Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
wiseplat
GitHub Repository: wiseplat/python-code
Path: blob/master/ invest-robot-contest_TinkoffBotTwitch-main/venv/lib/python3.8/site-packages/aiohttp/streams.py
7763 views
1
import asyncio
2
import collections
3
import warnings
4
from typing import Awaitable, Callable, Deque, Generic, List, Optional, Tuple, TypeVar
5
6
from .base_protocol import BaseProtocol
7
from .helpers import BaseTimerContext, set_exception, set_result
8
from .log import internal_logger
9
from .typedefs import Final
10
11
__all__ = (
12
"EMPTY_PAYLOAD",
13
"EofStream",
14
"StreamReader",
15
"DataQueue",
16
"FlowControlDataQueue",
17
)
18
19
_T = TypeVar("_T")
20
21
22
class EofStream(Exception):
23
"""eof stream indication."""
24
25
26
class AsyncStreamIterator(Generic[_T]):
27
def __init__(self, read_func: Callable[[], Awaitable[_T]]) -> None:
28
self.read_func = read_func
29
30
def __aiter__(self) -> "AsyncStreamIterator[_T]":
31
return self
32
33
async def __anext__(self) -> _T:
34
try:
35
rv = await self.read_func()
36
except EofStream:
37
raise StopAsyncIteration
38
if rv == b"":
39
raise StopAsyncIteration
40
return rv
41
42
43
class ChunkTupleAsyncStreamIterator:
44
def __init__(self, stream: "StreamReader") -> None:
45
self._stream = stream
46
47
def __aiter__(self) -> "ChunkTupleAsyncStreamIterator":
48
return self
49
50
async def __anext__(self) -> Tuple[bytes, bool]:
51
rv = await self._stream.readchunk()
52
if rv == (b"", False):
53
raise StopAsyncIteration
54
return rv
55
56
57
class AsyncStreamReaderMixin:
58
def __aiter__(self) -> AsyncStreamIterator[bytes]:
59
return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]
60
61
def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
62
"""Returns an asynchronous iterator that yields chunks of size n.
63
64
Python-3.5 available for Python 3.5+ only
65
"""
66
return AsyncStreamIterator(
67
lambda: self.read(n) # type: ignore[attr-defined,no-any-return]
68
)
69
70
def iter_any(self) -> AsyncStreamIterator[bytes]:
71
"""Yield all available data as soon as it is received.
72
73
Python-3.5 available for Python 3.5+ only
74
"""
75
return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]
76
77
def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
78
"""Yield chunks of data as they are received by the server.
79
80
The yielded objects are tuples
81
of (bytes, bool) as returned by the StreamReader.readchunk method.
82
83
Python-3.5 available for Python 3.5+ only
84
"""
85
return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]
86
87
88
class StreamReader(AsyncStreamReaderMixin):
89
"""An enhancement of asyncio.StreamReader.
90
91
Supports asynchronous iteration by line, chunk or as available::
92
93
async for line in reader:
94
...
95
async for chunk in reader.iter_chunked(1024):
96
...
97
async for slice in reader.iter_any():
98
...
99
100
"""
101
102
total_bytes = 0
103
104
def __init__(
105
self,
106
protocol: BaseProtocol,
107
limit: int,
108
*,
109
timer: Optional[BaseTimerContext] = None,
110
loop: Optional[asyncio.AbstractEventLoop] = None,
111
) -> None:
112
self._protocol = protocol
113
self._low_water = limit
114
self._high_water = limit * 2
115
if loop is None:
116
loop = asyncio.get_event_loop()
117
self._loop = loop
118
self._size = 0
119
self._cursor = 0
120
self._http_chunk_splits = None # type: Optional[List[int]]
121
self._buffer = collections.deque() # type: Deque[bytes]
122
self._buffer_offset = 0
123
self._eof = False
124
self._waiter = None # type: Optional[asyncio.Future[None]]
125
self._eof_waiter = None # type: Optional[asyncio.Future[None]]
126
self._exception = None # type: Optional[BaseException]
127
self._timer = timer
128
self._eof_callbacks = [] # type: List[Callable[[], None]]
129
130
def __repr__(self) -> str:
131
info = [self.__class__.__name__]
132
if self._size:
133
info.append("%d bytes" % self._size)
134
if self._eof:
135
info.append("eof")
136
if self._low_water != 2 ** 16: # default limit
137
info.append("low=%d high=%d" % (self._low_water, self._high_water))
138
if self._waiter:
139
info.append("w=%r" % self._waiter)
140
if self._exception:
141
info.append("e=%r" % self._exception)
142
return "<%s>" % " ".join(info)
143
144
def get_read_buffer_limits(self) -> Tuple[int, int]:
145
return (self._low_water, self._high_water)
146
147
def exception(self) -> Optional[BaseException]:
148
return self._exception
149
150
def set_exception(self, exc: BaseException) -> None:
151
self._exception = exc
152
self._eof_callbacks.clear()
153
154
waiter = self._waiter
155
if waiter is not None:
156
self._waiter = None
157
set_exception(waiter, exc)
158
159
waiter = self._eof_waiter
160
if waiter is not None:
161
self._eof_waiter = None
162
set_exception(waiter, exc)
163
164
def on_eof(self, callback: Callable[[], None]) -> None:
165
if self._eof:
166
try:
167
callback()
168
except Exception:
169
internal_logger.exception("Exception in eof callback")
170
else:
171
self._eof_callbacks.append(callback)
172
173
def feed_eof(self) -> None:
174
self._eof = True
175
176
waiter = self._waiter
177
if waiter is not None:
178
self._waiter = None
179
set_result(waiter, None)
180
181
waiter = self._eof_waiter
182
if waiter is not None:
183
self._eof_waiter = None
184
set_result(waiter, None)
185
186
for cb in self._eof_callbacks:
187
try:
188
cb()
189
except Exception:
190
internal_logger.exception("Exception in eof callback")
191
192
self._eof_callbacks.clear()
193
194
def is_eof(self) -> bool:
195
"""Return True if 'feed_eof' was called."""
196
return self._eof
197
198
def at_eof(self) -> bool:
199
"""Return True if the buffer is empty and 'feed_eof' was called."""
200
return self._eof and not self._buffer
201
202
async def wait_eof(self) -> None:
203
if self._eof:
204
return
205
206
assert self._eof_waiter is None
207
self._eof_waiter = self._loop.create_future()
208
try:
209
await self._eof_waiter
210
finally:
211
self._eof_waiter = None
212
213
def unread_data(self, data: bytes) -> None:
214
"""rollback reading some data from stream, inserting it to buffer head."""
215
warnings.warn(
216
"unread_data() is deprecated "
217
"and will be removed in future releases (#3260)",
218
DeprecationWarning,
219
stacklevel=2,
220
)
221
if not data:
222
return
223
224
if self._buffer_offset:
225
self._buffer[0] = self._buffer[0][self._buffer_offset :]
226
self._buffer_offset = 0
227
self._size += len(data)
228
self._cursor -= len(data)
229
self._buffer.appendleft(data)
230
self._eof_counter = 0
231
232
# TODO: size is ignored, remove the param later
233
def feed_data(self, data: bytes, size: int = 0) -> None:
234
assert not self._eof, "feed_data after feed_eof"
235
236
if not data:
237
return
238
239
self._size += len(data)
240
self._buffer.append(data)
241
self.total_bytes += len(data)
242
243
waiter = self._waiter
244
if waiter is not None:
245
self._waiter = None
246
set_result(waiter, None)
247
248
if self._size > self._high_water and not self._protocol._reading_paused:
249
self._protocol.pause_reading()
250
251
def begin_http_chunk_receiving(self) -> None:
252
if self._http_chunk_splits is None:
253
if self.total_bytes:
254
raise RuntimeError(
255
"Called begin_http_chunk_receiving when" "some data was already fed"
256
)
257
self._http_chunk_splits = []
258
259
def end_http_chunk_receiving(self) -> None:
260
if self._http_chunk_splits is None:
261
raise RuntimeError(
262
"Called end_chunk_receiving without calling "
263
"begin_chunk_receiving first"
264
)
265
266
# self._http_chunk_splits contains logical byte offsets from start of
267
# the body transfer. Each offset is the offset of the end of a chunk.
268
# "Logical" means bytes, accessible for a user.
269
# If no chunks containig logical data were received, current position
270
# is difinitely zero.
271
pos = self._http_chunk_splits[-1] if self._http_chunk_splits else 0
272
273
if self.total_bytes == pos:
274
# We should not add empty chunks here. So we check for that.
275
# Note, when chunked + gzip is used, we can receive a chunk
276
# of compressed data, but that data may not be enough for gzip FSM
277
# to yield any uncompressed data. That's why current position may
278
# not change after receiving a chunk.
279
return
280
281
self._http_chunk_splits.append(self.total_bytes)
282
283
# wake up readchunk when end of http chunk received
284
waiter = self._waiter
285
if waiter is not None:
286
self._waiter = None
287
set_result(waiter, None)
288
289
async def _wait(self, func_name: str) -> None:
290
# StreamReader uses a future to link the protocol feed_data() method
291
# to a read coroutine. Running two read coroutines at the same time
292
# would have an unexpected behaviour. It would not possible to know
293
# which coroutine would get the next data.
294
if self._waiter is not None:
295
raise RuntimeError(
296
"%s() called while another coroutine is "
297
"already waiting for incoming data" % func_name
298
)
299
300
waiter = self._waiter = self._loop.create_future()
301
try:
302
if self._timer:
303
with self._timer:
304
await waiter
305
else:
306
await waiter
307
finally:
308
self._waiter = None
309
310
async def readline(self) -> bytes:
311
return await self.readuntil()
312
313
async def readuntil(self, separator: bytes = b"\n") -> bytes:
314
seplen = len(separator)
315
if seplen == 0:
316
raise ValueError("Separator should be at least one-byte string")
317
318
if self._exception is not None:
319
raise self._exception
320
321
chunk = b""
322
chunk_size = 0
323
not_enough = True
324
325
while not_enough:
326
while self._buffer and not_enough:
327
offset = self._buffer_offset
328
ichar = self._buffer[0].find(separator, offset) + 1
329
# Read from current offset to found separator or to the end.
330
data = self._read_nowait_chunk(ichar - offset if ichar else -1)
331
chunk += data
332
chunk_size += len(data)
333
if ichar:
334
not_enough = False
335
336
if chunk_size > self._high_water:
337
raise ValueError("Chunk too big")
338
339
if self._eof:
340
break
341
342
if not_enough:
343
await self._wait("readuntil")
344
345
return chunk
346
347
async def read(self, n: int = -1) -> bytes:
348
if self._exception is not None:
349
raise self._exception
350
351
# migration problem; with DataQueue you have to catch
352
# EofStream exception, so common way is to run payload.read() inside
353
# infinite loop. what can cause real infinite loop with StreamReader
354
# lets keep this code one major release.
355
if __debug__:
356
if self._eof and not self._buffer:
357
self._eof_counter = getattr(self, "_eof_counter", 0) + 1
358
if self._eof_counter > 5:
359
internal_logger.warning(
360
"Multiple access to StreamReader in eof state, "
361
"might be infinite loop.",
362
stack_info=True,
363
)
364
365
if not n:
366
return b""
367
368
if n < 0:
369
# This used to just loop creating a new waiter hoping to
370
# collect everything in self._buffer, but that would
371
# deadlock if the subprocess sends more than self.limit
372
# bytes. So just call self.readany() until EOF.
373
blocks = []
374
while True:
375
block = await self.readany()
376
if not block:
377
break
378
blocks.append(block)
379
return b"".join(blocks)
380
381
# TODO: should be `if` instead of `while`
382
# because waiter maybe triggered on chunk end,
383
# without feeding any data
384
while not self._buffer and not self._eof:
385
await self._wait("read")
386
387
return self._read_nowait(n)
388
389
async def readany(self) -> bytes:
390
if self._exception is not None:
391
raise self._exception
392
393
# TODO: should be `if` instead of `while`
394
# because waiter maybe triggered on chunk end,
395
# without feeding any data
396
while not self._buffer and not self._eof:
397
await self._wait("readany")
398
399
return self._read_nowait(-1)
400
401
async def readchunk(self) -> Tuple[bytes, bool]:
402
"""Returns a tuple of (data, end_of_http_chunk).
403
404
When chunked transfer
405
encoding is used, end_of_http_chunk is a boolean indicating if the end
406
of the data corresponds to the end of a HTTP chunk , otherwise it is
407
always False.
408
"""
409
while True:
410
if self._exception is not None:
411
raise self._exception
412
413
while self._http_chunk_splits:
414
pos = self._http_chunk_splits.pop(0)
415
if pos == self._cursor:
416
return (b"", True)
417
if pos > self._cursor:
418
return (self._read_nowait(pos - self._cursor), True)
419
internal_logger.warning(
420
"Skipping HTTP chunk end due to data "
421
"consumption beyond chunk boundary"
422
)
423
424
if self._buffer:
425
return (self._read_nowait_chunk(-1), False)
426
# return (self._read_nowait(-1), False)
427
428
if self._eof:
429
# Special case for signifying EOF.
430
# (b'', True) is not a final return value actually.
431
return (b"", False)
432
433
await self._wait("readchunk")
434
435
async def readexactly(self, n: int) -> bytes:
436
if self._exception is not None:
437
raise self._exception
438
439
blocks = [] # type: List[bytes]
440
while n > 0:
441
block = await self.read(n)
442
if not block:
443
partial = b"".join(blocks)
444
raise asyncio.IncompleteReadError(partial, len(partial) + n)
445
blocks.append(block)
446
n -= len(block)
447
448
return b"".join(blocks)
449
450
def read_nowait(self, n: int = -1) -> bytes:
451
# default was changed to be consistent with .read(-1)
452
#
453
# I believe the most users don't know about the method and
454
# they are not affected.
455
if self._exception is not None:
456
raise self._exception
457
458
if self._waiter and not self._waiter.done():
459
raise RuntimeError(
460
"Called while some coroutine is waiting for incoming data."
461
)
462
463
return self._read_nowait(n)
464
465
def _read_nowait_chunk(self, n: int) -> bytes:
466
first_buffer = self._buffer[0]
467
offset = self._buffer_offset
468
if n != -1 and len(first_buffer) - offset > n:
469
data = first_buffer[offset : offset + n]
470
self._buffer_offset += n
471
472
elif offset:
473
self._buffer.popleft()
474
data = first_buffer[offset:]
475
self._buffer_offset = 0
476
477
else:
478
data = self._buffer.popleft()
479
480
self._size -= len(data)
481
self._cursor += len(data)
482
483
chunk_splits = self._http_chunk_splits
484
# Prevent memory leak: drop useless chunk splits
485
while chunk_splits and chunk_splits[0] < self._cursor:
486
chunk_splits.pop(0)
487
488
if self._size < self._low_water and self._protocol._reading_paused:
489
self._protocol.resume_reading()
490
return data
491
492
def _read_nowait(self, n: int) -> bytes:
493
"""Read not more than n bytes, or whole buffer if n == -1"""
494
chunks = []
495
496
while self._buffer:
497
chunk = self._read_nowait_chunk(n)
498
chunks.append(chunk)
499
if n != -1:
500
n -= len(chunk)
501
if n == 0:
502
break
503
504
return b"".join(chunks) if chunks else b""
505
506
507
class EmptyStreamReader(StreamReader): # lgtm [py/missing-call-to-init]
508
def __init__(self) -> None:
509
pass
510
511
def exception(self) -> Optional[BaseException]:
512
return None
513
514
def set_exception(self, exc: BaseException) -> None:
515
pass
516
517
def on_eof(self, callback: Callable[[], None]) -> None:
518
try:
519
callback()
520
except Exception:
521
internal_logger.exception("Exception in eof callback")
522
523
def feed_eof(self) -> None:
524
pass
525
526
def is_eof(self) -> bool:
527
return True
528
529
def at_eof(self) -> bool:
530
return True
531
532
async def wait_eof(self) -> None:
533
return
534
535
def feed_data(self, data: bytes, n: int = 0) -> None:
536
pass
537
538
async def readline(self) -> bytes:
539
return b""
540
541
async def read(self, n: int = -1) -> bytes:
542
return b""
543
544
# TODO add async def readuntil
545
546
async def readany(self) -> bytes:
547
return b""
548
549
async def readchunk(self) -> Tuple[bytes, bool]:
550
return (b"", True)
551
552
async def readexactly(self, n: int) -> bytes:
553
raise asyncio.IncompleteReadError(b"", n)
554
555
def read_nowait(self, n: int = -1) -> bytes:
556
return b""
557
558
559
EMPTY_PAYLOAD: Final[StreamReader] = EmptyStreamReader()
560
561
562
class DataQueue(Generic[_T]):
563
"""DataQueue is a general-purpose blocking queue with one reader."""
564
565
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
566
self._loop = loop
567
self._eof = False
568
self._waiter = None # type: Optional[asyncio.Future[None]]
569
self._exception = None # type: Optional[BaseException]
570
self._size = 0
571
self._buffer = collections.deque() # type: Deque[Tuple[_T, int]]
572
573
def __len__(self) -> int:
574
return len(self._buffer)
575
576
def is_eof(self) -> bool:
577
return self._eof
578
579
def at_eof(self) -> bool:
580
return self._eof and not self._buffer
581
582
def exception(self) -> Optional[BaseException]:
583
return self._exception
584
585
def set_exception(self, exc: BaseException) -> None:
586
self._eof = True
587
self._exception = exc
588
589
waiter = self._waiter
590
if waiter is not None:
591
self._waiter = None
592
set_exception(waiter, exc)
593
594
def feed_data(self, data: _T, size: int = 0) -> None:
595
self._size += size
596
self._buffer.append((data, size))
597
598
waiter = self._waiter
599
if waiter is not None:
600
self._waiter = None
601
set_result(waiter, None)
602
603
def feed_eof(self) -> None:
604
self._eof = True
605
606
waiter = self._waiter
607
if waiter is not None:
608
self._waiter = None
609
set_result(waiter, None)
610
611
async def read(self) -> _T:
612
if not self._buffer and not self._eof:
613
assert not self._waiter
614
self._waiter = self._loop.create_future()
615
try:
616
await self._waiter
617
except (asyncio.CancelledError, asyncio.TimeoutError):
618
self._waiter = None
619
raise
620
621
if self._buffer:
622
data, size = self._buffer.popleft()
623
self._size -= size
624
return data
625
else:
626
if self._exception is not None:
627
raise self._exception
628
else:
629
raise EofStream
630
631
def __aiter__(self) -> AsyncStreamIterator[_T]:
632
return AsyncStreamIterator(self.read)
633
634
635
class FlowControlDataQueue(DataQueue[_T]):
636
"""FlowControlDataQueue resumes and pauses an underlying stream.
637
638
It is a destination for parsed data.
639
"""
640
641
def __init__(
642
self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
643
) -> None:
644
super().__init__(loop=loop)
645
646
self._protocol = protocol
647
self._limit = limit * 2
648
649
def feed_data(self, data: _T, size: int = 0) -> None:
650
super().feed_data(data, size)
651
652
if self._size > self._limit and not self._protocol._reading_paused:
653
self._protocol.pause_reading()
654
655
async def read(self) -> _T:
656
try:
657
return await super().read()
658
finally:
659
if self._size < self._limit and self._protocol._reading_paused:
660
self._protocol.resume_reading()
661
662