Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/transports.py
12 views
1
"""Abstract Transport class."""
2
3
__all__ = (
4
'BaseTransport', 'ReadTransport', 'WriteTransport',
5
'Transport', 'DatagramTransport', 'SubprocessTransport',
6
)
7
8
9
class BaseTransport:
10
"""Base class for transports."""
11
12
__slots__ = ('_extra',)
13
14
def __init__(self, extra=None):
15
if extra is None:
16
extra = {}
17
self._extra = extra
18
19
def get_extra_info(self, name, default=None):
20
"""Get optional transport information."""
21
return self._extra.get(name, default)
22
23
def is_closing(self):
24
"""Return True if the transport is closing or closed."""
25
raise NotImplementedError
26
27
def close(self):
28
"""Close the transport.
29
30
Buffered data will be flushed asynchronously. No more data
31
will be received. After all buffered data is flushed, the
32
protocol's connection_lost() method will (eventually) be
33
called with None as its argument.
34
"""
35
raise NotImplementedError
36
37
def set_protocol(self, protocol):
38
"""Set a new protocol."""
39
raise NotImplementedError
40
41
def get_protocol(self):
42
"""Return the current protocol."""
43
raise NotImplementedError
44
45
46
class ReadTransport(BaseTransport):
47
"""Interface for read-only transports."""
48
49
__slots__ = ()
50
51
def is_reading(self):
52
"""Return True if the transport is receiving."""
53
raise NotImplementedError
54
55
def pause_reading(self):
56
"""Pause the receiving end.
57
58
No data will be passed to the protocol's data_received()
59
method until resume_reading() is called.
60
"""
61
raise NotImplementedError
62
63
def resume_reading(self):
64
"""Resume the receiving end.
65
66
Data received will once again be passed to the protocol's
67
data_received() method.
68
"""
69
raise NotImplementedError
70
71
72
class WriteTransport(BaseTransport):
73
"""Interface for write-only transports."""
74
75
__slots__ = ()
76
77
def set_write_buffer_limits(self, high=None, low=None):
78
"""Set the high- and low-water limits for write flow control.
79
80
These two values control when to call the protocol's
81
pause_writing() and resume_writing() methods. If specified,
82
the low-water limit must be less than or equal to the
83
high-water limit. Neither value can be negative.
84
85
The defaults are implementation-specific. If only the
86
high-water limit is given, the low-water limit defaults to an
87
implementation-specific value less than or equal to the
88
high-water limit. Setting high to zero forces low to zero as
89
well, and causes pause_writing() to be called whenever the
90
buffer becomes non-empty. Setting low to zero causes
91
resume_writing() to be called only once the buffer is empty.
92
Use of zero for either limit is generally sub-optimal as it
93
reduces opportunities for doing I/O and computation
94
concurrently.
95
"""
96
raise NotImplementedError
97
98
def get_write_buffer_size(self):
99
"""Return the current size of the write buffer."""
100
raise NotImplementedError
101
102
def get_write_buffer_limits(self):
103
"""Get the high and low watermarks for write flow control.
104
Return a tuple (low, high) where low and high are
105
positive number of bytes."""
106
raise NotImplementedError
107
108
def write(self, data):
109
"""Write some data bytes to the transport.
110
111
This does not block; it buffers the data and arranges for it
112
to be sent out asynchronously.
113
"""
114
raise NotImplementedError
115
116
def writelines(self, list_of_data):
117
"""Write a list (or any iterable) of data bytes to the transport.
118
119
The default implementation concatenates the arguments and
120
calls write() on the result.
121
"""
122
data = b''.join(list_of_data)
123
self.write(data)
124
125
def write_eof(self):
126
"""Close the write end after flushing buffered data.
127
128
(This is like typing ^D into a UNIX program reading from stdin.)
129
130
Data may still be received.
131
"""
132
raise NotImplementedError
133
134
def can_write_eof(self):
135
"""Return True if this transport supports write_eof(), False if not."""
136
raise NotImplementedError
137
138
def abort(self):
139
"""Close the transport immediately.
140
141
Buffered data will be lost. No more data will be received.
142
The protocol's connection_lost() method will (eventually) be
143
called with None as its argument.
144
"""
145
raise NotImplementedError
146
147
148
class Transport(ReadTransport, WriteTransport):
149
"""Interface representing a bidirectional transport.
150
151
There may be several implementations, but typically, the user does
152
not implement new transports; rather, the platform provides some
153
useful transports that are implemented using the platform's best
154
practices.
155
156
The user never instantiates a transport directly; they call a
157
utility function, passing it a protocol factory and other
158
information necessary to create the transport and protocol. (E.g.
159
EventLoop.create_connection() or EventLoop.create_server().)
160
161
The utility function will asynchronously create a transport and a
162
protocol and hook them up by calling the protocol's
163
connection_made() method, passing it the transport.
164
165
The implementation here raises NotImplemented for every method
166
except writelines(), which calls write() in a loop.
167
"""
168
169
__slots__ = ()
170
171
172
class DatagramTransport(BaseTransport):
173
"""Interface for datagram (UDP) transports."""
174
175
__slots__ = ()
176
177
def sendto(self, data, addr=None):
178
"""Send data to the transport.
179
180
This does not block; it buffers the data and arranges for it
181
to be sent out asynchronously.
182
addr is target socket address.
183
If addr is None use target address pointed on transport creation.
184
"""
185
raise NotImplementedError
186
187
def abort(self):
188
"""Close the transport immediately.
189
190
Buffered data will be lost. No more data will be received.
191
The protocol's connection_lost() method will (eventually) be
192
called with None as its argument.
193
"""
194
raise NotImplementedError
195
196
197
class SubprocessTransport(BaseTransport):
198
199
__slots__ = ()
200
201
def get_pid(self):
202
"""Get subprocess id."""
203
raise NotImplementedError
204
205
def get_returncode(self):
206
"""Get subprocess returncode.
207
208
See also
209
http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
210
"""
211
raise NotImplementedError
212
213
def get_pipe_transport(self, fd):
214
"""Get transport for pipe with number fd."""
215
raise NotImplementedError
216
217
def send_signal(self, signal):
218
"""Send signal to subprocess.
219
220
See also:
221
docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
222
"""
223
raise NotImplementedError
224
225
def terminate(self):
226
"""Stop the subprocess.
227
228
Alias for close() method.
229
230
On Posix OSs the method sends SIGTERM to the subprocess.
231
On Windows the Win32 API function TerminateProcess()
232
is called to stop the subprocess.
233
234
See also:
235
http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
236
"""
237
raise NotImplementedError
238
239
def kill(self):
240
"""Kill the subprocess.
241
242
On Posix OSs the function sends SIGKILL to the subprocess.
243
On Windows kill() is an alias for terminate().
244
245
See also:
246
http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
247
"""
248
raise NotImplementedError
249
250
251
class _FlowControlMixin(Transport):
252
"""All the logic for (write) flow control in a mix-in base class.
253
254
The subclass must implement get_write_buffer_size(). It must call
255
_maybe_pause_protocol() whenever the write buffer size increases,
256
and _maybe_resume_protocol() whenever it decreases. It may also
257
override set_write_buffer_limits() (e.g. to specify different
258
defaults).
259
260
The subclass constructor must call super().__init__(extra). This
261
will call set_write_buffer_limits().
262
263
The user may call set_write_buffer_limits() and
264
get_write_buffer_size(), and their protocol's pause_writing() and
265
resume_writing() may be called.
266
"""
267
268
__slots__ = ('_loop', '_protocol_paused', '_high_water', '_low_water')
269
270
def __init__(self, extra=None, loop=None):
271
super().__init__(extra)
272
assert loop is not None
273
self._loop = loop
274
self._protocol_paused = False
275
self._set_write_buffer_limits()
276
277
def _maybe_pause_protocol(self):
278
size = self.get_write_buffer_size()
279
if size <= self._high_water:
280
return
281
if not self._protocol_paused:
282
self._protocol_paused = True
283
try:
284
self._protocol.pause_writing()
285
except (SystemExit, KeyboardInterrupt):
286
raise
287
except BaseException as exc:
288
self._loop.call_exception_handler({
289
'message': 'protocol.pause_writing() failed',
290
'exception': exc,
291
'transport': self,
292
'protocol': self._protocol,
293
})
294
295
def _maybe_resume_protocol(self):
296
if (self._protocol_paused and
297
self.get_write_buffer_size() <= self._low_water):
298
self._protocol_paused = False
299
try:
300
self._protocol.resume_writing()
301
except (SystemExit, KeyboardInterrupt):
302
raise
303
except BaseException as exc:
304
self._loop.call_exception_handler({
305
'message': 'protocol.resume_writing() failed',
306
'exception': exc,
307
'transport': self,
308
'protocol': self._protocol,
309
})
310
311
def get_write_buffer_limits(self):
312
return (self._low_water, self._high_water)
313
314
def _set_write_buffer_limits(self, high=None, low=None):
315
if high is None:
316
if low is None:
317
high = 64 * 1024
318
else:
319
high = 4 * low
320
if low is None:
321
low = high // 4
322
323
if not high >= low >= 0:
324
raise ValueError(
325
f'high ({high!r}) must be >= low ({low!r}) must be >= 0')
326
327
self._high_water = high
328
self._low_water = low
329
330
def set_write_buffer_limits(self, high=None, low=None):
331
self._set_write_buffer_limits(high=high, low=low)
332
self._maybe_pause_protocol()
333
334
def get_write_buffer_size(self):
335
raise NotImplementedError
336
337