Path: blob/master/src/java.base/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
41139 views
/*1* Copyright (c) 2002, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.io.IOException;28import java.nio.channels.ClosedSelectorException;29import java.nio.channels.Pipe;30import java.nio.channels.SelectionKey;31import java.nio.channels.Selector;32import java.nio.channels.SelectableChannel;33import java.nio.channels.spi.SelectorProvider;34import java.util.ArrayDeque;35import java.util.ArrayList;36import java.util.Deque;37import java.util.HashMap;38import java.util.List;39import java.util.function.Consumer;40import jdk.internal.misc.Unsafe;4142/**43* A multi-threaded implementation of Selector for Windows.44*45* @author Konstantin Kladko46* @author Mark Reinhold47*/4849class WindowsSelectorImpl extends SelectorImpl {50private static final Unsafe unsafe = Unsafe.getUnsafe();51private static int addressSize = unsafe.addressSize();5253private static int dependsArch(int value32, int value64) {54return (addressSize == 4) ? value32 : value64;55}5657// Initial capacity of the poll array58private final int INIT_CAP = 8;59// Maximum number of sockets for select().60// Should be INIT_CAP times a power of 261private static final int MAX_SELECTABLE_FDS = 1024;6263// Size of FD_SET struct to allocate a buffer for it in SubSelector,64// aligned to 8 bytes on 64-bit:65// struct { unsigned int fd_count; SOCKET fd_array[MAX_SELECTABLE_FDS]; }.66private static final long SIZEOF_FD_SET = dependsArch(674 + MAX_SELECTABLE_FDS * 4, // SOCKET = unsigned int684 + MAX_SELECTABLE_FDS * 8 + 4); // SOCKET = unsigned __int646970// The list of SelectableChannels serviced by this Selector. Every mod71// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll72// array, where the corresponding entry is occupied by the wakeupSocket73private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];7475// The global native poll array holds file decriptors and event masks76private PollArrayWrapper pollWrapper;7778// The number of valid entries in poll array, including entries occupied79// by wakeup socket handle.80private int totalChannels = 1;8182// Number of helper threads needed for select. We need one thread per83// each additional set of MAX_SELECTABLE_FDS - 1 channels.84private int threadsCount = 0;8586// A list of helper threads for select.87private final List<SelectThread> threads = new ArrayList<SelectThread>();8889//Pipe used as a wakeup object.90private final Pipe wakeupPipe;9192// File descriptors corresponding to source and sink93private final int wakeupSourceFd, wakeupSinkFd;9495// Maps file descriptors to their indices in pollArray96private static final class FdMap extends HashMap<Integer, MapEntry> {97static final long serialVersionUID = 0L;98private MapEntry get(int desc) {99return get(Integer.valueOf(desc));100}101private MapEntry put(SelectionKeyImpl ski) {102return put(Integer.valueOf(ski.getFDVal()), new MapEntry(ski));103}104private MapEntry remove(SelectionKeyImpl ski) {105Integer fd = Integer.valueOf(ski.getFDVal());106MapEntry x = get(fd);107if ((x != null) && (x.ski.channel() == ski.channel()))108return remove(fd);109return null;110}111}112113// class for fdMap entries114private static final class MapEntry {115final SelectionKeyImpl ski;116long updateCount = 0;117MapEntry(SelectionKeyImpl ski) {118this.ski = ski;119}120}121private final FdMap fdMap = new FdMap();122123// SubSelector for the main thread124private final SubSelector subSelector = new SubSelector();125126private long timeout; //timeout for poll127128// Lock for interrupt triggering and clearing129private final Object interruptLock = new Object();130private volatile boolean interruptTriggered;131132// pending new registrations/updates, queued by implRegister and setEventOps133private final Object updateLock = new Object();134private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();135private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();136137138WindowsSelectorImpl(SelectorProvider sp) throws IOException {139super(sp);140pollWrapper = new PollArrayWrapper(INIT_CAP);141wakeupPipe = new PipeImpl(sp, false);142wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();143wakeupSinkFd = ((SelChImpl)wakeupPipe.sink()).getFDVal();144pollWrapper.addWakeupSocket(wakeupSourceFd, 0);145}146147private void ensureOpen() {148if (!isOpen())149throw new ClosedSelectorException();150}151152@Override153protected int doSelect(Consumer<SelectionKey> action, long timeout)154throws IOException155{156assert Thread.holdsLock(this);157this.timeout = timeout; // set selector timeout158processUpdateQueue();159processDeregisterQueue();160if (interruptTriggered) {161resetWakeupSocket();162return 0;163}164// Calculate number of helper threads needed for poll. If necessary165// threads are created here and start waiting on startLock166adjustThreadsCount();167finishLock.reset(); // reset finishLock168// Wakeup helper threads, waiting on startLock, so they start polling.169// Redundant threads will exit here after wakeup.170startLock.startThreads();171// do polling in the main thread. Main thread is responsible for172// first MAX_SELECTABLE_FDS entries in pollArray.173try {174begin();175try {176subSelector.poll();177} catch (IOException e) {178finishLock.setException(e); // Save this exception179}180// Main thread is out of poll(). Wakeup others and wait for them181if (threads.size() > 0)182finishLock.waitForHelperThreads();183} finally {184end();185}186// Done with poll(). Set wakeupSocket to nonsignaled for the next run.187finishLock.checkForException();188processDeregisterQueue();189int updated = updateSelectedKeys(action);190// Done with poll(). Set wakeupSocket to nonsignaled for the next run.191resetWakeupSocket();192return updated;193}194195/**196* Process new registrations and changes to the interest ops.197*/198private void processUpdateQueue() {199assert Thread.holdsLock(this);200201synchronized (updateLock) {202SelectionKeyImpl ski;203204// new registrations205while ((ski = newKeys.pollFirst()) != null) {206if (ski.isValid()) {207growIfNeeded();208channelArray[totalChannels] = ski;209ski.setIndex(totalChannels);210pollWrapper.putEntry(totalChannels, ski);211totalChannels++;212MapEntry previous = fdMap.put(ski);213assert previous == null;214}215}216217// changes to interest ops218while ((ski = updateKeys.pollFirst()) != null) {219int events = ski.translateInterestOps();220int fd = ski.getFDVal();221if (ski.isValid() && fdMap.containsKey(fd)) {222int index = ski.getIndex();223assert index >= 0 && index < totalChannels;224pollWrapper.putEventOps(index, events);225}226}227}228}229230// Helper threads wait on this lock for the next poll.231private final StartLock startLock = new StartLock();232233private final class StartLock {234// A variable which distinguishes the current run of doSelect from the235// previous one. Incrementing runsCounter and notifying threads will236// trigger another round of poll.237private long runsCounter;238// Triggers threads, waiting on this lock to start polling.239private synchronized void startThreads() {240runsCounter++; // next run241notifyAll(); // wake up threads.242}243// This function is called by a helper thread to wait for the244// next round of poll(). It also checks, if this thread became245// redundant. If yes, it returns true, notifying the thread246// that it should exit.247private synchronized boolean waitForStart(SelectThread thread) {248while (true) {249while (runsCounter == thread.lastRun) {250try {251startLock.wait();252} catch (InterruptedException e) {253Thread.currentThread().interrupt();254}255}256if (thread.isZombie()) { // redundant thread257return true; // will cause run() to exit.258} else {259thread.lastRun = runsCounter; // update lastRun260return false; // will cause run() to poll.261}262}263}264}265266// Main thread waits on this lock, until all helper threads are done267// with poll().268private final FinishLock finishLock = new FinishLock();269270private final class FinishLock {271// Number of helper threads, that did not finish yet.272private int threadsToFinish;273274// IOException which occurred during the last run.275IOException exception = null;276277// Called before polling.278private void reset() {279threadsToFinish = threads.size(); // helper threads280}281282// Each helper thread invokes this function on finishLock, when283// the thread is done with poll().284private synchronized void threadFinished() {285if (threadsToFinish == threads.size()) { // finished poll() first286// if finished first, wakeup others287wakeup();288}289threadsToFinish--;290if (threadsToFinish == 0) // all helper threads finished poll().291notify(); // notify the main thread292}293294// The main thread invokes this function on finishLock to wait295// for helper threads to finish poll().296private synchronized void waitForHelperThreads() {297if (threadsToFinish == threads.size()) {298// no helper threads finished yet. Wakeup them up.299wakeup();300}301while (threadsToFinish != 0) {302try {303finishLock.wait();304} catch (InterruptedException e) {305// Interrupted - set interrupted state.306Thread.currentThread().interrupt();307}308}309}310311// sets IOException for this run312private synchronized void setException(IOException e) {313exception = e;314}315316// Checks if there was any exception during the last run.317// If yes, throws it318private void checkForException() throws IOException {319if (exception == null)320return;321String message = "An exception occurred" +322" during the execution of select(): \n" +323exception + '\n';324exception = null;325throw new IOException(message);326}327}328329private final class SubSelector {330private final int pollArrayIndex; // starting index in pollArray to poll331// These arrays will hold result of native select().332// The first element of each array is the number of selected sockets.333// Other elements are file descriptors of selected sockets.334private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];335private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];336private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];337// Buffer for readfds, writefds and exceptfds structs that are passed338// to native select().339private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 3);340341private SubSelector() {342this.pollArrayIndex = 0; // main thread343}344345private SubSelector(int threadIndex) { // helper threads346this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;347}348349private int poll() throws IOException{ // poll for the main thread350return poll0(pollWrapper.pollArrayAddress,351Math.min(totalChannels, MAX_SELECTABLE_FDS),352readFds, writeFds, exceptFds, timeout, fdsBuffer);353}354355private int poll(int index) throws IOException {356// poll for helper threads357return poll0(pollWrapper.pollArrayAddress +358(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),359Math.min(MAX_SELECTABLE_FDS,360totalChannels - (index + 1) * MAX_SELECTABLE_FDS),361readFds, writeFds, exceptFds, timeout, fdsBuffer);362}363364private native int poll0(long pollAddress, int numfds,365int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer);366367private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action)368throws IOException369{370int numKeysUpdated = 0;371numKeysUpdated += processFDSet(updateCount, action, readFds,372Net.POLLIN,373false);374numKeysUpdated += processFDSet(updateCount, action, writeFds,375Net.POLLCONN |376Net.POLLOUT,377false);378numKeysUpdated += processFDSet(updateCount, action, exceptFds,379Net.POLLIN |380Net.POLLCONN |381Net.POLLOUT,382true);383return numKeysUpdated;384}385386/**387* updateCount is used to tell if a key has been counted as updated388* in this select operation.389*390* me.updateCount <= updateCount391*/392private int processFDSet(long updateCount,393Consumer<SelectionKey> action,394int[] fds, int rOps,395boolean isExceptFds)396throws IOException397{398int numKeysUpdated = 0;399for (int i = 1; i <= fds[0]; i++) {400int desc = fds[i];401if (desc == wakeupSourceFd) {402synchronized (interruptLock) {403interruptTriggered = true;404}405continue;406}407MapEntry me = fdMap.get(desc);408// If me is null, the key was deregistered in the previous409// processDeregisterQueue.410if (me == null)411continue;412SelectionKeyImpl ski = me.ski;413414// The descriptor may be in the exceptfds set because there is415// OOB data queued to the socket. If there is OOB data then it416// is discarded and the key is not added to the selected set.417SelectableChannel sc = ski.channel();418if (isExceptFds && (sc instanceof SocketChannelImpl)419&& ((SocketChannelImpl) sc).isNetSocket()420&& Net.discardOOB(ski.getFD())) {421continue;422}423424int updated = processReadyEvents(rOps, ski, action);425if (updated > 0 && me.updateCount != updateCount) {426me.updateCount = updateCount;427numKeysUpdated++;428}429}430return numKeysUpdated;431}432433private void freeFDSetBuffer() {434unsafe.freeMemory(fdsBuffer);435}436}437438// Represents a helper thread used for select.439private final class SelectThread extends Thread {440private final int index; // index of this thread441final SubSelector subSelector;442private long lastRun = 0; // last run number443private volatile boolean zombie;444// Creates a new thread445private SelectThread(int i) {446super(null, null, "SelectorHelper", 0, false);447this.index = i;448this.subSelector = new SubSelector(i);449//make sure we wait for next round of poll450this.lastRun = startLock.runsCounter;451}452void makeZombie() {453zombie = true;454}455boolean isZombie() {456return zombie;457}458public void run() {459while (true) { // poll loop460// wait for the start of poll. If this thread has become461// redundant, then exit.462if (startLock.waitForStart(this)) {463subSelector.freeFDSetBuffer();464return;465}466// call poll()467try {468subSelector.poll(index);469} catch (IOException e) {470// Save this exception and let other threads finish.471finishLock.setException(e);472}473// notify main thread, that this thread has finished, and474// wakeup others, if this thread is the first to finish.475finishLock.threadFinished();476}477}478}479480// After some channels registered/deregistered, the number of required481// helper threads may have changed. Adjust this number.482private void adjustThreadsCount() {483if (threadsCount > threads.size()) {484// More threads needed. Start more threads.485for (int i = threads.size(); i < threadsCount; i++) {486SelectThread newThread = new SelectThread(i);487threads.add(newThread);488newThread.setDaemon(true);489newThread.start();490}491} else if (threadsCount < threads.size()) {492// Some threads become redundant. Remove them from the threads List.493for (int i = threads.size() - 1 ; i >= threadsCount; i--)494threads.remove(i).makeZombie();495}496}497498// Sets Windows wakeup socket to a signaled state.499private void setWakeupSocket() {500setWakeupSocket0(wakeupSinkFd);501}502private native void setWakeupSocket0(int wakeupSinkFd);503504// Sets Windows wakeup socket to a non-signaled state.505private void resetWakeupSocket() {506synchronized (interruptLock) {507if (interruptTriggered == false)508return;509resetWakeupSocket0(wakeupSourceFd);510interruptTriggered = false;511}512}513514private native void resetWakeupSocket0(int wakeupSourceFd);515516// We increment this counter on each call to updateSelectedKeys()517// each entry in SubSelector.fdsMap has a memorized value of518// updateCount. When we increment numKeysUpdated we set updateCount519// for the corresponding entry to its current value. This is used to520// avoid counting the same key more than once - the same key can521// appear in readfds and writefds.522private long updateCount = 0;523524// Update ops of the corresponding Channels. Add the ready keys to the525// ready queue.526private int updateSelectedKeys(Consumer<SelectionKey> action) throws IOException {527updateCount++;528int numKeysUpdated = 0;529numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);530for (SelectThread t: threads) {531numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);532}533return numKeysUpdated;534}535536@Override537protected void implClose() throws IOException {538assert !isOpen();539assert Thread.holdsLock(this);540541// prevent further wakeup542synchronized (interruptLock) {543interruptTriggered = true;544}545546wakeupPipe.sink().close();547wakeupPipe.source().close();548pollWrapper.free();549550// Make all remaining helper threads exit551for (SelectThread t: threads)552t.makeZombie();553startLock.startThreads();554subSelector.freeFDSetBuffer();555}556557@Override558protected void implRegister(SelectionKeyImpl ski) {559ensureOpen();560synchronized (updateLock) {561newKeys.addLast(ski);562}563}564565private void growIfNeeded() {566if (channelArray.length == totalChannels) {567int newSize = totalChannels * 2; // Make a larger array568SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];569System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);570channelArray = temp;571pollWrapper.grow(newSize);572}573if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed574pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);575totalChannels++;576threadsCount++;577}578}579580@Override581protected void implDereg(SelectionKeyImpl ski) {582assert !ski.isValid();583assert Thread.holdsLock(this);584585if (fdMap.remove(ski) != null) {586int i = ski.getIndex();587assert (i >= 0);588589if (i != totalChannels - 1) {590// Copy end one over it591SelectionKeyImpl endChannel = channelArray[totalChannels-1];592channelArray[i] = endChannel;593endChannel.setIndex(i);594pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);595}596ski.setIndex(-1);597598channelArray[totalChannels - 1] = null;599totalChannels--;600if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {601totalChannels--;602threadsCount--; // The last thread has become redundant.603}604}605}606607@Override608public void setEventOps(SelectionKeyImpl ski) {609ensureOpen();610synchronized (updateLock) {611updateKeys.addLast(ski);612}613}614615@Override616public Selector wakeup() {617synchronized (interruptLock) {618if (!interruptTriggered) {619setWakeupSocket();620interruptTriggered = true;621}622}623return this;624}625626static {627IOUtil.load();628}629}630631632