Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
32288 views
1
/*
2
* Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
/*
27
*/
28
29
30
package sun.nio.ch;
31
32
import java.nio.channels.spi.SelectorProvider;
33
import java.nio.channels.Selector;
34
import java.nio.channels.ClosedSelectorException;
35
import java.nio.channels.Pipe;
36
import java.nio.channels.SelectableChannel;
37
import java.io.IOException;
38
import java.nio.channels.CancelledKeyException;
39
import java.util.List;
40
import java.util.ArrayList;
41
import java.util.HashMap;
42
import java.util.Iterator;
43
import sun.misc.Unsafe;
44
45
/**
46
* A multi-threaded implementation of Selector for Windows.
47
*
48
* @author Konstantin Kladko
49
* @author Mark Reinhold
50
*/
51
52
final class WindowsSelectorImpl extends SelectorImpl {
53
private static final Unsafe unsafe = Unsafe.getUnsafe();
54
private static int addressSize = unsafe.addressSize();
55
56
private static int dependsArch(int value32, int value64) {
57
return (addressSize == 4) ? value32 : value64;
58
}
59
60
// Initial capacity of the poll array
61
private final int INIT_CAP = 8;
62
// Maximum number of sockets for select().
63
// Should be INIT_CAP times a power of 2
64
private final static int MAX_SELECTABLE_FDS = 1024;
65
66
// Size of FD_SET struct to allocate a buffer for it in SubSelector,
67
// aligned to 8 bytes on 64-bit:
68
// struct { unsigned int fd_count; SOCKET fd_array[MAX_SELECTABLE_FDS]; }.
69
private static final long SIZEOF_FD_SET = dependsArch(
70
4 + MAX_SELECTABLE_FDS * 4, // SOCKET = unsigned int
71
4 + MAX_SELECTABLE_FDS * 8 + 4); // SOCKET = unsigned __int64
72
73
// The list of SelectableChannels serviced by this Selector. Every mod
74
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
75
// array, where the corresponding entry is occupied by the wakeupSocket
76
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
77
78
// The global native poll array holds file decriptors and event masks
79
private PollArrayWrapper pollWrapper;
80
81
// The number of valid entries in poll array, including entries occupied
82
// by wakeup socket handle.
83
private int totalChannels = 1;
84
85
// Number of helper threads needed for select. We need one thread per
86
// each additional set of MAX_SELECTABLE_FDS - 1 channels.
87
private int threadsCount = 0;
88
89
// A list of helper threads for select.
90
private final List<SelectThread> threads = new ArrayList<SelectThread>();
91
92
//Pipe used as a wakeup object.
93
private final Pipe wakeupPipe;
94
95
// File descriptors corresponding to source and sink
96
private final int wakeupSourceFd, wakeupSinkFd;
97
98
// Lock for close cleanup
99
private Object closeLock = new Object();
100
101
// Maps file descriptors to their indices in pollArray
102
private final static class FdMap extends HashMap<Integer, MapEntry> {
103
static final long serialVersionUID = 0L;
104
private MapEntry get(int desc) {
105
return get(new Integer(desc));
106
}
107
private MapEntry put(SelectionKeyImpl ski) {
108
return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
109
}
110
private MapEntry remove(SelectionKeyImpl ski) {
111
Integer fd = new Integer(ski.channel.getFDVal());
112
MapEntry x = get(fd);
113
if ((x != null) && (x.ski.channel == ski.channel))
114
return remove(fd);
115
return null;
116
}
117
}
118
119
// class for fdMap entries
120
private final static class MapEntry {
121
SelectionKeyImpl ski;
122
long updateCount = 0;
123
long clearedCount = 0;
124
MapEntry(SelectionKeyImpl ski) {
125
this.ski = ski;
126
}
127
}
128
private final FdMap fdMap = new FdMap();
129
130
// SubSelector for the main thread
131
private final SubSelector subSelector = new SubSelector();
132
133
private long timeout; //timeout for poll
134
135
// Lock for interrupt triggering and clearing
136
private final Object interruptLock = new Object();
137
private volatile boolean interruptTriggered = false;
138
139
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
140
super(sp);
141
pollWrapper = new PollArrayWrapper(INIT_CAP);
142
wakeupPipe = Pipe.open();
143
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
144
145
// Disable the Nagle algorithm so that the wakeup is more immediate
146
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
147
(sink.sc).socket().setTcpNoDelay(true);
148
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
149
150
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
151
}
152
153
protected int doSelect(long timeout) throws IOException {
154
if (channelArray == null)
155
throw new ClosedSelectorException();
156
this.timeout = timeout; // set selector timeout
157
processDeregisterQueue();
158
if (interruptTriggered) {
159
resetWakeupSocket();
160
return 0;
161
}
162
// Calculate number of helper threads needed for poll. If necessary
163
// threads are created here and start waiting on startLock
164
adjustThreadsCount();
165
finishLock.reset(); // reset finishLock
166
// Wakeup helper threads, waiting on startLock, so they start polling.
167
// Redundant threads will exit here after wakeup.
168
startLock.startThreads();
169
// do polling in the main thread. Main thread is responsible for
170
// first MAX_SELECTABLE_FDS entries in pollArray.
171
try {
172
begin();
173
try {
174
subSelector.poll();
175
} catch (IOException e) {
176
finishLock.setException(e); // Save this exception
177
}
178
// Main thread is out of poll(). Wakeup others and wait for them
179
if (threads.size() > 0)
180
finishLock.waitForHelperThreads();
181
} finally {
182
end();
183
}
184
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
185
finishLock.checkForException();
186
processDeregisterQueue();
187
int updated = updateSelectedKeys();
188
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
189
resetWakeupSocket();
190
return updated;
191
}
192
193
// Helper threads wait on this lock for the next poll.
194
private final StartLock startLock = new StartLock();
195
196
private final class StartLock {
197
// A variable which distinguishes the current run of doSelect from the
198
// previous one. Incrementing runsCounter and notifying threads will
199
// trigger another round of poll.
200
private long runsCounter;
201
// Triggers threads, waiting on this lock to start polling.
202
private synchronized void startThreads() {
203
runsCounter++; // next run
204
notifyAll(); // wake up threads.
205
}
206
// This function is called by a helper thread to wait for the
207
// next round of poll(). It also checks, if this thread became
208
// redundant. If yes, it returns true, notifying the thread
209
// that it should exit.
210
private synchronized boolean waitForStart(SelectThread thread) {
211
while (true) {
212
while (runsCounter == thread.lastRun) {
213
try {
214
startLock.wait();
215
} catch (InterruptedException e) {
216
Thread.currentThread().interrupt();
217
}
218
}
219
if (thread.isZombie()) { // redundant thread
220
return true; // will cause run() to exit.
221
} else {
222
thread.lastRun = runsCounter; // update lastRun
223
return false; // will cause run() to poll.
224
}
225
}
226
}
227
}
228
229
// Main thread waits on this lock, until all helper threads are done
230
// with poll().
231
private final FinishLock finishLock = new FinishLock();
232
233
private final class FinishLock {
234
// Number of helper threads, that did not finish yet.
235
private int threadsToFinish;
236
237
// IOException which occurred during the last run.
238
IOException exception = null;
239
240
// Called before polling.
241
private void reset() {
242
threadsToFinish = threads.size(); // helper threads
243
}
244
245
// Each helper thread invokes this function on finishLock, when
246
// the thread is done with poll().
247
private synchronized void threadFinished() {
248
if (threadsToFinish == threads.size()) { // finished poll() first
249
// if finished first, wakeup others
250
wakeup();
251
}
252
threadsToFinish--;
253
if (threadsToFinish == 0) // all helper threads finished poll().
254
notify(); // notify the main thread
255
}
256
257
// The main thread invokes this function on finishLock to wait
258
// for helper threads to finish poll().
259
private synchronized void waitForHelperThreads() {
260
if (threadsToFinish == threads.size()) {
261
// no helper threads finished yet. Wakeup them up.
262
wakeup();
263
}
264
while (threadsToFinish != 0) {
265
try {
266
finishLock.wait();
267
} catch (InterruptedException e) {
268
// Interrupted - set interrupted state.
269
Thread.currentThread().interrupt();
270
}
271
}
272
}
273
274
// sets IOException for this run
275
private synchronized void setException(IOException e) {
276
exception = e;
277
}
278
279
// Checks if there was any exception during the last run.
280
// If yes, throws it
281
private void checkForException() throws IOException {
282
if (exception == null)
283
return;
284
StringBuffer message = new StringBuffer("An exception occurred" +
285
" during the execution of select(): \n");
286
message.append(exception);
287
message.append('\n');
288
exception = null;
289
throw new IOException(message.toString());
290
}
291
}
292
293
private final class SubSelector {
294
private final int pollArrayIndex; // starting index in pollArray to poll
295
// These arrays will hold result of native select().
296
// The first element of each array is the number of selected sockets.
297
// Other elements are file descriptors of selected sockets.
298
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
299
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
300
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
301
// Buffer for readfds, writefds and exceptfds structs that are passed
302
// to native select().
303
private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 6);
304
305
private SubSelector() {
306
this.pollArrayIndex = 0; // main thread
307
}
308
309
private SubSelector(int threadIndex) { // helper threads
310
this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;
311
}
312
313
private int poll() throws IOException{ // poll for the main thread
314
return poll0(pollWrapper.pollArrayAddress,
315
Math.min(totalChannels, MAX_SELECTABLE_FDS),
316
readFds, writeFds, exceptFds, timeout, fdsBuffer);
317
}
318
319
private int poll(int index) throws IOException {
320
// poll for helper threads
321
return poll0(pollWrapper.pollArrayAddress +
322
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
323
Math.min(MAX_SELECTABLE_FDS,
324
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
325
readFds, writeFds, exceptFds, timeout, fdsBuffer);
326
}
327
328
private native int poll0(long pollAddress, int numfds,
329
int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer);
330
331
private int processSelectedKeys(long updateCount) {
332
int numKeysUpdated = 0;
333
numKeysUpdated += processFDSet(updateCount, readFds,
334
Net.POLLIN,
335
false);
336
numKeysUpdated += processFDSet(updateCount, writeFds,
337
Net.POLLCONN |
338
Net.POLLOUT,
339
false);
340
numKeysUpdated += processFDSet(updateCount, exceptFds,
341
Net.POLLIN |
342
Net.POLLCONN |
343
Net.POLLOUT,
344
true);
345
return numKeysUpdated;
346
}
347
348
/**
349
* Note, clearedCount is used to determine if the readyOps have
350
* been reset in this select operation. updateCount is used to
351
* tell if a key has been counted as updated in this select
352
* operation.
353
*
354
* me.updateCount <= me.clearedCount <= updateCount
355
*/
356
private int processFDSet(long updateCount, int[] fds, int rOps,
357
boolean isExceptFds)
358
{
359
int numKeysUpdated = 0;
360
for (int i = 1; i <= fds[0]; i++) {
361
int desc = fds[i];
362
if (desc == wakeupSourceFd) {
363
synchronized (interruptLock) {
364
interruptTriggered = true;
365
}
366
continue;
367
}
368
MapEntry me = fdMap.get(desc);
369
// If me is null, the key was deregistered in the previous
370
// processDeregisterQueue.
371
if (me == null)
372
continue;
373
SelectionKeyImpl sk = me.ski;
374
375
// The descriptor may be in the exceptfds set because there is
376
// OOB data queued to the socket. If there is OOB data then it
377
// is discarded and the key is not added to the selected set.
378
if (isExceptFds &&
379
(sk.channel() instanceof SocketChannelImpl) &&
380
discardUrgentData(desc))
381
{
382
continue;
383
}
384
385
if (selectedKeys.contains(sk)) { // Key in selected set
386
if (me.clearedCount != updateCount) {
387
if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
388
(me.updateCount != updateCount)) {
389
me.updateCount = updateCount;
390
numKeysUpdated++;
391
}
392
} else { // The readyOps have been set; now add
393
if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
394
(me.updateCount != updateCount)) {
395
me.updateCount = updateCount;
396
numKeysUpdated++;
397
}
398
}
399
me.clearedCount = updateCount;
400
} else { // Key is not in selected set yet
401
if (me.clearedCount != updateCount) {
402
sk.channel.translateAndSetReadyOps(rOps, sk);
403
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
404
selectedKeys.add(sk);
405
me.updateCount = updateCount;
406
numKeysUpdated++;
407
}
408
} else { // The readyOps have been set; now add
409
sk.channel.translateAndUpdateReadyOps(rOps, sk);
410
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
411
selectedKeys.add(sk);
412
me.updateCount = updateCount;
413
numKeysUpdated++;
414
}
415
}
416
me.clearedCount = updateCount;
417
}
418
}
419
return numKeysUpdated;
420
}
421
422
private void freeFDSetBuffer() {
423
unsafe.freeMemory(fdsBuffer);
424
}
425
}
426
427
// Represents a helper thread used for select.
428
private final class SelectThread extends Thread {
429
private final int index; // index of this thread
430
final SubSelector subSelector;
431
private long lastRun = 0; // last run number
432
private volatile boolean zombie;
433
// Creates a new thread
434
private SelectThread(int i) {
435
this.index = i;
436
this.subSelector = new SubSelector(i);
437
//make sure we wait for next round of poll
438
this.lastRun = startLock.runsCounter;
439
}
440
void makeZombie() {
441
zombie = true;
442
}
443
boolean isZombie() {
444
return zombie;
445
}
446
public void run() {
447
while (true) { // poll loop
448
// wait for the start of poll. If this thread has become
449
// redundant, then exit.
450
if (startLock.waitForStart(this)) {
451
subSelector.freeFDSetBuffer();
452
return;
453
}
454
// call poll()
455
try {
456
subSelector.poll(index);
457
} catch (IOException e) {
458
// Save this exception and let other threads finish.
459
finishLock.setException(e);
460
}
461
// notify main thread, that this thread has finished, and
462
// wakeup others, if this thread is the first to finish.
463
finishLock.threadFinished();
464
}
465
}
466
}
467
468
// After some channels registered/deregistered, the number of required
469
// helper threads may have changed. Adjust this number.
470
private void adjustThreadsCount() {
471
if (threadsCount > threads.size()) {
472
// More threads needed. Start more threads.
473
for (int i = threads.size(); i < threadsCount; i++) {
474
SelectThread newThread = new SelectThread(i);
475
threads.add(newThread);
476
newThread.setDaemon(true);
477
newThread.start();
478
}
479
} else if (threadsCount < threads.size()) {
480
// Some threads become redundant. Remove them from the threads List.
481
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
482
threads.remove(i).makeZombie();
483
}
484
}
485
486
// Sets Windows wakeup socket to a signaled state.
487
private void setWakeupSocket() {
488
setWakeupSocket0(wakeupSinkFd);
489
}
490
private native void setWakeupSocket0(int wakeupSinkFd);
491
492
// Sets Windows wakeup socket to a non-signaled state.
493
private void resetWakeupSocket() {
494
synchronized (interruptLock) {
495
if (interruptTriggered == false)
496
return;
497
resetWakeupSocket0(wakeupSourceFd);
498
interruptTriggered = false;
499
}
500
}
501
502
private native void resetWakeupSocket0(int wakeupSourceFd);
503
504
private native boolean discardUrgentData(int fd);
505
506
// We increment this counter on each call to updateSelectedKeys()
507
// each entry in SubSelector.fdsMap has a memorized value of
508
// updateCount. When we increment numKeysUpdated we set updateCount
509
// for the corresponding entry to its current value. This is used to
510
// avoid counting the same key more than once - the same key can
511
// appear in readfds and writefds.
512
private long updateCount = 0;
513
514
// Update ops of the corresponding Channels. Add the ready keys to the
515
// ready queue.
516
private int updateSelectedKeys() {
517
updateCount++;
518
int numKeysUpdated = 0;
519
numKeysUpdated += subSelector.processSelectedKeys(updateCount);
520
for (SelectThread t: threads) {
521
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
522
}
523
return numKeysUpdated;
524
}
525
526
protected void implClose() throws IOException {
527
synchronized (closeLock) {
528
if (channelArray != null) {
529
if (pollWrapper != null) {
530
// prevent further wakeup
531
synchronized (interruptLock) {
532
interruptTriggered = true;
533
}
534
wakeupPipe.sink().close();
535
wakeupPipe.source().close();
536
for(int i = 1; i < totalChannels; i++) { // Deregister channels
537
if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
538
deregister(channelArray[i]);
539
SelectableChannel selch = channelArray[i].channel();
540
if (!selch.isOpen() && !selch.isRegistered())
541
((SelChImpl)selch).kill();
542
}
543
}
544
pollWrapper.free();
545
pollWrapper = null;
546
selectedKeys = null;
547
channelArray = null;
548
// Make all remaining helper threads exit
549
for (SelectThread t: threads)
550
t.makeZombie();
551
startLock.startThreads();
552
subSelector.freeFDSetBuffer();
553
}
554
}
555
}
556
}
557
558
protected void implRegister(SelectionKeyImpl ski) {
559
synchronized (closeLock) {
560
if (pollWrapper == null)
561
throw new ClosedSelectorException();
562
growIfNeeded();
563
channelArray[totalChannels] = ski;
564
ski.setIndex(totalChannels);
565
fdMap.put(ski);
566
keys.add(ski);
567
pollWrapper.addEntry(totalChannels, ski);
568
totalChannels++;
569
}
570
}
571
572
private void growIfNeeded() {
573
if (channelArray.length == totalChannels) {
574
int newSize = totalChannels * 2; // Make a larger array
575
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
576
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
577
channelArray = temp;
578
pollWrapper.grow(newSize);
579
}
580
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
581
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
582
totalChannels++;
583
threadsCount++;
584
}
585
}
586
587
protected void implDereg(SelectionKeyImpl ski) throws IOException{
588
int i = ski.getIndex();
589
assert (i >= 0);
590
synchronized (closeLock) {
591
if (i != totalChannels - 1) {
592
// Copy end one over it
593
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
594
channelArray[i] = endChannel;
595
endChannel.setIndex(i);
596
pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
597
pollWrapper, i);
598
}
599
ski.setIndex(-1);
600
}
601
channelArray[totalChannels - 1] = null;
602
totalChannels--;
603
if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
604
totalChannels--;
605
threadsCount--; // The last thread has become redundant.
606
}
607
fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
608
keys.remove(ski);
609
selectedKeys.remove(ski);
610
deregister(ski);
611
SelectableChannel selch = ski.channel();
612
if (!selch.isOpen() && !selch.isRegistered())
613
((SelChImpl)selch).kill();
614
}
615
616
public void putEventOps(SelectionKeyImpl sk, int ops) {
617
synchronized (closeLock) {
618
if (pollWrapper == null)
619
throw new ClosedSelectorException();
620
// make sure this sk has not been removed yet
621
int index = sk.getIndex();
622
if (index == -1)
623
throw new CancelledKeyException();
624
pollWrapper.putEventOps(index, ops);
625
}
626
}
627
628
public Selector wakeup() {
629
synchronized (interruptLock) {
630
if (!interruptTriggered) {
631
setWakeupSocket();
632
interruptTriggered = true;
633
}
634
}
635
return this;
636
}
637
638
static {
639
IOUtil.load();
640
}
641
}
642
643