Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/grpc/_channel.py
7796 views
# Copyright 2016 gRPC authors.1#2# Licensed under the Apache License, Version 2.0 (the "License");3# you may not use this file except in compliance with the License.4# You may obtain a copy of the License at5#6# http://www.apache.org/licenses/LICENSE-2.07#8# Unless required by applicable law or agreed to in writing, software9# distributed under the License is distributed on an "AS IS" BASIS,10# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11# See the License for the specific language governing permissions and12# limitations under the License.13"""Invocation-side implementation of gRPC Python."""1415import copy16import functools17import logging18import os19import sys20import threading21import time2223import grpc24from grpc import _common25from grpc import _compression26from grpc import _grpcio_metadata27from grpc._cython import cygrpc28import grpc.experimental2930_LOGGER = logging.getLogger(__name__)3132_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)3334_EMPTY_FLAGS = 03536# NOTE(rbellevi): No guarantees are given about the maintenance of this37# environment variable.38_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(39"GRPC_SINGLE_THREADED_UNARY_STREAM") is not None4041_UNARY_UNARY_INITIAL_DUE = (42cygrpc.OperationType.send_initial_metadata,43cygrpc.OperationType.send_message,44cygrpc.OperationType.send_close_from_client,45cygrpc.OperationType.receive_initial_metadata,46cygrpc.OperationType.receive_message,47cygrpc.OperationType.receive_status_on_client,48)49_UNARY_STREAM_INITIAL_DUE = (50cygrpc.OperationType.send_initial_metadata,51cygrpc.OperationType.send_message,52cygrpc.OperationType.send_close_from_client,53cygrpc.OperationType.receive_initial_metadata,54cygrpc.OperationType.receive_status_on_client,55)56_STREAM_UNARY_INITIAL_DUE = (57cygrpc.OperationType.send_initial_metadata,58cygrpc.OperationType.receive_initial_metadata,59cygrpc.OperationType.receive_message,60cygrpc.OperationType.receive_status_on_client,61)62_STREAM_STREAM_INITIAL_DUE = (63cygrpc.OperationType.send_initial_metadata,64cygrpc.OperationType.receive_initial_metadata,65cygrpc.OperationType.receive_status_on_client,66)6768_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (69'Exception calling channel subscription callback!')7071_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'72'\tstatus = {}\n'73'\tdetails = "{}"\n'74'>')7576_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'77'\tstatus = {}\n'78'\tdetails = "{}"\n'79'\tdebug_error_string = "{}"\n'80'>')818283def _deadline(timeout):84return None if timeout is None else time.time() + timeout858687def _unknown_code_details(unknown_cygrpc_code, details):88return 'Server sent unknown code {} and details "{}"'.format(89unknown_cygrpc_code, details)909192class _RPCState(object):9394def __init__(self, due, initial_metadata, trailing_metadata, code, details):95# `condition` guards all members of _RPCState. `notify_all` is called on96# `condition` when the state of the RPC has changed.97self.condition = threading.Condition()9899# The cygrpc.OperationType objects representing events due from the RPC's100# completion queue. If an operation is in `due`, it is guaranteed that101# `operate()` has been called on a corresponding operation. But the102# converse is not true. That is, in the case of failed `operate()`103# calls, there may briefly be events in `due` that do not correspond to104# operations submitted to Core.105self.due = set(due)106self.initial_metadata = initial_metadata107self.response = None108self.trailing_metadata = trailing_metadata109self.code = code110self.details = details111self.debug_error_string = None112113# The semantics of grpc.Future.cancel and grpc.Future.cancelled are114# slightly wonky, so they have to be tracked separately from the rest of the115# result of the RPC. This field tracks whether cancellation was requested116# prior to termination of the RPC.117self.cancelled = False118self.callbacks = []119self.fork_epoch = cygrpc.get_fork_epoch()120121def reset_postfork_child(self):122self.condition = threading.Condition()123124125def _abort(state, code, details):126if state.code is None:127state.code = code128state.details = details129if state.initial_metadata is None:130state.initial_metadata = ()131state.trailing_metadata = ()132133134def _handle_event(event, state, response_deserializer):135callbacks = []136for batch_operation in event.batch_operations:137operation_type = batch_operation.type()138state.due.remove(operation_type)139if operation_type == cygrpc.OperationType.receive_initial_metadata:140state.initial_metadata = batch_operation.initial_metadata()141elif operation_type == cygrpc.OperationType.receive_message:142serialized_response = batch_operation.message()143if serialized_response is not None:144response = _common.deserialize(serialized_response,145response_deserializer)146if response is None:147details = 'Exception deserializing response!'148_abort(state, grpc.StatusCode.INTERNAL, details)149else:150state.response = response151elif operation_type == cygrpc.OperationType.receive_status_on_client:152state.trailing_metadata = batch_operation.trailing_metadata()153if state.code is None:154code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(155batch_operation.code())156if code is None:157state.code = grpc.StatusCode.UNKNOWN158state.details = _unknown_code_details(159code, batch_operation.details())160else:161state.code = code162state.details = batch_operation.details()163state.debug_error_string = batch_operation.error_string()164callbacks.extend(state.callbacks)165state.callbacks = None166return callbacks167168169def _event_handler(state, response_deserializer):170171def handle_event(event):172with state.condition:173callbacks = _handle_event(event, state, response_deserializer)174state.condition.notify_all()175done = not state.due176for callback in callbacks:177try:178callback()179except Exception as e: # pylint: disable=broad-except180# NOTE(rbellevi): We suppress but log errors here so as not to181# kill the channel spin thread.182logging.error('Exception in callback %s: %s',183repr(callback.func), repr(e))184return done and state.fork_epoch >= cygrpc.get_fork_epoch()185186return handle_event187188189#pylint: disable=too-many-statements190def _consume_request_iterator(request_iterator, state, call, request_serializer,191event_handler):192"""Consume a request iterator supplied by the user."""193194def consume_request_iterator(): # pylint: disable=too-many-branches195# Iterate over the request iterator until it is exhausted or an error196# condition is encountered.197while True:198return_from_user_request_generator_invoked = False199try:200# The thread may die in user-code. Do not block fork for this.201cygrpc.enter_user_request_generator()202request = next(request_iterator)203except StopIteration:204break205except Exception: # pylint: disable=broad-except206cygrpc.return_from_user_request_generator()207return_from_user_request_generator_invoked = True208code = grpc.StatusCode.UNKNOWN209details = 'Exception iterating requests!'210_LOGGER.exception(details)211call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],212details)213_abort(state, code, details)214return215finally:216if not return_from_user_request_generator_invoked:217cygrpc.return_from_user_request_generator()218serialized_request = _common.serialize(request, request_serializer)219with state.condition:220if state.code is None and not state.cancelled:221if serialized_request is None:222code = grpc.StatusCode.INTERNAL223details = 'Exception serializing request!'224call.cancel(225_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],226details)227_abort(state, code, details)228return229else:230state.due.add(cygrpc.OperationType.send_message)231operations = (cygrpc.SendMessageOperation(232serialized_request, _EMPTY_FLAGS),)233operating = call.operate(operations, event_handler)234if not operating:235state.due.remove(cygrpc.OperationType.send_message)236return237238def _done():239return (state.code is not None or240cygrpc.OperationType.send_message241not in state.due)242243_common.wait(state.condition.wait,244_done,245spin_cb=functools.partial(246cygrpc.block_if_fork_in_progress,247state))248if state.code is not None:249return250else:251return252with state.condition:253if state.code is None:254state.due.add(cygrpc.OperationType.send_close_from_client)255operations = (256cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)257operating = call.operate(operations, event_handler)258if not operating:259state.due.remove(260cygrpc.OperationType.send_close_from_client)261262consumption_thread = cygrpc.ForkManagedThread(263target=consume_request_iterator)264consumption_thread.setDaemon(True)265consumption_thread.start()266267268def _rpc_state_string(class_name, rpc_state):269"""Calculates error string for RPC."""270with rpc_state.condition:271if rpc_state.code is None:272return '<{} object>'.format(class_name)273elif rpc_state.code is grpc.StatusCode.OK:274return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,275rpc_state.details)276else:277return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(278class_name, rpc_state.code, rpc_state.details,279rpc_state.debug_error_string)280281282class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):283"""An RPC error not tied to the execution of a particular RPC.284285The RPC represented by the state object must not be in-progress or286cancelled.287288Attributes:289_state: An instance of _RPCState.290"""291292def __init__(self, state):293with state.condition:294self._state = _RPCState((), copy.deepcopy(state.initial_metadata),295copy.deepcopy(state.trailing_metadata),296state.code, copy.deepcopy(state.details))297self._state.response = copy.copy(state.response)298self._state.debug_error_string = copy.copy(state.debug_error_string)299300def initial_metadata(self):301return self._state.initial_metadata302303def trailing_metadata(self):304return self._state.trailing_metadata305306def code(self):307return self._state.code308309def details(self):310return _common.decode(self._state.details)311312def debug_error_string(self):313return _common.decode(self._state.debug_error_string)314315def _repr(self):316return _rpc_state_string(self.__class__.__name__, self._state)317318def __repr__(self):319return self._repr()320321def __str__(self):322return self._repr()323324def cancel(self):325"""See grpc.Future.cancel."""326return False327328def cancelled(self):329"""See grpc.Future.cancelled."""330return False331332def running(self):333"""See grpc.Future.running."""334return False335336def done(self):337"""See grpc.Future.done."""338return True339340def result(self, timeout=None): # pylint: disable=unused-argument341"""See grpc.Future.result."""342raise self343344def exception(self, timeout=None): # pylint: disable=unused-argument345"""See grpc.Future.exception."""346return self347348def traceback(self, timeout=None): # pylint: disable=unused-argument349"""See grpc.Future.traceback."""350try:351raise self352except grpc.RpcError:353return sys.exc_info()[2]354355def add_done_callback(self, fn, timeout=None): # pylint: disable=unused-argument356"""See grpc.Future.add_done_callback."""357fn(self)358359360class _Rendezvous(grpc.RpcError, grpc.RpcContext):361"""An RPC iterator.362363Attributes:364_state: An instance of _RPCState.365_call: An instance of SegregatedCall or IntegratedCall.366In either case, the _call object is expected to have operate, cancel,367and next_event methods.368_response_deserializer: A callable taking bytes and return a Python369object.370_deadline: A float representing the deadline of the RPC in seconds. Or371possibly None, to represent an RPC with no deadline at all.372"""373374def __init__(self, state, call, response_deserializer, deadline):375super(_Rendezvous, self).__init__()376self._state = state377self._call = call378self._response_deserializer = response_deserializer379self._deadline = deadline380381def is_active(self):382"""See grpc.RpcContext.is_active"""383with self._state.condition:384return self._state.code is None385386def time_remaining(self):387"""See grpc.RpcContext.time_remaining"""388with self._state.condition:389if self._deadline is None:390return None391else:392return max(self._deadline - time.time(), 0)393394def cancel(self):395"""See grpc.RpcContext.cancel"""396with self._state.condition:397if self._state.code is None:398code = grpc.StatusCode.CANCELLED399details = 'Locally cancelled by application!'400self._call.cancel(401_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)402self._state.cancelled = True403_abort(self._state, code, details)404self._state.condition.notify_all()405return True406else:407return False408409def add_callback(self, callback):410"""See grpc.RpcContext.add_callback"""411with self._state.condition:412if self._state.callbacks is None:413return False414else:415self._state.callbacks.append(callback)416return True417418def __iter__(self):419return self420421def next(self):422return self._next()423424def __next__(self):425return self._next()426427def _next(self):428raise NotImplementedError()429430def debug_error_string(self):431raise NotImplementedError()432433def _repr(self):434return _rpc_state_string(self.__class__.__name__, self._state)435436def __repr__(self):437return self._repr()438439def __str__(self):440return self._repr()441442def __del__(self):443with self._state.condition:444if self._state.code is None:445self._state.code = grpc.StatusCode.CANCELLED446self._state.details = 'Cancelled upon garbage collection!'447self._state.cancelled = True448self._call.cancel(449_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],450self._state.details)451self._state.condition.notify_all()452453454class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors455"""An RPC iterator operating entirely on a single thread.456457The __next__ method of _SingleThreadedRendezvous does not depend on the458existence of any other thread, including the "channel spin thread".459However, this means that its interface is entirely synchronous. So this460class cannot completely fulfill the grpc.Future interface. The result,461exception, and traceback methods will never block and will instead raise462an exception if calling the method would result in blocking.463464This means that these methods are safe to call from add_done_callback465handlers.466"""467468def _is_complete(self):469return self._state.code is not None470471def cancelled(self):472with self._state.condition:473return self._state.cancelled474475def running(self):476with self._state.condition:477return self._state.code is None478479def done(self):480with self._state.condition:481return self._state.code is not None482483def result(self, timeout=None):484"""Returns the result of the computation or raises its exception.485486This method will never block. Instead, it will raise an exception487if calling this method would otherwise result in blocking.488489Since this method will never block, any `timeout` argument passed will490be ignored.491"""492del timeout493with self._state.condition:494if not self._is_complete():495raise grpc.experimental.UsageError(496"_SingleThreadedRendezvous only supports result() when the RPC is complete."497)498if self._state.code is grpc.StatusCode.OK:499return self._state.response500elif self._state.cancelled:501raise grpc.FutureCancelledError()502else:503raise self504505def exception(self, timeout=None):506"""Return the exception raised by the computation.507508This method will never block. Instead, it will raise an exception509if calling this method would otherwise result in blocking.510511Since this method will never block, any `timeout` argument passed will512be ignored.513"""514del timeout515with self._state.condition:516if not self._is_complete():517raise grpc.experimental.UsageError(518"_SingleThreadedRendezvous only supports exception() when the RPC is complete."519)520if self._state.code is grpc.StatusCode.OK:521return None522elif self._state.cancelled:523raise grpc.FutureCancelledError()524else:525return self526527def traceback(self, timeout=None):528"""Access the traceback of the exception raised by the computation.529530This method will never block. Instead, it will raise an exception531if calling this method would otherwise result in blocking.532533Since this method will never block, any `timeout` argument passed will534be ignored.535"""536del timeout537with self._state.condition:538if not self._is_complete():539raise grpc.experimental.UsageError(540"_SingleThreadedRendezvous only supports traceback() when the RPC is complete."541)542if self._state.code is grpc.StatusCode.OK:543return None544elif self._state.cancelled:545raise grpc.FutureCancelledError()546else:547try:548raise self549except grpc.RpcError:550return sys.exc_info()[2]551552def add_done_callback(self, fn):553with self._state.condition:554if self._state.code is None:555self._state.callbacks.append(functools.partial(fn, self))556return557558fn(self)559560def initial_metadata(self):561"""See grpc.Call.initial_metadata"""562with self._state.condition:563# NOTE(gnossen): Based on our initial call batch, we are guaranteed564# to receive initial metadata before any messages.565while self._state.initial_metadata is None:566self._consume_next_event()567return self._state.initial_metadata568569def trailing_metadata(self):570"""See grpc.Call.trailing_metadata"""571with self._state.condition:572if self._state.trailing_metadata is None:573raise grpc.experimental.UsageError(574"Cannot get trailing metadata until RPC is completed.")575return self._state.trailing_metadata576577def code(self):578"""See grpc.Call.code"""579with self._state.condition:580if self._state.code is None:581raise grpc.experimental.UsageError(582"Cannot get code until RPC is completed.")583return self._state.code584585def details(self):586"""See grpc.Call.details"""587with self._state.condition:588if self._state.details is None:589raise grpc.experimental.UsageError(590"Cannot get details until RPC is completed.")591return _common.decode(self._state.details)592593def _consume_next_event(self):594event = self._call.next_event()595with self._state.condition:596callbacks = _handle_event(event, self._state,597self._response_deserializer)598for callback in callbacks:599# NOTE(gnossen): We intentionally allow exceptions to bubble up600# to the user when running on a single thread.601callback()602return event603604def _next_response(self):605while True:606self._consume_next_event()607with self._state.condition:608if self._state.response is not None:609response = self._state.response610self._state.response = None611return response612elif cygrpc.OperationType.receive_message not in self._state.due:613if self._state.code is grpc.StatusCode.OK:614raise StopIteration()615elif self._state.code is not None:616raise self617618def _next(self):619with self._state.condition:620if self._state.code is None:621# We tentatively add the operation as expected and remove622# it if the enqueue operation fails. This allows us to guarantee that623# if an event has been submitted to the core completion queue,624# it is in `due`. If we waited until after a successful625# enqueue operation then a signal could interrupt this626# thread between the enqueue operation and the addition of the627# operation to `due`. This would cause an exception on the628# channel spin thread when the operation completes and no629# corresponding operation would be present in state.due.630# Note that, since `condition` is held through this block, there is631# no data race on `due`.632self._state.due.add(cygrpc.OperationType.receive_message)633operating = self._call.operate(634(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)635if not operating:636self._state.due.remove(cygrpc.OperationType.receive_message)637elif self._state.code is grpc.StatusCode.OK:638raise StopIteration()639else:640raise self641return self._next_response()642643def debug_error_string(self):644with self._state.condition:645if self._state.debug_error_string is None:646raise grpc.experimental.UsageError(647"Cannot get debug error string until RPC is completed.")648return _common.decode(self._state.debug_error_string)649650651class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors652"""An RPC iterator that depends on a channel spin thread.653654This iterator relies upon a per-channel thread running in the background,655dequeueing events from the completion queue, and notifying threads waiting656on the threading.Condition object in the _RPCState object.657658This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface659and to mediate a bidirection streaming RPC.660"""661662def initial_metadata(self):663"""See grpc.Call.initial_metadata"""664with self._state.condition:665666def _done():667return self._state.initial_metadata is not None668669_common.wait(self._state.condition.wait, _done)670return self._state.initial_metadata671672def trailing_metadata(self):673"""See grpc.Call.trailing_metadata"""674with self._state.condition:675676def _done():677return self._state.trailing_metadata is not None678679_common.wait(self._state.condition.wait, _done)680return self._state.trailing_metadata681682def code(self):683"""See grpc.Call.code"""684with self._state.condition:685686def _done():687return self._state.code is not None688689_common.wait(self._state.condition.wait, _done)690return self._state.code691692def details(self):693"""See grpc.Call.details"""694with self._state.condition:695696def _done():697return self._state.details is not None698699_common.wait(self._state.condition.wait, _done)700return _common.decode(self._state.details)701702def debug_error_string(self):703with self._state.condition:704705def _done():706return self._state.debug_error_string is not None707708_common.wait(self._state.condition.wait, _done)709return _common.decode(self._state.debug_error_string)710711def cancelled(self):712with self._state.condition:713return self._state.cancelled714715def running(self):716with self._state.condition:717return self._state.code is None718719def done(self):720with self._state.condition:721return self._state.code is not None722723def _is_complete(self):724return self._state.code is not None725726def result(self, timeout=None):727"""Returns the result of the computation or raises its exception.728729See grpc.Future.result for the full API contract.730"""731with self._state.condition:732timed_out = _common.wait(self._state.condition.wait,733self._is_complete,734timeout=timeout)735if timed_out:736raise grpc.FutureTimeoutError()737else:738if self._state.code is grpc.StatusCode.OK:739return self._state.response740elif self._state.cancelled:741raise grpc.FutureCancelledError()742else:743raise self744745def exception(self, timeout=None):746"""Return the exception raised by the computation.747748See grpc.Future.exception for the full API contract.749"""750with self._state.condition:751timed_out = _common.wait(self._state.condition.wait,752self._is_complete,753timeout=timeout)754if timed_out:755raise grpc.FutureTimeoutError()756else:757if self._state.code is grpc.StatusCode.OK:758return None759elif self._state.cancelled:760raise grpc.FutureCancelledError()761else:762return self763764def traceback(self, timeout=None):765"""Access the traceback of the exception raised by the computation.766767See grpc.future.traceback for the full API contract.768"""769with self._state.condition:770timed_out = _common.wait(self._state.condition.wait,771self._is_complete,772timeout=timeout)773if timed_out:774raise grpc.FutureTimeoutError()775else:776if self._state.code is grpc.StatusCode.OK:777return None778elif self._state.cancelled:779raise grpc.FutureCancelledError()780else:781try:782raise self783except grpc.RpcError:784return sys.exc_info()[2]785786def add_done_callback(self, fn):787with self._state.condition:788if self._state.code is None:789self._state.callbacks.append(functools.partial(fn, self))790return791792fn(self)793794def _next(self):795with self._state.condition:796if self._state.code is None:797event_handler = _event_handler(self._state,798self._response_deserializer)799self._state.due.add(cygrpc.OperationType.receive_message)800operating = self._call.operate(801(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),802event_handler)803if not operating:804self._state.due.remove(cygrpc.OperationType.receive_message)805elif self._state.code is grpc.StatusCode.OK:806raise StopIteration()807else:808raise self809810def _response_ready():811return (self._state.response is not None or812(cygrpc.OperationType.receive_message813not in self._state.due and814self._state.code is not None))815816_common.wait(self._state.condition.wait, _response_ready)817if self._state.response is not None:818response = self._state.response819self._state.response = None820return response821elif cygrpc.OperationType.receive_message not in self._state.due:822if self._state.code is grpc.StatusCode.OK:823raise StopIteration()824elif self._state.code is not None:825raise self826827828def _start_unary_request(request, timeout, request_serializer):829deadline = _deadline(timeout)830serialized_request = _common.serialize(request, request_serializer)831if serialized_request is None:832state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,833'Exception serializing request!')834error = _InactiveRpcError(state)835return deadline, None, error836else:837return deadline, serialized_request, None838839840def _end_unary_response_blocking(state, call, with_call, deadline):841if state.code is grpc.StatusCode.OK:842if with_call:843rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)844return state.response, rendezvous845else:846return state.response847else:848raise _InactiveRpcError(state)849850851def _stream_unary_invocation_operationses(metadata, initial_metadata_flags):852return (853(854cygrpc.SendInitialMetadataOperation(metadata,855initial_metadata_flags),856cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),857cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),858),859(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),860)861862863def _stream_unary_invocation_operationses_and_tags(metadata,864initial_metadata_flags):865return tuple((866operations,867None,868) for operations in _stream_unary_invocation_operationses(869metadata, initial_metadata_flags))870871872def _determine_deadline(user_deadline):873parent_deadline = cygrpc.get_deadline_from_context()874if parent_deadline is None and user_deadline is None:875return None876elif parent_deadline is not None and user_deadline is None:877return parent_deadline878elif user_deadline is not None and parent_deadline is None:879return user_deadline880else:881return min(parent_deadline, user_deadline)882883884class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):885886# pylint: disable=too-many-arguments887def __init__(self, channel, managed_call, method, request_serializer,888response_deserializer):889self._channel = channel890self._managed_call = managed_call891self._method = method892self._request_serializer = request_serializer893self._response_deserializer = response_deserializer894self._context = cygrpc.build_census_context()895896def _prepare(self, request, timeout, metadata, wait_for_ready, compression):897deadline, serialized_request, rendezvous = _start_unary_request(898request, timeout, self._request_serializer)899initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(900wait_for_ready)901augmented_metadata = _compression.augment_metadata(902metadata, compression)903if serialized_request is None:904return None, None, None, rendezvous905else:906state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)907operations = (908cygrpc.SendInitialMetadataOperation(augmented_metadata,909initial_metadata_flags),910cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),911cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),912cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),913cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),914cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),915)916return state, operations, deadline, None917918def _blocking(self, request, timeout, metadata, credentials, wait_for_ready,919compression):920state, operations, deadline, rendezvous = self._prepare(921request, timeout, metadata, wait_for_ready, compression)922if state is None:923raise rendezvous # pylint: disable-msg=raising-bad-type924else:925call = self._channel.segregated_call(926cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,927self._method, None, _determine_deadline(deadline), metadata,928None if credentials is None else credentials._credentials, ((929operations,930None,931),), self._context)932event = call.next_event()933_handle_event(event, state, self._response_deserializer)934return state, call935936def __call__(self,937request,938timeout=None,939metadata=None,940credentials=None,941wait_for_ready=None,942compression=None):943state, call, = self._blocking(request, timeout, metadata, credentials,944wait_for_ready, compression)945return _end_unary_response_blocking(state, call, False, None)946947def with_call(self,948request,949timeout=None,950metadata=None,951credentials=None,952wait_for_ready=None,953compression=None):954state, call, = self._blocking(request, timeout, metadata, credentials,955wait_for_ready, compression)956return _end_unary_response_blocking(state, call, True, None)957958def future(self,959request,960timeout=None,961metadata=None,962credentials=None,963wait_for_ready=None,964compression=None):965state, operations, deadline, rendezvous = self._prepare(966request, timeout, metadata, wait_for_ready, compression)967if state is None:968raise rendezvous # pylint: disable-msg=raising-bad-type969else:970event_handler = _event_handler(state, self._response_deserializer)971call = self._managed_call(972cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,973self._method, None, deadline, metadata,974None if credentials is None else credentials._credentials,975(operations,), event_handler, self._context)976return _MultiThreadedRendezvous(state, call,977self._response_deserializer,978deadline)979980981class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):982983# pylint: disable=too-many-arguments984def __init__(self, channel, method, request_serializer,985response_deserializer):986self._channel = channel987self._method = method988self._request_serializer = request_serializer989self._response_deserializer = response_deserializer990self._context = cygrpc.build_census_context()991992def __call__( # pylint: disable=too-many-locals993self,994request,995timeout=None,996metadata=None,997credentials=None,998wait_for_ready=None,999compression=None):1000deadline = _deadline(timeout)1001serialized_request = _common.serialize(request,1002self._request_serializer)1003if serialized_request is None:1004state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,1005'Exception serializing request!')1006raise _InactiveRpcError(state)10071008state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)1009call_credentials = None if credentials is None else credentials._credentials1010initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(1011wait_for_ready)1012augmented_metadata = _compression.augment_metadata(1013metadata, compression)1014operations = (1015(cygrpc.SendInitialMetadataOperation(augmented_metadata,1016initial_metadata_flags),1017cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),1018cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),1019(cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),1020(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),1021)1022operations_and_tags = tuple((ops, None) for ops in operations)1023call = self._channel.segregated_call(1024cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,1025None, _determine_deadline(deadline), metadata, call_credentials,1026operations_and_tags, self._context)1027return _SingleThreadedRendezvous(state, call,1028self._response_deserializer, deadline)102910301031class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):10321033# pylint: disable=too-many-arguments1034def __init__(self, channel, managed_call, method, request_serializer,1035response_deserializer):1036self._channel = channel1037self._managed_call = managed_call1038self._method = method1039self._request_serializer = request_serializer1040self._response_deserializer = response_deserializer1041self._context = cygrpc.build_census_context()10421043def __call__( # pylint: disable=too-many-locals1044self,1045request,1046timeout=None,1047metadata=None,1048credentials=None,1049wait_for_ready=None,1050compression=None):1051deadline, serialized_request, rendezvous = _start_unary_request(1052request, timeout, self._request_serializer)1053initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(1054wait_for_ready)1055if serialized_request is None:1056raise rendezvous # pylint: disable-msg=raising-bad-type1057else:1058augmented_metadata = _compression.augment_metadata(1059metadata, compression)1060state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)1061operationses = (1062(1063cygrpc.SendInitialMetadataOperation(augmented_metadata,1064initial_metadata_flags),1065cygrpc.SendMessageOperation(serialized_request,1066_EMPTY_FLAGS),1067cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),1068cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),1069),1070(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),1071)1072call = self._managed_call(1073cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,1074self._method, None, _determine_deadline(deadline), metadata,1075None if credentials is None else credentials._credentials,1076operationses, _event_handler(state,1077self._response_deserializer),1078self._context)1079return _MultiThreadedRendezvous(state, call,1080self._response_deserializer,1081deadline)108210831084class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):10851086# pylint: disable=too-many-arguments1087def __init__(self, channel, managed_call, method, request_serializer,1088response_deserializer):1089self._channel = channel1090self._managed_call = managed_call1091self._method = method1092self._request_serializer = request_serializer1093self._response_deserializer = response_deserializer1094self._context = cygrpc.build_census_context()10951096def _blocking(self, request_iterator, timeout, metadata, credentials,1097wait_for_ready, compression):1098deadline = _deadline(timeout)1099state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)1100initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(1101wait_for_ready)1102augmented_metadata = _compression.augment_metadata(1103metadata, compression)1104call = self._channel.segregated_call(1105cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,1106None, _determine_deadline(deadline), augmented_metadata,1107None if credentials is None else credentials._credentials,1108_stream_unary_invocation_operationses_and_tags(1109augmented_metadata, initial_metadata_flags), self._context)1110_consume_request_iterator(request_iterator, state, call,1111self._request_serializer, None)1112while True:1113event = call.next_event()1114with state.condition:1115_handle_event(event, state, self._response_deserializer)1116state.condition.notify_all()1117if not state.due:1118break1119return state, call11201121def __call__(self,1122request_iterator,1123timeout=None,1124metadata=None,1125credentials=None,1126wait_for_ready=None,1127compression=None):1128state, call, = self._blocking(request_iterator, timeout, metadata,1129credentials, wait_for_ready, compression)1130return _end_unary_response_blocking(state, call, False, None)11311132def with_call(self,1133request_iterator,1134timeout=None,1135metadata=None,1136credentials=None,1137wait_for_ready=None,1138compression=None):1139state, call, = self._blocking(request_iterator, timeout, metadata,1140credentials, wait_for_ready, compression)1141return _end_unary_response_blocking(state, call, True, None)11421143def future(self,1144request_iterator,1145timeout=None,1146metadata=None,1147credentials=None,1148wait_for_ready=None,1149compression=None):1150deadline = _deadline(timeout)1151state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)1152event_handler = _event_handler(state, self._response_deserializer)1153initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(1154wait_for_ready)1155augmented_metadata = _compression.augment_metadata(1156metadata, compression)1157call = self._managed_call(1158cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,1159None, deadline, augmented_metadata,1160None if credentials is None else credentials._credentials,1161_stream_unary_invocation_operationses(metadata,1162initial_metadata_flags),1163event_handler, self._context)1164_consume_request_iterator(request_iterator, state, call,1165self._request_serializer, event_handler)1166return _MultiThreadedRendezvous(state, call,1167self._response_deserializer, deadline)116811691170class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):11711172# pylint: disable=too-many-arguments1173def __init__(self, channel, managed_call, method, request_serializer,1174response_deserializer):1175self._channel = channel1176self._managed_call = managed_call1177self._method = method1178self._request_serializer = request_serializer1179self._response_deserializer = response_deserializer1180self._context = cygrpc.build_census_context()11811182def __call__(self,1183request_iterator,1184timeout=None,1185metadata=None,1186credentials=None,1187wait_for_ready=None,1188compression=None):1189deadline = _deadline(timeout)1190state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)1191initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(1192wait_for_ready)1193augmented_metadata = _compression.augment_metadata(1194metadata, compression)1195operationses = (1196(1197cygrpc.SendInitialMetadataOperation(augmented_metadata,1198initial_metadata_flags),1199cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),1200),1201(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),1202)1203event_handler = _event_handler(state, self._response_deserializer)1204call = self._managed_call(1205cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,1206None, _determine_deadline(deadline), augmented_metadata,1207None if credentials is None else credentials._credentials,1208operationses, event_handler, self._context)1209_consume_request_iterator(request_iterator, state, call,1210self._request_serializer, event_handler)1211return _MultiThreadedRendezvous(state, call,1212self._response_deserializer, deadline)121312141215class _InitialMetadataFlags(int):1216"""Stores immutable initial metadata flags"""12171218def __new__(cls, value=_EMPTY_FLAGS):1219value &= cygrpc.InitialMetadataFlags.used_mask1220return super(_InitialMetadataFlags, cls).__new__(cls, value)12211222def with_wait_for_ready(self, wait_for_ready):1223if wait_for_ready is not None:1224if wait_for_ready:1225return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \1226cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)1227elif not wait_for_ready:1228return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \1229cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)1230return self123112321233class _ChannelCallState(object):12341235def __init__(self, channel):1236self.lock = threading.Lock()1237self.channel = channel1238self.managed_calls = 01239self.threading = False12401241def reset_postfork_child(self):1242self.managed_calls = 012431244def __del__(self):1245try:1246self.channel.close(cygrpc.StatusCode.cancelled,1247'Channel deallocated!')1248except (TypeError, AttributeError):1249pass125012511252def _run_channel_spin_thread(state):12531254def channel_spin():1255while True:1256cygrpc.block_if_fork_in_progress(state)1257event = state.channel.next_call_event()1258if event.completion_type == cygrpc.CompletionType.queue_timeout:1259continue1260call_completed = event.tag(event)1261if call_completed:1262with state.lock:1263state.managed_calls -= 11264if state.managed_calls == 0:1265return12661267channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)1268channel_spin_thread.setDaemon(True)1269channel_spin_thread.start()127012711272def _channel_managed_call_management(state):12731274# pylint: disable=too-many-arguments1275def create(flags, method, host, deadline, metadata, credentials,1276operationses, event_handler, context):1277"""Creates a cygrpc.IntegratedCall.12781279Args:1280flags: An integer bitfield of call flags.1281method: The RPC method.1282host: A host string for the created call.1283deadline: A float to be the deadline of the created call or None if1284the call is to have an infinite deadline.1285metadata: The metadata for the call or None.1286credentials: A cygrpc.CallCredentials or None.1287operationses: An iterable of iterables of cygrpc.Operations to be1288started on the call.1289event_handler: A behavior to call to handle the events resultant from1290the operations on the call.1291context: Context object for distributed tracing.1292Returns:1293A cygrpc.IntegratedCall with which to conduct an RPC.1294"""1295operationses_and_tags = tuple((1296operations,1297event_handler,1298) for operations in operationses)1299with state.lock:1300call = state.channel.integrated_call(flags, method, host, deadline,1301metadata, credentials,1302operationses_and_tags, context)1303if state.managed_calls == 0:1304state.managed_calls = 11305_run_channel_spin_thread(state)1306else:1307state.managed_calls += 11308return call13091310return create131113121313class _ChannelConnectivityState(object):13141315def __init__(self, channel):1316self.lock = threading.RLock()1317self.channel = channel1318self.polling = False1319self.connectivity = None1320self.try_to_connect = False1321self.callbacks_and_connectivities = []1322self.delivering = False13231324def reset_postfork_child(self):1325self.polling = False1326self.connectivity = None1327self.try_to_connect = False1328self.callbacks_and_connectivities = []1329self.delivering = False133013311332def _deliveries(state):1333callbacks_needing_update = []1334for callback_and_connectivity in state.callbacks_and_connectivities:1335callback, callback_connectivity, = callback_and_connectivity1336if callback_connectivity is not state.connectivity:1337callbacks_needing_update.append(callback)1338callback_and_connectivity[1] = state.connectivity1339return callbacks_needing_update134013411342def _deliver(state, initial_connectivity, initial_callbacks):1343connectivity = initial_connectivity1344callbacks = initial_callbacks1345while True:1346for callback in callbacks:1347cygrpc.block_if_fork_in_progress(state)1348try:1349callback(connectivity)1350except Exception: # pylint: disable=broad-except1351_LOGGER.exception(1352_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)1353with state.lock:1354callbacks = _deliveries(state)1355if callbacks:1356connectivity = state.connectivity1357else:1358state.delivering = False1359return136013611362def _spawn_delivery(state, callbacks):1363delivering_thread = cygrpc.ForkManagedThread(target=_deliver,1364args=(1365state,1366state.connectivity,1367callbacks,1368))1369delivering_thread.setDaemon(True)1370delivering_thread.start()1371state.delivering = True137213731374# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.1375def _poll_connectivity(state, channel, initial_try_to_connect):1376try_to_connect = initial_try_to_connect1377connectivity = channel.check_connectivity_state(try_to_connect)1378with state.lock:1379state.connectivity = (1380_common.1381CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])1382callbacks = tuple(1383callback for callback, unused_but_known_to_be_none_connectivity in1384state.callbacks_and_connectivities)1385for callback_and_connectivity in state.callbacks_and_connectivities:1386callback_and_connectivity[1] = state.connectivity1387if callbacks:1388_spawn_delivery(state, callbacks)1389while True:1390event = channel.watch_connectivity_state(connectivity,1391time.time() + 0.2)1392cygrpc.block_if_fork_in_progress(state)1393with state.lock:1394if not state.callbacks_and_connectivities and not state.try_to_connect:1395state.polling = False1396state.connectivity = None1397break1398try_to_connect = state.try_to_connect1399state.try_to_connect = False1400if event.success or try_to_connect:1401connectivity = channel.check_connectivity_state(try_to_connect)1402with state.lock:1403state.connectivity = (1404_common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[1405connectivity])1406if not state.delivering:1407callbacks = _deliveries(state)1408if callbacks:1409_spawn_delivery(state, callbacks)141014111412def _subscribe(state, callback, try_to_connect):1413with state.lock:1414if not state.callbacks_and_connectivities and not state.polling:1415polling_thread = cygrpc.ForkManagedThread(1416target=_poll_connectivity,1417args=(state, state.channel, bool(try_to_connect)))1418polling_thread.setDaemon(True)1419polling_thread.start()1420state.polling = True1421state.callbacks_and_connectivities.append([callback, None])1422elif not state.delivering and state.connectivity is not None:1423_spawn_delivery(state, (callback,))1424state.try_to_connect |= bool(try_to_connect)1425state.callbacks_and_connectivities.append(1426[callback, state.connectivity])1427else:1428state.try_to_connect |= bool(try_to_connect)1429state.callbacks_and_connectivities.append([callback, None])143014311432def _unsubscribe(state, callback):1433with state.lock:1434for index, (subscribed_callback, unused_connectivity) in enumerate(1435state.callbacks_and_connectivities):1436if callback == subscribed_callback:1437state.callbacks_and_connectivities.pop(index)1438break143914401441def _augment_options(base_options, compression):1442compression_option = _compression.create_channel_option(compression)1443return tuple(base_options) + compression_option + ((1444cygrpc.ChannelArgKey.primary_user_agent_string,1445_USER_AGENT,1446),)144714481449def _separate_channel_options(options):1450"""Separates core channel options from Python channel options."""1451core_options = []1452python_options = []1453for pair in options:1454if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:1455python_options.append(pair)1456else:1457core_options.append(pair)1458return python_options, core_options145914601461class Channel(grpc.Channel):1462"""A cygrpc.Channel-backed implementation of grpc.Channel."""14631464def __init__(self, target, options, credentials, compression):1465"""Constructor.14661467Args:1468target: The target to which to connect.1469options: Configuration options for the channel.1470credentials: A cygrpc.ChannelCredentials or None.1471compression: An optional value indicating the compression method to be1472used over the lifetime of the channel.1473"""1474python_options, core_options = _separate_channel_options(options)1475self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM1476self._process_python_options(python_options)1477self._channel = cygrpc.Channel(1478_common.encode(target), _augment_options(core_options, compression),1479credentials)1480self._call_state = _ChannelCallState(self._channel)1481self._connectivity_state = _ChannelConnectivityState(self._channel)1482cygrpc.fork_register_channel(self)1483if cygrpc.g_gevent_activated:1484cygrpc.gevent_increment_channel_count()14851486def _process_python_options(self, python_options):1487"""Sets channel attributes according to python-only channel options."""1488for pair in python_options:1489if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:1490self._single_threaded_unary_stream = True14911492def subscribe(self, callback, try_to_connect=None):1493_subscribe(self._connectivity_state, callback, try_to_connect)14941495def unsubscribe(self, callback):1496_unsubscribe(self._connectivity_state, callback)14971498def unary_unary(self,1499method,1500request_serializer=None,1501response_deserializer=None):1502return _UnaryUnaryMultiCallable(1503self._channel, _channel_managed_call_management(self._call_state),1504_common.encode(method), request_serializer, response_deserializer)15051506def unary_stream(self,1507method,1508request_serializer=None,1509response_deserializer=None):1510# NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC1511# on a single Python thread results in an appreciable speed-up. However,1512# due to slight differences in capability, the multi-threaded variant1513# remains the default.1514if self._single_threaded_unary_stream:1515return _SingleThreadedUnaryStreamMultiCallable(1516self._channel, _common.encode(method), request_serializer,1517response_deserializer)1518else:1519return _UnaryStreamMultiCallable(1520self._channel,1521_channel_managed_call_management(self._call_state),1522_common.encode(method), request_serializer,1523response_deserializer)15241525def stream_unary(self,1526method,1527request_serializer=None,1528response_deserializer=None):1529return _StreamUnaryMultiCallable(1530self._channel, _channel_managed_call_management(self._call_state),1531_common.encode(method), request_serializer, response_deserializer)15321533def stream_stream(self,1534method,1535request_serializer=None,1536response_deserializer=None):1537return _StreamStreamMultiCallable(1538self._channel, _channel_managed_call_management(self._call_state),1539_common.encode(method), request_serializer, response_deserializer)15401541def _unsubscribe_all(self):1542state = self._connectivity_state1543if state:1544with state.lock:1545del state.callbacks_and_connectivities[:]15461547def _close(self):1548self._unsubscribe_all()1549self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')1550cygrpc.fork_unregister_channel(self)1551if cygrpc.g_gevent_activated:1552cygrpc.gevent_decrement_channel_count()15531554def _close_on_fork(self):1555self._unsubscribe_all()1556self._channel.close_on_fork(cygrpc.StatusCode.cancelled,1557'Channel closed due to fork')15581559def __enter__(self):1560return self15611562def __exit__(self, exc_type, exc_val, exc_tb):1563self._close()1564return False15651566def close(self):1567self._close()15681569def __del__(self):1570# TODO(https://github.com/grpc/grpc/issues/12531): Several releases1571# after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call1572# here (or more likely, call self._close() here). We don't do this today1573# because many valid use cases today allow the channel to be deleted1574# immediately after stubs are created. After a sufficient period of time1575# has passed for all users to be trusted to freeze out to their channels1576# for as long as they are in use and to close them after using them,1577# then deletion of this grpc._channel.Channel instance can be made to1578# effect closure of the underlying cygrpc.Channel instance.1579try:1580self._unsubscribe_all()1581except: # pylint: disable=bare-except1582# Exceptions in __del__ are ignored by Python anyway, but they can1583# keep spamming logs. Just silence them.1584pass158515861587