Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/windows_events.py
12 views
1
"""Selector and proactor event loops for Windows."""
2
3
import sys
4
5
if sys.platform != 'win32': # pragma: no cover
6
raise ImportError('win32 only')
7
8
import _overlapped
9
import _winapi
10
import errno
11
import math
12
import msvcrt
13
import socket
14
import struct
15
import time
16
import weakref
17
18
from . import events
19
from . import base_subprocess
20
from . import futures
21
from . import exceptions
22
from . import proactor_events
23
from . import selector_events
24
from . import tasks
25
from . import windows_utils
26
from .log import logger
27
28
29
__all__ = (
30
'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
31
'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
32
'WindowsProactorEventLoopPolicy',
33
)
34
35
36
NULL = _winapi.NULL
37
INFINITE = _winapi.INFINITE
38
ERROR_CONNECTION_REFUSED = 1225
39
ERROR_CONNECTION_ABORTED = 1236
40
41
# Initial delay in seconds for connect_pipe() before retrying to connect
42
CONNECT_PIPE_INIT_DELAY = 0.001
43
44
# Maximum delay in seconds for connect_pipe() before retrying to connect
45
CONNECT_PIPE_MAX_DELAY = 0.100
46
47
48
class _OverlappedFuture(futures.Future):
49
"""Subclass of Future which represents an overlapped operation.
50
51
Cancelling it will immediately cancel the overlapped operation.
52
"""
53
54
def __init__(self, ov, *, loop=None):
55
super().__init__(loop=loop)
56
if self._source_traceback:
57
del self._source_traceback[-1]
58
self._ov = ov
59
60
def _repr_info(self):
61
info = super()._repr_info()
62
if self._ov is not None:
63
state = 'pending' if self._ov.pending else 'completed'
64
info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
65
return info
66
67
def _cancel_overlapped(self):
68
if self._ov is None:
69
return
70
try:
71
self._ov.cancel()
72
except OSError as exc:
73
context = {
74
'message': 'Cancelling an overlapped future failed',
75
'exception': exc,
76
'future': self,
77
}
78
if self._source_traceback:
79
context['source_traceback'] = self._source_traceback
80
self._loop.call_exception_handler(context)
81
self._ov = None
82
83
def cancel(self, msg=None):
84
self._cancel_overlapped()
85
return super().cancel(msg=msg)
86
87
def set_exception(self, exception):
88
super().set_exception(exception)
89
self._cancel_overlapped()
90
91
def set_result(self, result):
92
super().set_result(result)
93
self._ov = None
94
95
96
class _BaseWaitHandleFuture(futures.Future):
97
"""Subclass of Future which represents a wait handle."""
98
99
def __init__(self, ov, handle, wait_handle, *, loop=None):
100
super().__init__(loop=loop)
101
if self._source_traceback:
102
del self._source_traceback[-1]
103
# Keep a reference to the Overlapped object to keep it alive until the
104
# wait is unregistered
105
self._ov = ov
106
self._handle = handle
107
self._wait_handle = wait_handle
108
109
# Should we call UnregisterWaitEx() if the wait completes
110
# or is cancelled?
111
self._registered = True
112
113
def _poll(self):
114
# non-blocking wait: use a timeout of 0 millisecond
115
return (_winapi.WaitForSingleObject(self._handle, 0) ==
116
_winapi.WAIT_OBJECT_0)
117
118
def _repr_info(self):
119
info = super()._repr_info()
120
info.append(f'handle={self._handle:#x}')
121
if self._handle is not None:
122
state = 'signaled' if self._poll() else 'waiting'
123
info.append(state)
124
if self._wait_handle is not None:
125
info.append(f'wait_handle={self._wait_handle:#x}')
126
return info
127
128
def _unregister_wait_cb(self, fut):
129
# The wait was unregistered: it's not safe to destroy the Overlapped
130
# object
131
self._ov = None
132
133
def _unregister_wait(self):
134
if not self._registered:
135
return
136
self._registered = False
137
138
wait_handle = self._wait_handle
139
self._wait_handle = None
140
try:
141
_overlapped.UnregisterWait(wait_handle)
142
except OSError as exc:
143
if exc.winerror != _overlapped.ERROR_IO_PENDING:
144
context = {
145
'message': 'Failed to unregister the wait handle',
146
'exception': exc,
147
'future': self,
148
}
149
if self._source_traceback:
150
context['source_traceback'] = self._source_traceback
151
self._loop.call_exception_handler(context)
152
return
153
# ERROR_IO_PENDING means that the unregister is pending
154
155
self._unregister_wait_cb(None)
156
157
def cancel(self, msg=None):
158
self._unregister_wait()
159
return super().cancel(msg=msg)
160
161
def set_exception(self, exception):
162
self._unregister_wait()
163
super().set_exception(exception)
164
165
def set_result(self, result):
166
self._unregister_wait()
167
super().set_result(result)
168
169
170
class _WaitCancelFuture(_BaseWaitHandleFuture):
171
"""Subclass of Future which represents a wait for the cancellation of a
172
_WaitHandleFuture using an event.
173
"""
174
175
def __init__(self, ov, event, wait_handle, *, loop=None):
176
super().__init__(ov, event, wait_handle, loop=loop)
177
178
self._done_callback = None
179
180
def cancel(self):
181
raise RuntimeError("_WaitCancelFuture must not be cancelled")
182
183
def set_result(self, result):
184
super().set_result(result)
185
if self._done_callback is not None:
186
self._done_callback(self)
187
188
def set_exception(self, exception):
189
super().set_exception(exception)
190
if self._done_callback is not None:
191
self._done_callback(self)
192
193
194
class _WaitHandleFuture(_BaseWaitHandleFuture):
195
def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
196
super().__init__(ov, handle, wait_handle, loop=loop)
197
self._proactor = proactor
198
self._unregister_proactor = True
199
self._event = _overlapped.CreateEvent(None, True, False, None)
200
self._event_fut = None
201
202
def _unregister_wait_cb(self, fut):
203
if self._event is not None:
204
_winapi.CloseHandle(self._event)
205
self._event = None
206
self._event_fut = None
207
208
# If the wait was cancelled, the wait may never be signalled, so
209
# it's required to unregister it. Otherwise, IocpProactor.close() will
210
# wait forever for an event which will never come.
211
#
212
# If the IocpProactor already received the event, it's safe to call
213
# _unregister() because we kept a reference to the Overlapped object
214
# which is used as a unique key.
215
self._proactor._unregister(self._ov)
216
self._proactor = None
217
218
super()._unregister_wait_cb(fut)
219
220
def _unregister_wait(self):
221
if not self._registered:
222
return
223
self._registered = False
224
225
wait_handle = self._wait_handle
226
self._wait_handle = None
227
try:
228
_overlapped.UnregisterWaitEx(wait_handle, self._event)
229
except OSError as exc:
230
if exc.winerror != _overlapped.ERROR_IO_PENDING:
231
context = {
232
'message': 'Failed to unregister the wait handle',
233
'exception': exc,
234
'future': self,
235
}
236
if self._source_traceback:
237
context['source_traceback'] = self._source_traceback
238
self._loop.call_exception_handler(context)
239
return
240
# ERROR_IO_PENDING is not an error, the wait was unregistered
241
242
self._event_fut = self._proactor._wait_cancel(self._event,
243
self._unregister_wait_cb)
244
245
246
class PipeServer(object):
247
"""Class representing a pipe server.
248
249
This is much like a bound, listening socket.
250
"""
251
def __init__(self, address):
252
self._address = address
253
self._free_instances = weakref.WeakSet()
254
# initialize the pipe attribute before calling _server_pipe_handle()
255
# because this function can raise an exception and the destructor calls
256
# the close() method
257
self._pipe = None
258
self._accept_pipe_future = None
259
self._pipe = self._server_pipe_handle(True)
260
261
def _get_unconnected_pipe(self):
262
# Create new instance and return previous one. This ensures
263
# that (until the server is closed) there is always at least
264
# one pipe handle for address. Therefore if a client attempt
265
# to connect it will not fail with FileNotFoundError.
266
tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
267
return tmp
268
269
def _server_pipe_handle(self, first):
270
# Return a wrapper for a new pipe handle.
271
if self.closed():
272
return None
273
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
274
if first:
275
flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
276
h = _winapi.CreateNamedPipe(
277
self._address, flags,
278
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
279
_winapi.PIPE_WAIT,
280
_winapi.PIPE_UNLIMITED_INSTANCES,
281
windows_utils.BUFSIZE, windows_utils.BUFSIZE,
282
_winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
283
pipe = windows_utils.PipeHandle(h)
284
self._free_instances.add(pipe)
285
return pipe
286
287
def closed(self):
288
return (self._address is None)
289
290
def close(self):
291
if self._accept_pipe_future is not None:
292
self._accept_pipe_future.cancel()
293
self._accept_pipe_future = None
294
# Close all instances which have not been connected to by a client.
295
if self._address is not None:
296
for pipe in self._free_instances:
297
pipe.close()
298
self._pipe = None
299
self._address = None
300
self._free_instances.clear()
301
302
__del__ = close
303
304
305
class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
306
"""Windows version of selector event loop."""
307
308
309
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
310
"""Windows version of proactor event loop using IOCP."""
311
312
def __init__(self, proactor=None):
313
if proactor is None:
314
proactor = IocpProactor()
315
super().__init__(proactor)
316
317
def run_forever(self):
318
try:
319
assert self._self_reading_future is None
320
self.call_soon(self._loop_self_reading)
321
super().run_forever()
322
finally:
323
if self._self_reading_future is not None:
324
ov = self._self_reading_future._ov
325
self._self_reading_future.cancel()
326
# self_reading_future was just cancelled so if it hasn't been
327
# finished yet, it never will be (it's possible that it has
328
# already finished and its callback is waiting in the queue,
329
# where it could still happen if the event loop is restarted).
330
# Unregister it otherwise IocpProactor.close will wait for it
331
# forever
332
if ov is not None:
333
self._proactor._unregister(ov)
334
self._self_reading_future = None
335
336
async def create_pipe_connection(self, protocol_factory, address):
337
f = self._proactor.connect_pipe(address)
338
pipe = await f
339
protocol = protocol_factory()
340
trans = self._make_duplex_pipe_transport(pipe, protocol,
341
extra={'addr': address})
342
return trans, protocol
343
344
async def start_serving_pipe(self, protocol_factory, address):
345
server = PipeServer(address)
346
347
def loop_accept_pipe(f=None):
348
pipe = None
349
try:
350
if f:
351
pipe = f.result()
352
server._free_instances.discard(pipe)
353
354
if server.closed():
355
# A client connected before the server was closed:
356
# drop the client (close the pipe) and exit
357
pipe.close()
358
return
359
360
protocol = protocol_factory()
361
self._make_duplex_pipe_transport(
362
pipe, protocol, extra={'addr': address})
363
364
pipe = server._get_unconnected_pipe()
365
if pipe is None:
366
return
367
368
f = self._proactor.accept_pipe(pipe)
369
except BrokenPipeError:
370
if pipe and pipe.fileno() != -1:
371
pipe.close()
372
self.call_soon(loop_accept_pipe)
373
except OSError as exc:
374
if pipe and pipe.fileno() != -1:
375
self.call_exception_handler({
376
'message': 'Pipe accept failed',
377
'exception': exc,
378
'pipe': pipe,
379
})
380
pipe.close()
381
elif self._debug:
382
logger.warning("Accept pipe failed on pipe %r",
383
pipe, exc_info=True)
384
self.call_soon(loop_accept_pipe)
385
except exceptions.CancelledError:
386
if pipe:
387
pipe.close()
388
else:
389
server._accept_pipe_future = f
390
f.add_done_callback(loop_accept_pipe)
391
392
self.call_soon(loop_accept_pipe)
393
return [server]
394
395
async def _make_subprocess_transport(self, protocol, args, shell,
396
stdin, stdout, stderr, bufsize,
397
extra=None, **kwargs):
398
waiter = self.create_future()
399
transp = _WindowsSubprocessTransport(self, protocol, args, shell,
400
stdin, stdout, stderr, bufsize,
401
waiter=waiter, extra=extra,
402
**kwargs)
403
try:
404
await waiter
405
except (SystemExit, KeyboardInterrupt):
406
raise
407
except BaseException:
408
transp.close()
409
await transp._wait()
410
raise
411
412
return transp
413
414
415
class IocpProactor:
416
"""Proactor implementation using IOCP."""
417
418
def __init__(self, concurrency=INFINITE):
419
self._loop = None
420
self._results = []
421
self._iocp = _overlapped.CreateIoCompletionPort(
422
_overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
423
self._cache = {}
424
self._registered = weakref.WeakSet()
425
self._unregistered = []
426
self._stopped_serving = weakref.WeakSet()
427
428
def _check_closed(self):
429
if self._iocp is None:
430
raise RuntimeError('IocpProactor is closed')
431
432
def __repr__(self):
433
info = ['overlapped#=%s' % len(self._cache),
434
'result#=%s' % len(self._results)]
435
if self._iocp is None:
436
info.append('closed')
437
return '<%s %s>' % (self.__class__.__name__, " ".join(info))
438
439
def set_loop(self, loop):
440
self._loop = loop
441
442
def select(self, timeout=None):
443
if not self._results:
444
self._poll(timeout)
445
tmp = self._results
446
self._results = []
447
try:
448
return tmp
449
finally:
450
# Needed to break cycles when an exception occurs.
451
tmp = None
452
453
def _result(self, value):
454
fut = self._loop.create_future()
455
fut.set_result(value)
456
return fut
457
458
@staticmethod
459
def finish_socket_func(trans, key, ov):
460
try:
461
return ov.getresult()
462
except OSError as exc:
463
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
464
_overlapped.ERROR_OPERATION_ABORTED):
465
raise ConnectionResetError(*exc.args)
466
else:
467
raise
468
469
def recv(self, conn, nbytes, flags=0):
470
self._register_with_iocp(conn)
471
ov = _overlapped.Overlapped(NULL)
472
try:
473
if isinstance(conn, socket.socket):
474
ov.WSARecv(conn.fileno(), nbytes, flags)
475
else:
476
ov.ReadFile(conn.fileno(), nbytes)
477
except BrokenPipeError:
478
return self._result(b'')
479
480
return self._register(ov, conn, self.finish_socket_func)
481
482
def recv_into(self, conn, buf, flags=0):
483
self._register_with_iocp(conn)
484
ov = _overlapped.Overlapped(NULL)
485
try:
486
if isinstance(conn, socket.socket):
487
ov.WSARecvInto(conn.fileno(), buf, flags)
488
else:
489
ov.ReadFileInto(conn.fileno(), buf)
490
except BrokenPipeError:
491
return self._result(0)
492
493
return self._register(ov, conn, self.finish_socket_func)
494
495
def recvfrom(self, conn, nbytes, flags=0):
496
self._register_with_iocp(conn)
497
ov = _overlapped.Overlapped(NULL)
498
try:
499
ov.WSARecvFrom(conn.fileno(), nbytes, flags)
500
except BrokenPipeError:
501
return self._result((b'', None))
502
503
return self._register(ov, conn, self.finish_socket_func)
504
505
def recvfrom_into(self, conn, buf, flags=0):
506
self._register_with_iocp(conn)
507
ov = _overlapped.Overlapped(NULL)
508
try:
509
ov.WSARecvFromInto(conn.fileno(), buf, flags)
510
except BrokenPipeError:
511
return self._result((0, None))
512
513
def finish_recv(trans, key, ov):
514
try:
515
return ov.getresult()
516
except OSError as exc:
517
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
518
_overlapped.ERROR_OPERATION_ABORTED):
519
raise ConnectionResetError(*exc.args)
520
else:
521
raise
522
523
return self._register(ov, conn, finish_recv)
524
525
def sendto(self, conn, buf, flags=0, addr=None):
526
self._register_with_iocp(conn)
527
ov = _overlapped.Overlapped(NULL)
528
529
ov.WSASendTo(conn.fileno(), buf, flags, addr)
530
531
return self._register(ov, conn, self.finish_socket_func)
532
533
def send(self, conn, buf, flags=0):
534
self._register_with_iocp(conn)
535
ov = _overlapped.Overlapped(NULL)
536
if isinstance(conn, socket.socket):
537
ov.WSASend(conn.fileno(), buf, flags)
538
else:
539
ov.WriteFile(conn.fileno(), buf)
540
541
return self._register(ov, conn, self.finish_socket_func)
542
543
def accept(self, listener):
544
self._register_with_iocp(listener)
545
conn = self._get_accept_socket(listener.family)
546
ov = _overlapped.Overlapped(NULL)
547
ov.AcceptEx(listener.fileno(), conn.fileno())
548
549
def finish_accept(trans, key, ov):
550
ov.getresult()
551
# Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
552
buf = struct.pack('@P', listener.fileno())
553
conn.setsockopt(socket.SOL_SOCKET,
554
_overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
555
conn.settimeout(listener.gettimeout())
556
return conn, conn.getpeername()
557
558
async def accept_coro(future, conn):
559
# Coroutine closing the accept socket if the future is cancelled
560
try:
561
await future
562
except exceptions.CancelledError:
563
conn.close()
564
raise
565
566
future = self._register(ov, listener, finish_accept)
567
coro = accept_coro(future, conn)
568
tasks.ensure_future(coro, loop=self._loop)
569
return future
570
571
def connect(self, conn, address):
572
if conn.type == socket.SOCK_DGRAM:
573
# WSAConnect will complete immediately for UDP sockets so we don't
574
# need to register any IOCP operation
575
_overlapped.WSAConnect(conn.fileno(), address)
576
fut = self._loop.create_future()
577
fut.set_result(None)
578
return fut
579
580
self._register_with_iocp(conn)
581
# The socket needs to be locally bound before we call ConnectEx().
582
try:
583
_overlapped.BindLocal(conn.fileno(), conn.family)
584
except OSError as e:
585
if e.winerror != errno.WSAEINVAL:
586
raise
587
# Probably already locally bound; check using getsockname().
588
if conn.getsockname()[1] == 0:
589
raise
590
ov = _overlapped.Overlapped(NULL)
591
ov.ConnectEx(conn.fileno(), address)
592
593
def finish_connect(trans, key, ov):
594
ov.getresult()
595
# Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
596
conn.setsockopt(socket.SOL_SOCKET,
597
_overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
598
return conn
599
600
return self._register(ov, conn, finish_connect)
601
602
def sendfile(self, sock, file, offset, count):
603
self._register_with_iocp(sock)
604
ov = _overlapped.Overlapped(NULL)
605
offset_low = offset & 0xffff_ffff
606
offset_high = (offset >> 32) & 0xffff_ffff
607
ov.TransmitFile(sock.fileno(),
608
msvcrt.get_osfhandle(file.fileno()),
609
offset_low, offset_high,
610
count, 0, 0)
611
612
return self._register(ov, sock, self.finish_socket_func)
613
614
def accept_pipe(self, pipe):
615
self._register_with_iocp(pipe)
616
ov = _overlapped.Overlapped(NULL)
617
connected = ov.ConnectNamedPipe(pipe.fileno())
618
619
if connected:
620
# ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
621
# that the pipe is connected. There is no need to wait for the
622
# completion of the connection.
623
return self._result(pipe)
624
625
def finish_accept_pipe(trans, key, ov):
626
ov.getresult()
627
return pipe
628
629
return self._register(ov, pipe, finish_accept_pipe)
630
631
async def connect_pipe(self, address):
632
delay = CONNECT_PIPE_INIT_DELAY
633
while True:
634
# Unfortunately there is no way to do an overlapped connect to
635
# a pipe. Call CreateFile() in a loop until it doesn't fail with
636
# ERROR_PIPE_BUSY.
637
try:
638
handle = _overlapped.ConnectPipe(address)
639
break
640
except OSError as exc:
641
if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
642
raise
643
644
# ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
645
delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
646
await tasks.sleep(delay)
647
648
return windows_utils.PipeHandle(handle)
649
650
def wait_for_handle(self, handle, timeout=None):
651
"""Wait for a handle.
652
653
Return a Future object. The result of the future is True if the wait
654
completed, or False if the wait did not complete (on timeout).
655
"""
656
return self._wait_for_handle(handle, timeout, False)
657
658
def _wait_cancel(self, event, done_callback):
659
fut = self._wait_for_handle(event, None, True)
660
# add_done_callback() cannot be used because the wait may only complete
661
# in IocpProactor.close(), while the event loop is not running.
662
fut._done_callback = done_callback
663
return fut
664
665
def _wait_for_handle(self, handle, timeout, _is_cancel):
666
self._check_closed()
667
668
if timeout is None:
669
ms = _winapi.INFINITE
670
else:
671
# RegisterWaitForSingleObject() has a resolution of 1 millisecond,
672
# round away from zero to wait *at least* timeout seconds.
673
ms = math.ceil(timeout * 1e3)
674
675
# We only create ov so we can use ov.address as a key for the cache.
676
ov = _overlapped.Overlapped(NULL)
677
wait_handle = _overlapped.RegisterWaitWithQueue(
678
handle, self._iocp, ov.address, ms)
679
if _is_cancel:
680
f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
681
else:
682
f = _WaitHandleFuture(ov, handle, wait_handle, self,
683
loop=self._loop)
684
if f._source_traceback:
685
del f._source_traceback[-1]
686
687
def finish_wait_for_handle(trans, key, ov):
688
# Note that this second wait means that we should only use
689
# this with handles types where a successful wait has no
690
# effect. So events or processes are all right, but locks
691
# or semaphores are not. Also note if the handle is
692
# signalled and then quickly reset, then we may return
693
# False even though we have not timed out.
694
return f._poll()
695
696
self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
697
return f
698
699
def _register_with_iocp(self, obj):
700
# To get notifications of finished ops on this objects sent to the
701
# completion port, were must register the handle.
702
if obj not in self._registered:
703
self._registered.add(obj)
704
_overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
705
# XXX We could also use SetFileCompletionNotificationModes()
706
# to avoid sending notifications to completion port of ops
707
# that succeed immediately.
708
709
def _register(self, ov, obj, callback):
710
self._check_closed()
711
712
# Return a future which will be set with the result of the
713
# operation when it completes. The future's value is actually
714
# the value returned by callback().
715
f = _OverlappedFuture(ov, loop=self._loop)
716
if f._source_traceback:
717
del f._source_traceback[-1]
718
if not ov.pending:
719
# The operation has completed, so no need to postpone the
720
# work. We cannot take this short cut if we need the
721
# NumberOfBytes, CompletionKey values returned by
722
# PostQueuedCompletionStatus().
723
try:
724
value = callback(None, None, ov)
725
except OSError as e:
726
f.set_exception(e)
727
else:
728
f.set_result(value)
729
# Even if GetOverlappedResult() was called, we have to wait for the
730
# notification of the completion in GetQueuedCompletionStatus().
731
# Register the overlapped operation to keep a reference to the
732
# OVERLAPPED object, otherwise the memory is freed and Windows may
733
# read uninitialized memory.
734
735
# Register the overlapped operation for later. Note that
736
# we only store obj to prevent it from being garbage
737
# collected too early.
738
self._cache[ov.address] = (f, ov, obj, callback)
739
return f
740
741
def _unregister(self, ov):
742
"""Unregister an overlapped object.
743
744
Call this method when its future has been cancelled. The event can
745
already be signalled (pending in the proactor event queue). It is also
746
safe if the event is never signalled (because it was cancelled).
747
"""
748
self._check_closed()
749
self._unregistered.append(ov)
750
751
def _get_accept_socket(self, family):
752
s = socket.socket(family)
753
s.settimeout(0)
754
return s
755
756
def _poll(self, timeout=None):
757
if timeout is None:
758
ms = INFINITE
759
elif timeout < 0:
760
raise ValueError("negative timeout")
761
else:
762
# GetQueuedCompletionStatus() has a resolution of 1 millisecond,
763
# round away from zero to wait *at least* timeout seconds.
764
ms = math.ceil(timeout * 1e3)
765
if ms >= INFINITE:
766
raise ValueError("timeout too big")
767
768
while True:
769
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
770
if status is None:
771
break
772
ms = 0
773
774
err, transferred, key, address = status
775
try:
776
f, ov, obj, callback = self._cache.pop(address)
777
except KeyError:
778
if self._loop.get_debug():
779
self._loop.call_exception_handler({
780
'message': ('GetQueuedCompletionStatus() returned an '
781
'unexpected event'),
782
'status': ('err=%s transferred=%s key=%#x address=%#x'
783
% (err, transferred, key, address)),
784
})
785
786
# key is either zero, or it is used to return a pipe
787
# handle which should be closed to avoid a leak.
788
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
789
_winapi.CloseHandle(key)
790
continue
791
792
if obj in self._stopped_serving:
793
f.cancel()
794
# Don't call the callback if _register() already read the result or
795
# if the overlapped has been cancelled
796
elif not f.done():
797
try:
798
value = callback(transferred, key, ov)
799
except OSError as e:
800
f.set_exception(e)
801
self._results.append(f)
802
else:
803
f.set_result(value)
804
self._results.append(f)
805
finally:
806
f = None
807
808
# Remove unregistered futures
809
for ov in self._unregistered:
810
self._cache.pop(ov.address, None)
811
self._unregistered.clear()
812
813
def _stop_serving(self, obj):
814
# obj is a socket or pipe handle. It will be closed in
815
# BaseProactorEventLoop._stop_serving() which will make any
816
# pending operations fail quickly.
817
self._stopped_serving.add(obj)
818
819
def close(self):
820
if self._iocp is None:
821
# already closed
822
return
823
824
# Cancel remaining registered operations.
825
for fut, ov, obj, callback in list(self._cache.values()):
826
if fut.cancelled():
827
# Nothing to do with cancelled futures
828
pass
829
elif isinstance(fut, _WaitCancelFuture):
830
# _WaitCancelFuture must not be cancelled
831
pass
832
else:
833
try:
834
fut.cancel()
835
except OSError as exc:
836
if self._loop is not None:
837
context = {
838
'message': 'Cancelling a future failed',
839
'exception': exc,
840
'future': fut,
841
}
842
if fut._source_traceback:
843
context['source_traceback'] = fut._source_traceback
844
self._loop.call_exception_handler(context)
845
846
# Wait until all cancelled overlapped complete: don't exit with running
847
# overlapped to prevent a crash. Display progress every second if the
848
# loop is still running.
849
msg_update = 1.0
850
start_time = time.monotonic()
851
next_msg = start_time + msg_update
852
while self._cache:
853
if next_msg <= time.monotonic():
854
logger.debug('%r is running after closing for %.1f seconds',
855
self, time.monotonic() - start_time)
856
next_msg = time.monotonic() + msg_update
857
858
# handle a few events, or timeout
859
self._poll(msg_update)
860
861
self._results = []
862
863
_winapi.CloseHandle(self._iocp)
864
self._iocp = None
865
866
def __del__(self):
867
self.close()
868
869
870
class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
871
872
def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
873
self._proc = windows_utils.Popen(
874
args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
875
bufsize=bufsize, **kwargs)
876
877
def callback(f):
878
returncode = self._proc.poll()
879
self._process_exited(returncode)
880
881
f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
882
f.add_done_callback(callback)
883
884
885
SelectorEventLoop = _WindowsSelectorEventLoop
886
887
888
class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
889
_loop_factory = SelectorEventLoop
890
891
892
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
893
_loop_factory = ProactorEventLoop
894
895
896
DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
897
898