Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.base/aix/classes/sun/nio/ch/AixPollPort.java
41139 views
1
/*
2
* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3
* Copyright (c) 2012 SAP SE. All rights reserved.
4
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5
*
6
* This code is free software; you can redistribute it and/or modify it
7
* under the terms of the GNU General Public License version 2 only, as
8
* published by the Free Software Foundation. Oracle designates this
9
* particular file as subject to the "Classpath" exception as provided
10
* by Oracle in the LICENSE file that accompanied this code.
11
*
12
* This code is distributed in the hope that it will be useful, but WITHOUT
13
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
14
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15
* version 2 for more details (a copy is included in the LICENSE file that
16
* accompanied this code).
17
*
18
* You should have received a copy of the GNU General Public License version
19
* 2 along with this work; if not, write to the Free Software Foundation,
20
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
21
*
22
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
23
* or visit www.oracle.com if you need additional information or have any
24
* questions.
25
*/
26
27
package sun.nio.ch;
28
29
import java.nio.channels.spi.AsynchronousChannelProvider;
30
import java.io.IOException;
31
import java.util.HashSet;
32
import java.util.Iterator;
33
import java.util.concurrent.ArrayBlockingQueue;
34
import java.util.concurrent.RejectedExecutionException;
35
import java.util.concurrent.atomic.AtomicInteger;
36
import java.util.concurrent.locks.ReentrantLock;
37
import jdk.internal.misc.Unsafe;
38
39
/**
40
* AsynchronousChannelGroup implementation based on the AIX pollset framework.
41
*/
42
final class AixPollPort
43
extends Port
44
{
45
private static final Unsafe unsafe = Unsafe.getUnsafe();
46
47
static {
48
IOUtil.load();
49
init();
50
}
51
52
/**
53
* struct pollfd {
54
* int fd;
55
* short events;
56
* short revents;
57
* }
58
*/
59
private static final int SIZEOF_POLLFD = eventSize();
60
private static final int OFFSETOF_EVENTS = eventsOffset();
61
private static final int OFFSETOF_REVENTS = reventsOffset();
62
private static final int OFFSETOF_FD = fdOffset();
63
64
// opcodes
65
private static final int PS_ADD = 0x0;
66
private static final int PS_MOD = 0x1;
67
private static final int PS_DELETE = 0x2;
68
69
// maximum number of events to poll at a time
70
private static final int MAX_POLL_EVENTS = 512;
71
72
// pollset ID
73
private final int pollset;
74
75
// true if port is closed
76
private boolean closed;
77
78
// socket pair used for wakeup
79
private final int sp[];
80
81
// socket pair used to indicate pending pollsetCtl calls
82
// Background info: pollsetCtl blocks when another thread is in a pollsetPoll call.
83
private final int ctlSp[];
84
85
// number of wakeups pending
86
private final AtomicInteger wakeupCount = new AtomicInteger();
87
88
// address of the poll array passed to pollset_poll
89
private final long address;
90
91
// encapsulates an event for a channel
92
static class Event {
93
final PollableChannel channel;
94
final int events;
95
96
Event(PollableChannel channel, int events) {
97
this.channel = channel;
98
this.events = events;
99
}
100
101
PollableChannel channel() { return channel; }
102
int events() { return events; }
103
}
104
105
// queue of events for cases that a polling thread dequeues more than one
106
// event
107
private final ArrayBlockingQueue<Event> queue;
108
private final Event NEED_TO_POLL = new Event(null, 0);
109
private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
110
private final Event CONTINUE_AFTER_CTL_EVENT = new Event(null, 0);
111
112
// encapsulates a pollset control event for a file descriptor
113
static class ControlEvent {
114
final int fd;
115
final int events;
116
final boolean removeOnly;
117
int error = 0;
118
119
ControlEvent(int fd, int events, boolean removeOnly) {
120
this.fd = fd;
121
this.events = events;
122
this.removeOnly = removeOnly;
123
}
124
125
int fd() { return fd; }
126
int events() { return events; }
127
boolean removeOnly() { return removeOnly; }
128
int error() { return error; }
129
void setError(int error) { this.error = error; }
130
}
131
132
// queue of control events that need to be processed
133
// (this object is also used for synchronization)
134
private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>();
135
136
// lock used to check whether a poll operation is ongoing
137
private final ReentrantLock controlLock = new ReentrantLock();
138
139
AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
140
throws IOException
141
{
142
super(provider, pool);
143
144
// open pollset
145
this.pollset = pollsetCreate();
146
147
// create socket pair for wakeup mechanism
148
int[] sv = new int[2];
149
try {
150
socketpair(sv);
151
// register one end with pollset
152
pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);
153
} catch (IOException x) {
154
pollsetDestroy(pollset);
155
throw x;
156
}
157
this.sp = sv;
158
159
// create socket pair for pollset control mechanism
160
sv = new int[2];
161
try {
162
socketpair(sv);
163
// register one end with pollset
164
pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);
165
} catch (IOException x) {
166
pollsetDestroy(pollset);
167
throw x;
168
}
169
this.ctlSp = sv;
170
171
// allocate the poll array
172
this.address = allocatePollArray(MAX_POLL_EVENTS);
173
174
// create the queue and offer the special event to ensure that the first
175
// threads polls
176
this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS);
177
this.queue.offer(NEED_TO_POLL);
178
}
179
180
AixPollPort start() {
181
startThreads(new EventHandlerTask());
182
return this;
183
}
184
185
/**
186
* Release all resources
187
*/
188
private void implClose() {
189
synchronized (this) {
190
if (closed)
191
return;
192
closed = true;
193
}
194
freePollArray(address);
195
close0(sp[0]);
196
close0(sp[1]);
197
close0(ctlSp[0]);
198
close0(ctlSp[1]);
199
pollsetDestroy(pollset);
200
}
201
202
private void wakeup() {
203
if (wakeupCount.incrementAndGet() == 1) {
204
// write byte to socketpair to force wakeup
205
try {
206
interrupt(sp[1]);
207
} catch (IOException x) {
208
throw new AssertionError(x);
209
}
210
}
211
}
212
213
@Override
214
void executeOnHandlerTask(Runnable task) {
215
synchronized (this) {
216
if (closed)
217
throw new RejectedExecutionException();
218
offerTask(task);
219
wakeup();
220
}
221
}
222
223
@Override
224
void shutdownHandlerTasks() {
225
/*
226
* If no tasks are running then just release resources; otherwise
227
* write to the one end of the socketpair to wakeup any polling threads.
228
*/
229
int nThreads = threadCount();
230
if (nThreads == 0) {
231
implClose();
232
} else {
233
// send interrupt to each thread
234
while (nThreads-- > 0) {
235
wakeup();
236
}
237
}
238
}
239
240
// invoke by clients to register a file descriptor
241
@Override
242
void startPoll(int fd, int events) {
243
queueControlEvent(new ControlEvent(fd, events, false));
244
}
245
246
// Callback method for implementations that need special handling when fd is removed
247
@Override
248
protected void preUnregister(int fd) {
249
queueControlEvent(new ControlEvent(fd, 0, true));
250
}
251
252
// Add control event into queue and wait for completion.
253
// In case the control lock is free, this method also tries to apply the control change directly.
254
private void queueControlEvent(ControlEvent ev) {
255
// pollsetCtl blocks when a poll call is ongoing. This is very probable.
256
// Therefore we let the polling thread do the pollsetCtl call.
257
synchronized (controlQueue) {
258
controlQueue.add(ev);
259
// write byte to socketpair to force wakeup
260
try {
261
interrupt(ctlSp[1]);
262
} catch (IOException x) {
263
throw new AssertionError(x);
264
}
265
do {
266
// Directly empty queue if no poll call is ongoing.
267
if (controlLock.tryLock()) {
268
try {
269
processControlQueue();
270
} finally {
271
controlLock.unlock();
272
}
273
} else {
274
try {
275
// Do not starve in case the polling thread returned before
276
// we could write to ctlSp[1] but the polling thread did not
277
// release the control lock until we checked. Therefore, use
278
// a timed wait for the time being.
279
controlQueue.wait(100);
280
} catch (InterruptedException e) {
281
// ignore exception and try again
282
}
283
}
284
} while (controlQueue.contains(ev));
285
}
286
if (ev.error() != 0) {
287
throw new AssertionError();
288
}
289
}
290
291
// Process all events currently stored in the control queue.
292
private void processControlQueue() {
293
synchronized (controlQueue) {
294
// On Aix it is only possible to set the event
295
// bits on the first call of pollsetCtl. Later
296
// calls only add bits, but cannot remove them.
297
// Therefore, we always remove the file
298
// descriptor ignoring the error and then add it.
299
Iterator<ControlEvent> iter = controlQueue.iterator();
300
while (iter.hasNext()) {
301
ControlEvent ev = iter.next();
302
pollsetCtl(pollset, PS_DELETE, ev.fd(), 0);
303
if (!ev.removeOnly()) {
304
ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events()));
305
}
306
iter.remove();
307
}
308
controlQueue.notifyAll();
309
}
310
}
311
312
/*
313
* Task to process events from pollset and dispatch to the channel's
314
* onEvent handler.
315
*
316
* Events are retreived from pollset in batch and offered to a BlockingQueue
317
* where they are consumed by handler threads. A special "NEED_TO_POLL"
318
* event is used to signal one consumer to re-poll when all events have
319
* been consumed.
320
*/
321
private class EventHandlerTask implements Runnable {
322
private Event poll() throws IOException {
323
try {
324
for (;;) {
325
int n;
326
controlLock.lock();
327
try {
328
n = pollsetPoll(pollset, address, MAX_POLL_EVENTS);
329
} finally {
330
controlLock.unlock();
331
}
332
/*
333
* 'n' events have been read. Here we map them to their
334
* corresponding channel in batch and queue n-1 so that
335
* they can be handled by other handler threads. The last
336
* event is handled by this thread (and so is not queued).
337
*/
338
fdToChannelLock.readLock().lock();
339
try {
340
while (n-- > 0) {
341
long eventAddress = getEvent(address, n);
342
int fd = getDescriptor(eventAddress);
343
344
// To emulate one shot semantic we need to remove
345
// the file descriptor here.
346
if (fd != sp[0] && fd != ctlSp[0]) {
347
synchronized (controlQueue) {
348
pollsetCtl(pollset, PS_DELETE, fd, 0);
349
}
350
}
351
352
// wakeup
353
if (fd == sp[0]) {
354
if (wakeupCount.decrementAndGet() == 0) {
355
// no more wakeups so drain pipe
356
drain1(sp[0]);
357
}
358
359
// queue special event if there are more events
360
// to handle.
361
if (n > 0) {
362
queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
363
continue;
364
}
365
return EXECUTE_TASK_OR_SHUTDOWN;
366
}
367
368
// wakeup to process control event
369
if (fd == ctlSp[0]) {
370
synchronized (controlQueue) {
371
drain1(ctlSp[0]);
372
processControlQueue();
373
}
374
if (n > 0) {
375
continue;
376
}
377
return CONTINUE_AFTER_CTL_EVENT;
378
}
379
380
PollableChannel channel = fdToChannel.get(fd);
381
if (channel != null) {
382
int events = getRevents(eventAddress);
383
Event ev = new Event(channel, events);
384
385
// n-1 events are queued; This thread handles
386
// the last one except for the wakeup
387
if (n > 0) {
388
queue.offer(ev);
389
} else {
390
return ev;
391
}
392
}
393
}
394
} finally {
395
fdToChannelLock.readLock().unlock();
396
}
397
}
398
} finally {
399
// to ensure that some thread will poll when all events have
400
// been consumed
401
queue.offer(NEED_TO_POLL);
402
}
403
}
404
405
public void run() {
406
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
407
Invoker.getGroupAndInvokeCount();
408
final boolean isPooledThread = (myGroupAndInvokeCount != null);
409
boolean replaceMe = false;
410
Event ev;
411
try {
412
for (;;) {
413
// reset invoke count
414
if (isPooledThread)
415
myGroupAndInvokeCount.resetInvokeCount();
416
417
try {
418
replaceMe = false;
419
ev = queue.take();
420
421
// no events and this thread has been "selected" to
422
// poll for more.
423
if (ev == NEED_TO_POLL) {
424
try {
425
ev = poll();
426
} catch (IOException x) {
427
x.printStackTrace();
428
return;
429
}
430
}
431
} catch (InterruptedException x) {
432
continue;
433
}
434
435
// contine after we processed a control event
436
if (ev == CONTINUE_AFTER_CTL_EVENT) {
437
continue;
438
}
439
440
// handle wakeup to execute task or shutdown
441
if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
442
Runnable task = pollTask();
443
if (task == null) {
444
// shutdown request
445
return;
446
}
447
// run task (may throw error/exception)
448
replaceMe = true;
449
task.run();
450
continue;
451
}
452
453
// process event
454
try {
455
ev.channel().onEvent(ev.events(), isPooledThread);
456
} catch (Error x) {
457
replaceMe = true; throw x;
458
} catch (RuntimeException x) {
459
replaceMe = true; throw x;
460
}
461
}
462
} finally {
463
// last handler to exit when shutdown releases resources
464
int remaining = threadExit(this, replaceMe);
465
if (remaining == 0 && isShutdown()) {
466
implClose();
467
}
468
}
469
}
470
}
471
472
/**
473
* Allocates a poll array to handle up to {@code count} events.
474
*/
475
private static long allocatePollArray(int count) {
476
return unsafe.allocateMemory(count * SIZEOF_POLLFD);
477
}
478
479
/**
480
* Free a poll array
481
*/
482
private static void freePollArray(long address) {
483
unsafe.freeMemory(address);
484
}
485
486
/**
487
* Returns event[i];
488
*/
489
private static long getEvent(long address, int i) {
490
return address + (SIZEOF_POLLFD*i);
491
}
492
493
/**
494
* Returns event->fd
495
*/
496
private static int getDescriptor(long eventAddress) {
497
return unsafe.getInt(eventAddress + OFFSETOF_FD);
498
}
499
500
/**
501
* Returns event->events
502
*/
503
private static int getEvents(long eventAddress) {
504
return unsafe.getChar(eventAddress + OFFSETOF_EVENTS);
505
}
506
507
/**
508
* Returns event->revents
509
*/
510
private static int getRevents(long eventAddress) {
511
return unsafe.getChar(eventAddress + OFFSETOF_REVENTS);
512
}
513
514
// -- Native methods --
515
516
private static native void init();
517
518
private static native int eventSize();
519
520
private static native int eventsOffset();
521
522
private static native int reventsOffset();
523
524
private static native int fdOffset();
525
526
private static native int pollsetCreate() throws IOException;
527
528
private static native int pollsetCtl(int pollset, int opcode, int fd, int events);
529
530
private static native int pollsetPoll(int pollset, long pollAddress, int numfds)
531
throws IOException;
532
533
private static native void pollsetDestroy(int pollset);
534
535
private static native void socketpair(int[] sv) throws IOException;
536
537
private static native void interrupt(int fd) throws IOException;
538
539
private static native void drain1(int fd) throws IOException;
540
541
private static native void close0(int fd);
542
}
543
544