Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
allendowney
GitHub Repository: allendowney/cpython
Path: blob/main/Lib/asyncio/queues.py
12 views
1
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
2
3
import collections
4
import heapq
5
from types import GenericAlias
6
7
from . import locks
8
from . import mixins
9
10
11
class QueueEmpty(Exception):
12
"""Raised when Queue.get_nowait() is called on an empty Queue."""
13
pass
14
15
16
class QueueFull(Exception):
17
"""Raised when the Queue.put_nowait() method is called on a full Queue."""
18
pass
19
20
21
class Queue(mixins._LoopBoundMixin):
22
"""A queue, useful for coordinating producer and consumer coroutines.
23
24
If maxsize is less than or equal to zero, the queue size is infinite. If it
25
is an integer greater than 0, then "await put()" will block when the
26
queue reaches maxsize, until an item is removed by get().
27
28
Unlike the standard library Queue, you can reliably know this Queue's size
29
with qsize(), since your single-threaded asyncio application won't be
30
interrupted between calling qsize() and doing an operation on the Queue.
31
"""
32
33
def __init__(self, maxsize=0):
34
self._maxsize = maxsize
35
36
# Futures.
37
self._getters = collections.deque()
38
# Futures.
39
self._putters = collections.deque()
40
self._unfinished_tasks = 0
41
self._finished = locks.Event()
42
self._finished.set()
43
self._init(maxsize)
44
45
# These three are overridable in subclasses.
46
47
def _init(self, maxsize):
48
self._queue = collections.deque()
49
50
def _get(self):
51
return self._queue.popleft()
52
53
def _put(self, item):
54
self._queue.append(item)
55
56
# End of the overridable methods.
57
58
def _wakeup_next(self, waiters):
59
# Wake up the next waiter (if any) that isn't cancelled.
60
while waiters:
61
waiter = waiters.popleft()
62
if not waiter.done():
63
waiter.set_result(None)
64
break
65
66
def __repr__(self):
67
return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'
68
69
def __str__(self):
70
return f'<{type(self).__name__} {self._format()}>'
71
72
__class_getitem__ = classmethod(GenericAlias)
73
74
def _format(self):
75
result = f'maxsize={self._maxsize!r}'
76
if getattr(self, '_queue', None):
77
result += f' _queue={list(self._queue)!r}'
78
if self._getters:
79
result += f' _getters[{len(self._getters)}]'
80
if self._putters:
81
result += f' _putters[{len(self._putters)}]'
82
if self._unfinished_tasks:
83
result += f' tasks={self._unfinished_tasks}'
84
return result
85
86
def qsize(self):
87
"""Number of items in the queue."""
88
return len(self._queue)
89
90
@property
91
def maxsize(self):
92
"""Number of items allowed in the queue."""
93
return self._maxsize
94
95
def empty(self):
96
"""Return True if the queue is empty, False otherwise."""
97
return not self._queue
98
99
def full(self):
100
"""Return True if there are maxsize items in the queue.
101
102
Note: if the Queue was initialized with maxsize=0 (the default),
103
then full() is never True.
104
"""
105
if self._maxsize <= 0:
106
return False
107
else:
108
return self.qsize() >= self._maxsize
109
110
async def put(self, item):
111
"""Put an item into the queue.
112
113
Put an item into the queue. If the queue is full, wait until a free
114
slot is available before adding item.
115
"""
116
while self.full():
117
putter = self._get_loop().create_future()
118
self._putters.append(putter)
119
try:
120
await putter
121
except:
122
putter.cancel() # Just in case putter is not done yet.
123
try:
124
# Clean self._putters from canceled putters.
125
self._putters.remove(putter)
126
except ValueError:
127
# The putter could be removed from self._putters by a
128
# previous get_nowait call.
129
pass
130
if not self.full() and not putter.cancelled():
131
# We were woken up by get_nowait(), but can't take
132
# the call. Wake up the next in line.
133
self._wakeup_next(self._putters)
134
raise
135
return self.put_nowait(item)
136
137
def put_nowait(self, item):
138
"""Put an item into the queue without blocking.
139
140
If no free slot is immediately available, raise QueueFull.
141
"""
142
if self.full():
143
raise QueueFull
144
self._put(item)
145
self._unfinished_tasks += 1
146
self._finished.clear()
147
self._wakeup_next(self._getters)
148
149
async def get(self):
150
"""Remove and return an item from the queue.
151
152
If queue is empty, wait until an item is available.
153
"""
154
while self.empty():
155
getter = self._get_loop().create_future()
156
self._getters.append(getter)
157
try:
158
await getter
159
except:
160
getter.cancel() # Just in case getter is not done yet.
161
try:
162
# Clean self._getters from canceled getters.
163
self._getters.remove(getter)
164
except ValueError:
165
# The getter could be removed from self._getters by a
166
# previous put_nowait call.
167
pass
168
if not self.empty() and not getter.cancelled():
169
# We were woken up by put_nowait(), but can't take
170
# the call. Wake up the next in line.
171
self._wakeup_next(self._getters)
172
raise
173
return self.get_nowait()
174
175
def get_nowait(self):
176
"""Remove and return an item from the queue.
177
178
Return an item if one is immediately available, else raise QueueEmpty.
179
"""
180
if self.empty():
181
raise QueueEmpty
182
item = self._get()
183
self._wakeup_next(self._putters)
184
return item
185
186
def task_done(self):
187
"""Indicate that a formerly enqueued task is complete.
188
189
Used by queue consumers. For each get() used to fetch a task,
190
a subsequent call to task_done() tells the queue that the processing
191
on the task is complete.
192
193
If a join() is currently blocking, it will resume when all items have
194
been processed (meaning that a task_done() call was received for every
195
item that had been put() into the queue).
196
197
Raises ValueError if called more times than there were items placed in
198
the queue.
199
"""
200
if self._unfinished_tasks <= 0:
201
raise ValueError('task_done() called too many times')
202
self._unfinished_tasks -= 1
203
if self._unfinished_tasks == 0:
204
self._finished.set()
205
206
async def join(self):
207
"""Block until all items in the queue have been gotten and processed.
208
209
The count of unfinished tasks goes up whenever an item is added to the
210
queue. The count goes down whenever a consumer calls task_done() to
211
indicate that the item was retrieved and all work on it is complete.
212
When the count of unfinished tasks drops to zero, join() unblocks.
213
"""
214
if self._unfinished_tasks > 0:
215
await self._finished.wait()
216
217
218
class PriorityQueue(Queue):
219
"""A subclass of Queue; retrieves entries in priority order (lowest first).
220
221
Entries are typically tuples of the form: (priority number, data).
222
"""
223
224
def _init(self, maxsize):
225
self._queue = []
226
227
def _put(self, item, heappush=heapq.heappush):
228
heappush(self._queue, item)
229
230
def _get(self, heappop=heapq.heappop):
231
return heappop(self._queue)
232
233
234
class LifoQueue(Queue):
235
"""A subclass of Queue that retrieves most recently added entries first."""
236
237
def _init(self, maxsize):
238
self._queue = []
239
240
def _put(self, item):
241
self._queue.append(item)
242
243
def _get(self):
244
return self._queue.pop()
245
246