Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.base/linux/classes/sun/nio/ch/EPollPort.java
40948 views
1
/*
2
* Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.nio.channels.spi.AsynchronousChannelProvider;
29
import java.io.IOException;
30
import java.util.concurrent.ArrayBlockingQueue;
31
import java.util.concurrent.RejectedExecutionException;
32
import java.util.concurrent.atomic.AtomicInteger;
33
34
import static sun.nio.ch.EPoll.EPOLLIN;
35
import static sun.nio.ch.EPoll.EPOLLONESHOT;
36
import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;
37
import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;
38
import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;
39
40
41
/**
42
* AsynchronousChannelGroup implementation based on the Linux epoll facility.
43
*/
44
45
final class EPollPort
46
extends Port
47
{
48
// maximum number of events to poll at a time
49
private static final int MAX_EPOLL_EVENTS = 512;
50
51
// errors
52
private static final int ENOENT = 2;
53
54
// epoll file descriptor
55
private final int epfd;
56
57
// address of the poll array passed to epoll_wait
58
private final long address;
59
60
// true if epoll closed
61
private boolean closed;
62
63
// socket pair used for wakeup
64
private final int sp[];
65
66
// number of wakeups pending
67
private final AtomicInteger wakeupCount = new AtomicInteger();
68
69
// encapsulates an event for a channel
70
static class Event {
71
final PollableChannel channel;
72
final int events;
73
74
Event(PollableChannel channel, int events) {
75
this.channel = channel;
76
this.events = events;
77
}
78
79
PollableChannel channel() { return channel; }
80
int events() { return events; }
81
}
82
83
// queue of events for cases that a polling thread dequeues more than one
84
// event
85
private final ArrayBlockingQueue<Event> queue;
86
private final Event NEED_TO_POLL = new Event(null, 0);
87
private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
88
89
EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
90
throws IOException
91
{
92
super(provider, pool);
93
94
this.epfd = EPoll.create();
95
this.address = EPoll.allocatePollArray(MAX_EPOLL_EVENTS);
96
97
// create socket pair for wakeup mechanism
98
try {
99
long fds = IOUtil.makePipe(true);
100
this.sp = new int[]{(int) (fds >>> 32), (int) fds};
101
} catch (IOException ioe) {
102
EPoll.freePollArray(address);
103
FileDispatcherImpl.closeIntFD(epfd);
104
throw ioe;
105
}
106
107
// register one end with epoll
108
EPoll.ctl(epfd, EPOLL_CTL_ADD, sp[0], EPOLLIN);
109
110
// create the queue and offer the special event to ensure that the first
111
// threads polls
112
this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);
113
this.queue.offer(NEED_TO_POLL);
114
}
115
116
EPollPort start() {
117
startThreads(new EventHandlerTask());
118
return this;
119
}
120
121
/**
122
* Release all resources
123
*/
124
private void implClose() {
125
synchronized (this) {
126
if (closed)
127
return;
128
closed = true;
129
}
130
try { FileDispatcherImpl.closeIntFD(epfd); } catch (IOException ioe) { }
131
try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
132
try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
133
EPoll.freePollArray(address);
134
}
135
136
private void wakeup() {
137
if (wakeupCount.incrementAndGet() == 1) {
138
// write byte to socketpair to force wakeup
139
try {
140
IOUtil.write1(sp[1], (byte)0);
141
} catch (IOException x) {
142
throw new AssertionError(x);
143
}
144
}
145
}
146
147
@Override
148
void executeOnHandlerTask(Runnable task) {
149
synchronized (this) {
150
if (closed)
151
throw new RejectedExecutionException();
152
offerTask(task);
153
wakeup();
154
}
155
}
156
157
@Override
158
void shutdownHandlerTasks() {
159
/*
160
* If no tasks are running then just release resources; otherwise
161
* write to the one end of the socketpair to wakeup any polling threads.
162
*/
163
int nThreads = threadCount();
164
if (nThreads == 0) {
165
implClose();
166
} else {
167
// send wakeup to each thread
168
while (nThreads-- > 0) {
169
wakeup();
170
}
171
}
172
}
173
174
// invoke by clients to register a file descriptor
175
@Override
176
void startPoll(int fd, int events) {
177
// update events (or add to epoll on first usage)
178
int err = EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
179
if (err == ENOENT)
180
err = EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
181
if (err != 0)
182
throw new AssertionError(); // should not happen
183
}
184
185
/**
186
* Task to process events from epoll and dispatch to the channel's
187
* onEvent handler.
188
*
189
* Events are retrieved from epoll in batch and offered to a BlockingQueue
190
* where they are consumed by handler threads. A special "NEED_TO_POLL"
191
* event is used to signal one consumer to re-poll when all events have
192
* been consumed.
193
*/
194
private class EventHandlerTask implements Runnable {
195
private Event poll() throws IOException {
196
try {
197
for (;;) {
198
int n;
199
do {
200
n = EPoll.wait(epfd, address, MAX_EPOLL_EVENTS, -1);
201
} while (n == IOStatus.INTERRUPTED);
202
203
/**
204
* 'n' events have been read. Here we map them to their
205
* corresponding channel in batch and queue n-1 so that
206
* they can be handled by other handler threads. The last
207
* event is handled by this thread (and so is not queued).
208
*/
209
fdToChannelLock.readLock().lock();
210
try {
211
while (n-- > 0) {
212
long eventAddress = EPoll.getEvent(address, n);
213
int fd = EPoll.getDescriptor(eventAddress);
214
215
// wakeup
216
if (fd == sp[0]) {
217
if (wakeupCount.decrementAndGet() == 0) {
218
// consume one wakeup byte, never more as this
219
// would interfere with shutdown when there is
220
// a wakeup byte queued to wake each thread
221
int nread;
222
do {
223
nread = IOUtil.drain1(sp[0]);
224
} while (nread == IOStatus.INTERRUPTED);
225
}
226
227
// queue special event if there are more events
228
// to handle.
229
if (n > 0) {
230
queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
231
continue;
232
}
233
return EXECUTE_TASK_OR_SHUTDOWN;
234
}
235
236
PollableChannel channel = fdToChannel.get(fd);
237
if (channel != null) {
238
int events = EPoll.getEvents(eventAddress);
239
Event ev = new Event(channel, events);
240
241
// n-1 events are queued; This thread handles
242
// the last one except for the wakeup
243
if (n > 0) {
244
queue.offer(ev);
245
} else {
246
return ev;
247
}
248
}
249
}
250
} finally {
251
fdToChannelLock.readLock().unlock();
252
}
253
}
254
} finally {
255
// to ensure that some thread will poll when all events have
256
// been consumed
257
queue.offer(NEED_TO_POLL);
258
}
259
}
260
261
public void run() {
262
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
263
Invoker.getGroupAndInvokeCount();
264
final boolean isPooledThread = (myGroupAndInvokeCount != null);
265
boolean replaceMe = false;
266
Event ev;
267
try {
268
for (;;) {
269
// reset invoke count
270
if (isPooledThread)
271
myGroupAndInvokeCount.resetInvokeCount();
272
273
try {
274
replaceMe = false;
275
ev = queue.take();
276
277
// no events and this thread has been "selected" to
278
// poll for more.
279
if (ev == NEED_TO_POLL) {
280
try {
281
ev = poll();
282
} catch (IOException x) {
283
x.printStackTrace();
284
return;
285
}
286
}
287
} catch (InterruptedException x) {
288
continue;
289
}
290
291
// handle wakeup to execute task or shutdown
292
if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
293
Runnable task = pollTask();
294
if (task == null) {
295
// shutdown request
296
return;
297
}
298
// run task (may throw error/exception)
299
replaceMe = true;
300
task.run();
301
continue;
302
}
303
304
// process event
305
try {
306
ev.channel().onEvent(ev.events(), isPooledThread);
307
} catch (Error x) {
308
replaceMe = true; throw x;
309
} catch (RuntimeException x) {
310
replaceMe = true; throw x;
311
}
312
}
313
} finally {
314
// last handler to exit when shutdown releases resources
315
int remaining = threadExit(this, replaceMe);
316
if (remaining == 0 && isShutdown()) {
317
implClose();
318
}
319
}
320
}
321
}
322
}
323
324