Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/classes/sun/nio/ch/KQueuePort.java
32288 views
1
/*
2
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.nio.channels.spi.AsynchronousChannelProvider;
29
import java.io.IOException;
30
import java.util.concurrent.ArrayBlockingQueue;
31
import java.util.concurrent.RejectedExecutionException;
32
import java.util.concurrent.atomic.AtomicInteger;
33
import static sun.nio.ch.KQueue.*;
34
35
/**
36
* AsynchronousChannelGroup implementation based on the BSD kqueue facility.
37
*/
38
39
final class KQueuePort
40
extends Port
41
{
42
// maximum number of events to poll at a time
43
private static final int MAX_KEVENTS_TO_POLL = 512;
44
45
// kqueue file descriptor
46
private final int kqfd;
47
48
// true if kqueue closed
49
private boolean closed;
50
51
// socket pair used for wakeup
52
private final int sp[];
53
54
// number of wakeups pending
55
private final AtomicInteger wakeupCount = new AtomicInteger();
56
57
// address of the poll array passed to kqueue_wait
58
private final long address;
59
60
// encapsulates an event for a channel
61
static class Event {
62
final PollableChannel channel;
63
final int events;
64
65
Event(PollableChannel channel, int events) {
66
this.channel = channel;
67
this.events = events;
68
}
69
70
PollableChannel channel() { return channel; }
71
int events() { return events; }
72
}
73
74
// queue of events for cases that a polling thread dequeues more than one
75
// event
76
private final ArrayBlockingQueue<Event> queue;
77
private final Event NEED_TO_POLL = new Event(null, 0);
78
private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
79
80
KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)
81
throws IOException
82
{
83
super(provider, pool);
84
85
// open kqueue
86
this.kqfd = kqueue();
87
88
// create socket pair for wakeup mechanism
89
int[] sv = new int[2];
90
try {
91
socketpair(sv);
92
93
// register one end with kqueue
94
keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
95
} catch (IOException x) {
96
close0(kqfd);
97
throw x;
98
}
99
this.sp = sv;
100
101
// allocate the poll array
102
this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
103
104
// create the queue and offer the special event to ensure that the first
105
// threads polls
106
this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
107
this.queue.offer(NEED_TO_POLL);
108
}
109
110
KQueuePort start() {
111
startThreads(new EventHandlerTask());
112
return this;
113
}
114
115
/**
116
* Release all resources
117
*/
118
private void implClose() {
119
synchronized (this) {
120
if (closed)
121
return;
122
closed = true;
123
}
124
freePollArray(address);
125
close0(sp[0]);
126
close0(sp[1]);
127
close0(kqfd);
128
}
129
130
private void wakeup() {
131
if (wakeupCount.incrementAndGet() == 1) {
132
// write byte to socketpair to force wakeup
133
try {
134
interrupt(sp[1]);
135
} catch (IOException x) {
136
throw new AssertionError(x);
137
}
138
}
139
}
140
141
@Override
142
void executeOnHandlerTask(Runnable task) {
143
synchronized (this) {
144
if (closed)
145
throw new RejectedExecutionException();
146
offerTask(task);
147
wakeup();
148
}
149
}
150
151
@Override
152
void shutdownHandlerTasks() {
153
/*
154
* If no tasks are running then just release resources; otherwise
155
* write to the one end of the socketpair to wakeup any polling threads.
156
*/
157
int nThreads = threadCount();
158
if (nThreads == 0) {
159
implClose();
160
} else {
161
// send interrupt to each thread
162
while (nThreads-- > 0) {
163
wakeup();
164
}
165
}
166
}
167
168
// invoked by clients to register a file descriptor
169
@Override
170
void startPoll(int fd, int events) {
171
// We use a separate filter for read and write events.
172
// TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.
173
int err = 0;
174
int flags = (EV_ADD|EV_ONESHOT);
175
if ((events & Net.POLLIN) > 0)
176
err = keventRegister(kqfd, fd, EVFILT_READ, flags);
177
if (err == 0 && (events & Net.POLLOUT) > 0)
178
err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
179
if (err != 0)
180
throw new InternalError("kevent failed: " + err); // should not happen
181
}
182
183
/*
184
* Task to process events from kqueue and dispatch to the channel's
185
* onEvent handler.
186
*
187
* Events are retreived from kqueue in batch and offered to a BlockingQueue
188
* where they are consumed by handler threads. A special "NEED_TO_POLL"
189
* event is used to signal one consumer to re-poll when all events have
190
* been consumed.
191
*/
192
private class EventHandlerTask implements Runnable {
193
private Event poll() throws IOException {
194
try {
195
for (;;) {
196
int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
197
/*
198
* 'n' events have been read. Here we map them to their
199
* corresponding channel in batch and queue n-1 so that
200
* they can be handled by other handler threads. The last
201
* event is handled by this thread (and so is not queued).
202
*/
203
fdToChannelLock.readLock().lock();
204
try {
205
while (n-- > 0) {
206
long keventAddress = getEvent(address, n);
207
int fd = getDescriptor(keventAddress);
208
209
// wakeup
210
if (fd == sp[0]) {
211
if (wakeupCount.decrementAndGet() == 0) {
212
// no more wakeups so drain pipe
213
drain1(sp[0]);
214
}
215
216
// queue special event if there are more events
217
// to handle.
218
if (n > 0) {
219
queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
220
continue;
221
}
222
return EXECUTE_TASK_OR_SHUTDOWN;
223
}
224
225
PollableChannel channel = fdToChannel.get(fd);
226
if (channel != null) {
227
int filter = getFilter(keventAddress);
228
int events = 0;
229
if (filter == EVFILT_READ)
230
events = Net.POLLIN;
231
else if (filter == EVFILT_WRITE)
232
events = Net.POLLOUT;
233
234
Event ev = new Event(channel, events);
235
236
// n-1 events are queued; This thread handles
237
// the last one except for the wakeup
238
if (n > 0) {
239
queue.offer(ev);
240
} else {
241
return ev;
242
}
243
}
244
}
245
} finally {
246
fdToChannelLock.readLock().unlock();
247
}
248
}
249
} finally {
250
// to ensure that some thread will poll when all events have
251
// been consumed
252
queue.offer(NEED_TO_POLL);
253
}
254
}
255
256
public void run() {
257
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
258
Invoker.getGroupAndInvokeCount();
259
final boolean isPooledThread = (myGroupAndInvokeCount != null);
260
boolean replaceMe = false;
261
Event ev;
262
try {
263
for (;;) {
264
// reset invoke count
265
if (isPooledThread)
266
myGroupAndInvokeCount.resetInvokeCount();
267
268
try {
269
replaceMe = false;
270
ev = queue.take();
271
272
// no events and this thread has been "selected" to
273
// poll for more.
274
if (ev == NEED_TO_POLL) {
275
try {
276
ev = poll();
277
} catch (IOException x) {
278
x.printStackTrace();
279
return;
280
}
281
}
282
} catch (InterruptedException x) {
283
continue;
284
}
285
286
// handle wakeup to execute task or shutdown
287
if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
288
Runnable task = pollTask();
289
if (task == null) {
290
// shutdown request
291
return;
292
}
293
// run task (may throw error/exception)
294
replaceMe = true;
295
task.run();
296
continue;
297
}
298
299
// process event
300
try {
301
ev.channel().onEvent(ev.events(), isPooledThread);
302
} catch (Error x) {
303
replaceMe = true; throw x;
304
} catch (RuntimeException x) {
305
replaceMe = true; throw x;
306
}
307
}
308
} finally {
309
// last handler to exit when shutdown releases resources
310
int remaining = threadExit(this, replaceMe);
311
if (remaining == 0 && isShutdown()) {
312
implClose();
313
}
314
}
315
}
316
}
317
318
// -- Native methods --
319
320
private static native void socketpair(int[] sv) throws IOException;
321
322
private static native void interrupt(int fd) throws IOException;
323
324
private static native void drain1(int fd) throws IOException;
325
326
private static native void close0(int fd);
327
328
static {
329
IOUtil.load();
330
}
331
}
332
333