Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/classes/sun/nio/ch/EPollPort.java
32288 views
1
/*
2
* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.nio.channels.spi.AsynchronousChannelProvider;
29
import java.io.IOException;
30
import java.util.concurrent.ArrayBlockingQueue;
31
import java.util.concurrent.RejectedExecutionException;
32
import java.util.concurrent.atomic.AtomicInteger;
33
import static sun.nio.ch.EPoll.*;
34
35
/**
36
* AsynchronousChannelGroup implementation based on the Linux epoll facility.
37
*/
38
39
final class EPollPort
40
extends Port
41
{
42
// maximum number of events to poll at a time
43
private static final int MAX_EPOLL_EVENTS = 512;
44
45
// errors
46
private static final int ENOENT = 2;
47
48
// epoll file descriptor
49
private final int epfd;
50
51
// true if epoll closed
52
private boolean closed;
53
54
// socket pair used for wakeup
55
private final int sp[];
56
57
// number of wakeups pending
58
private final AtomicInteger wakeupCount = new AtomicInteger();
59
60
// address of the poll array passed to epoll_wait
61
private final long address;
62
63
// encapsulates an event for a channel
64
static class Event {
65
final PollableChannel channel;
66
final int events;
67
68
Event(PollableChannel channel, int events) {
69
this.channel = channel;
70
this.events = events;
71
}
72
73
PollableChannel channel() { return channel; }
74
int events() { return events; }
75
}
76
77
// queue of events for cases that a polling thread dequeues more than one
78
// event
79
private final ArrayBlockingQueue<Event> queue;
80
private final Event NEED_TO_POLL = new Event(null, 0);
81
private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
82
83
EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
84
throws IOException
85
{
86
super(provider, pool);
87
88
// open epoll
89
this.epfd = epollCreate();
90
91
// create socket pair for wakeup mechanism
92
int[] sv = new int[2];
93
try {
94
socketpair(sv);
95
// register one end with epoll
96
epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN);
97
} catch (IOException x) {
98
close0(epfd);
99
throw x;
100
}
101
this.sp = sv;
102
103
// allocate the poll array
104
this.address = allocatePollArray(MAX_EPOLL_EVENTS);
105
106
// create the queue and offer the special event to ensure that the first
107
// threads polls
108
this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
109
this.queue.offer(NEED_TO_POLL);
110
}
111
112
EPollPort start() {
113
startThreads(new EventHandlerTask());
114
return this;
115
}
116
117
/**
118
* Release all resources
119
*/
120
private void implClose() {
121
synchronized (this) {
122
if (closed)
123
return;
124
closed = true;
125
}
126
freePollArray(address);
127
close0(sp[0]);
128
close0(sp[1]);
129
close0(epfd);
130
}
131
132
private void wakeup() {
133
if (wakeupCount.incrementAndGet() == 1) {
134
// write byte to socketpair to force wakeup
135
try {
136
interrupt(sp[1]);
137
} catch (IOException x) {
138
throw new AssertionError(x);
139
}
140
}
141
}
142
143
@Override
144
void executeOnHandlerTask(Runnable task) {
145
synchronized (this) {
146
if (closed)
147
throw new RejectedExecutionException();
148
offerTask(task);
149
wakeup();
150
}
151
}
152
153
@Override
154
void shutdownHandlerTasks() {
155
/*
156
* If no tasks are running then just release resources; otherwise
157
* write to the one end of the socketpair to wakeup any polling threads.
158
*/
159
int nThreads = threadCount();
160
if (nThreads == 0) {
161
implClose();
162
} else {
163
// send interrupt to each thread
164
while (nThreads-- > 0) {
165
wakeup();
166
}
167
}
168
}
169
170
// invoke by clients to register a file descriptor
171
@Override
172
void startPoll(int fd, int events) {
173
// update events (or add to epoll on first usage)
174
int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
175
if (err == ENOENT)
176
err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
177
if (err != 0)
178
throw new AssertionError(); // should not happen
179
}
180
181
/*
182
* Task to process events from epoll and dispatch to the channel's
183
* onEvent handler.
184
*
185
* Events are retreived from epoll in batch and offered to a BlockingQueue
186
* where they are consumed by handler threads. A special "NEED_TO_POLL"
187
* event is used to signal one consumer to re-poll when all events have
188
* been consumed.
189
*/
190
private class EventHandlerTask implements Runnable {
191
private Event poll() throws IOException {
192
try {
193
for (;;) {
194
int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
195
/*
196
* 'n' events have been read. Here we map them to their
197
* corresponding channel in batch and queue n-1 so that
198
* they can be handled by other handler threads. The last
199
* event is handled by this thread (and so is not queued).
200
*/
201
fdToChannelLock.readLock().lock();
202
try {
203
while (n-- > 0) {
204
long eventAddress = getEvent(address, n);
205
int fd = getDescriptor(eventAddress);
206
207
// wakeup
208
if (fd == sp[0]) {
209
if (wakeupCount.decrementAndGet() == 0) {
210
// no more wakeups so drain pipe
211
drain1(sp[0]);
212
}
213
214
// queue special event if there are more events
215
// to handle.
216
if (n > 0) {
217
queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
218
continue;
219
}
220
return EXECUTE_TASK_OR_SHUTDOWN;
221
}
222
223
PollableChannel channel = fdToChannel.get(fd);
224
if (channel != null) {
225
int events = getEvents(eventAddress);
226
Event ev = new Event(channel, events);
227
228
// n-1 events are queued; This thread handles
229
// the last one except for the wakeup
230
if (n > 0) {
231
queue.offer(ev);
232
} else {
233
return ev;
234
}
235
}
236
}
237
} finally {
238
fdToChannelLock.readLock().unlock();
239
}
240
}
241
} finally {
242
// to ensure that some thread will poll when all events have
243
// been consumed
244
queue.offer(NEED_TO_POLL);
245
}
246
}
247
248
public void run() {
249
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
250
Invoker.getGroupAndInvokeCount();
251
final boolean isPooledThread = (myGroupAndInvokeCount != null);
252
boolean replaceMe = false;
253
Event ev;
254
try {
255
for (;;) {
256
// reset invoke count
257
if (isPooledThread)
258
myGroupAndInvokeCount.resetInvokeCount();
259
260
try {
261
replaceMe = false;
262
ev = queue.take();
263
264
// no events and this thread has been "selected" to
265
// poll for more.
266
if (ev == NEED_TO_POLL) {
267
try {
268
ev = poll();
269
} catch (IOException x) {
270
x.printStackTrace();
271
return;
272
}
273
}
274
} catch (InterruptedException x) {
275
continue;
276
}
277
278
// handle wakeup to execute task or shutdown
279
if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
280
Runnable task = pollTask();
281
if (task == null) {
282
// shutdown request
283
return;
284
}
285
// run task (may throw error/exception)
286
replaceMe = true;
287
task.run();
288
continue;
289
}
290
291
// process event
292
try {
293
ev.channel().onEvent(ev.events(), isPooledThread);
294
} catch (Error x) {
295
replaceMe = true; throw x;
296
} catch (RuntimeException x) {
297
replaceMe = true; throw x;
298
}
299
}
300
} finally {
301
// last handler to exit when shutdown releases resources
302
int remaining = threadExit(this, replaceMe);
303
if (remaining == 0 && isShutdown()) {
304
implClose();
305
}
306
}
307
}
308
}
309
310
// -- Native methods --
311
312
private static native void socketpair(int[] sv) throws IOException;
313
314
private static native void interrupt(int fd) throws IOException;
315
316
private static native void drain1(int fd) throws IOException;
317
318
private static native void close0(int fd);
319
320
static {
321
IOUtil.load();
322
}
323
}
324
325