Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/sslproto.py
12 views
1
import collections
2
import enum
3
import warnings
4
try:
5
import ssl
6
except ImportError: # pragma: no cover
7
ssl = None
8
9
from . import constants
10
from . import exceptions
11
from . import protocols
12
from . import transports
13
from .log import logger
14
15
if ssl is not None:
16
SSLAgainErrors = (ssl.SSLWantReadError, ssl.SSLSyscallError)
17
18
19
class SSLProtocolState(enum.Enum):
20
UNWRAPPED = "UNWRAPPED"
21
DO_HANDSHAKE = "DO_HANDSHAKE"
22
WRAPPED = "WRAPPED"
23
FLUSHING = "FLUSHING"
24
SHUTDOWN = "SHUTDOWN"
25
26
27
class AppProtocolState(enum.Enum):
28
# This tracks the state of app protocol (https://git.io/fj59P):
29
#
30
# INIT -cm-> CON_MADE [-dr*->] [-er-> EOF?] -cl-> CON_LOST
31
#
32
# * cm: connection_made()
33
# * dr: data_received()
34
# * er: eof_received()
35
# * cl: connection_lost()
36
37
STATE_INIT = "STATE_INIT"
38
STATE_CON_MADE = "STATE_CON_MADE"
39
STATE_EOF = "STATE_EOF"
40
STATE_CON_LOST = "STATE_CON_LOST"
41
42
43
def _create_transport_context(server_side, server_hostname):
44
if server_side:
45
raise ValueError('Server side SSL needs a valid SSLContext')
46
47
# Client side may pass ssl=True to use a default
48
# context; in that case the sslcontext passed is None.
49
# The default is secure for client connections.
50
# Python 3.4+: use up-to-date strong settings.
51
sslcontext = ssl.create_default_context()
52
if not server_hostname:
53
sslcontext.check_hostname = False
54
return sslcontext
55
56
57
def add_flowcontrol_defaults(high, low, kb):
58
if high is None:
59
if low is None:
60
hi = kb * 1024
61
else:
62
lo = low
63
hi = 4 * lo
64
else:
65
hi = high
66
if low is None:
67
lo = hi // 4
68
else:
69
lo = low
70
71
if not hi >= lo >= 0:
72
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
73
(hi, lo))
74
75
return hi, lo
76
77
78
class _SSLProtocolTransport(transports._FlowControlMixin,
79
transports.Transport):
80
81
_start_tls_compatible = True
82
_sendfile_compatible = constants._SendfileMode.FALLBACK
83
84
def __init__(self, loop, ssl_protocol):
85
self._loop = loop
86
self._ssl_protocol = ssl_protocol
87
self._closed = False
88
89
def get_extra_info(self, name, default=None):
90
"""Get optional transport information."""
91
return self._ssl_protocol._get_extra_info(name, default)
92
93
def set_protocol(self, protocol):
94
self._ssl_protocol._set_app_protocol(protocol)
95
96
def get_protocol(self):
97
return self._ssl_protocol._app_protocol
98
99
def is_closing(self):
100
return self._closed
101
102
def close(self):
103
"""Close the transport.
104
105
Buffered data will be flushed asynchronously. No more data
106
will be received. After all buffered data is flushed, the
107
protocol's connection_lost() method will (eventually) called
108
with None as its argument.
109
"""
110
if not self._closed:
111
self._closed = True
112
self._ssl_protocol._start_shutdown()
113
else:
114
self._ssl_protocol = None
115
116
def __del__(self, _warnings=warnings):
117
if not self._closed:
118
self._closed = True
119
_warnings.warn(
120
"unclosed transport <asyncio._SSLProtocolTransport "
121
"object>", ResourceWarning)
122
123
def is_reading(self):
124
return not self._ssl_protocol._app_reading_paused
125
126
def pause_reading(self):
127
"""Pause the receiving end.
128
129
No data will be passed to the protocol's data_received()
130
method until resume_reading() is called.
131
"""
132
self._ssl_protocol._pause_reading()
133
134
def resume_reading(self):
135
"""Resume the receiving end.
136
137
Data received will once again be passed to the protocol's
138
data_received() method.
139
"""
140
self._ssl_protocol._resume_reading()
141
142
def set_write_buffer_limits(self, high=None, low=None):
143
"""Set the high- and low-water limits for write flow control.
144
145
These two values control when to call the protocol's
146
pause_writing() and resume_writing() methods. If specified,
147
the low-water limit must be less than or equal to the
148
high-water limit. Neither value can be negative.
149
150
The defaults are implementation-specific. If only the
151
high-water limit is given, the low-water limit defaults to an
152
implementation-specific value less than or equal to the
153
high-water limit. Setting high to zero forces low to zero as
154
well, and causes pause_writing() to be called whenever the
155
buffer becomes non-empty. Setting low to zero causes
156
resume_writing() to be called only once the buffer is empty.
157
Use of zero for either limit is generally sub-optimal as it
158
reduces opportunities for doing I/O and computation
159
concurrently.
160
"""
161
self._ssl_protocol._set_write_buffer_limits(high, low)
162
self._ssl_protocol._control_app_writing()
163
164
def get_write_buffer_limits(self):
165
return (self._ssl_protocol._outgoing_low_water,
166
self._ssl_protocol._outgoing_high_water)
167
168
def get_write_buffer_size(self):
169
"""Return the current size of the write buffers."""
170
return self._ssl_protocol._get_write_buffer_size()
171
172
def set_read_buffer_limits(self, high=None, low=None):
173
"""Set the high- and low-water limits for read flow control.
174
175
These two values control when to call the upstream transport's
176
pause_reading() and resume_reading() methods. If specified,
177
the low-water limit must be less than or equal to the
178
high-water limit. Neither value can be negative.
179
180
The defaults are implementation-specific. If only the
181
high-water limit is given, the low-water limit defaults to an
182
implementation-specific value less than or equal to the
183
high-water limit. Setting high to zero forces low to zero as
184
well, and causes pause_reading() to be called whenever the
185
buffer becomes non-empty. Setting low to zero causes
186
resume_reading() to be called only once the buffer is empty.
187
Use of zero for either limit is generally sub-optimal as it
188
reduces opportunities for doing I/O and computation
189
concurrently.
190
"""
191
self._ssl_protocol._set_read_buffer_limits(high, low)
192
self._ssl_protocol._control_ssl_reading()
193
194
def get_read_buffer_limits(self):
195
return (self._ssl_protocol._incoming_low_water,
196
self._ssl_protocol._incoming_high_water)
197
198
def get_read_buffer_size(self):
199
"""Return the current size of the read buffer."""
200
return self._ssl_protocol._get_read_buffer_size()
201
202
@property
203
def _protocol_paused(self):
204
# Required for sendfile fallback pause_writing/resume_writing logic
205
return self._ssl_protocol._app_writing_paused
206
207
def write(self, data):
208
"""Write some data bytes to the transport.
209
210
This does not block; it buffers the data and arranges for it
211
to be sent out asynchronously.
212
"""
213
if not isinstance(data, (bytes, bytearray, memoryview)):
214
raise TypeError(f"data: expecting a bytes-like instance, "
215
f"got {type(data).__name__}")
216
if not data:
217
return
218
self._ssl_protocol._write_appdata((data,))
219
220
def writelines(self, list_of_data):
221
"""Write a list (or any iterable) of data bytes to the transport.
222
223
The default implementation concatenates the arguments and
224
calls write() on the result.
225
"""
226
self._ssl_protocol._write_appdata(list_of_data)
227
228
def write_eof(self):
229
"""Close the write end after flushing buffered data.
230
231
This raises :exc:`NotImplementedError` right now.
232
"""
233
raise NotImplementedError
234
235
def can_write_eof(self):
236
"""Return True if this transport supports write_eof(), False if not."""
237
return False
238
239
def abort(self):
240
"""Close the transport immediately.
241
242
Buffered data will be lost. No more data will be received.
243
The protocol's connection_lost() method will (eventually) be
244
called with None as its argument.
245
"""
246
self._closed = True
247
if self._ssl_protocol is not None:
248
self._ssl_protocol._abort()
249
250
def _force_close(self, exc):
251
self._closed = True
252
self._ssl_protocol._abort(exc)
253
254
def _test__append_write_backlog(self, data):
255
# for test only
256
self._ssl_protocol._write_backlog.append(data)
257
self._ssl_protocol._write_buffer_size += len(data)
258
259
260
class SSLProtocol(protocols.BufferedProtocol):
261
max_size = 256 * 1024 # Buffer size passed to read()
262
263
_handshake_start_time = None
264
_handshake_timeout_handle = None
265
_shutdown_timeout_handle = None
266
267
def __init__(self, loop, app_protocol, sslcontext, waiter,
268
server_side=False, server_hostname=None,
269
call_connection_made=True,
270
ssl_handshake_timeout=None,
271
ssl_shutdown_timeout=None):
272
if ssl is None:
273
raise RuntimeError("stdlib ssl module not available")
274
275
self._ssl_buffer = bytearray(self.max_size)
276
self._ssl_buffer_view = memoryview(self._ssl_buffer)
277
278
if ssl_handshake_timeout is None:
279
ssl_handshake_timeout = constants.SSL_HANDSHAKE_TIMEOUT
280
elif ssl_handshake_timeout <= 0:
281
raise ValueError(
282
f"ssl_handshake_timeout should be a positive number, "
283
f"got {ssl_handshake_timeout}")
284
if ssl_shutdown_timeout is None:
285
ssl_shutdown_timeout = constants.SSL_SHUTDOWN_TIMEOUT
286
elif ssl_shutdown_timeout <= 0:
287
raise ValueError(
288
f"ssl_shutdown_timeout should be a positive number, "
289
f"got {ssl_shutdown_timeout}")
290
291
if not sslcontext:
292
sslcontext = _create_transport_context(
293
server_side, server_hostname)
294
295
self._server_side = server_side
296
if server_hostname and not server_side:
297
self._server_hostname = server_hostname
298
else:
299
self._server_hostname = None
300
self._sslcontext = sslcontext
301
# SSL-specific extra info. More info are set when the handshake
302
# completes.
303
self._extra = dict(sslcontext=sslcontext)
304
305
# App data write buffering
306
self._write_backlog = collections.deque()
307
self._write_buffer_size = 0
308
309
self._waiter = waiter
310
self._loop = loop
311
self._set_app_protocol(app_protocol)
312
self._app_transport = None
313
self._app_transport_created = False
314
# transport, ex: SelectorSocketTransport
315
self._transport = None
316
self._ssl_handshake_timeout = ssl_handshake_timeout
317
self._ssl_shutdown_timeout = ssl_shutdown_timeout
318
# SSL and state machine
319
self._incoming = ssl.MemoryBIO()
320
self._outgoing = ssl.MemoryBIO()
321
self._state = SSLProtocolState.UNWRAPPED
322
self._conn_lost = 0 # Set when connection_lost called
323
if call_connection_made:
324
self._app_state = AppProtocolState.STATE_INIT
325
else:
326
self._app_state = AppProtocolState.STATE_CON_MADE
327
self._sslobj = self._sslcontext.wrap_bio(
328
self._incoming, self._outgoing,
329
server_side=self._server_side,
330
server_hostname=self._server_hostname)
331
332
# Flow Control
333
334
self._ssl_writing_paused = False
335
336
self._app_reading_paused = False
337
338
self._ssl_reading_paused = False
339
self._incoming_high_water = 0
340
self._incoming_low_water = 0
341
self._set_read_buffer_limits()
342
self._eof_received = False
343
344
self._app_writing_paused = False
345
self._outgoing_high_water = 0
346
self._outgoing_low_water = 0
347
self._set_write_buffer_limits()
348
self._get_app_transport()
349
350
def _set_app_protocol(self, app_protocol):
351
self._app_protocol = app_protocol
352
# Make fast hasattr check first
353
if (hasattr(app_protocol, 'get_buffer') and
354
isinstance(app_protocol, protocols.BufferedProtocol)):
355
self._app_protocol_get_buffer = app_protocol.get_buffer
356
self._app_protocol_buffer_updated = app_protocol.buffer_updated
357
self._app_protocol_is_buffer = True
358
else:
359
self._app_protocol_is_buffer = False
360
361
def _wakeup_waiter(self, exc=None):
362
if self._waiter is None:
363
return
364
if not self._waiter.cancelled():
365
if exc is not None:
366
self._waiter.set_exception(exc)
367
else:
368
self._waiter.set_result(None)
369
self._waiter = None
370
371
def _get_app_transport(self):
372
if self._app_transport is None:
373
if self._app_transport_created:
374
raise RuntimeError('Creating _SSLProtocolTransport twice')
375
self._app_transport = _SSLProtocolTransport(self._loop, self)
376
self._app_transport_created = True
377
return self._app_transport
378
379
def connection_made(self, transport):
380
"""Called when the low-level connection is made.
381
382
Start the SSL handshake.
383
"""
384
self._transport = transport
385
self._start_handshake()
386
387
def connection_lost(self, exc):
388
"""Called when the low-level connection is lost or closed.
389
390
The argument is an exception object or None (the latter
391
meaning a regular EOF is received or the connection was
392
aborted or closed).
393
"""
394
self._write_backlog.clear()
395
self._outgoing.read()
396
self._conn_lost += 1
397
398
# Just mark the app transport as closed so that its __dealloc__
399
# doesn't complain.
400
if self._app_transport is not None:
401
self._app_transport._closed = True
402
403
if self._state != SSLProtocolState.DO_HANDSHAKE:
404
if (
405
self._app_state == AppProtocolState.STATE_CON_MADE or
406
self._app_state == AppProtocolState.STATE_EOF
407
):
408
self._app_state = AppProtocolState.STATE_CON_LOST
409
self._loop.call_soon(self._app_protocol.connection_lost, exc)
410
self._set_state(SSLProtocolState.UNWRAPPED)
411
self._transport = None
412
self._app_transport = None
413
self._app_protocol = None
414
self._wakeup_waiter(exc)
415
416
if self._shutdown_timeout_handle:
417
self._shutdown_timeout_handle.cancel()
418
self._shutdown_timeout_handle = None
419
if self._handshake_timeout_handle:
420
self._handshake_timeout_handle.cancel()
421
self._handshake_timeout_handle = None
422
423
def get_buffer(self, n):
424
want = n
425
if want <= 0 or want > self.max_size:
426
want = self.max_size
427
if len(self._ssl_buffer) < want:
428
self._ssl_buffer = bytearray(want)
429
self._ssl_buffer_view = memoryview(self._ssl_buffer)
430
return self._ssl_buffer_view
431
432
def buffer_updated(self, nbytes):
433
self._incoming.write(self._ssl_buffer_view[:nbytes])
434
435
if self._state == SSLProtocolState.DO_HANDSHAKE:
436
self._do_handshake()
437
438
elif self._state == SSLProtocolState.WRAPPED:
439
self._do_read()
440
441
elif self._state == SSLProtocolState.FLUSHING:
442
self._do_flush()
443
444
elif self._state == SSLProtocolState.SHUTDOWN:
445
self._do_shutdown()
446
447
def eof_received(self):
448
"""Called when the other end of the low-level stream
449
is half-closed.
450
451
If this returns a false value (including None), the transport
452
will close itself. If it returns a true value, closing the
453
transport is up to the protocol.
454
"""
455
self._eof_received = True
456
try:
457
if self._loop.get_debug():
458
logger.debug("%r received EOF", self)
459
460
if self._state == SSLProtocolState.DO_HANDSHAKE:
461
self._on_handshake_complete(ConnectionResetError)
462
463
elif self._state == SSLProtocolState.WRAPPED:
464
self._set_state(SSLProtocolState.FLUSHING)
465
if self._app_reading_paused:
466
return True
467
else:
468
self._do_flush()
469
470
elif self._state == SSLProtocolState.FLUSHING:
471
self._do_write()
472
self._set_state(SSLProtocolState.SHUTDOWN)
473
self._do_shutdown()
474
475
elif self._state == SSLProtocolState.SHUTDOWN:
476
self._do_shutdown()
477
478
except Exception:
479
self._transport.close()
480
raise
481
482
def _get_extra_info(self, name, default=None):
483
if name in self._extra:
484
return self._extra[name]
485
elif self._transport is not None:
486
return self._transport.get_extra_info(name, default)
487
else:
488
return default
489
490
def _set_state(self, new_state):
491
allowed = False
492
493
if new_state == SSLProtocolState.UNWRAPPED:
494
allowed = True
495
496
elif (
497
self._state == SSLProtocolState.UNWRAPPED and
498
new_state == SSLProtocolState.DO_HANDSHAKE
499
):
500
allowed = True
501
502
elif (
503
self._state == SSLProtocolState.DO_HANDSHAKE and
504
new_state == SSLProtocolState.WRAPPED
505
):
506
allowed = True
507
508
elif (
509
self._state == SSLProtocolState.WRAPPED and
510
new_state == SSLProtocolState.FLUSHING
511
):
512
allowed = True
513
514
elif (
515
self._state == SSLProtocolState.FLUSHING and
516
new_state == SSLProtocolState.SHUTDOWN
517
):
518
allowed = True
519
520
if allowed:
521
self._state = new_state
522
523
else:
524
raise RuntimeError(
525
'cannot switch state from {} to {}'.format(
526
self._state, new_state))
527
528
# Handshake flow
529
530
def _start_handshake(self):
531
if self._loop.get_debug():
532
logger.debug("%r starts SSL handshake", self)
533
self._handshake_start_time = self._loop.time()
534
else:
535
self._handshake_start_time = None
536
537
self._set_state(SSLProtocolState.DO_HANDSHAKE)
538
539
# start handshake timeout count down
540
self._handshake_timeout_handle = \
541
self._loop.call_later(self._ssl_handshake_timeout,
542
lambda: self._check_handshake_timeout())
543
544
self._do_handshake()
545
546
def _check_handshake_timeout(self):
547
if self._state == SSLProtocolState.DO_HANDSHAKE:
548
msg = (
549
f"SSL handshake is taking longer than "
550
f"{self._ssl_handshake_timeout} seconds: "
551
f"aborting the connection"
552
)
553
self._fatal_error(ConnectionAbortedError(msg))
554
555
def _do_handshake(self):
556
try:
557
self._sslobj.do_handshake()
558
except SSLAgainErrors:
559
self._process_outgoing()
560
except ssl.SSLError as exc:
561
self._on_handshake_complete(exc)
562
else:
563
self._on_handshake_complete(None)
564
565
def _on_handshake_complete(self, handshake_exc):
566
if self._handshake_timeout_handle is not None:
567
self._handshake_timeout_handle.cancel()
568
self._handshake_timeout_handle = None
569
570
sslobj = self._sslobj
571
try:
572
if handshake_exc is None:
573
self._set_state(SSLProtocolState.WRAPPED)
574
else:
575
raise handshake_exc
576
577
peercert = sslobj.getpeercert()
578
except Exception as exc:
579
self._set_state(SSLProtocolState.UNWRAPPED)
580
if isinstance(exc, ssl.CertificateError):
581
msg = 'SSL handshake failed on verifying the certificate'
582
else:
583
msg = 'SSL handshake failed'
584
self._fatal_error(exc, msg)
585
self._wakeup_waiter(exc)
586
return
587
588
if self._loop.get_debug():
589
dt = self._loop.time() - self._handshake_start_time
590
logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
591
592
# Add extra info that becomes available after handshake.
593
self._extra.update(peercert=peercert,
594
cipher=sslobj.cipher(),
595
compression=sslobj.compression(),
596
ssl_object=sslobj)
597
if self._app_state == AppProtocolState.STATE_INIT:
598
self._app_state = AppProtocolState.STATE_CON_MADE
599
self._app_protocol.connection_made(self._get_app_transport())
600
self._wakeup_waiter()
601
self._do_read()
602
603
# Shutdown flow
604
605
def _start_shutdown(self):
606
if (
607
self._state in (
608
SSLProtocolState.FLUSHING,
609
SSLProtocolState.SHUTDOWN,
610
SSLProtocolState.UNWRAPPED
611
)
612
):
613
return
614
if self._app_transport is not None:
615
self._app_transport._closed = True
616
if self._state == SSLProtocolState.DO_HANDSHAKE:
617
self._abort()
618
else:
619
self._set_state(SSLProtocolState.FLUSHING)
620
self._shutdown_timeout_handle = self._loop.call_later(
621
self._ssl_shutdown_timeout,
622
lambda: self._check_shutdown_timeout()
623
)
624
self._do_flush()
625
626
def _check_shutdown_timeout(self):
627
if (
628
self._state in (
629
SSLProtocolState.FLUSHING,
630
SSLProtocolState.SHUTDOWN
631
)
632
):
633
self._transport._force_close(
634
exceptions.TimeoutError('SSL shutdown timed out'))
635
636
def _do_flush(self):
637
self._do_read()
638
self._set_state(SSLProtocolState.SHUTDOWN)
639
self._do_shutdown()
640
641
def _do_shutdown(self):
642
try:
643
if not self._eof_received:
644
self._sslobj.unwrap()
645
except SSLAgainErrors:
646
self._process_outgoing()
647
except ssl.SSLError as exc:
648
self._on_shutdown_complete(exc)
649
else:
650
self._process_outgoing()
651
self._call_eof_received()
652
self._on_shutdown_complete(None)
653
654
def _on_shutdown_complete(self, shutdown_exc):
655
if self._shutdown_timeout_handle is not None:
656
self._shutdown_timeout_handle.cancel()
657
self._shutdown_timeout_handle = None
658
659
if shutdown_exc:
660
self._fatal_error(shutdown_exc)
661
else:
662
self._loop.call_soon(self._transport.close)
663
664
def _abort(self):
665
self._set_state(SSLProtocolState.UNWRAPPED)
666
if self._transport is not None:
667
self._transport.abort()
668
669
# Outgoing flow
670
671
def _write_appdata(self, list_of_data):
672
if (
673
self._state in (
674
SSLProtocolState.FLUSHING,
675
SSLProtocolState.SHUTDOWN,
676
SSLProtocolState.UNWRAPPED
677
)
678
):
679
if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
680
logger.warning('SSL connection is closed')
681
self._conn_lost += 1
682
return
683
684
for data in list_of_data:
685
self._write_backlog.append(data)
686
self._write_buffer_size += len(data)
687
688
try:
689
if self._state == SSLProtocolState.WRAPPED:
690
self._do_write()
691
692
except Exception as ex:
693
self._fatal_error(ex, 'Fatal error on SSL protocol')
694
695
def _do_write(self):
696
try:
697
while self._write_backlog:
698
data = self._write_backlog[0]
699
count = self._sslobj.write(data)
700
data_len = len(data)
701
if count < data_len:
702
self._write_backlog[0] = data[count:]
703
self._write_buffer_size -= count
704
else:
705
del self._write_backlog[0]
706
self._write_buffer_size -= data_len
707
except SSLAgainErrors:
708
pass
709
self._process_outgoing()
710
711
def _process_outgoing(self):
712
if not self._ssl_writing_paused:
713
data = self._outgoing.read()
714
if len(data):
715
self._transport.write(data)
716
self._control_app_writing()
717
718
# Incoming flow
719
720
def _do_read(self):
721
if (
722
self._state not in (
723
SSLProtocolState.WRAPPED,
724
SSLProtocolState.FLUSHING,
725
)
726
):
727
return
728
try:
729
if not self._app_reading_paused:
730
if self._app_protocol_is_buffer:
731
self._do_read__buffered()
732
else:
733
self._do_read__copied()
734
if self._write_backlog:
735
self._do_write()
736
else:
737
self._process_outgoing()
738
self._control_ssl_reading()
739
except Exception as ex:
740
self._fatal_error(ex, 'Fatal error on SSL protocol')
741
742
def _do_read__buffered(self):
743
offset = 0
744
count = 1
745
746
buf = self._app_protocol_get_buffer(self._get_read_buffer_size())
747
wants = len(buf)
748
749
try:
750
count = self._sslobj.read(wants, buf)
751
752
if count > 0:
753
offset = count
754
while offset < wants:
755
count = self._sslobj.read(wants - offset, buf[offset:])
756
if count > 0:
757
offset += count
758
else:
759
break
760
else:
761
self._loop.call_soon(lambda: self._do_read())
762
except SSLAgainErrors:
763
pass
764
if offset > 0:
765
self._app_protocol_buffer_updated(offset)
766
if not count:
767
# close_notify
768
self._call_eof_received()
769
self._start_shutdown()
770
771
def _do_read__copied(self):
772
chunk = b'1'
773
zero = True
774
one = False
775
776
try:
777
while True:
778
chunk = self._sslobj.read(self.max_size)
779
if not chunk:
780
break
781
if zero:
782
zero = False
783
one = True
784
first = chunk
785
elif one:
786
one = False
787
data = [first, chunk]
788
else:
789
data.append(chunk)
790
except SSLAgainErrors:
791
pass
792
if one:
793
self._app_protocol.data_received(first)
794
elif not zero:
795
self._app_protocol.data_received(b''.join(data))
796
if not chunk:
797
# close_notify
798
self._call_eof_received()
799
self._start_shutdown()
800
801
def _call_eof_received(self):
802
try:
803
if self._app_state == AppProtocolState.STATE_CON_MADE:
804
self._app_state = AppProtocolState.STATE_EOF
805
keep_open = self._app_protocol.eof_received()
806
if keep_open:
807
logger.warning('returning true from eof_received() '
808
'has no effect when using ssl')
809
except (KeyboardInterrupt, SystemExit):
810
raise
811
except BaseException as ex:
812
self._fatal_error(ex, 'Error calling eof_received()')
813
814
# Flow control for writes from APP socket
815
816
def _control_app_writing(self):
817
size = self._get_write_buffer_size()
818
if size >= self._outgoing_high_water and not self._app_writing_paused:
819
self._app_writing_paused = True
820
try:
821
self._app_protocol.pause_writing()
822
except (KeyboardInterrupt, SystemExit):
823
raise
824
except BaseException as exc:
825
self._loop.call_exception_handler({
826
'message': 'protocol.pause_writing() failed',
827
'exception': exc,
828
'transport': self._app_transport,
829
'protocol': self,
830
})
831
elif size <= self._outgoing_low_water and self._app_writing_paused:
832
self._app_writing_paused = False
833
try:
834
self._app_protocol.resume_writing()
835
except (KeyboardInterrupt, SystemExit):
836
raise
837
except BaseException as exc:
838
self._loop.call_exception_handler({
839
'message': 'protocol.resume_writing() failed',
840
'exception': exc,
841
'transport': self._app_transport,
842
'protocol': self,
843
})
844
845
def _get_write_buffer_size(self):
846
return self._outgoing.pending + self._write_buffer_size
847
848
def _set_write_buffer_limits(self, high=None, low=None):
849
high, low = add_flowcontrol_defaults(
850
high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_WRITE)
851
self._outgoing_high_water = high
852
self._outgoing_low_water = low
853
854
# Flow control for reads to APP socket
855
856
def _pause_reading(self):
857
self._app_reading_paused = True
858
859
def _resume_reading(self):
860
if self._app_reading_paused:
861
self._app_reading_paused = False
862
863
def resume():
864
if self._state == SSLProtocolState.WRAPPED:
865
self._do_read()
866
elif self._state == SSLProtocolState.FLUSHING:
867
self._do_flush()
868
elif self._state == SSLProtocolState.SHUTDOWN:
869
self._do_shutdown()
870
self._loop.call_soon(resume)
871
872
# Flow control for reads from SSL socket
873
874
def _control_ssl_reading(self):
875
size = self._get_read_buffer_size()
876
if size >= self._incoming_high_water and not self._ssl_reading_paused:
877
self._ssl_reading_paused = True
878
self._transport.pause_reading()
879
elif size <= self._incoming_low_water and self._ssl_reading_paused:
880
self._ssl_reading_paused = False
881
self._transport.resume_reading()
882
883
def _set_read_buffer_limits(self, high=None, low=None):
884
high, low = add_flowcontrol_defaults(
885
high, low, constants.FLOW_CONTROL_HIGH_WATER_SSL_READ)
886
self._incoming_high_water = high
887
self._incoming_low_water = low
888
889
def _get_read_buffer_size(self):
890
return self._incoming.pending
891
892
# Flow control for writes to SSL socket
893
894
def pause_writing(self):
895
"""Called when the low-level transport's buffer goes over
896
the high-water mark.
897
"""
898
assert not self._ssl_writing_paused
899
self._ssl_writing_paused = True
900
901
def resume_writing(self):
902
"""Called when the low-level transport's buffer drains below
903
the low-water mark.
904
"""
905
assert self._ssl_writing_paused
906
self._ssl_writing_paused = False
907
self._process_outgoing()
908
909
def _fatal_error(self, exc, message='Fatal error on transport'):
910
if self._transport:
911
self._transport._force_close(exc)
912
913
if isinstance(exc, OSError):
914
if self._loop.get_debug():
915
logger.debug("%r: %s", self, message, exc_info=True)
916
elif not isinstance(exc, exceptions.CancelledError):
917
self._loop.call_exception_handler({
918
'message': message,
919
'exception': exc,
920
'transport': self._transport,
921
'protocol': self,
922
})
923
924