Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/grpc/_server.py
7764 views
1
# Copyright 2016 gRPC authors.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
# http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""Service-side implementation of gRPC Python."""
15
16
import collections
17
from concurrent import futures
18
import enum
19
import logging
20
import threading
21
import time
22
23
import grpc
24
from grpc import _common
25
from grpc import _compression
26
from grpc import _interceptor
27
from grpc._cython import cygrpc
28
import six
29
30
_LOGGER = logging.getLogger(__name__)
31
32
_SHUTDOWN_TAG = 'shutdown'
33
_REQUEST_CALL_TAG = 'request_call'
34
35
_RECEIVE_CLOSE_ON_SERVER_TOKEN = 'receive_close_on_server'
36
_SEND_INITIAL_METADATA_TOKEN = 'send_initial_metadata'
37
_RECEIVE_MESSAGE_TOKEN = 'receive_message'
38
_SEND_MESSAGE_TOKEN = 'send_message'
39
_SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN = (
40
'send_initial_metadata * send_message')
41
_SEND_STATUS_FROM_SERVER_TOKEN = 'send_status_from_server'
42
_SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN = (
43
'send_initial_metadata * send_status_from_server')
44
45
_OPEN = 'open'
46
_CLOSED = 'closed'
47
_CANCELLED = 'cancelled'
48
49
_EMPTY_FLAGS = 0
50
51
_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.0
52
_INF_TIMEOUT = 1e9
53
54
55
def _serialized_request(request_event):
56
return request_event.batch_operations[0].message()
57
58
59
def _application_code(code):
60
cygrpc_code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(code)
61
return cygrpc.StatusCode.unknown if cygrpc_code is None else cygrpc_code
62
63
64
def _completion_code(state):
65
if state.code is None:
66
return cygrpc.StatusCode.ok
67
else:
68
return _application_code(state.code)
69
70
71
def _abortion_code(state, code):
72
if state.code is None:
73
return code
74
else:
75
return _application_code(state.code)
76
77
78
def _details(state):
79
return b'' if state.details is None else state.details
80
81
82
class _HandlerCallDetails(
83
collections.namedtuple('_HandlerCallDetails', (
84
'method',
85
'invocation_metadata',
86
)), grpc.HandlerCallDetails):
87
pass
88
89
90
class _RPCState(object):
91
92
def __init__(self):
93
self.condition = threading.Condition()
94
self.due = set()
95
self.request = None
96
self.client = _OPEN
97
self.initial_metadata_allowed = True
98
self.compression_algorithm = None
99
self.disable_next_compression = False
100
self.trailing_metadata = None
101
self.code = None
102
self.details = None
103
self.statused = False
104
self.rpc_errors = []
105
self.callbacks = []
106
self.aborted = False
107
108
109
def _raise_rpc_error(state):
110
rpc_error = grpc.RpcError()
111
state.rpc_errors.append(rpc_error)
112
raise rpc_error
113
114
115
def _possibly_finish_call(state, token):
116
state.due.remove(token)
117
if not _is_rpc_state_active(state) and not state.due:
118
callbacks = state.callbacks
119
state.callbacks = None
120
return state, callbacks
121
else:
122
return None, ()
123
124
125
def _send_status_from_server(state, token):
126
127
def send_status_from_server(unused_send_status_from_server_event):
128
with state.condition:
129
return _possibly_finish_call(state, token)
130
131
return send_status_from_server
132
133
134
def _get_initial_metadata(state, metadata):
135
with state.condition:
136
if state.compression_algorithm:
137
compression_metadata = (
138
_compression.compression_algorithm_to_metadata(
139
state.compression_algorithm),)
140
if metadata is None:
141
return compression_metadata
142
else:
143
return compression_metadata + tuple(metadata)
144
else:
145
return metadata
146
147
148
def _get_initial_metadata_operation(state, metadata):
149
operation = cygrpc.SendInitialMetadataOperation(
150
_get_initial_metadata(state, metadata), _EMPTY_FLAGS)
151
return operation
152
153
154
def _abort(state, call, code, details):
155
if state.client is not _CANCELLED:
156
effective_code = _abortion_code(state, code)
157
effective_details = details if state.details is None else state.details
158
if state.initial_metadata_allowed:
159
operations = (
160
_get_initial_metadata_operation(state, None),
161
cygrpc.SendStatusFromServerOperation(state.trailing_metadata,
162
effective_code,
163
effective_details,
164
_EMPTY_FLAGS),
165
)
166
token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
167
else:
168
operations = (cygrpc.SendStatusFromServerOperation(
169
state.trailing_metadata, effective_code, effective_details,
170
_EMPTY_FLAGS),)
171
token = _SEND_STATUS_FROM_SERVER_TOKEN
172
call.start_server_batch(operations,
173
_send_status_from_server(state, token))
174
state.statused = True
175
state.due.add(token)
176
177
178
def _receive_close_on_server(state):
179
180
def receive_close_on_server(receive_close_on_server_event):
181
with state.condition:
182
if receive_close_on_server_event.batch_operations[0].cancelled():
183
state.client = _CANCELLED
184
elif state.client is _OPEN:
185
state.client = _CLOSED
186
state.condition.notify_all()
187
return _possibly_finish_call(state, _RECEIVE_CLOSE_ON_SERVER_TOKEN)
188
189
return receive_close_on_server
190
191
192
def _receive_message(state, call, request_deserializer):
193
194
def receive_message(receive_message_event):
195
serialized_request = _serialized_request(receive_message_event)
196
if serialized_request is None:
197
with state.condition:
198
if state.client is _OPEN:
199
state.client = _CLOSED
200
state.condition.notify_all()
201
return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN)
202
else:
203
request = _common.deserialize(serialized_request,
204
request_deserializer)
205
with state.condition:
206
if request is None:
207
_abort(state, call, cygrpc.StatusCode.internal,
208
b'Exception deserializing request!')
209
else:
210
state.request = request
211
state.condition.notify_all()
212
return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN)
213
214
return receive_message
215
216
217
def _send_initial_metadata(state):
218
219
def send_initial_metadata(unused_send_initial_metadata_event):
220
with state.condition:
221
return _possibly_finish_call(state, _SEND_INITIAL_METADATA_TOKEN)
222
223
return send_initial_metadata
224
225
226
def _send_message(state, token):
227
228
def send_message(unused_send_message_event):
229
with state.condition:
230
state.condition.notify_all()
231
return _possibly_finish_call(state, token)
232
233
return send_message
234
235
236
class _Context(grpc.ServicerContext):
237
238
def __init__(self, rpc_event, state, request_deserializer):
239
self._rpc_event = rpc_event
240
self._state = state
241
self._request_deserializer = request_deserializer
242
243
def is_active(self):
244
with self._state.condition:
245
return _is_rpc_state_active(self._state)
246
247
def time_remaining(self):
248
return max(self._rpc_event.call_details.deadline - time.time(), 0)
249
250
def cancel(self):
251
self._rpc_event.call.cancel()
252
253
def add_callback(self, callback):
254
with self._state.condition:
255
if self._state.callbacks is None:
256
return False
257
else:
258
self._state.callbacks.append(callback)
259
return True
260
261
def disable_next_message_compression(self):
262
with self._state.condition:
263
self._state.disable_next_compression = True
264
265
def invocation_metadata(self):
266
return self._rpc_event.invocation_metadata
267
268
def peer(self):
269
return _common.decode(self._rpc_event.call.peer())
270
271
def peer_identities(self):
272
return cygrpc.peer_identities(self._rpc_event.call)
273
274
def peer_identity_key(self):
275
id_key = cygrpc.peer_identity_key(self._rpc_event.call)
276
return id_key if id_key is None else _common.decode(id_key)
277
278
def auth_context(self):
279
return {
280
_common.decode(key): value for key, value in six.iteritems(
281
cygrpc.auth_context(self._rpc_event.call))
282
}
283
284
def set_compression(self, compression):
285
with self._state.condition:
286
self._state.compression_algorithm = compression
287
288
def send_initial_metadata(self, initial_metadata):
289
with self._state.condition:
290
if self._state.client is _CANCELLED:
291
_raise_rpc_error(self._state)
292
else:
293
if self._state.initial_metadata_allowed:
294
operation = _get_initial_metadata_operation(
295
self._state, initial_metadata)
296
self._rpc_event.call.start_server_batch(
297
(operation,), _send_initial_metadata(self._state))
298
self._state.initial_metadata_allowed = False
299
self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)
300
else:
301
raise ValueError('Initial metadata no longer allowed!')
302
303
def set_trailing_metadata(self, trailing_metadata):
304
with self._state.condition:
305
self._state.trailing_metadata = trailing_metadata
306
307
def trailing_metadata(self):
308
return self._state.trailing_metadata
309
310
def abort(self, code, details):
311
# treat OK like other invalid arguments: fail the RPC
312
if code == grpc.StatusCode.OK:
313
_LOGGER.error(
314
'abort() called with StatusCode.OK; returning UNKNOWN')
315
code = grpc.StatusCode.UNKNOWN
316
details = ''
317
with self._state.condition:
318
self._state.code = code
319
self._state.details = _common.encode(details)
320
self._state.aborted = True
321
raise Exception()
322
323
def abort_with_status(self, status):
324
self._state.trailing_metadata = status.trailing_metadata
325
self.abort(status.code, status.details)
326
327
def set_code(self, code):
328
with self._state.condition:
329
self._state.code = code
330
331
def code(self):
332
return self._state.code
333
334
def set_details(self, details):
335
with self._state.condition:
336
self._state.details = _common.encode(details)
337
338
def details(self):
339
return self._state.details
340
341
def _finalize_state(self):
342
pass
343
344
345
class _RequestIterator(object):
346
347
def __init__(self, state, call, request_deserializer):
348
self._state = state
349
self._call = call
350
self._request_deserializer = request_deserializer
351
352
def _raise_or_start_receive_message(self):
353
if self._state.client is _CANCELLED:
354
_raise_rpc_error(self._state)
355
elif not _is_rpc_state_active(self._state):
356
raise StopIteration()
357
else:
358
self._call.start_server_batch(
359
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
360
_receive_message(self._state, self._call,
361
self._request_deserializer))
362
self._state.due.add(_RECEIVE_MESSAGE_TOKEN)
363
364
def _look_for_request(self):
365
if self._state.client is _CANCELLED:
366
_raise_rpc_error(self._state)
367
elif (self._state.request is None and
368
_RECEIVE_MESSAGE_TOKEN not in self._state.due):
369
raise StopIteration()
370
else:
371
request = self._state.request
372
self._state.request = None
373
return request
374
375
raise AssertionError() # should never run
376
377
def _next(self):
378
with self._state.condition:
379
self._raise_or_start_receive_message()
380
while True:
381
self._state.condition.wait()
382
request = self._look_for_request()
383
if request is not None:
384
return request
385
386
def __iter__(self):
387
return self
388
389
def __next__(self):
390
return self._next()
391
392
def next(self):
393
return self._next()
394
395
396
def _unary_request(rpc_event, state, request_deserializer):
397
398
def unary_request():
399
with state.condition:
400
if not _is_rpc_state_active(state):
401
return None
402
else:
403
rpc_event.call.start_server_batch(
404
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
405
_receive_message(state, rpc_event.call,
406
request_deserializer))
407
state.due.add(_RECEIVE_MESSAGE_TOKEN)
408
while True:
409
state.condition.wait()
410
if state.request is None:
411
if state.client is _CLOSED:
412
details = '"{}" requires exactly one request message.'.format(
413
rpc_event.call_details.method)
414
_abort(state, rpc_event.call,
415
cygrpc.StatusCode.unimplemented,
416
_common.encode(details))
417
return None
418
elif state.client is _CANCELLED:
419
return None
420
else:
421
request = state.request
422
state.request = None
423
return request
424
425
return unary_request
426
427
428
def _call_behavior(rpc_event,
429
state,
430
behavior,
431
argument,
432
request_deserializer,
433
send_response_callback=None):
434
from grpc import _create_servicer_context
435
with _create_servicer_context(rpc_event, state,
436
request_deserializer) as context:
437
try:
438
response_or_iterator = None
439
if send_response_callback is not None:
440
response_or_iterator = behavior(argument, context,
441
send_response_callback)
442
else:
443
response_or_iterator = behavior(argument, context)
444
return response_or_iterator, True
445
except Exception as exception: # pylint: disable=broad-except
446
with state.condition:
447
if state.aborted:
448
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
449
b'RPC Aborted')
450
elif exception not in state.rpc_errors:
451
details = 'Exception calling application: {}'.format(
452
exception)
453
_LOGGER.exception(details)
454
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
455
_common.encode(details))
456
return None, False
457
458
459
def _take_response_from_response_iterator(rpc_event, state, response_iterator):
460
try:
461
return next(response_iterator), True
462
except StopIteration:
463
return None, True
464
except Exception as exception: # pylint: disable=broad-except
465
with state.condition:
466
if state.aborted:
467
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
468
b'RPC Aborted')
469
elif exception not in state.rpc_errors:
470
details = 'Exception iterating responses: {}'.format(exception)
471
_LOGGER.exception(details)
472
_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,
473
_common.encode(details))
474
return None, False
475
476
477
def _serialize_response(rpc_event, state, response, response_serializer):
478
serialized_response = _common.serialize(response, response_serializer)
479
if serialized_response is None:
480
with state.condition:
481
_abort(state, rpc_event.call, cygrpc.StatusCode.internal,
482
b'Failed to serialize response!')
483
return None
484
else:
485
return serialized_response
486
487
488
def _get_send_message_op_flags_from_state(state):
489
if state.disable_next_compression:
490
return cygrpc.WriteFlag.no_compress
491
else:
492
return _EMPTY_FLAGS
493
494
495
def _reset_per_message_state(state):
496
with state.condition:
497
state.disable_next_compression = False
498
499
500
def _send_response(rpc_event, state, serialized_response):
501
with state.condition:
502
if not _is_rpc_state_active(state):
503
return False
504
else:
505
if state.initial_metadata_allowed:
506
operations = (
507
_get_initial_metadata_operation(state, None),
508
cygrpc.SendMessageOperation(
509
serialized_response,
510
_get_send_message_op_flags_from_state(state)),
511
)
512
state.initial_metadata_allowed = False
513
token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
514
else:
515
operations = (cygrpc.SendMessageOperation(
516
serialized_response,
517
_get_send_message_op_flags_from_state(state)),)
518
token = _SEND_MESSAGE_TOKEN
519
rpc_event.call.start_server_batch(operations,
520
_send_message(state, token))
521
state.due.add(token)
522
_reset_per_message_state(state)
523
while True:
524
state.condition.wait()
525
if token not in state.due:
526
return _is_rpc_state_active(state)
527
528
529
def _status(rpc_event, state, serialized_response):
530
with state.condition:
531
if state.client is not _CANCELLED:
532
code = _completion_code(state)
533
details = _details(state)
534
operations = [
535
cygrpc.SendStatusFromServerOperation(state.trailing_metadata,
536
code, details,
537
_EMPTY_FLAGS),
538
]
539
if state.initial_metadata_allowed:
540
operations.append(_get_initial_metadata_operation(state, None))
541
if serialized_response is not None:
542
operations.append(
543
cygrpc.SendMessageOperation(
544
serialized_response,
545
_get_send_message_op_flags_from_state(state)))
546
rpc_event.call.start_server_batch(
547
operations,
548
_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
549
state.statused = True
550
_reset_per_message_state(state)
551
state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN)
552
553
554
def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
555
request_deserializer, response_serializer):
556
cygrpc.install_context_from_request_call_event(rpc_event)
557
try:
558
argument = argument_thunk()
559
if argument is not None:
560
response, proceed = _call_behavior(rpc_event, state, behavior,
561
argument, request_deserializer)
562
if proceed:
563
serialized_response = _serialize_response(
564
rpc_event, state, response, response_serializer)
565
if serialized_response is not None:
566
_status(rpc_event, state, serialized_response)
567
finally:
568
cygrpc.uninstall_context()
569
570
571
def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
572
request_deserializer, response_serializer):
573
cygrpc.install_context_from_request_call_event(rpc_event)
574
575
def send_response(response):
576
if response is None:
577
_status(rpc_event, state, None)
578
else:
579
serialized_response = _serialize_response(rpc_event, state,
580
response,
581
response_serializer)
582
if serialized_response is not None:
583
_send_response(rpc_event, state, serialized_response)
584
585
try:
586
argument = argument_thunk()
587
if argument is not None:
588
if hasattr(behavior, 'experimental_non_blocking'
589
) and behavior.experimental_non_blocking:
590
_call_behavior(rpc_event,
591
state,
592
behavior,
593
argument,
594
request_deserializer,
595
send_response_callback=send_response)
596
else:
597
response_iterator, proceed = _call_behavior(
598
rpc_event, state, behavior, argument, request_deserializer)
599
if proceed:
600
_send_message_callback_to_blocking_iterator_adapter(
601
rpc_event, state, send_response, response_iterator)
602
finally:
603
cygrpc.uninstall_context()
604
605
606
def _is_rpc_state_active(state):
607
return state.client is not _CANCELLED and not state.statused
608
609
610
def _send_message_callback_to_blocking_iterator_adapter(rpc_event, state,
611
send_response_callback,
612
response_iterator):
613
while True:
614
response, proceed = _take_response_from_response_iterator(
615
rpc_event, state, response_iterator)
616
if proceed:
617
send_response_callback(response)
618
if not _is_rpc_state_active(state):
619
break
620
else:
621
break
622
623
624
def _select_thread_pool_for_behavior(behavior, default_thread_pool):
625
if hasattr(behavior, 'experimental_thread_pool') and isinstance(
626
behavior.experimental_thread_pool, futures.ThreadPoolExecutor):
627
return behavior.experimental_thread_pool
628
else:
629
return default_thread_pool
630
631
632
def _handle_unary_unary(rpc_event, state, method_handler, default_thread_pool):
633
unary_request = _unary_request(rpc_event, state,
634
method_handler.request_deserializer)
635
thread_pool = _select_thread_pool_for_behavior(method_handler.unary_unary,
636
default_thread_pool)
637
return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
638
method_handler.unary_unary, unary_request,
639
method_handler.request_deserializer,
640
method_handler.response_serializer)
641
642
643
def _handle_unary_stream(rpc_event, state, method_handler, default_thread_pool):
644
unary_request = _unary_request(rpc_event, state,
645
method_handler.request_deserializer)
646
thread_pool = _select_thread_pool_for_behavior(method_handler.unary_stream,
647
default_thread_pool)
648
return thread_pool.submit(_stream_response_in_pool, rpc_event, state,
649
method_handler.unary_stream, unary_request,
650
method_handler.request_deserializer,
651
method_handler.response_serializer)
652
653
654
def _handle_stream_unary(rpc_event, state, method_handler, default_thread_pool):
655
request_iterator = _RequestIterator(state, rpc_event.call,
656
method_handler.request_deserializer)
657
thread_pool = _select_thread_pool_for_behavior(method_handler.stream_unary,
658
default_thread_pool)
659
return thread_pool.submit(_unary_response_in_pool, rpc_event, state,
660
method_handler.stream_unary,
661
lambda: request_iterator,
662
method_handler.request_deserializer,
663
method_handler.response_serializer)
664
665
666
def _handle_stream_stream(rpc_event, state, method_handler,
667
default_thread_pool):
668
request_iterator = _RequestIterator(state, rpc_event.call,
669
method_handler.request_deserializer)
670
thread_pool = _select_thread_pool_for_behavior(method_handler.stream_stream,
671
default_thread_pool)
672
return thread_pool.submit(_stream_response_in_pool, rpc_event, state,
673
method_handler.stream_stream,
674
lambda: request_iterator,
675
method_handler.request_deserializer,
676
method_handler.response_serializer)
677
678
679
def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
680
681
def query_handlers(handler_call_details):
682
for generic_handler in generic_handlers:
683
method_handler = generic_handler.service(handler_call_details)
684
if method_handler is not None:
685
return method_handler
686
return None
687
688
handler_call_details = _HandlerCallDetails(
689
_common.decode(rpc_event.call_details.method),
690
rpc_event.invocation_metadata)
691
692
if interceptor_pipeline is not None:
693
return interceptor_pipeline.execute(query_handlers,
694
handler_call_details)
695
else:
696
return query_handlers(handler_call_details)
697
698
699
def _reject_rpc(rpc_event, status, details):
700
rpc_state = _RPCState()
701
operations = (
702
_get_initial_metadata_operation(rpc_state, None),
703
cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
704
cygrpc.SendStatusFromServerOperation(None, status, details,
705
_EMPTY_FLAGS),
706
)
707
rpc_event.call.start_server_batch(operations, lambda ignored_event: (
708
rpc_state,
709
(),
710
))
711
return rpc_state
712
713
714
def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
715
state = _RPCState()
716
with state.condition:
717
rpc_event.call.start_server_batch(
718
(cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
719
_receive_close_on_server(state))
720
state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
721
if method_handler.request_streaming:
722
if method_handler.response_streaming:
723
return state, _handle_stream_stream(rpc_event, state,
724
method_handler, thread_pool)
725
else:
726
return state, _handle_stream_unary(rpc_event, state,
727
method_handler, thread_pool)
728
else:
729
if method_handler.response_streaming:
730
return state, _handle_unary_stream(rpc_event, state,
731
method_handler, thread_pool)
732
else:
733
return state, _handle_unary_unary(rpc_event, state,
734
method_handler, thread_pool)
735
736
737
def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,
738
concurrency_exceeded):
739
if not rpc_event.success:
740
return None, None
741
if rpc_event.call_details.method is not None:
742
try:
743
method_handler = _find_method_handler(rpc_event, generic_handlers,
744
interceptor_pipeline)
745
except Exception as exception: # pylint: disable=broad-except
746
details = 'Exception servicing handler: {}'.format(exception)
747
_LOGGER.exception(details)
748
return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,
749
b'Error in service handler!'), None
750
if method_handler is None:
751
return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,
752
b'Method not found!'), None
753
elif concurrency_exceeded:
754
return _reject_rpc(rpc_event, cygrpc.StatusCode.resource_exhausted,
755
b'Concurrent RPC limit exceeded!'), None
756
else:
757
return _handle_with_method_handler(rpc_event, method_handler,
758
thread_pool)
759
else:
760
return None, None
761
762
763
@enum.unique
764
class _ServerStage(enum.Enum):
765
STOPPED = 'stopped'
766
STARTED = 'started'
767
GRACE = 'grace'
768
769
770
class _ServerState(object):
771
772
# pylint: disable=too-many-arguments
773
def __init__(self, completion_queue, server, generic_handlers,
774
interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):
775
self.lock = threading.RLock()
776
self.completion_queue = completion_queue
777
self.server = server
778
self.generic_handlers = list(generic_handlers)
779
self.interceptor_pipeline = interceptor_pipeline
780
self.thread_pool = thread_pool
781
self.stage = _ServerStage.STOPPED
782
self.termination_event = threading.Event()
783
self.shutdown_events = [self.termination_event]
784
self.maximum_concurrent_rpcs = maximum_concurrent_rpcs
785
self.active_rpc_count = 0
786
787
# TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.
788
self.rpc_states = set()
789
self.due = set()
790
791
# A "volatile" flag to interrupt the daemon serving thread
792
self.server_deallocated = False
793
794
795
def _add_generic_handlers(state, generic_handlers):
796
with state.lock:
797
state.generic_handlers.extend(generic_handlers)
798
799
800
def _add_insecure_port(state, address):
801
with state.lock:
802
return state.server.add_http2_port(address)
803
804
805
def _add_secure_port(state, address, server_credentials):
806
with state.lock:
807
return state.server.add_http2_port(address,
808
server_credentials._credentials)
809
810
811
def _request_call(state):
812
state.server.request_call(state.completion_queue, state.completion_queue,
813
_REQUEST_CALL_TAG)
814
state.due.add(_REQUEST_CALL_TAG)
815
816
817
# TODO(https://github.com/grpc/grpc/issues/6597): delete this function.
818
def _stop_serving(state):
819
if not state.rpc_states and not state.due:
820
state.server.destroy()
821
for shutdown_event in state.shutdown_events:
822
shutdown_event.set()
823
state.stage = _ServerStage.STOPPED
824
return True
825
else:
826
return False
827
828
829
def _on_call_completed(state):
830
with state.lock:
831
state.active_rpc_count -= 1
832
833
834
def _process_event_and_continue(state, event):
835
should_continue = True
836
if event.tag is _SHUTDOWN_TAG:
837
with state.lock:
838
state.due.remove(_SHUTDOWN_TAG)
839
if _stop_serving(state):
840
should_continue = False
841
elif event.tag is _REQUEST_CALL_TAG:
842
with state.lock:
843
state.due.remove(_REQUEST_CALL_TAG)
844
concurrency_exceeded = (
845
state.maximum_concurrent_rpcs is not None and
846
state.active_rpc_count >= state.maximum_concurrent_rpcs)
847
rpc_state, rpc_future = _handle_call(event, state.generic_handlers,
848
state.interceptor_pipeline,
849
state.thread_pool,
850
concurrency_exceeded)
851
if rpc_state is not None:
852
state.rpc_states.add(rpc_state)
853
if rpc_future is not None:
854
state.active_rpc_count += 1
855
rpc_future.add_done_callback(
856
lambda unused_future: _on_call_completed(state))
857
if state.stage is _ServerStage.STARTED:
858
_request_call(state)
859
elif _stop_serving(state):
860
should_continue = False
861
else:
862
rpc_state, callbacks = event.tag(event)
863
for callback in callbacks:
864
try:
865
callback()
866
except Exception: # pylint: disable=broad-except
867
_LOGGER.exception('Exception calling callback!')
868
if rpc_state is not None:
869
with state.lock:
870
state.rpc_states.remove(rpc_state)
871
if _stop_serving(state):
872
should_continue = False
873
return should_continue
874
875
876
def _serve(state):
877
while True:
878
timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S
879
event = state.completion_queue.poll(timeout)
880
if state.server_deallocated:
881
_begin_shutdown_once(state)
882
if event.completion_type != cygrpc.CompletionType.queue_timeout:
883
if not _process_event_and_continue(state, event):
884
return
885
# We want to force the deletion of the previous event
886
# ~before~ we poll again; if the event has a reference
887
# to a shutdown Call object, this can induce spinlock.
888
event = None
889
890
891
def _begin_shutdown_once(state):
892
with state.lock:
893
if state.stage is _ServerStage.STARTED:
894
state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)
895
state.stage = _ServerStage.GRACE
896
state.due.add(_SHUTDOWN_TAG)
897
898
899
def _stop(state, grace):
900
with state.lock:
901
if state.stage is _ServerStage.STOPPED:
902
shutdown_event = threading.Event()
903
shutdown_event.set()
904
return shutdown_event
905
else:
906
_begin_shutdown_once(state)
907
shutdown_event = threading.Event()
908
state.shutdown_events.append(shutdown_event)
909
if grace is None:
910
state.server.cancel_all_calls()
911
else:
912
913
def cancel_all_calls_after_grace():
914
shutdown_event.wait(timeout=grace)
915
with state.lock:
916
state.server.cancel_all_calls()
917
918
thread = threading.Thread(target=cancel_all_calls_after_grace)
919
thread.start()
920
return shutdown_event
921
shutdown_event.wait()
922
return shutdown_event
923
924
925
def _start(state):
926
with state.lock:
927
if state.stage is not _ServerStage.STOPPED:
928
raise ValueError('Cannot start already-started server!')
929
state.server.start()
930
state.stage = _ServerStage.STARTED
931
_request_call(state)
932
933
thread = threading.Thread(target=_serve, args=(state,))
934
thread.daemon = True
935
thread.start()
936
937
938
def _validate_generic_rpc_handlers(generic_rpc_handlers):
939
for generic_rpc_handler in generic_rpc_handlers:
940
service_attribute = getattr(generic_rpc_handler, 'service', None)
941
if service_attribute is None:
942
raise AttributeError(
943
'"{}" must conform to grpc.GenericRpcHandler type but does '
944
'not have "service" method!'.format(generic_rpc_handler))
945
946
947
def _augment_options(base_options, compression):
948
compression_option = _compression.create_channel_option(compression)
949
return tuple(base_options) + compression_option
950
951
952
class _Server(grpc.Server):
953
954
# pylint: disable=too-many-arguments
955
def __init__(self, thread_pool, generic_handlers, interceptors, options,
956
maximum_concurrent_rpcs, compression, xds):
957
completion_queue = cygrpc.CompletionQueue()
958
server = cygrpc.Server(_augment_options(options, compression), xds)
959
server.register_completion_queue(completion_queue)
960
self._state = _ServerState(completion_queue, server, generic_handlers,
961
_interceptor.service_pipeline(interceptors),
962
thread_pool, maximum_concurrent_rpcs)
963
964
def add_generic_rpc_handlers(self, generic_rpc_handlers):
965
_validate_generic_rpc_handlers(generic_rpc_handlers)
966
_add_generic_handlers(self._state, generic_rpc_handlers)
967
968
def add_insecure_port(self, address):
969
return _common.validate_port_binding_result(
970
address, _add_insecure_port(self._state, _common.encode(address)))
971
972
def add_secure_port(self, address, server_credentials):
973
return _common.validate_port_binding_result(
974
address,
975
_add_secure_port(self._state, _common.encode(address),
976
server_credentials))
977
978
def start(self):
979
_start(self._state)
980
981
def wait_for_termination(self, timeout=None):
982
# NOTE(https://bugs.python.org/issue35935)
983
# Remove this workaround once threading.Event.wait() is working with
984
# CTRL+C across platforms.
985
return _common.wait(self._state.termination_event.wait,
986
self._state.termination_event.is_set,
987
timeout=timeout)
988
989
def stop(self, grace):
990
return _stop(self._state, grace)
991
992
def __del__(self):
993
if hasattr(self, '_state'):
994
# We can not grab a lock in __del__(), so set a flag to signal the
995
# serving daemon thread (if it exists) to initiate shutdown.
996
self._state.server_deallocated = True
997
998
999
def create_server(thread_pool, generic_rpc_handlers, interceptors, options,
1000
maximum_concurrent_rpcs, compression, xds):
1001
_validate_generic_rpc_handlers(generic_rpc_handlers)
1002
return _Server(thread_pool, generic_rpc_handlers, interceptors, options,
1003
maximum_concurrent_rpcs, compression, xds)
1004
1005