Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/grpc/_channel.py
7796 views
1
# Copyright 2016 gRPC authors.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
# http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
"""Invocation-side implementation of gRPC Python."""
15
16
import copy
17
import functools
18
import logging
19
import os
20
import sys
21
import threading
22
import time
23
24
import grpc
25
from grpc import _common
26
from grpc import _compression
27
from grpc import _grpcio_metadata
28
from grpc._cython import cygrpc
29
import grpc.experimental
30
31
_LOGGER = logging.getLogger(__name__)
32
33
_USER_AGENT = 'grpc-python/{}'.format(_grpcio_metadata.__version__)
34
35
_EMPTY_FLAGS = 0
36
37
# NOTE(rbellevi): No guarantees are given about the maintenance of this
38
# environment variable.
39
_DEFAULT_SINGLE_THREADED_UNARY_STREAM = os.getenv(
40
"GRPC_SINGLE_THREADED_UNARY_STREAM") is not None
41
42
_UNARY_UNARY_INITIAL_DUE = (
43
cygrpc.OperationType.send_initial_metadata,
44
cygrpc.OperationType.send_message,
45
cygrpc.OperationType.send_close_from_client,
46
cygrpc.OperationType.receive_initial_metadata,
47
cygrpc.OperationType.receive_message,
48
cygrpc.OperationType.receive_status_on_client,
49
)
50
_UNARY_STREAM_INITIAL_DUE = (
51
cygrpc.OperationType.send_initial_metadata,
52
cygrpc.OperationType.send_message,
53
cygrpc.OperationType.send_close_from_client,
54
cygrpc.OperationType.receive_initial_metadata,
55
cygrpc.OperationType.receive_status_on_client,
56
)
57
_STREAM_UNARY_INITIAL_DUE = (
58
cygrpc.OperationType.send_initial_metadata,
59
cygrpc.OperationType.receive_initial_metadata,
60
cygrpc.OperationType.receive_message,
61
cygrpc.OperationType.receive_status_on_client,
62
)
63
_STREAM_STREAM_INITIAL_DUE = (
64
cygrpc.OperationType.send_initial_metadata,
65
cygrpc.OperationType.receive_initial_metadata,
66
cygrpc.OperationType.receive_status_on_client,
67
)
68
69
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
70
'Exception calling channel subscription callback!')
71
72
_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
73
'\tstatus = {}\n'
74
'\tdetails = "{}"\n'
75
'>')
76
77
_NON_OK_RENDEZVOUS_REPR_FORMAT = ('<{} of RPC that terminated with:\n'
78
'\tstatus = {}\n'
79
'\tdetails = "{}"\n'
80
'\tdebug_error_string = "{}"\n'
81
'>')
82
83
84
def _deadline(timeout):
85
return None if timeout is None else time.time() + timeout
86
87
88
def _unknown_code_details(unknown_cygrpc_code, details):
89
return 'Server sent unknown code {} and details "{}"'.format(
90
unknown_cygrpc_code, details)
91
92
93
class _RPCState(object):
94
95
def __init__(self, due, initial_metadata, trailing_metadata, code, details):
96
# `condition` guards all members of _RPCState. `notify_all` is called on
97
# `condition` when the state of the RPC has changed.
98
self.condition = threading.Condition()
99
100
# The cygrpc.OperationType objects representing events due from the RPC's
101
# completion queue. If an operation is in `due`, it is guaranteed that
102
# `operate()` has been called on a corresponding operation. But the
103
# converse is not true. That is, in the case of failed `operate()`
104
# calls, there may briefly be events in `due` that do not correspond to
105
# operations submitted to Core.
106
self.due = set(due)
107
self.initial_metadata = initial_metadata
108
self.response = None
109
self.trailing_metadata = trailing_metadata
110
self.code = code
111
self.details = details
112
self.debug_error_string = None
113
114
# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
115
# slightly wonky, so they have to be tracked separately from the rest of the
116
# result of the RPC. This field tracks whether cancellation was requested
117
# prior to termination of the RPC.
118
self.cancelled = False
119
self.callbacks = []
120
self.fork_epoch = cygrpc.get_fork_epoch()
121
122
def reset_postfork_child(self):
123
self.condition = threading.Condition()
124
125
126
def _abort(state, code, details):
127
if state.code is None:
128
state.code = code
129
state.details = details
130
if state.initial_metadata is None:
131
state.initial_metadata = ()
132
state.trailing_metadata = ()
133
134
135
def _handle_event(event, state, response_deserializer):
136
callbacks = []
137
for batch_operation in event.batch_operations:
138
operation_type = batch_operation.type()
139
state.due.remove(operation_type)
140
if operation_type == cygrpc.OperationType.receive_initial_metadata:
141
state.initial_metadata = batch_operation.initial_metadata()
142
elif operation_type == cygrpc.OperationType.receive_message:
143
serialized_response = batch_operation.message()
144
if serialized_response is not None:
145
response = _common.deserialize(serialized_response,
146
response_deserializer)
147
if response is None:
148
details = 'Exception deserializing response!'
149
_abort(state, grpc.StatusCode.INTERNAL, details)
150
else:
151
state.response = response
152
elif operation_type == cygrpc.OperationType.receive_status_on_client:
153
state.trailing_metadata = batch_operation.trailing_metadata()
154
if state.code is None:
155
code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
156
batch_operation.code())
157
if code is None:
158
state.code = grpc.StatusCode.UNKNOWN
159
state.details = _unknown_code_details(
160
code, batch_operation.details())
161
else:
162
state.code = code
163
state.details = batch_operation.details()
164
state.debug_error_string = batch_operation.error_string()
165
callbacks.extend(state.callbacks)
166
state.callbacks = None
167
return callbacks
168
169
170
def _event_handler(state, response_deserializer):
171
172
def handle_event(event):
173
with state.condition:
174
callbacks = _handle_event(event, state, response_deserializer)
175
state.condition.notify_all()
176
done = not state.due
177
for callback in callbacks:
178
try:
179
callback()
180
except Exception as e: # pylint: disable=broad-except
181
# NOTE(rbellevi): We suppress but log errors here so as not to
182
# kill the channel spin thread.
183
logging.error('Exception in callback %s: %s',
184
repr(callback.func), repr(e))
185
return done and state.fork_epoch >= cygrpc.get_fork_epoch()
186
187
return handle_event
188
189
190
#pylint: disable=too-many-statements
191
def _consume_request_iterator(request_iterator, state, call, request_serializer,
192
event_handler):
193
"""Consume a request iterator supplied by the user."""
194
195
def consume_request_iterator(): # pylint: disable=too-many-branches
196
# Iterate over the request iterator until it is exhausted or an error
197
# condition is encountered.
198
while True:
199
return_from_user_request_generator_invoked = False
200
try:
201
# The thread may die in user-code. Do not block fork for this.
202
cygrpc.enter_user_request_generator()
203
request = next(request_iterator)
204
except StopIteration:
205
break
206
except Exception: # pylint: disable=broad-except
207
cygrpc.return_from_user_request_generator()
208
return_from_user_request_generator_invoked = True
209
code = grpc.StatusCode.UNKNOWN
210
details = 'Exception iterating requests!'
211
_LOGGER.exception(details)
212
call.cancel(_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
213
details)
214
_abort(state, code, details)
215
return
216
finally:
217
if not return_from_user_request_generator_invoked:
218
cygrpc.return_from_user_request_generator()
219
serialized_request = _common.serialize(request, request_serializer)
220
with state.condition:
221
if state.code is None and not state.cancelled:
222
if serialized_request is None:
223
code = grpc.StatusCode.INTERNAL
224
details = 'Exception serializing request!'
225
call.cancel(
226
_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code],
227
details)
228
_abort(state, code, details)
229
return
230
else:
231
state.due.add(cygrpc.OperationType.send_message)
232
operations = (cygrpc.SendMessageOperation(
233
serialized_request, _EMPTY_FLAGS),)
234
operating = call.operate(operations, event_handler)
235
if not operating:
236
state.due.remove(cygrpc.OperationType.send_message)
237
return
238
239
def _done():
240
return (state.code is not None or
241
cygrpc.OperationType.send_message
242
not in state.due)
243
244
_common.wait(state.condition.wait,
245
_done,
246
spin_cb=functools.partial(
247
cygrpc.block_if_fork_in_progress,
248
state))
249
if state.code is not None:
250
return
251
else:
252
return
253
with state.condition:
254
if state.code is None:
255
state.due.add(cygrpc.OperationType.send_close_from_client)
256
operations = (
257
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
258
operating = call.operate(operations, event_handler)
259
if not operating:
260
state.due.remove(
261
cygrpc.OperationType.send_close_from_client)
262
263
consumption_thread = cygrpc.ForkManagedThread(
264
target=consume_request_iterator)
265
consumption_thread.setDaemon(True)
266
consumption_thread.start()
267
268
269
def _rpc_state_string(class_name, rpc_state):
270
"""Calculates error string for RPC."""
271
with rpc_state.condition:
272
if rpc_state.code is None:
273
return '<{} object>'.format(class_name)
274
elif rpc_state.code is grpc.StatusCode.OK:
275
return _OK_RENDEZVOUS_REPR_FORMAT.format(class_name, rpc_state.code,
276
rpc_state.details)
277
else:
278
return _NON_OK_RENDEZVOUS_REPR_FORMAT.format(
279
class_name, rpc_state.code, rpc_state.details,
280
rpc_state.debug_error_string)
281
282
283
class _InactiveRpcError(grpc.RpcError, grpc.Call, grpc.Future):
284
"""An RPC error not tied to the execution of a particular RPC.
285
286
The RPC represented by the state object must not be in-progress or
287
cancelled.
288
289
Attributes:
290
_state: An instance of _RPCState.
291
"""
292
293
def __init__(self, state):
294
with state.condition:
295
self._state = _RPCState((), copy.deepcopy(state.initial_metadata),
296
copy.deepcopy(state.trailing_metadata),
297
state.code, copy.deepcopy(state.details))
298
self._state.response = copy.copy(state.response)
299
self._state.debug_error_string = copy.copy(state.debug_error_string)
300
301
def initial_metadata(self):
302
return self._state.initial_metadata
303
304
def trailing_metadata(self):
305
return self._state.trailing_metadata
306
307
def code(self):
308
return self._state.code
309
310
def details(self):
311
return _common.decode(self._state.details)
312
313
def debug_error_string(self):
314
return _common.decode(self._state.debug_error_string)
315
316
def _repr(self):
317
return _rpc_state_string(self.__class__.__name__, self._state)
318
319
def __repr__(self):
320
return self._repr()
321
322
def __str__(self):
323
return self._repr()
324
325
def cancel(self):
326
"""See grpc.Future.cancel."""
327
return False
328
329
def cancelled(self):
330
"""See grpc.Future.cancelled."""
331
return False
332
333
def running(self):
334
"""See grpc.Future.running."""
335
return False
336
337
def done(self):
338
"""See grpc.Future.done."""
339
return True
340
341
def result(self, timeout=None): # pylint: disable=unused-argument
342
"""See grpc.Future.result."""
343
raise self
344
345
def exception(self, timeout=None): # pylint: disable=unused-argument
346
"""See grpc.Future.exception."""
347
return self
348
349
def traceback(self, timeout=None): # pylint: disable=unused-argument
350
"""See grpc.Future.traceback."""
351
try:
352
raise self
353
except grpc.RpcError:
354
return sys.exc_info()[2]
355
356
def add_done_callback(self, fn, timeout=None): # pylint: disable=unused-argument
357
"""See grpc.Future.add_done_callback."""
358
fn(self)
359
360
361
class _Rendezvous(grpc.RpcError, grpc.RpcContext):
362
"""An RPC iterator.
363
364
Attributes:
365
_state: An instance of _RPCState.
366
_call: An instance of SegregatedCall or IntegratedCall.
367
In either case, the _call object is expected to have operate, cancel,
368
and next_event methods.
369
_response_deserializer: A callable taking bytes and return a Python
370
object.
371
_deadline: A float representing the deadline of the RPC in seconds. Or
372
possibly None, to represent an RPC with no deadline at all.
373
"""
374
375
def __init__(self, state, call, response_deserializer, deadline):
376
super(_Rendezvous, self).__init__()
377
self._state = state
378
self._call = call
379
self._response_deserializer = response_deserializer
380
self._deadline = deadline
381
382
def is_active(self):
383
"""See grpc.RpcContext.is_active"""
384
with self._state.condition:
385
return self._state.code is None
386
387
def time_remaining(self):
388
"""See grpc.RpcContext.time_remaining"""
389
with self._state.condition:
390
if self._deadline is None:
391
return None
392
else:
393
return max(self._deadline - time.time(), 0)
394
395
def cancel(self):
396
"""See grpc.RpcContext.cancel"""
397
with self._state.condition:
398
if self._state.code is None:
399
code = grpc.StatusCode.CANCELLED
400
details = 'Locally cancelled by application!'
401
self._call.cancel(
402
_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], details)
403
self._state.cancelled = True
404
_abort(self._state, code, details)
405
self._state.condition.notify_all()
406
return True
407
else:
408
return False
409
410
def add_callback(self, callback):
411
"""See grpc.RpcContext.add_callback"""
412
with self._state.condition:
413
if self._state.callbacks is None:
414
return False
415
else:
416
self._state.callbacks.append(callback)
417
return True
418
419
def __iter__(self):
420
return self
421
422
def next(self):
423
return self._next()
424
425
def __next__(self):
426
return self._next()
427
428
def _next(self):
429
raise NotImplementedError()
430
431
def debug_error_string(self):
432
raise NotImplementedError()
433
434
def _repr(self):
435
return _rpc_state_string(self.__class__.__name__, self._state)
436
437
def __repr__(self):
438
return self._repr()
439
440
def __str__(self):
441
return self._repr()
442
443
def __del__(self):
444
with self._state.condition:
445
if self._state.code is None:
446
self._state.code = grpc.StatusCode.CANCELLED
447
self._state.details = 'Cancelled upon garbage collection!'
448
self._state.cancelled = True
449
self._call.cancel(
450
_common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[self._state.code],
451
self._state.details)
452
self._state.condition.notify_all()
453
454
455
class _SingleThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
456
"""An RPC iterator operating entirely on a single thread.
457
458
The __next__ method of _SingleThreadedRendezvous does not depend on the
459
existence of any other thread, including the "channel spin thread".
460
However, this means that its interface is entirely synchronous. So this
461
class cannot completely fulfill the grpc.Future interface. The result,
462
exception, and traceback methods will never block and will instead raise
463
an exception if calling the method would result in blocking.
464
465
This means that these methods are safe to call from add_done_callback
466
handlers.
467
"""
468
469
def _is_complete(self):
470
return self._state.code is not None
471
472
def cancelled(self):
473
with self._state.condition:
474
return self._state.cancelled
475
476
def running(self):
477
with self._state.condition:
478
return self._state.code is None
479
480
def done(self):
481
with self._state.condition:
482
return self._state.code is not None
483
484
def result(self, timeout=None):
485
"""Returns the result of the computation or raises its exception.
486
487
This method will never block. Instead, it will raise an exception
488
if calling this method would otherwise result in blocking.
489
490
Since this method will never block, any `timeout` argument passed will
491
be ignored.
492
"""
493
del timeout
494
with self._state.condition:
495
if not self._is_complete():
496
raise grpc.experimental.UsageError(
497
"_SingleThreadedRendezvous only supports result() when the RPC is complete."
498
)
499
if self._state.code is grpc.StatusCode.OK:
500
return self._state.response
501
elif self._state.cancelled:
502
raise grpc.FutureCancelledError()
503
else:
504
raise self
505
506
def exception(self, timeout=None):
507
"""Return the exception raised by the computation.
508
509
This method will never block. Instead, it will raise an exception
510
if calling this method would otherwise result in blocking.
511
512
Since this method will never block, any `timeout` argument passed will
513
be ignored.
514
"""
515
del timeout
516
with self._state.condition:
517
if not self._is_complete():
518
raise grpc.experimental.UsageError(
519
"_SingleThreadedRendezvous only supports exception() when the RPC is complete."
520
)
521
if self._state.code is grpc.StatusCode.OK:
522
return None
523
elif self._state.cancelled:
524
raise grpc.FutureCancelledError()
525
else:
526
return self
527
528
def traceback(self, timeout=None):
529
"""Access the traceback of the exception raised by the computation.
530
531
This method will never block. Instead, it will raise an exception
532
if calling this method would otherwise result in blocking.
533
534
Since this method will never block, any `timeout` argument passed will
535
be ignored.
536
"""
537
del timeout
538
with self._state.condition:
539
if not self._is_complete():
540
raise grpc.experimental.UsageError(
541
"_SingleThreadedRendezvous only supports traceback() when the RPC is complete."
542
)
543
if self._state.code is grpc.StatusCode.OK:
544
return None
545
elif self._state.cancelled:
546
raise grpc.FutureCancelledError()
547
else:
548
try:
549
raise self
550
except grpc.RpcError:
551
return sys.exc_info()[2]
552
553
def add_done_callback(self, fn):
554
with self._state.condition:
555
if self._state.code is None:
556
self._state.callbacks.append(functools.partial(fn, self))
557
return
558
559
fn(self)
560
561
def initial_metadata(self):
562
"""See grpc.Call.initial_metadata"""
563
with self._state.condition:
564
# NOTE(gnossen): Based on our initial call batch, we are guaranteed
565
# to receive initial metadata before any messages.
566
while self._state.initial_metadata is None:
567
self._consume_next_event()
568
return self._state.initial_metadata
569
570
def trailing_metadata(self):
571
"""See grpc.Call.trailing_metadata"""
572
with self._state.condition:
573
if self._state.trailing_metadata is None:
574
raise grpc.experimental.UsageError(
575
"Cannot get trailing metadata until RPC is completed.")
576
return self._state.trailing_metadata
577
578
def code(self):
579
"""See grpc.Call.code"""
580
with self._state.condition:
581
if self._state.code is None:
582
raise grpc.experimental.UsageError(
583
"Cannot get code until RPC is completed.")
584
return self._state.code
585
586
def details(self):
587
"""See grpc.Call.details"""
588
with self._state.condition:
589
if self._state.details is None:
590
raise grpc.experimental.UsageError(
591
"Cannot get details until RPC is completed.")
592
return _common.decode(self._state.details)
593
594
def _consume_next_event(self):
595
event = self._call.next_event()
596
with self._state.condition:
597
callbacks = _handle_event(event, self._state,
598
self._response_deserializer)
599
for callback in callbacks:
600
# NOTE(gnossen): We intentionally allow exceptions to bubble up
601
# to the user when running on a single thread.
602
callback()
603
return event
604
605
def _next_response(self):
606
while True:
607
self._consume_next_event()
608
with self._state.condition:
609
if self._state.response is not None:
610
response = self._state.response
611
self._state.response = None
612
return response
613
elif cygrpc.OperationType.receive_message not in self._state.due:
614
if self._state.code is grpc.StatusCode.OK:
615
raise StopIteration()
616
elif self._state.code is not None:
617
raise self
618
619
def _next(self):
620
with self._state.condition:
621
if self._state.code is None:
622
# We tentatively add the operation as expected and remove
623
# it if the enqueue operation fails. This allows us to guarantee that
624
# if an event has been submitted to the core completion queue,
625
# it is in `due`. If we waited until after a successful
626
# enqueue operation then a signal could interrupt this
627
# thread between the enqueue operation and the addition of the
628
# operation to `due`. This would cause an exception on the
629
# channel spin thread when the operation completes and no
630
# corresponding operation would be present in state.due.
631
# Note that, since `condition` is held through this block, there is
632
# no data race on `due`.
633
self._state.due.add(cygrpc.OperationType.receive_message)
634
operating = self._call.operate(
635
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
636
if not operating:
637
self._state.due.remove(cygrpc.OperationType.receive_message)
638
elif self._state.code is grpc.StatusCode.OK:
639
raise StopIteration()
640
else:
641
raise self
642
return self._next_response()
643
644
def debug_error_string(self):
645
with self._state.condition:
646
if self._state.debug_error_string is None:
647
raise grpc.experimental.UsageError(
648
"Cannot get debug error string until RPC is completed.")
649
return _common.decode(self._state.debug_error_string)
650
651
652
class _MultiThreadedRendezvous(_Rendezvous, grpc.Call, grpc.Future): # pylint: disable=too-many-ancestors
653
"""An RPC iterator that depends on a channel spin thread.
654
655
This iterator relies upon a per-channel thread running in the background,
656
dequeueing events from the completion queue, and notifying threads waiting
657
on the threading.Condition object in the _RPCState object.
658
659
This extra thread allows _MultiThreadedRendezvous to fulfill the grpc.Future interface
660
and to mediate a bidirection streaming RPC.
661
"""
662
663
def initial_metadata(self):
664
"""See grpc.Call.initial_metadata"""
665
with self._state.condition:
666
667
def _done():
668
return self._state.initial_metadata is not None
669
670
_common.wait(self._state.condition.wait, _done)
671
return self._state.initial_metadata
672
673
def trailing_metadata(self):
674
"""See grpc.Call.trailing_metadata"""
675
with self._state.condition:
676
677
def _done():
678
return self._state.trailing_metadata is not None
679
680
_common.wait(self._state.condition.wait, _done)
681
return self._state.trailing_metadata
682
683
def code(self):
684
"""See grpc.Call.code"""
685
with self._state.condition:
686
687
def _done():
688
return self._state.code is not None
689
690
_common.wait(self._state.condition.wait, _done)
691
return self._state.code
692
693
def details(self):
694
"""See grpc.Call.details"""
695
with self._state.condition:
696
697
def _done():
698
return self._state.details is not None
699
700
_common.wait(self._state.condition.wait, _done)
701
return _common.decode(self._state.details)
702
703
def debug_error_string(self):
704
with self._state.condition:
705
706
def _done():
707
return self._state.debug_error_string is not None
708
709
_common.wait(self._state.condition.wait, _done)
710
return _common.decode(self._state.debug_error_string)
711
712
def cancelled(self):
713
with self._state.condition:
714
return self._state.cancelled
715
716
def running(self):
717
with self._state.condition:
718
return self._state.code is None
719
720
def done(self):
721
with self._state.condition:
722
return self._state.code is not None
723
724
def _is_complete(self):
725
return self._state.code is not None
726
727
def result(self, timeout=None):
728
"""Returns the result of the computation or raises its exception.
729
730
See grpc.Future.result for the full API contract.
731
"""
732
with self._state.condition:
733
timed_out = _common.wait(self._state.condition.wait,
734
self._is_complete,
735
timeout=timeout)
736
if timed_out:
737
raise grpc.FutureTimeoutError()
738
else:
739
if self._state.code is grpc.StatusCode.OK:
740
return self._state.response
741
elif self._state.cancelled:
742
raise grpc.FutureCancelledError()
743
else:
744
raise self
745
746
def exception(self, timeout=None):
747
"""Return the exception raised by the computation.
748
749
See grpc.Future.exception for the full API contract.
750
"""
751
with self._state.condition:
752
timed_out = _common.wait(self._state.condition.wait,
753
self._is_complete,
754
timeout=timeout)
755
if timed_out:
756
raise grpc.FutureTimeoutError()
757
else:
758
if self._state.code is grpc.StatusCode.OK:
759
return None
760
elif self._state.cancelled:
761
raise grpc.FutureCancelledError()
762
else:
763
return self
764
765
def traceback(self, timeout=None):
766
"""Access the traceback of the exception raised by the computation.
767
768
See grpc.future.traceback for the full API contract.
769
"""
770
with self._state.condition:
771
timed_out = _common.wait(self._state.condition.wait,
772
self._is_complete,
773
timeout=timeout)
774
if timed_out:
775
raise grpc.FutureTimeoutError()
776
else:
777
if self._state.code is grpc.StatusCode.OK:
778
return None
779
elif self._state.cancelled:
780
raise grpc.FutureCancelledError()
781
else:
782
try:
783
raise self
784
except grpc.RpcError:
785
return sys.exc_info()[2]
786
787
def add_done_callback(self, fn):
788
with self._state.condition:
789
if self._state.code is None:
790
self._state.callbacks.append(functools.partial(fn, self))
791
return
792
793
fn(self)
794
795
def _next(self):
796
with self._state.condition:
797
if self._state.code is None:
798
event_handler = _event_handler(self._state,
799
self._response_deserializer)
800
self._state.due.add(cygrpc.OperationType.receive_message)
801
operating = self._call.operate(
802
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
803
event_handler)
804
if not operating:
805
self._state.due.remove(cygrpc.OperationType.receive_message)
806
elif self._state.code is grpc.StatusCode.OK:
807
raise StopIteration()
808
else:
809
raise self
810
811
def _response_ready():
812
return (self._state.response is not None or
813
(cygrpc.OperationType.receive_message
814
not in self._state.due and
815
self._state.code is not None))
816
817
_common.wait(self._state.condition.wait, _response_ready)
818
if self._state.response is not None:
819
response = self._state.response
820
self._state.response = None
821
return response
822
elif cygrpc.OperationType.receive_message not in self._state.due:
823
if self._state.code is grpc.StatusCode.OK:
824
raise StopIteration()
825
elif self._state.code is not None:
826
raise self
827
828
829
def _start_unary_request(request, timeout, request_serializer):
830
deadline = _deadline(timeout)
831
serialized_request = _common.serialize(request, request_serializer)
832
if serialized_request is None:
833
state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
834
'Exception serializing request!')
835
error = _InactiveRpcError(state)
836
return deadline, None, error
837
else:
838
return deadline, serialized_request, None
839
840
841
def _end_unary_response_blocking(state, call, with_call, deadline):
842
if state.code is grpc.StatusCode.OK:
843
if with_call:
844
rendezvous = _MultiThreadedRendezvous(state, call, None, deadline)
845
return state.response, rendezvous
846
else:
847
return state.response
848
else:
849
raise _InactiveRpcError(state)
850
851
852
def _stream_unary_invocation_operationses(metadata, initial_metadata_flags):
853
return (
854
(
855
cygrpc.SendInitialMetadataOperation(metadata,
856
initial_metadata_flags),
857
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
858
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
859
),
860
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
861
)
862
863
864
def _stream_unary_invocation_operationses_and_tags(metadata,
865
initial_metadata_flags):
866
return tuple((
867
operations,
868
None,
869
) for operations in _stream_unary_invocation_operationses(
870
metadata, initial_metadata_flags))
871
872
873
def _determine_deadline(user_deadline):
874
parent_deadline = cygrpc.get_deadline_from_context()
875
if parent_deadline is None and user_deadline is None:
876
return None
877
elif parent_deadline is not None and user_deadline is None:
878
return parent_deadline
879
elif user_deadline is not None and parent_deadline is None:
880
return user_deadline
881
else:
882
return min(parent_deadline, user_deadline)
883
884
885
class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
886
887
# pylint: disable=too-many-arguments
888
def __init__(self, channel, managed_call, method, request_serializer,
889
response_deserializer):
890
self._channel = channel
891
self._managed_call = managed_call
892
self._method = method
893
self._request_serializer = request_serializer
894
self._response_deserializer = response_deserializer
895
self._context = cygrpc.build_census_context()
896
897
def _prepare(self, request, timeout, metadata, wait_for_ready, compression):
898
deadline, serialized_request, rendezvous = _start_unary_request(
899
request, timeout, self._request_serializer)
900
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
901
wait_for_ready)
902
augmented_metadata = _compression.augment_metadata(
903
metadata, compression)
904
if serialized_request is None:
905
return None, None, None, rendezvous
906
else:
907
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
908
operations = (
909
cygrpc.SendInitialMetadataOperation(augmented_metadata,
910
initial_metadata_flags),
911
cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
912
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
913
cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
914
cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
915
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
916
)
917
return state, operations, deadline, None
918
919
def _blocking(self, request, timeout, metadata, credentials, wait_for_ready,
920
compression):
921
state, operations, deadline, rendezvous = self._prepare(
922
request, timeout, metadata, wait_for_ready, compression)
923
if state is None:
924
raise rendezvous # pylint: disable-msg=raising-bad-type
925
else:
926
call = self._channel.segregated_call(
927
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
928
self._method, None, _determine_deadline(deadline), metadata,
929
None if credentials is None else credentials._credentials, ((
930
operations,
931
None,
932
),), self._context)
933
event = call.next_event()
934
_handle_event(event, state, self._response_deserializer)
935
return state, call
936
937
def __call__(self,
938
request,
939
timeout=None,
940
metadata=None,
941
credentials=None,
942
wait_for_ready=None,
943
compression=None):
944
state, call, = self._blocking(request, timeout, metadata, credentials,
945
wait_for_ready, compression)
946
return _end_unary_response_blocking(state, call, False, None)
947
948
def with_call(self,
949
request,
950
timeout=None,
951
metadata=None,
952
credentials=None,
953
wait_for_ready=None,
954
compression=None):
955
state, call, = self._blocking(request, timeout, metadata, credentials,
956
wait_for_ready, compression)
957
return _end_unary_response_blocking(state, call, True, None)
958
959
def future(self,
960
request,
961
timeout=None,
962
metadata=None,
963
credentials=None,
964
wait_for_ready=None,
965
compression=None):
966
state, operations, deadline, rendezvous = self._prepare(
967
request, timeout, metadata, wait_for_ready, compression)
968
if state is None:
969
raise rendezvous # pylint: disable-msg=raising-bad-type
970
else:
971
event_handler = _event_handler(state, self._response_deserializer)
972
call = self._managed_call(
973
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
974
self._method, None, deadline, metadata,
975
None if credentials is None else credentials._credentials,
976
(operations,), event_handler, self._context)
977
return _MultiThreadedRendezvous(state, call,
978
self._response_deserializer,
979
deadline)
980
981
982
class _SingleThreadedUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
983
984
# pylint: disable=too-many-arguments
985
def __init__(self, channel, method, request_serializer,
986
response_deserializer):
987
self._channel = channel
988
self._method = method
989
self._request_serializer = request_serializer
990
self._response_deserializer = response_deserializer
991
self._context = cygrpc.build_census_context()
992
993
def __call__( # pylint: disable=too-many-locals
994
self,
995
request,
996
timeout=None,
997
metadata=None,
998
credentials=None,
999
wait_for_ready=None,
1000
compression=None):
1001
deadline = _deadline(timeout)
1002
serialized_request = _common.serialize(request,
1003
self._request_serializer)
1004
if serialized_request is None:
1005
state = _RPCState((), (), (), grpc.StatusCode.INTERNAL,
1006
'Exception serializing request!')
1007
raise _InactiveRpcError(state)
1008
1009
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1010
call_credentials = None if credentials is None else credentials._credentials
1011
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1012
wait_for_ready)
1013
augmented_metadata = _compression.augment_metadata(
1014
metadata, compression)
1015
operations = (
1016
(cygrpc.SendInitialMetadataOperation(augmented_metadata,
1017
initial_metadata_flags),
1018
cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
1019
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS)),
1020
(cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),),
1021
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1022
)
1023
operations_and_tags = tuple((ops, None) for ops in operations)
1024
call = self._channel.segregated_call(
1025
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1026
None, _determine_deadline(deadline), metadata, call_credentials,
1027
operations_and_tags, self._context)
1028
return _SingleThreadedRendezvous(state, call,
1029
self._response_deserializer, deadline)
1030
1031
1032
class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
1033
1034
# pylint: disable=too-many-arguments
1035
def __init__(self, channel, managed_call, method, request_serializer,
1036
response_deserializer):
1037
self._channel = channel
1038
self._managed_call = managed_call
1039
self._method = method
1040
self._request_serializer = request_serializer
1041
self._response_deserializer = response_deserializer
1042
self._context = cygrpc.build_census_context()
1043
1044
def __call__( # pylint: disable=too-many-locals
1045
self,
1046
request,
1047
timeout=None,
1048
metadata=None,
1049
credentials=None,
1050
wait_for_ready=None,
1051
compression=None):
1052
deadline, serialized_request, rendezvous = _start_unary_request(
1053
request, timeout, self._request_serializer)
1054
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1055
wait_for_ready)
1056
if serialized_request is None:
1057
raise rendezvous # pylint: disable-msg=raising-bad-type
1058
else:
1059
augmented_metadata = _compression.augment_metadata(
1060
metadata, compression)
1061
state = _RPCState(_UNARY_STREAM_INITIAL_DUE, None, None, None, None)
1062
operationses = (
1063
(
1064
cygrpc.SendInitialMetadataOperation(augmented_metadata,
1065
initial_metadata_flags),
1066
cygrpc.SendMessageOperation(serialized_request,
1067
_EMPTY_FLAGS),
1068
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
1069
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1070
),
1071
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1072
)
1073
call = self._managed_call(
1074
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS,
1075
self._method, None, _determine_deadline(deadline), metadata,
1076
None if credentials is None else credentials._credentials,
1077
operationses, _event_handler(state,
1078
self._response_deserializer),
1079
self._context)
1080
return _MultiThreadedRendezvous(state, call,
1081
self._response_deserializer,
1082
deadline)
1083
1084
1085
class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
1086
1087
# pylint: disable=too-many-arguments
1088
def __init__(self, channel, managed_call, method, request_serializer,
1089
response_deserializer):
1090
self._channel = channel
1091
self._managed_call = managed_call
1092
self._method = method
1093
self._request_serializer = request_serializer
1094
self._response_deserializer = response_deserializer
1095
self._context = cygrpc.build_census_context()
1096
1097
def _blocking(self, request_iterator, timeout, metadata, credentials,
1098
wait_for_ready, compression):
1099
deadline = _deadline(timeout)
1100
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1101
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1102
wait_for_ready)
1103
augmented_metadata = _compression.augment_metadata(
1104
metadata, compression)
1105
call = self._channel.segregated_call(
1106
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1107
None, _determine_deadline(deadline), augmented_metadata,
1108
None if credentials is None else credentials._credentials,
1109
_stream_unary_invocation_operationses_and_tags(
1110
augmented_metadata, initial_metadata_flags), self._context)
1111
_consume_request_iterator(request_iterator, state, call,
1112
self._request_serializer, None)
1113
while True:
1114
event = call.next_event()
1115
with state.condition:
1116
_handle_event(event, state, self._response_deserializer)
1117
state.condition.notify_all()
1118
if not state.due:
1119
break
1120
return state, call
1121
1122
def __call__(self,
1123
request_iterator,
1124
timeout=None,
1125
metadata=None,
1126
credentials=None,
1127
wait_for_ready=None,
1128
compression=None):
1129
state, call, = self._blocking(request_iterator, timeout, metadata,
1130
credentials, wait_for_ready, compression)
1131
return _end_unary_response_blocking(state, call, False, None)
1132
1133
def with_call(self,
1134
request_iterator,
1135
timeout=None,
1136
metadata=None,
1137
credentials=None,
1138
wait_for_ready=None,
1139
compression=None):
1140
state, call, = self._blocking(request_iterator, timeout, metadata,
1141
credentials, wait_for_ready, compression)
1142
return _end_unary_response_blocking(state, call, True, None)
1143
1144
def future(self,
1145
request_iterator,
1146
timeout=None,
1147
metadata=None,
1148
credentials=None,
1149
wait_for_ready=None,
1150
compression=None):
1151
deadline = _deadline(timeout)
1152
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
1153
event_handler = _event_handler(state, self._response_deserializer)
1154
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1155
wait_for_ready)
1156
augmented_metadata = _compression.augment_metadata(
1157
metadata, compression)
1158
call = self._managed_call(
1159
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1160
None, deadline, augmented_metadata,
1161
None if credentials is None else credentials._credentials,
1162
_stream_unary_invocation_operationses(metadata,
1163
initial_metadata_flags),
1164
event_handler, self._context)
1165
_consume_request_iterator(request_iterator, state, call,
1166
self._request_serializer, event_handler)
1167
return _MultiThreadedRendezvous(state, call,
1168
self._response_deserializer, deadline)
1169
1170
1171
class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
1172
1173
# pylint: disable=too-many-arguments
1174
def __init__(self, channel, managed_call, method, request_serializer,
1175
response_deserializer):
1176
self._channel = channel
1177
self._managed_call = managed_call
1178
self._method = method
1179
self._request_serializer = request_serializer
1180
self._response_deserializer = response_deserializer
1181
self._context = cygrpc.build_census_context()
1182
1183
def __call__(self,
1184
request_iterator,
1185
timeout=None,
1186
metadata=None,
1187
credentials=None,
1188
wait_for_ready=None,
1189
compression=None):
1190
deadline = _deadline(timeout)
1191
state = _RPCState(_STREAM_STREAM_INITIAL_DUE, None, None, None, None)
1192
initial_metadata_flags = _InitialMetadataFlags().with_wait_for_ready(
1193
wait_for_ready)
1194
augmented_metadata = _compression.augment_metadata(
1195
metadata, compression)
1196
operationses = (
1197
(
1198
cygrpc.SendInitialMetadataOperation(augmented_metadata,
1199
initial_metadata_flags),
1200
cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
1201
),
1202
(cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
1203
)
1204
event_handler = _event_handler(state, self._response_deserializer)
1205
call = self._managed_call(
1206
cygrpc.PropagationConstants.GRPC_PROPAGATE_DEFAULTS, self._method,
1207
None, _determine_deadline(deadline), augmented_metadata,
1208
None if credentials is None else credentials._credentials,
1209
operationses, event_handler, self._context)
1210
_consume_request_iterator(request_iterator, state, call,
1211
self._request_serializer, event_handler)
1212
return _MultiThreadedRendezvous(state, call,
1213
self._response_deserializer, deadline)
1214
1215
1216
class _InitialMetadataFlags(int):
1217
"""Stores immutable initial metadata flags"""
1218
1219
def __new__(cls, value=_EMPTY_FLAGS):
1220
value &= cygrpc.InitialMetadataFlags.used_mask
1221
return super(_InitialMetadataFlags, cls).__new__(cls, value)
1222
1223
def with_wait_for_ready(self, wait_for_ready):
1224
if wait_for_ready is not None:
1225
if wait_for_ready:
1226
return self.__class__(self | cygrpc.InitialMetadataFlags.wait_for_ready | \
1227
cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1228
elif not wait_for_ready:
1229
return self.__class__(self & ~cygrpc.InitialMetadataFlags.wait_for_ready | \
1230
cygrpc.InitialMetadataFlags.wait_for_ready_explicitly_set)
1231
return self
1232
1233
1234
class _ChannelCallState(object):
1235
1236
def __init__(self, channel):
1237
self.lock = threading.Lock()
1238
self.channel = channel
1239
self.managed_calls = 0
1240
self.threading = False
1241
1242
def reset_postfork_child(self):
1243
self.managed_calls = 0
1244
1245
def __del__(self):
1246
try:
1247
self.channel.close(cygrpc.StatusCode.cancelled,
1248
'Channel deallocated!')
1249
except (TypeError, AttributeError):
1250
pass
1251
1252
1253
def _run_channel_spin_thread(state):
1254
1255
def channel_spin():
1256
while True:
1257
cygrpc.block_if_fork_in_progress(state)
1258
event = state.channel.next_call_event()
1259
if event.completion_type == cygrpc.CompletionType.queue_timeout:
1260
continue
1261
call_completed = event.tag(event)
1262
if call_completed:
1263
with state.lock:
1264
state.managed_calls -= 1
1265
if state.managed_calls == 0:
1266
return
1267
1268
channel_spin_thread = cygrpc.ForkManagedThread(target=channel_spin)
1269
channel_spin_thread.setDaemon(True)
1270
channel_spin_thread.start()
1271
1272
1273
def _channel_managed_call_management(state):
1274
1275
# pylint: disable=too-many-arguments
1276
def create(flags, method, host, deadline, metadata, credentials,
1277
operationses, event_handler, context):
1278
"""Creates a cygrpc.IntegratedCall.
1279
1280
Args:
1281
flags: An integer bitfield of call flags.
1282
method: The RPC method.
1283
host: A host string for the created call.
1284
deadline: A float to be the deadline of the created call or None if
1285
the call is to have an infinite deadline.
1286
metadata: The metadata for the call or None.
1287
credentials: A cygrpc.CallCredentials or None.
1288
operationses: An iterable of iterables of cygrpc.Operations to be
1289
started on the call.
1290
event_handler: A behavior to call to handle the events resultant from
1291
the operations on the call.
1292
context: Context object for distributed tracing.
1293
Returns:
1294
A cygrpc.IntegratedCall with which to conduct an RPC.
1295
"""
1296
operationses_and_tags = tuple((
1297
operations,
1298
event_handler,
1299
) for operations in operationses)
1300
with state.lock:
1301
call = state.channel.integrated_call(flags, method, host, deadline,
1302
metadata, credentials,
1303
operationses_and_tags, context)
1304
if state.managed_calls == 0:
1305
state.managed_calls = 1
1306
_run_channel_spin_thread(state)
1307
else:
1308
state.managed_calls += 1
1309
return call
1310
1311
return create
1312
1313
1314
class _ChannelConnectivityState(object):
1315
1316
def __init__(self, channel):
1317
self.lock = threading.RLock()
1318
self.channel = channel
1319
self.polling = False
1320
self.connectivity = None
1321
self.try_to_connect = False
1322
self.callbacks_and_connectivities = []
1323
self.delivering = False
1324
1325
def reset_postfork_child(self):
1326
self.polling = False
1327
self.connectivity = None
1328
self.try_to_connect = False
1329
self.callbacks_and_connectivities = []
1330
self.delivering = False
1331
1332
1333
def _deliveries(state):
1334
callbacks_needing_update = []
1335
for callback_and_connectivity in state.callbacks_and_connectivities:
1336
callback, callback_connectivity, = callback_and_connectivity
1337
if callback_connectivity is not state.connectivity:
1338
callbacks_needing_update.append(callback)
1339
callback_and_connectivity[1] = state.connectivity
1340
return callbacks_needing_update
1341
1342
1343
def _deliver(state, initial_connectivity, initial_callbacks):
1344
connectivity = initial_connectivity
1345
callbacks = initial_callbacks
1346
while True:
1347
for callback in callbacks:
1348
cygrpc.block_if_fork_in_progress(state)
1349
try:
1350
callback(connectivity)
1351
except Exception: # pylint: disable=broad-except
1352
_LOGGER.exception(
1353
_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE)
1354
with state.lock:
1355
callbacks = _deliveries(state)
1356
if callbacks:
1357
connectivity = state.connectivity
1358
else:
1359
state.delivering = False
1360
return
1361
1362
1363
def _spawn_delivery(state, callbacks):
1364
delivering_thread = cygrpc.ForkManagedThread(target=_deliver,
1365
args=(
1366
state,
1367
state.connectivity,
1368
callbacks,
1369
))
1370
delivering_thread.setDaemon(True)
1371
delivering_thread.start()
1372
state.delivering = True
1373
1374
1375
# NOTE(https://github.com/grpc/grpc/issues/3064): We'd rather not poll.
1376
def _poll_connectivity(state, channel, initial_try_to_connect):
1377
try_to_connect = initial_try_to_connect
1378
connectivity = channel.check_connectivity_state(try_to_connect)
1379
with state.lock:
1380
state.connectivity = (
1381
_common.
1382
CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[connectivity])
1383
callbacks = tuple(
1384
callback for callback, unused_but_known_to_be_none_connectivity in
1385
state.callbacks_and_connectivities)
1386
for callback_and_connectivity in state.callbacks_and_connectivities:
1387
callback_and_connectivity[1] = state.connectivity
1388
if callbacks:
1389
_spawn_delivery(state, callbacks)
1390
while True:
1391
event = channel.watch_connectivity_state(connectivity,
1392
time.time() + 0.2)
1393
cygrpc.block_if_fork_in_progress(state)
1394
with state.lock:
1395
if not state.callbacks_and_connectivities and not state.try_to_connect:
1396
state.polling = False
1397
state.connectivity = None
1398
break
1399
try_to_connect = state.try_to_connect
1400
state.try_to_connect = False
1401
if event.success or try_to_connect:
1402
connectivity = channel.check_connectivity_state(try_to_connect)
1403
with state.lock:
1404
state.connectivity = (
1405
_common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[
1406
connectivity])
1407
if not state.delivering:
1408
callbacks = _deliveries(state)
1409
if callbacks:
1410
_spawn_delivery(state, callbacks)
1411
1412
1413
def _subscribe(state, callback, try_to_connect):
1414
with state.lock:
1415
if not state.callbacks_and_connectivities and not state.polling:
1416
polling_thread = cygrpc.ForkManagedThread(
1417
target=_poll_connectivity,
1418
args=(state, state.channel, bool(try_to_connect)))
1419
polling_thread.setDaemon(True)
1420
polling_thread.start()
1421
state.polling = True
1422
state.callbacks_and_connectivities.append([callback, None])
1423
elif not state.delivering and state.connectivity is not None:
1424
_spawn_delivery(state, (callback,))
1425
state.try_to_connect |= bool(try_to_connect)
1426
state.callbacks_and_connectivities.append(
1427
[callback, state.connectivity])
1428
else:
1429
state.try_to_connect |= bool(try_to_connect)
1430
state.callbacks_and_connectivities.append([callback, None])
1431
1432
1433
def _unsubscribe(state, callback):
1434
with state.lock:
1435
for index, (subscribed_callback, unused_connectivity) in enumerate(
1436
state.callbacks_and_connectivities):
1437
if callback == subscribed_callback:
1438
state.callbacks_and_connectivities.pop(index)
1439
break
1440
1441
1442
def _augment_options(base_options, compression):
1443
compression_option = _compression.create_channel_option(compression)
1444
return tuple(base_options) + compression_option + ((
1445
cygrpc.ChannelArgKey.primary_user_agent_string,
1446
_USER_AGENT,
1447
),)
1448
1449
1450
def _separate_channel_options(options):
1451
"""Separates core channel options from Python channel options."""
1452
core_options = []
1453
python_options = []
1454
for pair in options:
1455
if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1456
python_options.append(pair)
1457
else:
1458
core_options.append(pair)
1459
return python_options, core_options
1460
1461
1462
class Channel(grpc.Channel):
1463
"""A cygrpc.Channel-backed implementation of grpc.Channel."""
1464
1465
def __init__(self, target, options, credentials, compression):
1466
"""Constructor.
1467
1468
Args:
1469
target: The target to which to connect.
1470
options: Configuration options for the channel.
1471
credentials: A cygrpc.ChannelCredentials or None.
1472
compression: An optional value indicating the compression method to be
1473
used over the lifetime of the channel.
1474
"""
1475
python_options, core_options = _separate_channel_options(options)
1476
self._single_threaded_unary_stream = _DEFAULT_SINGLE_THREADED_UNARY_STREAM
1477
self._process_python_options(python_options)
1478
self._channel = cygrpc.Channel(
1479
_common.encode(target), _augment_options(core_options, compression),
1480
credentials)
1481
self._call_state = _ChannelCallState(self._channel)
1482
self._connectivity_state = _ChannelConnectivityState(self._channel)
1483
cygrpc.fork_register_channel(self)
1484
if cygrpc.g_gevent_activated:
1485
cygrpc.gevent_increment_channel_count()
1486
1487
def _process_python_options(self, python_options):
1488
"""Sets channel attributes according to python-only channel options."""
1489
for pair in python_options:
1490
if pair[0] == grpc.experimental.ChannelOptions.SingleThreadedUnaryStream:
1491
self._single_threaded_unary_stream = True
1492
1493
def subscribe(self, callback, try_to_connect=None):
1494
_subscribe(self._connectivity_state, callback, try_to_connect)
1495
1496
def unsubscribe(self, callback):
1497
_unsubscribe(self._connectivity_state, callback)
1498
1499
def unary_unary(self,
1500
method,
1501
request_serializer=None,
1502
response_deserializer=None):
1503
return _UnaryUnaryMultiCallable(
1504
self._channel, _channel_managed_call_management(self._call_state),
1505
_common.encode(method), request_serializer, response_deserializer)
1506
1507
def unary_stream(self,
1508
method,
1509
request_serializer=None,
1510
response_deserializer=None):
1511
# NOTE(rbellevi): Benchmarks have shown that running a unary-stream RPC
1512
# on a single Python thread results in an appreciable speed-up. However,
1513
# due to slight differences in capability, the multi-threaded variant
1514
# remains the default.
1515
if self._single_threaded_unary_stream:
1516
return _SingleThreadedUnaryStreamMultiCallable(
1517
self._channel, _common.encode(method), request_serializer,
1518
response_deserializer)
1519
else:
1520
return _UnaryStreamMultiCallable(
1521
self._channel,
1522
_channel_managed_call_management(self._call_state),
1523
_common.encode(method), request_serializer,
1524
response_deserializer)
1525
1526
def stream_unary(self,
1527
method,
1528
request_serializer=None,
1529
response_deserializer=None):
1530
return _StreamUnaryMultiCallable(
1531
self._channel, _channel_managed_call_management(self._call_state),
1532
_common.encode(method), request_serializer, response_deserializer)
1533
1534
def stream_stream(self,
1535
method,
1536
request_serializer=None,
1537
response_deserializer=None):
1538
return _StreamStreamMultiCallable(
1539
self._channel, _channel_managed_call_management(self._call_state),
1540
_common.encode(method), request_serializer, response_deserializer)
1541
1542
def _unsubscribe_all(self):
1543
state = self._connectivity_state
1544
if state:
1545
with state.lock:
1546
del state.callbacks_and_connectivities[:]
1547
1548
def _close(self):
1549
self._unsubscribe_all()
1550
self._channel.close(cygrpc.StatusCode.cancelled, 'Channel closed!')
1551
cygrpc.fork_unregister_channel(self)
1552
if cygrpc.g_gevent_activated:
1553
cygrpc.gevent_decrement_channel_count()
1554
1555
def _close_on_fork(self):
1556
self._unsubscribe_all()
1557
self._channel.close_on_fork(cygrpc.StatusCode.cancelled,
1558
'Channel closed due to fork')
1559
1560
def __enter__(self):
1561
return self
1562
1563
def __exit__(self, exc_type, exc_val, exc_tb):
1564
self._close()
1565
return False
1566
1567
def close(self):
1568
self._close()
1569
1570
def __del__(self):
1571
# TODO(https://github.com/grpc/grpc/issues/12531): Several releases
1572
# after 1.12 (1.16 or thereabouts?) add a "self._channel.close" call
1573
# here (or more likely, call self._close() here). We don't do this today
1574
# because many valid use cases today allow the channel to be deleted
1575
# immediately after stubs are created. After a sufficient period of time
1576
# has passed for all users to be trusted to freeze out to their channels
1577
# for as long as they are in use and to close them after using them,
1578
# then deletion of this grpc._channel.Channel instance can be made to
1579
# effect closure of the underlying cygrpc.Channel instance.
1580
try:
1581
self._unsubscribe_all()
1582
except: # pylint: disable=bare-except
1583
# Exceptions in __del__ are ignored by Python anyway, but they can
1584
# keep spamming logs. Just silence them.
1585
pass
1586
1587