Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/windows/classes/sun/nio/ch/WindowsSelectorImpl.java
32288 views
/*1* Copyright (c) 2002, 2013, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425/*26*/272829package sun.nio.ch;3031import java.nio.channels.spi.SelectorProvider;32import java.nio.channels.Selector;33import java.nio.channels.ClosedSelectorException;34import java.nio.channels.Pipe;35import java.nio.channels.SelectableChannel;36import java.io.IOException;37import java.nio.channels.CancelledKeyException;38import java.util.List;39import java.util.ArrayList;40import java.util.HashMap;41import java.util.Iterator;42import sun.misc.Unsafe;4344/**45* A multi-threaded implementation of Selector for Windows.46*47* @author Konstantin Kladko48* @author Mark Reinhold49*/5051final class WindowsSelectorImpl extends SelectorImpl {52private static final Unsafe unsafe = Unsafe.getUnsafe();53private static int addressSize = unsafe.addressSize();5455private static int dependsArch(int value32, int value64) {56return (addressSize == 4) ? value32 : value64;57}5859// Initial capacity of the poll array60private final int INIT_CAP = 8;61// Maximum number of sockets for select().62// Should be INIT_CAP times a power of 263private final static int MAX_SELECTABLE_FDS = 1024;6465// Size of FD_SET struct to allocate a buffer for it in SubSelector,66// aligned to 8 bytes on 64-bit:67// struct { unsigned int fd_count; SOCKET fd_array[MAX_SELECTABLE_FDS]; }.68private static final long SIZEOF_FD_SET = dependsArch(694 + MAX_SELECTABLE_FDS * 4, // SOCKET = unsigned int704 + MAX_SELECTABLE_FDS * 8 + 4); // SOCKET = unsigned __int647172// The list of SelectableChannels serviced by this Selector. Every mod73// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll74// array, where the corresponding entry is occupied by the wakeupSocket75private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];7677// The global native poll array holds file decriptors and event masks78private PollArrayWrapper pollWrapper;7980// The number of valid entries in poll array, including entries occupied81// by wakeup socket handle.82private int totalChannels = 1;8384// Number of helper threads needed for select. We need one thread per85// each additional set of MAX_SELECTABLE_FDS - 1 channels.86private int threadsCount = 0;8788// A list of helper threads for select.89private final List<SelectThread> threads = new ArrayList<SelectThread>();9091//Pipe used as a wakeup object.92private final Pipe wakeupPipe;9394// File descriptors corresponding to source and sink95private final int wakeupSourceFd, wakeupSinkFd;9697// Lock for close cleanup98private Object closeLock = new Object();99100// Maps file descriptors to their indices in pollArray101private final static class FdMap extends HashMap<Integer, MapEntry> {102static final long serialVersionUID = 0L;103private MapEntry get(int desc) {104return get(new Integer(desc));105}106private MapEntry put(SelectionKeyImpl ski) {107return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));108}109private MapEntry remove(SelectionKeyImpl ski) {110Integer fd = new Integer(ski.channel.getFDVal());111MapEntry x = get(fd);112if ((x != null) && (x.ski.channel == ski.channel))113return remove(fd);114return null;115}116}117118// class for fdMap entries119private final static class MapEntry {120SelectionKeyImpl ski;121long updateCount = 0;122long clearedCount = 0;123MapEntry(SelectionKeyImpl ski) {124this.ski = ski;125}126}127private final FdMap fdMap = new FdMap();128129// SubSelector for the main thread130private final SubSelector subSelector = new SubSelector();131132private long timeout; //timeout for poll133134// Lock for interrupt triggering and clearing135private final Object interruptLock = new Object();136private volatile boolean interruptTriggered = false;137138WindowsSelectorImpl(SelectorProvider sp) throws IOException {139super(sp);140pollWrapper = new PollArrayWrapper(INIT_CAP);141wakeupPipe = Pipe.open();142wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();143144// Disable the Nagle algorithm so that the wakeup is more immediate145SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();146(sink.sc).socket().setTcpNoDelay(true);147wakeupSinkFd = ((SelChImpl)sink).getFDVal();148149pollWrapper.addWakeupSocket(wakeupSourceFd, 0);150}151152protected int doSelect(long timeout) throws IOException {153if (channelArray == null)154throw new ClosedSelectorException();155this.timeout = timeout; // set selector timeout156processDeregisterQueue();157if (interruptTriggered) {158resetWakeupSocket();159return 0;160}161// Calculate number of helper threads needed for poll. If necessary162// threads are created here and start waiting on startLock163adjustThreadsCount();164finishLock.reset(); // reset finishLock165// Wakeup helper threads, waiting on startLock, so they start polling.166// Redundant threads will exit here after wakeup.167startLock.startThreads();168// do polling in the main thread. Main thread is responsible for169// first MAX_SELECTABLE_FDS entries in pollArray.170try {171begin();172try {173subSelector.poll();174} catch (IOException e) {175finishLock.setException(e); // Save this exception176}177// Main thread is out of poll(). Wakeup others and wait for them178if (threads.size() > 0)179finishLock.waitForHelperThreads();180} finally {181end();182}183// Done with poll(). Set wakeupSocket to nonsignaled for the next run.184finishLock.checkForException();185processDeregisterQueue();186int updated = updateSelectedKeys();187// Done with poll(). Set wakeupSocket to nonsignaled for the next run.188resetWakeupSocket();189return updated;190}191192// Helper threads wait on this lock for the next poll.193private final StartLock startLock = new StartLock();194195private final class StartLock {196// A variable which distinguishes the current run of doSelect from the197// previous one. Incrementing runsCounter and notifying threads will198// trigger another round of poll.199private long runsCounter;200// Triggers threads, waiting on this lock to start polling.201private synchronized void startThreads() {202runsCounter++; // next run203notifyAll(); // wake up threads.204}205// This function is called by a helper thread to wait for the206// next round of poll(). It also checks, if this thread became207// redundant. If yes, it returns true, notifying the thread208// that it should exit.209private synchronized boolean waitForStart(SelectThread thread) {210while (true) {211while (runsCounter == thread.lastRun) {212try {213startLock.wait();214} catch (InterruptedException e) {215Thread.currentThread().interrupt();216}217}218if (thread.isZombie()) { // redundant thread219return true; // will cause run() to exit.220} else {221thread.lastRun = runsCounter; // update lastRun222return false; // will cause run() to poll.223}224}225}226}227228// Main thread waits on this lock, until all helper threads are done229// with poll().230private final FinishLock finishLock = new FinishLock();231232private final class FinishLock {233// Number of helper threads, that did not finish yet.234private int threadsToFinish;235236// IOException which occurred during the last run.237IOException exception = null;238239// Called before polling.240private void reset() {241threadsToFinish = threads.size(); // helper threads242}243244// Each helper thread invokes this function on finishLock, when245// the thread is done with poll().246private synchronized void threadFinished() {247if (threadsToFinish == threads.size()) { // finished poll() first248// if finished first, wakeup others249wakeup();250}251threadsToFinish--;252if (threadsToFinish == 0) // all helper threads finished poll().253notify(); // notify the main thread254}255256// The main thread invokes this function on finishLock to wait257// for helper threads to finish poll().258private synchronized void waitForHelperThreads() {259if (threadsToFinish == threads.size()) {260// no helper threads finished yet. Wakeup them up.261wakeup();262}263while (threadsToFinish != 0) {264try {265finishLock.wait();266} catch (InterruptedException e) {267// Interrupted - set interrupted state.268Thread.currentThread().interrupt();269}270}271}272273// sets IOException for this run274private synchronized void setException(IOException e) {275exception = e;276}277278// Checks if there was any exception during the last run.279// If yes, throws it280private void checkForException() throws IOException {281if (exception == null)282return;283StringBuffer message = new StringBuffer("An exception occurred" +284" during the execution of select(): \n");285message.append(exception);286message.append('\n');287exception = null;288throw new IOException(message.toString());289}290}291292private final class SubSelector {293private final int pollArrayIndex; // starting index in pollArray to poll294// These arrays will hold result of native select().295// The first element of each array is the number of selected sockets.296// Other elements are file descriptors of selected sockets.297private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];298private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];299private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];300// Buffer for readfds, writefds and exceptfds structs that are passed301// to native select().302private final long fdsBuffer = unsafe.allocateMemory(SIZEOF_FD_SET * 6);303304private SubSelector() {305this.pollArrayIndex = 0; // main thread306}307308private SubSelector(int threadIndex) { // helper threads309this.pollArrayIndex = (threadIndex + 1) * MAX_SELECTABLE_FDS;310}311312private int poll() throws IOException{ // poll for the main thread313return poll0(pollWrapper.pollArrayAddress,314Math.min(totalChannels, MAX_SELECTABLE_FDS),315readFds, writeFds, exceptFds, timeout, fdsBuffer);316}317318private int poll(int index) throws IOException {319// poll for helper threads320return poll0(pollWrapper.pollArrayAddress +321(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),322Math.min(MAX_SELECTABLE_FDS,323totalChannels - (index + 1) * MAX_SELECTABLE_FDS),324readFds, writeFds, exceptFds, timeout, fdsBuffer);325}326327private native int poll0(long pollAddress, int numfds,328int[] readFds, int[] writeFds, int[] exceptFds, long timeout, long fdsBuffer);329330private int processSelectedKeys(long updateCount) {331int numKeysUpdated = 0;332numKeysUpdated += processFDSet(updateCount, readFds,333Net.POLLIN,334false);335numKeysUpdated += processFDSet(updateCount, writeFds,336Net.POLLCONN |337Net.POLLOUT,338false);339numKeysUpdated += processFDSet(updateCount, exceptFds,340Net.POLLIN |341Net.POLLCONN |342Net.POLLOUT,343true);344return numKeysUpdated;345}346347/**348* Note, clearedCount is used to determine if the readyOps have349* been reset in this select operation. updateCount is used to350* tell if a key has been counted as updated in this select351* operation.352*353* me.updateCount <= me.clearedCount <= updateCount354*/355private int processFDSet(long updateCount, int[] fds, int rOps,356boolean isExceptFds)357{358int numKeysUpdated = 0;359for (int i = 1; i <= fds[0]; i++) {360int desc = fds[i];361if (desc == wakeupSourceFd) {362synchronized (interruptLock) {363interruptTriggered = true;364}365continue;366}367MapEntry me = fdMap.get(desc);368// If me is null, the key was deregistered in the previous369// processDeregisterQueue.370if (me == null)371continue;372SelectionKeyImpl sk = me.ski;373374// The descriptor may be in the exceptfds set because there is375// OOB data queued to the socket. If there is OOB data then it376// is discarded and the key is not added to the selected set.377if (isExceptFds &&378(sk.channel() instanceof SocketChannelImpl) &&379discardUrgentData(desc))380{381continue;382}383384if (selectedKeys.contains(sk)) { // Key in selected set385if (me.clearedCount != updateCount) {386if (sk.channel.translateAndSetReadyOps(rOps, sk) &&387(me.updateCount != updateCount)) {388me.updateCount = updateCount;389numKeysUpdated++;390}391} else { // The readyOps have been set; now add392if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&393(me.updateCount != updateCount)) {394me.updateCount = updateCount;395numKeysUpdated++;396}397}398me.clearedCount = updateCount;399} else { // Key is not in selected set yet400if (me.clearedCount != updateCount) {401sk.channel.translateAndSetReadyOps(rOps, sk);402if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {403selectedKeys.add(sk);404me.updateCount = updateCount;405numKeysUpdated++;406}407} else { // The readyOps have been set; now add408sk.channel.translateAndUpdateReadyOps(rOps, sk);409if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {410selectedKeys.add(sk);411me.updateCount = updateCount;412numKeysUpdated++;413}414}415me.clearedCount = updateCount;416}417}418return numKeysUpdated;419}420421private void freeFDSetBuffer() {422unsafe.freeMemory(fdsBuffer);423}424}425426// Represents a helper thread used for select.427private final class SelectThread extends Thread {428private final int index; // index of this thread429final SubSelector subSelector;430private long lastRun = 0; // last run number431private volatile boolean zombie;432// Creates a new thread433private SelectThread(int i) {434this.index = i;435this.subSelector = new SubSelector(i);436//make sure we wait for next round of poll437this.lastRun = startLock.runsCounter;438}439void makeZombie() {440zombie = true;441}442boolean isZombie() {443return zombie;444}445public void run() {446while (true) { // poll loop447// wait for the start of poll. If this thread has become448// redundant, then exit.449if (startLock.waitForStart(this)) {450subSelector.freeFDSetBuffer();451return;452}453// call poll()454try {455subSelector.poll(index);456} catch (IOException e) {457// Save this exception and let other threads finish.458finishLock.setException(e);459}460// notify main thread, that this thread has finished, and461// wakeup others, if this thread is the first to finish.462finishLock.threadFinished();463}464}465}466467// After some channels registered/deregistered, the number of required468// helper threads may have changed. Adjust this number.469private void adjustThreadsCount() {470if (threadsCount > threads.size()) {471// More threads needed. Start more threads.472for (int i = threads.size(); i < threadsCount; i++) {473SelectThread newThread = new SelectThread(i);474threads.add(newThread);475newThread.setDaemon(true);476newThread.start();477}478} else if (threadsCount < threads.size()) {479// Some threads become redundant. Remove them from the threads List.480for (int i = threads.size() - 1 ; i >= threadsCount; i--)481threads.remove(i).makeZombie();482}483}484485// Sets Windows wakeup socket to a signaled state.486private void setWakeupSocket() {487setWakeupSocket0(wakeupSinkFd);488}489private native void setWakeupSocket0(int wakeupSinkFd);490491// Sets Windows wakeup socket to a non-signaled state.492private void resetWakeupSocket() {493synchronized (interruptLock) {494if (interruptTriggered == false)495return;496resetWakeupSocket0(wakeupSourceFd);497interruptTriggered = false;498}499}500501private native void resetWakeupSocket0(int wakeupSourceFd);502503private native boolean discardUrgentData(int fd);504505// We increment this counter on each call to updateSelectedKeys()506// each entry in SubSelector.fdsMap has a memorized value of507// updateCount. When we increment numKeysUpdated we set updateCount508// for the corresponding entry to its current value. This is used to509// avoid counting the same key more than once - the same key can510// appear in readfds and writefds.511private long updateCount = 0;512513// Update ops of the corresponding Channels. Add the ready keys to the514// ready queue.515private int updateSelectedKeys() {516updateCount++;517int numKeysUpdated = 0;518numKeysUpdated += subSelector.processSelectedKeys(updateCount);519for (SelectThread t: threads) {520numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);521}522return numKeysUpdated;523}524525protected void implClose() throws IOException {526synchronized (closeLock) {527if (channelArray != null) {528if (pollWrapper != null) {529// prevent further wakeup530synchronized (interruptLock) {531interruptTriggered = true;532}533wakeupPipe.sink().close();534wakeupPipe.source().close();535for(int i = 1; i < totalChannels; i++) { // Deregister channels536if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent537deregister(channelArray[i]);538SelectableChannel selch = channelArray[i].channel();539if (!selch.isOpen() && !selch.isRegistered())540((SelChImpl)selch).kill();541}542}543pollWrapper.free();544pollWrapper = null;545selectedKeys = null;546channelArray = null;547// Make all remaining helper threads exit548for (SelectThread t: threads)549t.makeZombie();550startLock.startThreads();551subSelector.freeFDSetBuffer();552}553}554}555}556557protected void implRegister(SelectionKeyImpl ski) {558synchronized (closeLock) {559if (pollWrapper == null)560throw new ClosedSelectorException();561growIfNeeded();562channelArray[totalChannels] = ski;563ski.setIndex(totalChannels);564fdMap.put(ski);565keys.add(ski);566pollWrapper.addEntry(totalChannels, ski);567totalChannels++;568}569}570571private void growIfNeeded() {572if (channelArray.length == totalChannels) {573int newSize = totalChannels * 2; // Make a larger array574SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];575System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);576channelArray = temp;577pollWrapper.grow(newSize);578}579if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed580pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);581totalChannels++;582threadsCount++;583}584}585586protected void implDereg(SelectionKeyImpl ski) throws IOException{587int i = ski.getIndex();588assert (i >= 0);589synchronized (closeLock) {590if (i != totalChannels - 1) {591// Copy end one over it592SelectionKeyImpl endChannel = channelArray[totalChannels-1];593channelArray[i] = endChannel;594endChannel.setIndex(i);595pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,596pollWrapper, i);597}598ski.setIndex(-1);599}600channelArray[totalChannels - 1] = null;601totalChannels--;602if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {603totalChannels--;604threadsCount--; // The last thread has become redundant.605}606fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys607keys.remove(ski);608selectedKeys.remove(ski);609deregister(ski);610SelectableChannel selch = ski.channel();611if (!selch.isOpen() && !selch.isRegistered())612((SelChImpl)selch).kill();613}614615public void putEventOps(SelectionKeyImpl sk, int ops) {616synchronized (closeLock) {617if (pollWrapper == null)618throw new ClosedSelectorException();619// make sure this sk has not been removed yet620int index = sk.getIndex();621if (index == -1)622throw new CancelledKeyException();623pollWrapper.putEventOps(index, ops);624}625}626627public Selector wakeup() {628synchronized (interruptLock) {629if (!interruptTriggered) {630setWakeupSocket();631interruptTriggered = true;632}633}634return this;635}636637static {638IOUtil.load();639}640}641642643