Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
41139 views
1
/*
2
* Copyright (c) 2002, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.io.IOException;
29
import java.nio.channels.ClosedSelectorException;
30
import java.nio.channels.Pipe;
31
import java.nio.channels.SelectionKey;
32
import java.nio.channels.Selector;
33
import java.nio.channels.SelectableChannel;
34
import java.nio.channels.spi.SelectorProvider;
35
import java.util.ArrayDeque;
36
import java.util.ArrayList;
37
import java.util.Deque;
38
import java.util.HashMap;
39
import java.util.List;
40
import java.util.function.Consumer;
41
import jdk.internal.misc.Unsafe;
42
43
/**
44
* A multi-threaded implementation of Selector for Windows.
45
*
46
* @author Konstantin Kladko
47
* @author Mark Reinhold
48
*/
49
50
class WindowsSelectorImpl extends SelectorImpl {
51
private static final Unsafe unsafe = Unsafe.getUnsafe();
52
private static int addressSize = unsafe.addressSize();
53
54
private static int dependsArch(int value32, int value64) {
55
return (addressSize == 4) ? value32 : value64;
56
}
57
58
// Initial capacity of the poll array
59
private final int INIT_CAP = 8;
60
// Maximum number of sockets for select().
61
// Should be INIT_CAP times a power of 2
62
private static final int MAX_SELECTABLE_FDS = 1024;
63
64
// Size of FD_SET struct to allocate a buffer for it in SubSelector,
65
// aligned to 8 bytes on 64-bit:
66
// struct { unsigned int fd_count; SOCKET fd_array[MAX_SELECTABLE_FDS]; }.
67
private static final long SIZEOF_FD_SET = dependsArch(
68
4 + MAX_SELECTABLE_FDS * 4, // SOCKET = unsigned int
69
4 + MAX_SELECTABLE_FDS * 8 + 4); // SOCKET = unsigned __int64
70
71
// The list of SelectableChannels serviced by this Selector. Every mod
72
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
73
// array, where the corresponding entry is occupied by the wakeupSocket
74
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
75
76
// The global native poll array holds file decriptors and event masks
77
private PollArrayWrapper pollWrapper;
78
79
// The number of valid entries in poll array, including entries occupied
80
// by wakeup socket handle.
81
private int totalChannels = 1;
82
83
// Number of helper threads needed for select. We need one thread per
84
// each additional set of MAX_SELECTABLE_FDS - 1 channels.
85
private int threadsCount = 0;
86
87
// A list of helper threads for select.
88
private final List<SelectThread> threads = new ArrayList<SelectThread>();
89
90
//Pipe used as a wakeup object.
91
private final Pipe wakeupPipe;
92
93
// File descriptors corresponding to source and sink
94
private final int wakeupSourceFd, wakeupSinkFd;
95
96
// Maps file descriptors to their indices in pollArray
97
private static final class FdMap extends HashMap<Integer, MapEntry> {
98
static final long serialVersionUID = 0L;
99
private MapEntry get(int desc) {
100
return get(Integer.valueOf(desc));
101
}
102
private MapEntry put(SelectionKeyImpl ski) {
103
return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));
104
}
105
private MapEntry remove(SelectionKeyImpl ski) {
106
Integer fd = Integer.valueOf(ski.getFDVal());
107
MapEntry x = get(fd);
108
if ((x != null) && (x.ski.channel() == ski.channel()))
109
return remove(fd);
110
return null;
111
}
112
}
113
114
// class for fdMap entries
115
private static final class MapEntry {
116
final SelectionKeyImpl ski;
117
long updateCount = 0;
118
MapEntry(SelectionKeyImpl ski) {
119
this.ski = ski;
120
}
121
}
122
private final FdMap fdMap = new FdMap();
123
124
// SubSelector for the main thread
125
private final SubSelector subSelector = new SubSelector();
126
127
private long timeout; //timeout for poll
128
129
// Lock for interrupt triggering and clearing
130
private final Object interruptLock = new Object();
131
private volatile boolean interruptTriggered;
132
133
// pending new registrations/updates, queued by implRegister and setEventOps
134
private final Object updateLock = new Object();
135
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
136
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
137
138
139
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
140
super(sp);
141
pollWrapper = new PollArrayWrapper(INIT_CAP);
142
wakeupPipe = new PipeImpl(sp, false);
143
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
144
wakeupSinkFd = ((SelChImpl)wakeupPipe.sink()).getFDVal();
145
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
146
}
147
148
private void ensureOpen() {
149
if (!isOpen())
150
throw new ClosedSelectorException();
151
}
152
153
@Override
154
protected int doSelect(Consumer<SelectionKey> action, long timeout)
155
throws IOException
156
{
157
assert Thread.holdsLock(this);
158
this.timeout = timeout; // set selector timeout
159
processUpdateQueue();
160
processDeregisterQueue();
161
if (interruptTriggered) {
162
resetWakeupSocket();
163
return 0;
164
}
165
// Calculate number of helper threads needed for poll. If necessary
166
// threads are created here and start waiting on startLock
167
adjustThreadsCount();
168
finishLock.reset(); // reset finishLock
169
// Wakeup helper threads, waiting on startLock, so they start polling.
170
// Redundant threads will exit here after wakeup.
171
startLock.startThreads();
172
// do polling in the main thread. Main thread is responsible for
173
// first MAX_SELECTABLE_FDS entries in pollArray.
174
try {
175
begin();
176
try {
177
subSelector.poll();
178
} catch (IOException e) {
179
finishLock.setException(e); // Save this exception
180
}
181
// Main thread is out of poll(). Wakeup others and wait for them
182
if (threads.size() > 0)
183
finishLock.waitForHelperThreads();
184
} finally {
185
end();
186
}
187
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
188
finishLock.checkForException();
189
processDeregisterQueue();
190
int updated = updateSelectedKeys(action);
191
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
192
resetWakeupSocket();
193
return updated;
194
}
195
196
/**
197
* Process new registrations and changes to the interest ops.
198
*/
199
private void processUpdateQueue() {
200
assert Thread.holdsLock(this);
201
202
synchronized (updateLock) {
203
SelectionKeyImpl ski;
204
205
// new registrations
206
while ((ski = newKeys.pollFirst()) != null) {
207
if (ski.isValid()) {
208
growIfNeeded();
209
channelArray[totalChannels] = ski;
210
ski.setIndex(totalChannels);
211
pollWrapper.putEntry(totalChannels, ski);
212
totalChannels++;
213
MapEntry previous = fdMap.put(ski);
214
assert previous == null;
215
}
216
}
217
218
// changes to interest ops
219
while ((ski = updateKeys.pollFirst()) != null) {
220
int events = ski.translateInterestOps();
221
int fd = ski.getFDVal();
222
if (ski.isValid() && fdMap.containsKey(fd)) {
223
int index = ski.getIndex();
224
assert index >= 0 && index < totalChannels;
225
pollWrapper.putEventOps(index, events);
226
}
227
}
228
}
229
}
230
231
// Helper threads wait on this lock for the next poll.
232
private final StartLock startLock = new StartLock();
233
234
private final class StartLock {
235
// A variable which distinguishes the current run of doSelect from the
236
// previous one. Incrementing runsCounter and notifying threads will
237
// trigger another round of poll.
238
private long runsCounter;
239
// Triggers threads, waiting on this lock to start polling.
240
private synchronized void startThreads() {
241
runsCounter++; // next run
242
notifyAll(); // wake up threads.
243
}
244
// This function is called by a helper thread to wait for the
245
// next round of poll(). It also checks, if this thread became
246
// redundant. If yes, it returns true, notifying the thread
247
// that it should exit.
248
private synchronized boolean waitForStart(SelectThread thread) {
249
while (true) {
250
while (runsCounter == thread.lastRun) {
251
try {
252
startLock.wait();
253
} catch (InterruptedException e) {
254
Thread.currentThread().interrupt();
255
}
256
}
257
if (thread.isZombie()) { // redundant thread
258
return true; // will cause run() to exit.
259
} else {
260
thread.lastRun = runsCounter; // update lastRun
261
return false; // will cause run() to poll.
262
}
263
}
264
}
265
}
266
267
// Main thread waits on this lock, until all helper threads are done
268
// with poll().
269
private final FinishLock finishLock = new FinishLock();
270
271
private final class FinishLock {
272
// Number of helper threads, that did not finish yet.
273
private int threadsToFinish;
274
275
// IOException which occurred during the last run.
276
IOException exception = null;
277
278
// Called before polling.
279
private void reset() {
280
threadsToFinish = threads.size(); // helper threads
281
}
282
283
// Each helper thread invokes this function on finishLock, when
284
// the thread is done with poll().
285
private synchronized void threadFinished() {
286
if (threadsToFinish == threads.size()) { // finished poll() first
287
// if finished first, wakeup others
288
wakeup();
289
}
290
threadsToFinish--;
291
if (threadsToFinish == 0) // all helper threads finished poll().
292
notify(); // notify the main thread
293
}
294
295
// The main thread invokes this function on finishLock to wait
296
// for helper threads to finish poll().
297
private synchronized void waitForHelperThreads() {
298
if (threadsToFinish == threads.size()) {
299
// no helper threads finished yet. Wakeup them up.
300
wakeup();
301
}
302
while (threadsToFinish != 0) {
303
try {
304
finishLock.wait();
305
} catch (InterruptedException e) {
306
// Interrupted - set interrupted state.
307
Thread.currentThread().interrupt();
308
}
309
}
310
}
311
312
// sets IOException for this run
313
private synchronized void setException(IOException e) {
314
exception = e;
315
}
316
317
// Checks if there was any exception during the last run.
318
// If yes, throws it
319
private void checkForException() throws IOException {
320
if (exception == null)
321
return;
322
String message = "An exception occurred" +
323
" during the execution of select(): \n" +
324
exception + '\n';
325
exception = null;
326
throw new IOException(message);
327
}
328
}
329
330
private final class SubSelector {
331
private final int pollArrayIndex; // starting index in pollArray to poll
332
// These arrays will hold result of native select().
333
// The first element of each array is the number of selected sockets.
334
// Other elements are file descriptors of selected sockets.
335
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
336
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
337
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
338
// Buffer for readfds, writefds and exceptfds structs that are passed
339
// to native select().
340
private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 3);
341
342
private SubSelector() {
343
this.pollArrayIndex = 0; // main thread
344
}
345
346
private SubSelector(int threadIndex) { // helper threads
347
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
348
}
349
350
private int poll() throws IOException{ // poll for the main thread
351
return poll0(pollWrapper.pollArrayAddress,
352
Math.min(totalChannels, MAX_SELECTABLE_FDS),
353
readFds, writeFds, exceptFds, timeout, fdsBuffer);
354
}
355
356
private int poll(int index) throws IOException {
357
// poll for helper threads
358
return poll0(pollWrapper.pollArrayAddress +
359
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
360
Math.min(MAX_SELECTABLE_FDS,
361
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
362
readFds, writeFds, exceptFds, timeout, fdsBuffer);
363
}
364
365
private native int poll0(long pollAddress, int numfds,
366
int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer);
367
368
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action)
369
throws IOException
370
{
371
int numKeysUpdated = 0;
372
numKeysUpdated += processFDSet(updateCount, action, readFds,
373
Net.POLLIN,
374
false);
375
numKeysUpdated += processFDSet(updateCount, action, writeFds,
376
Net.POLLCONN |
377
Net.POLLOUT,
378
false);
379
numKeysUpdated += processFDSet(updateCount, action, exceptFds,
380
Net.POLLIN |
381
Net.POLLCONN |
382
Net.POLLOUT,
383
true);
384
return numKeysUpdated;
385
}
386
387
/**
388
* updateCount is used to tell if a key has been counted as updated
389
* in this select operation.
390
*
391
* me.updateCount <= updateCount
392
*/
393
private int processFDSet(long updateCount,
394
Consumer<SelectionKey> action,
395
int[] fds, int rOps,
396
boolean isExceptFds)
397
throws IOException
398
{
399
int numKeysUpdated = 0;
400
for (int i = 1; i <= fds[0]; i++) {
401
int desc = fds[i];
402
if (desc == wakeupSourceFd) {
403
synchronized (interruptLock) {
404
interruptTriggered = true;
405
}
406
continue;
407
}
408
MapEntry me = fdMap.get(desc);
409
// If me is null, the key was deregistered in the previous
410
// processDeregisterQueue.
411
if (me == null)
412
continue;
413
SelectionKeyImpl ski = me.ski;
414
415
// The descriptor may be in the exceptfds set because there is
416
// OOB data queued to the socket. If there is OOB data then it
417
// is discarded and the key is not added to the selected set.
418
SelectableChannel sc = ski.channel();
419
if (isExceptFds && (sc instanceof SocketChannelImpl)
420
&& ((SocketChannelImpl) sc).isNetSocket()
421
&& Net.discardOOB(ski.getFD())) {
422
continue;
423
}
424
425
int updated = processReadyEvents(rOps, ski, action);
426
if (updated > 0 && me.updateCount != updateCount) {
427
me.updateCount = updateCount;
428
numKeysUpdated++;
429
}
430
}
431
return numKeysUpdated;
432
}
433
434
private void freeFDSetBuffer() {
435
unsafe.freeMemory(fdsBuffer);
436
}
437
}
438
439
// Represents a helper thread used for select.
440
private final class SelectThread extends Thread {
441
private final int index; // index of this thread
442
final SubSelector subSelector;
443
private long lastRun = 0; // last run number
444
private volatile boolean zombie;
445
// Creates a new thread
446
private SelectThread(int i) {
447
super(null, null, "SelectorHelper", 0, false);
448
this.index = i;
449
this.subSelector = new SubSelector(i);
450
//make sure we wait for next round of poll
451
this.lastRun = startLock.runsCounter;
452
}
453
void makeZombie() {
454
zombie = true;
455
}
456
boolean isZombie() {
457
return zombie;
458
}
459
public void run() {
460
while (true) { // poll loop
461
// wait for the start of poll. If this thread has become
462
// redundant, then exit.
463
if (startLock.waitForStart(this)) {
464
subSelector.freeFDSetBuffer();
465
return;
466
}
467
// call poll()
468
try {
469
subSelector.poll(index);
470
} catch (IOException e) {
471
// Save this exception and let other threads finish.
472
finishLock.setException(e);
473
}
474
// notify main thread, that this thread has finished, and
475
// wakeup others, if this thread is the first to finish.
476
finishLock.threadFinished();
477
}
478
}
479
}
480
481
// After some channels registered/deregistered, the number of required
482
// helper threads may have changed. Adjust this number.
483
private void adjustThreadsCount() {
484
if (threadsCount > threads.size()) {
485
// More threads needed. Start more threads.
486
for (int i = threads.size(); i < threadsCount; i++) {
487
SelectThread newThread = new SelectThread(i);
488
threads.add(newThread);
489
newThread.setDaemon(true);
490
newThread.start();
491
}
492
} else if (threadsCount < threads.size()) {
493
// Some threads become redundant. Remove them from the threads List.
494
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
495
threads.remove(i).makeZombie();
496
}
497
}
498
499
// Sets Windows wakeup socket to a signaled state.
500
private void setWakeupSocket() {
501
setWakeupSocket0(wakeupSinkFd);
502
}
503
private native void setWakeupSocket0(int wakeupSinkFd);
504
505
// Sets Windows wakeup socket to a non-signaled state.
506
private void resetWakeupSocket() {
507
synchronized (interruptLock) {
508
if (interruptTriggered == false)
509
return;
510
resetWakeupSocket0(wakeupSourceFd);
511
interruptTriggered = false;
512
}
513
}
514
515
private native void resetWakeupSocket0(int wakeupSourceFd);
516
517
// We increment this counter on each call to updateSelectedKeys()
518
// each entry in SubSelector.fdsMap has a memorized value of
519
// updateCount. When we increment numKeysUpdated we set updateCount
520
// for the corresponding entry to its current value. This is used to
521
// avoid counting the same key more than once - the same key can
522
// appear in readfds and writefds.
523
private long updateCount = 0;
524
525
// Update ops of the corresponding Channels. Add the ready keys to the
526
// ready queue.
527
private int updateSelectedKeys(Consumer<SelectionKey> action) throws IOException {
528
updateCount++;
529
int numKeysUpdated = 0;
530
numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
531
for (SelectThread t: threads) {
532
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
533
}
534
return numKeysUpdated;
535
}
536
537
@Override
538
protected void implClose() throws IOException {
539
assert !isOpen();
540
assert Thread.holdsLock(this);
541
542
// prevent further wakeup
543
synchronized (interruptLock) {
544
interruptTriggered = true;
545
}
546
547
wakeupPipe.sink().close();
548
wakeupPipe.source().close();
549
pollWrapper.free();
550
551
// Make all remaining helper threads exit
552
for (SelectThread t: threads)
553
t.makeZombie();
554
startLock.startThreads();
555
subSelector.freeFDSetBuffer();
556
}
557
558
@Override
559
protected void implRegister(SelectionKeyImpl ski) {
560
ensureOpen();
561
synchronized (updateLock) {
562
newKeys.addLast(ski);
563
}
564
}
565
566
private void growIfNeeded() {
567
if (channelArray.length == totalChannels) {
568
int newSize = totalChannels * 2; // Make a larger array
569
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
570
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
571
channelArray = temp;
572
pollWrapper.grow(newSize);
573
}
574
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
575
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
576
totalChannels++;
577
threadsCount++;
578
}
579
}
580
581
@Override
582
protected void implDereg(SelectionKeyImpl ski) {
583
assert !ski.isValid();
584
assert Thread.holdsLock(this);
585
586
if (fdMap.remove(ski) != null) {
587
int i = ski.getIndex();
588
assert (i >= 0);
589
590
if (i != totalChannels - 1) {
591
// Copy end one over it
592
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
593
channelArray[i] = endChannel;
594
endChannel.setIndex(i);
595
pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
596
}
597
ski.setIndex(-1);
598
599
channelArray[totalChannels - 1] = null;
600
totalChannels--;
601
if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
602
totalChannels--;
603
threadsCount--; // The last thread has become redundant.
604
}
605
}
606
}
607
608
@Override
609
public void setEventOps(SelectionKeyImpl ski) {
610
ensureOpen();
611
synchronized (updateLock) {
612
updateKeys.addLast(ski);
613
}
614
}
615
616
@Override
617
public Selector wakeup() {
618
synchronized (interruptLock) {
619
if (!interruptTriggered) {
620
setWakeupSocket();
621
interruptTriggered = true;
622
}
623
}
624
return this;
625
}
626
627
static {
628
IOUtil.load();
629
}
630
}
631
632