Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/grpc/_server.py
7764 views
# Copyright 2016 gRPC authors.1#2# Licensed under the Apache License, Version 2.0 (the "License");3# you may not use this file except in compliance with the License.4# You may obtain a copy of the License at5#6# http://www.apache.org/licenses/LICENSE-2.07#8# Unless required by applicable law or agreed to in writing, software9# distributed under the License is distributed on an "AS IS" BASIS,10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11# See the License for the specific language governing permissions and12# limitations under the License.13"""Service-side implementation of gRPC Python."""1415import collections16from concurrent import futures17import enum18import logging19import threading20import time2122import grpc23from grpc import _common24from grpc import _compression25from grpc import _interceptor26from grpc._cython import cygrpc27import six2829_LOGGER = logging.getLogger(__name__)3031_SHUTDOWN_TAG = 'shutdown'32_REQUEST_CALL_TAG = 'request_call'3334_RECEIVE_CLOSE_ON_SERVER_TOKEN = 'receive_close_on_server'35_SEND_INITIAL_METADATA_TOKEN = 'send_initial_metadata'36_RECEIVE_MESSAGE_TOKEN = 'receive_message'37_SEND_MESSAGE_TOKEN = 'send_message'38_SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN = (39'send_initial_metadata * send_message')40_SEND_STATUS_FROM_SERVER_TOKEN = 'send_status_from_server'41_SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN = (42'send_initial_metadata * send_status_from_server')4344_OPEN = 'open'45_CLOSED = 'closed'46_CANCELLED = 'cancelled'4748_EMPTY_FLAGS = 04950_DEALLOCATED_SERVER_CHECK_PERIOD_S = 1.051_INF_TIMEOUT = 1e9525354def _serialized_request(request_event):55return request_event.batch_operations[0].message()565758def _application_code(code):59cygrpc_code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(code)60return cygrpc.StatusCode.unknown if cygrpc_code is None else cygrpc_code616263def _completion_code(state):64if state.code is None:65return cygrpc.StatusCode.ok66else:67return _application_code(state.code)686970def _abortion_code(state, code):71if state.code is None:72return code73else:74return _application_code(state.code)757677def _details(state):78return b'' if state.details is None else state.details798081class _HandlerCallDetails(82collections.namedtuple('_HandlerCallDetails', (83'method',84'invocation_metadata',85)), grpc.HandlerCallDetails):86pass878889class _RPCState(object):9091def __init__(self):92self.condition = threading.Condition()93self.due = set()94self.request = None95self.client = _OPEN96self.initial_metadata_allowed = True97self.compression_algorithm = None98self.disable_next_compression = False99self.trailing_metadata = None100self.code = None101self.details = None102self.statused = False103self.rpc_errors = []104self.callbacks = []105self.aborted = False106107108def _raise_rpc_error(state):109rpc_error = grpc.RpcError()110state.rpc_errors.append(rpc_error)111raise rpc_error112113114def _possibly_finish_call(state, token):115state.due.remove(token)116if not _is_rpc_state_active(state) and not state.due:117callbacks = state.callbacks118state.callbacks = None119return state, callbacks120else:121return None, ()122123124def _send_status_from_server(state, token):125126def send_status_from_server(unused_send_status_from_server_event):127with state.condition:128return _possibly_finish_call(state, token)129130return send_status_from_server131132133def _get_initial_metadata(state, metadata):134with state.condition:135if state.compression_algorithm:136compression_metadata = (137_compression.compression_algorithm_to_metadata(138state.compression_algorithm),)139if metadata is None:140return compression_metadata141else:142return compression_metadata + tuple(metadata)143else:144return metadata145146147def _get_initial_metadata_operation(state, metadata):148operation = cygrpc.SendInitialMetadataOperation(149_get_initial_metadata(state, metadata), _EMPTY_FLAGS)150return operation151152153def _abort(state, call, code, details):154if state.client is not _CANCELLED:155effective_code = _abortion_code(state, code)156effective_details = details if state.details is None else state.details157if state.initial_metadata_allowed:158operations = (159_get_initial_metadata_operation(state, None),160cygrpc.SendStatusFromServerOperation(state.trailing_metadata,161effective_code,162effective_details,163_EMPTY_FLAGS),164)165token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN166else:167operations = (cygrpc.SendStatusFromServerOperation(168state.trailing_metadata, effective_code, effective_details,169_EMPTY_FLAGS),)170token = _SEND_STATUS_FROM_SERVER_TOKEN171call.start_server_batch(operations,172_send_status_from_server(state, token))173state.statused = True174state.due.add(token)175176177def _receive_close_on_server(state):178179def receive_close_on_server(receive_close_on_server_event):180with state.condition:181if receive_close_on_server_event.batch_operations[0].cancelled():182state.client = _CANCELLED183elif state.client is _OPEN:184state.client = _CLOSED185state.condition.notify_all()186return _possibly_finish_call(state, _RECEIVE_CLOSE_ON_SERVER_TOKEN)187188return receive_close_on_server189190191def _receive_message(state, call, request_deserializer):192193def receive_message(receive_message_event):194serialized_request = _serialized_request(receive_message_event)195if serialized_request is None:196with state.condition:197if state.client is _OPEN:198state.client = _CLOSED199state.condition.notify_all()200return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN)201else:202request = _common.deserialize(serialized_request,203request_deserializer)204with state.condition:205if request is None:206_abort(state, call, cygrpc.StatusCode.internal,207b'Exception deserializing request!')208else:209state.request = request210state.condition.notify_all()211return _possibly_finish_call(state, _RECEIVE_MESSAGE_TOKEN)212213return receive_message214215216def _send_initial_metadata(state):217218def send_initial_metadata(unused_send_initial_metadata_event):219with state.condition:220return _possibly_finish_call(state, _SEND_INITIAL_METADATA_TOKEN)221222return send_initial_metadata223224225def _send_message(state, token):226227def send_message(unused_send_message_event):228with state.condition:229state.condition.notify_all()230return _possibly_finish_call(state, token)231232return send_message233234235class _Context(grpc.ServicerContext):236237def __init__(self, rpc_event, state, request_deserializer):238self._rpc_event = rpc_event239self._state = state240self._request_deserializer = request_deserializer241242def is_active(self):243with self._state.condition:244return _is_rpc_state_active(self._state)245246def time_remaining(self):247return max(self._rpc_event.call_details.deadline - time.time(), 0)248249def cancel(self):250self._rpc_event.call.cancel()251252def add_callback(self, callback):253with self._state.condition:254if self._state.callbacks is None:255return False256else:257self._state.callbacks.append(callback)258return True259260def disable_next_message_compression(self):261with self._state.condition:262self._state.disable_next_compression = True263264def invocation_metadata(self):265return self._rpc_event.invocation_metadata266267def peer(self):268return _common.decode(self._rpc_event.call.peer())269270def peer_identities(self):271return cygrpc.peer_identities(self._rpc_event.call)272273def peer_identity_key(self):274id_key = cygrpc.peer_identity_key(self._rpc_event.call)275return id_key if id_key is None else _common.decode(id_key)276277def auth_context(self):278return {279_common.decode(key): value for key, value in six.iteritems(280cygrpc.auth_context(self._rpc_event.call))281}282283def set_compression(self, compression):284with self._state.condition:285self._state.compression_algorithm = compression286287def send_initial_metadata(self, initial_metadata):288with self._state.condition:289if self._state.client is _CANCELLED:290_raise_rpc_error(self._state)291else:292if self._state.initial_metadata_allowed:293operation = _get_initial_metadata_operation(294self._state, initial_metadata)295self._rpc_event.call.start_server_batch(296(operation,), _send_initial_metadata(self._state))297self._state.initial_metadata_allowed = False298self._state.due.add(_SEND_INITIAL_METADATA_TOKEN)299else:300raise ValueError('Initial metadata no longer allowed!')301302def set_trailing_metadata(self, trailing_metadata):303with self._state.condition:304self._state.trailing_metadata = trailing_metadata305306def trailing_metadata(self):307return self._state.trailing_metadata308309def abort(self, code, details):310# treat OK like other invalid arguments: fail the RPC311if code == grpc.StatusCode.OK:312_LOGGER.error(313'abort() called with StatusCode.OK; returning UNKNOWN')314code = grpc.StatusCode.UNKNOWN315details = ''316with self._state.condition:317self._state.code = code318self._state.details = _common.encode(details)319self._state.aborted = True320raise Exception()321322def abort_with_status(self, status):323self._state.trailing_metadata = status.trailing_metadata324self.abort(status.code, status.details)325326def set_code(self, code):327with self._state.condition:328self._state.code = code329330def code(self):331return self._state.code332333def set_details(self, details):334with self._state.condition:335self._state.details = _common.encode(details)336337def details(self):338return self._state.details339340def _finalize_state(self):341pass342343344class _RequestIterator(object):345346def __init__(self, state, call, request_deserializer):347self._state = state348self._call = call349self._request_deserializer = request_deserializer350351def _raise_or_start_receive_message(self):352if self._state.client is _CANCELLED:353_raise_rpc_error(self._state)354elif not _is_rpc_state_active(self._state):355raise StopIteration()356else:357self._call.start_server_batch(358(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),359_receive_message(self._state, self._call,360self._request_deserializer))361self._state.due.add(_RECEIVE_MESSAGE_TOKEN)362363def _look_for_request(self):364if self._state.client is _CANCELLED:365_raise_rpc_error(self._state)366elif (self._state.request is None and367_RECEIVE_MESSAGE_TOKEN not in self._state.due):368raise StopIteration()369else:370request = self._state.request371self._state.request = None372return request373374raise AssertionError() # should never run375376def _next(self):377with self._state.condition:378self._raise_or_start_receive_message()379while True:380self._state.condition.wait()381request = self._look_for_request()382if request is not None:383return request384385def __iter__(self):386return self387388def __next__(self):389return self._next()390391def next(self):392return self._next()393394395def _unary_request(rpc_event, state, request_deserializer):396397def unary_request():398with state.condition:399if not _is_rpc_state_active(state):400return None401else:402rpc_event.call.start_server_batch(403(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),404_receive_message(state, rpc_event.call,405request_deserializer))406state.due.add(_RECEIVE_MESSAGE_TOKEN)407while True:408state.condition.wait()409if state.request is None:410if state.client is _CLOSED:411details = '"{}" requires exactly one request message.'.format(412rpc_event.call_details.method)413_abort(state, rpc_event.call,414cygrpc.StatusCode.unimplemented,415_common.encode(details))416return None417elif state.client is _CANCELLED:418return None419else:420request = state.request421state.request = None422return request423424return unary_request425426427def _call_behavior(rpc_event,428state,429behavior,430argument,431request_deserializer,432send_response_callback=None):433from grpc import _create_servicer_context434with _create_servicer_context(rpc_event, state,435request_deserializer) as context:436try:437response_or_iterator = None438if send_response_callback is not None:439response_or_iterator = behavior(argument, context,440send_response_callback)441else:442response_or_iterator = behavior(argument, context)443return response_or_iterator, True444except Exception as exception: # pylint: disable=broad-except445with state.condition:446if state.aborted:447_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,448b'RPC Aborted')449elif exception not in state.rpc_errors:450details = 'Exception calling application: {}'.format(451exception)452_LOGGER.exception(details)453_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,454_common.encode(details))455return None, False456457458def _take_response_from_response_iterator(rpc_event, state, response_iterator):459try:460return next(response_iterator), True461except StopIteration:462return None, True463except Exception as exception: # pylint: disable=broad-except464with state.condition:465if state.aborted:466_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,467b'RPC Aborted')468elif exception not in state.rpc_errors:469details = 'Exception iterating responses: {}'.format(exception)470_LOGGER.exception(details)471_abort(state, rpc_event.call, cygrpc.StatusCode.unknown,472_common.encode(details))473return None, False474475476def _serialize_response(rpc_event, state, response, response_serializer):477serialized_response = _common.serialize(response, response_serializer)478if serialized_response is None:479with state.condition:480_abort(state, rpc_event.call, cygrpc.StatusCode.internal,481b'Failed to serialize response!')482return None483else:484return serialized_response485486487def _get_send_message_op_flags_from_state(state):488if state.disable_next_compression:489return cygrpc.WriteFlag.no_compress490else:491return _EMPTY_FLAGS492493494def _reset_per_message_state(state):495with state.condition:496state.disable_next_compression = False497498499def _send_response(rpc_event, state, serialized_response):500with state.condition:501if not _is_rpc_state_active(state):502return False503else:504if state.initial_metadata_allowed:505operations = (506_get_initial_metadata_operation(state, None),507cygrpc.SendMessageOperation(508serialized_response,509_get_send_message_op_flags_from_state(state)),510)511state.initial_metadata_allowed = False512token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN513else:514operations = (cygrpc.SendMessageOperation(515serialized_response,516_get_send_message_op_flags_from_state(state)),)517token = _SEND_MESSAGE_TOKEN518rpc_event.call.start_server_batch(operations,519_send_message(state, token))520state.due.add(token)521_reset_per_message_state(state)522while True:523state.condition.wait()524if token not in state.due:525return _is_rpc_state_active(state)526527528def _status(rpc_event, state, serialized_response):529with state.condition:530if state.client is not _CANCELLED:531code = _completion_code(state)532details = _details(state)533operations = [534cygrpc.SendStatusFromServerOperation(state.trailing_metadata,535code, details,536_EMPTY_FLAGS),537]538if state.initial_metadata_allowed:539operations.append(_get_initial_metadata_operation(state, None))540if serialized_response is not None:541operations.append(542cygrpc.SendMessageOperation(543serialized_response,544_get_send_message_op_flags_from_state(state)))545rpc_event.call.start_server_batch(546operations,547_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))548state.statused = True549_reset_per_message_state(state)550state.due.add(_SEND_STATUS_FROM_SERVER_TOKEN)551552553def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,554request_deserializer, response_serializer):555cygrpc.install_context_from_request_call_event(rpc_event)556try:557argument = argument_thunk()558if argument is not None:559response, proceed = _call_behavior(rpc_event, state, behavior,560argument, request_deserializer)561if proceed:562serialized_response = _serialize_response(563rpc_event, state, response, response_serializer)564if serialized_response is not None:565_status(rpc_event, state, serialized_response)566finally:567cygrpc.uninstall_context()568569570def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,571request_deserializer, response_serializer):572cygrpc.install_context_from_request_call_event(rpc_event)573574def send_response(response):575if response is None:576_status(rpc_event, state, None)577else:578serialized_response = _serialize_response(rpc_event, state,579response,580response_serializer)581if serialized_response is not None:582_send_response(rpc_event, state, serialized_response)583584try:585argument = argument_thunk()586if argument is not None:587if hasattr(behavior, 'experimental_non_blocking'588) and behavior.experimental_non_blocking:589_call_behavior(rpc_event,590state,591behavior,592argument,593request_deserializer,594send_response_callback=send_response)595else:596response_iterator, proceed = _call_behavior(597rpc_event, state, behavior, argument, request_deserializer)598if proceed:599_send_message_callback_to_blocking_iterator_adapter(600rpc_event, state, send_response, response_iterator)601finally:602cygrpc.uninstall_context()603604605def _is_rpc_state_active(state):606return state.client is not _CANCELLED and not state.statused607608609def _send_message_callback_to_blocking_iterator_adapter(rpc_event, state,610send_response_callback,611response_iterator):612while True:613response, proceed = _take_response_from_response_iterator(614rpc_event, state, response_iterator)615if proceed:616send_response_callback(response)617if not _is_rpc_state_active(state):618break619else:620break621622623def _select_thread_pool_for_behavior(behavior, default_thread_pool):624if hasattr(behavior, 'experimental_thread_pool') and isinstance(625behavior.experimental_thread_pool, futures.ThreadPoolExecutor):626return behavior.experimental_thread_pool627else:628return default_thread_pool629630631def _handle_unary_unary(rpc_event, state, method_handler, default_thread_pool):632unary_request = _unary_request(rpc_event, state,633method_handler.request_deserializer)634thread_pool = _select_thread_pool_for_behavior(method_handler.unary_unary,635default_thread_pool)636return thread_pool.submit(_unary_response_in_pool, rpc_event, state,637method_handler.unary_unary, unary_request,638method_handler.request_deserializer,639method_handler.response_serializer)640641642def _handle_unary_stream(rpc_event, state, method_handler, default_thread_pool):643unary_request = _unary_request(rpc_event, state,644method_handler.request_deserializer)645thread_pool = _select_thread_pool_for_behavior(method_handler.unary_stream,646default_thread_pool)647return thread_pool.submit(_stream_response_in_pool, rpc_event, state,648method_handler.unary_stream, unary_request,649method_handler.request_deserializer,650method_handler.response_serializer)651652653def _handle_stream_unary(rpc_event, state, method_handler, default_thread_pool):654request_iterator = _RequestIterator(state, rpc_event.call,655method_handler.request_deserializer)656thread_pool = _select_thread_pool_for_behavior(method_handler.stream_unary,657default_thread_pool)658return thread_pool.submit(_unary_response_in_pool, rpc_event, state,659method_handler.stream_unary,660lambda: request_iterator,661method_handler.request_deserializer,662method_handler.response_serializer)663664665def _handle_stream_stream(rpc_event, state, method_handler,666default_thread_pool):667request_iterator = _RequestIterator(state, rpc_event.call,668method_handler.request_deserializer)669thread_pool = _select_thread_pool_for_behavior(method_handler.stream_stream,670default_thread_pool)671return thread_pool.submit(_stream_response_in_pool, rpc_event, state,672method_handler.stream_stream,673lambda: request_iterator,674method_handler.request_deserializer,675method_handler.response_serializer)676677678def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):679680def query_handlers(handler_call_details):681for generic_handler in generic_handlers:682method_handler = generic_handler.service(handler_call_details)683if method_handler is not None:684return method_handler685return None686687handler_call_details = _HandlerCallDetails(688_common.decode(rpc_event.call_details.method),689rpc_event.invocation_metadata)690691if interceptor_pipeline is not None:692return interceptor_pipeline.execute(query_handlers,693handler_call_details)694else:695return query_handlers(handler_call_details)696697698def _reject_rpc(rpc_event, status, details):699rpc_state = _RPCState()700operations = (701_get_initial_metadata_operation(rpc_state, None),702cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),703cygrpc.SendStatusFromServerOperation(None, status, details,704_EMPTY_FLAGS),705)706rpc_event.call.start_server_batch(operations, lambda ignored_event: (707rpc_state,708(),709))710return rpc_state711712713def _handle_with_method_handler(rpc_event, method_handler, thread_pool):714state = _RPCState()715with state.condition:716rpc_event.call.start_server_batch(717(cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),718_receive_close_on_server(state))719state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)720if method_handler.request_streaming:721if method_handler.response_streaming:722return state, _handle_stream_stream(rpc_event, state,723method_handler, thread_pool)724else:725return state, _handle_stream_unary(rpc_event, state,726method_handler, thread_pool)727else:728if method_handler.response_streaming:729return state, _handle_unary_stream(rpc_event, state,730method_handler, thread_pool)731else:732return state, _handle_unary_unary(rpc_event, state,733method_handler, thread_pool)734735736def _handle_call(rpc_event, generic_handlers, interceptor_pipeline, thread_pool,737concurrency_exceeded):738if not rpc_event.success:739return None, None740if rpc_event.call_details.method is not None:741try:742method_handler = _find_method_handler(rpc_event, generic_handlers,743interceptor_pipeline)744except Exception as exception: # pylint: disable=broad-except745details = 'Exception servicing handler: {}'.format(exception)746_LOGGER.exception(details)747return _reject_rpc(rpc_event, cygrpc.StatusCode.unknown,748b'Error in service handler!'), None749if method_handler is None:750return _reject_rpc(rpc_event, cygrpc.StatusCode.unimplemented,751b'Method not found!'), None752elif concurrency_exceeded:753return _reject_rpc(rpc_event, cygrpc.StatusCode.resource_exhausted,754b'Concurrent RPC limit exceeded!'), None755else:756return _handle_with_method_handler(rpc_event, method_handler,757thread_pool)758else:759return None, None760761762@enum.unique763class _ServerStage(enum.Enum):764STOPPED = 'stopped'765STARTED = 'started'766GRACE = 'grace'767768769class _ServerState(object):770771# pylint: disable=too-many-arguments772def __init__(self, completion_queue, server, generic_handlers,773interceptor_pipeline, thread_pool, maximum_concurrent_rpcs):774self.lock = threading.RLock()775self.completion_queue = completion_queue776self.server = server777self.generic_handlers = list(generic_handlers)778self.interceptor_pipeline = interceptor_pipeline779self.thread_pool = thread_pool780self.stage = _ServerStage.STOPPED781self.termination_event = threading.Event()782self.shutdown_events = [self.termination_event]783self.maximum_concurrent_rpcs = maximum_concurrent_rpcs784self.active_rpc_count = 0785786# TODO(https://github.com/grpc/grpc/issues/6597): eliminate these fields.787self.rpc_states = set()788self.due = set()789790# A "volatile" flag to interrupt the daemon serving thread791self.server_deallocated = False792793794def _add_generic_handlers(state, generic_handlers):795with state.lock:796state.generic_handlers.extend(generic_handlers)797798799def _add_insecure_port(state, address):800with state.lock:801return state.server.add_http2_port(address)802803804def _add_secure_port(state, address, server_credentials):805with state.lock:806return state.server.add_http2_port(address,807server_credentials._credentials)808809810def _request_call(state):811state.server.request_call(state.completion_queue, state.completion_queue,812_REQUEST_CALL_TAG)813state.due.add(_REQUEST_CALL_TAG)814815816# TODO(https://github.com/grpc/grpc/issues/6597): delete this function.817def _stop_serving(state):818if not state.rpc_states and not state.due:819state.server.destroy()820for shutdown_event in state.shutdown_events:821shutdown_event.set()822state.stage = _ServerStage.STOPPED823return True824else:825return False826827828def _on_call_completed(state):829with state.lock:830state.active_rpc_count -= 1831832833def _process_event_and_continue(state, event):834should_continue = True835if event.tag is _SHUTDOWN_TAG:836with state.lock:837state.due.remove(_SHUTDOWN_TAG)838if _stop_serving(state):839should_continue = False840elif event.tag is _REQUEST_CALL_TAG:841with state.lock:842state.due.remove(_REQUEST_CALL_TAG)843concurrency_exceeded = (844state.maximum_concurrent_rpcs is not None and845state.active_rpc_count >= state.maximum_concurrent_rpcs)846rpc_state, rpc_future = _handle_call(event, state.generic_handlers,847state.interceptor_pipeline,848state.thread_pool,849concurrency_exceeded)850if rpc_state is not None:851state.rpc_states.add(rpc_state)852if rpc_future is not None:853state.active_rpc_count += 1854rpc_future.add_done_callback(855lambda unused_future: _on_call_completed(state))856if state.stage is _ServerStage.STARTED:857_request_call(state)858elif _stop_serving(state):859should_continue = False860else:861rpc_state, callbacks = event.tag(event)862for callback in callbacks:863try:864callback()865except Exception: # pylint: disable=broad-except866_LOGGER.exception('Exception calling callback!')867if rpc_state is not None:868with state.lock:869state.rpc_states.remove(rpc_state)870if _stop_serving(state):871should_continue = False872return should_continue873874875def _serve(state):876while True:877timeout = time.time() + _DEALLOCATED_SERVER_CHECK_PERIOD_S878event = state.completion_queue.poll(timeout)879if state.server_deallocated:880_begin_shutdown_once(state)881if event.completion_type != cygrpc.CompletionType.queue_timeout:882if not _process_event_and_continue(state, event):883return884# We want to force the deletion of the previous event885# ~before~ we poll again; if the event has a reference886# to a shutdown Call object, this can induce spinlock.887event = None888889890def _begin_shutdown_once(state):891with state.lock:892if state.stage is _ServerStage.STARTED:893state.server.shutdown(state.completion_queue, _SHUTDOWN_TAG)894state.stage = _ServerStage.GRACE895state.due.add(_SHUTDOWN_TAG)896897898def _stop(state, grace):899with state.lock:900if state.stage is _ServerStage.STOPPED:901shutdown_event = threading.Event()902shutdown_event.set()903return shutdown_event904else:905_begin_shutdown_once(state)906shutdown_event = threading.Event()907state.shutdown_events.append(shutdown_event)908if grace is None:909state.server.cancel_all_calls()910else:911912def cancel_all_calls_after_grace():913shutdown_event.wait(timeout=grace)914with state.lock:915state.server.cancel_all_calls()916917thread = threading.Thread(target=cancel_all_calls_after_grace)918thread.start()919return shutdown_event920shutdown_event.wait()921return shutdown_event922923924def _start(state):925with state.lock:926if state.stage is not _ServerStage.STOPPED:927raise ValueError('Cannot start already-started server!')928state.server.start()929state.stage = _ServerStage.STARTED930_request_call(state)931932thread = threading.Thread(target=_serve, args=(state,))933thread.daemon = True934thread.start()935936937def _validate_generic_rpc_handlers(generic_rpc_handlers):938for generic_rpc_handler in generic_rpc_handlers:939service_attribute = getattr(generic_rpc_handler, 'service', None)940if service_attribute is None:941raise AttributeError(942'"{}" must conform to grpc.GenericRpcHandler type but does '943'not have "service" method!'.format(generic_rpc_handler))944945946def _augment_options(base_options, compression):947compression_option = _compression.create_channel_option(compression)948return tuple(base_options) + compression_option949950951class _Server(grpc.Server):952953# pylint: disable=too-many-arguments954def __init__(self, thread_pool, generic_handlers, interceptors, options,955maximum_concurrent_rpcs, compression, xds):956completion_queue = cygrpc.CompletionQueue()957server = cygrpc.Server(_augment_options(options, compression), xds)958server.register_completion_queue(completion_queue)959self._state = _ServerState(completion_queue, server, generic_handlers,960_interceptor.service_pipeline(interceptors),961thread_pool, maximum_concurrent_rpcs)962963def add_generic_rpc_handlers(self, generic_rpc_handlers):964_validate_generic_rpc_handlers(generic_rpc_handlers)965_add_generic_handlers(self._state, generic_rpc_handlers)966967def add_insecure_port(self, address):968return _common.validate_port_binding_result(969address, _add_insecure_port(self._state, _common.encode(address)))970971def add_secure_port(self, address, server_credentials):972return _common.validate_port_binding_result(973address,974_add_secure_port(self._state, _common.encode(address),975server_credentials))976977def start(self):978_start(self._state)979980def wait_for_termination(self, timeout=None):981# NOTE(https://bugs.python.org/issue35935)982# Remove this workaround once threading.Event.wait() is working with983# CTRL+C across platforms.984return _common.wait(self._state.termination_event.wait,985self._state.termination_event.is_set,986timeout=timeout)987988def stop(self, grace):989return _stop(self._state, grace)990991def __del__(self):992if hasattr(self, '_state'):993# We can not grab a lock in __del__(), so set a flag to signal the994# serving daemon thread (if it exists) to initiate shutdown.995self._state.server_deallocated = True996997998def create_server(thread_pool, generic_rpc_handlers, interceptors, options,999maximum_concurrent_rpcs, compression, xds):1000_validate_generic_rpc_handlers(generic_rpc_handlers)1001return _Server(thread_pool, generic_rpc_handlers, interceptors, options,1002maximum_concurrent_rpcs, compression, xds)100310041005