Path: blob/master/src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java
41137 views
/*1* Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/24package sun.nio.ch;2526import java.io.IOException;27import java.nio.channels.ClosedSelectorException;28import java.nio.channels.SelectionKey;29import java.nio.channels.Selector;30import java.nio.channels.spi.SelectorProvider;31import java.util.ArrayDeque;32import java.util.ArrayList;33import java.util.Deque;34import java.util.List;35import java.util.concurrent.TimeUnit;36import java.util.function.Consumer;3738import jdk.internal.misc.Unsafe;3940/**41* Selector implementation based on poll42*/4344class PollSelectorImpl extends SelectorImpl {4546// initial capacity of poll array47private static final int INITIAL_CAPACITY = 16;4849// poll array, grows as needed50private int pollArrayCapacity = INITIAL_CAPACITY;51private int pollArraySize;52private AllocatedNativeObject pollArray;5354// file descriptors used for interrupt55private final int fd0;56private final int fd1;5758// keys for file descriptors in poll array, synchronize on selector59private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();6061// pending updates, queued by putEventOps62private final Object updateLock = new Object();63private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();6465// interrupt triggering and clearing66private final Object interruptLock = new Object();67private boolean interruptTriggered;6869PollSelectorImpl(SelectorProvider sp) throws IOException {70super(sp);7172int size = pollArrayCapacity * SIZE_POLLFD;73this.pollArray = new AllocatedNativeObject(size, false);7475try {76long fds = IOUtil.makePipe(false);77this.fd0 = (int) (fds >>> 32);78this.fd1 = (int) fds;79} catch (IOException ioe) {80pollArray.free();81throw ioe;82}8384// wakeup support85synchronized (this) {86setFirst(fd0, Net.POLLIN);87}88}8990private void ensureOpen() {91if (!isOpen())92throw new ClosedSelectorException();93}9495@Override96protected int doSelect(Consumer<SelectionKey> action, long timeout)97throws IOException98{99assert Thread.holdsLock(this);100101int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout102boolean blocking = (to != 0);103boolean timedPoll = (to > 0);104105processUpdateQueue();106processDeregisterQueue();107try {108begin(blocking);109110int numPolled;111do {112long startTime = timedPoll ? System.nanoTime() : 0;113numPolled = poll(pollArray.address(), pollArraySize, to);114if (numPolled == IOStatus.INTERRUPTED && timedPoll) {115// timed poll interrupted so need to adjust timeout116long adjust = System.nanoTime() - startTime;117to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);118if (to <= 0) {119// timeout expired so no retry120numPolled = 0;121}122}123} while (numPolled == IOStatus.INTERRUPTED);124assert numPolled <= pollArraySize;125126} finally {127end(blocking);128}129130processDeregisterQueue();131return processEvents(action);132}133134/**135* Process changes to the interest ops.136*/137private void processUpdateQueue() {138assert Thread.holdsLock(this);139140synchronized (updateLock) {141SelectionKeyImpl ski;142while ((ski = updateKeys.pollFirst()) != null) {143int newEvents = ski.translateInterestOps();144if (ski.isValid()) {145int index = ski.getIndex();146assert index >= 0 && index < pollArraySize;147if (index > 0) {148assert pollKeys.get(index) == ski;149if (newEvents == 0) {150remove(ski);151} else {152update(ski, newEvents);153}154} else if (newEvents != 0) {155add(ski, newEvents);156}157}158}159}160}161162/**163* Process the polled events.164* If the interrupt fd has been selected, drain it and clear the interrupt.165*/166private int processEvents(Consumer<SelectionKey> action)167throws IOException168{169assert Thread.holdsLock(this);170assert pollArraySize > 0 && pollArraySize == pollKeys.size();171172int numKeysUpdated = 0;173for (int i = 1; i < pollArraySize; i++) {174int rOps = getReventOps(i);175if (rOps != 0) {176SelectionKeyImpl ski = pollKeys.get(i);177assert ski.getFDVal() == getDescriptor(i);178if (ski.isValid()) {179numKeysUpdated += processReadyEvents(rOps, ski, action);180}181}182}183184// check for interrupt185if (getReventOps(0) != 0) {186assert getDescriptor(0) == fd0;187clearInterrupt();188}189190return numKeysUpdated;191}192193@Override194protected void implClose() throws IOException {195assert !isOpen();196assert Thread.holdsLock(this);197198// prevent further wakeup199synchronized (interruptLock) {200interruptTriggered = true;201}202203pollArray.free();204FileDispatcherImpl.closeIntFD(fd0);205FileDispatcherImpl.closeIntFD(fd1);206}207208@Override209protected void implRegister(SelectionKeyImpl ski) {210assert ski.getIndex() == 0;211ensureOpen();212}213214@Override215protected void implDereg(SelectionKeyImpl ski) throws IOException {216assert !ski.isValid();217assert Thread.holdsLock(this);218219// remove from poll array220int index = ski.getIndex();221if (index > 0) {222remove(ski);223}224}225226@Override227public void setEventOps(SelectionKeyImpl ski) {228ensureOpen();229synchronized (updateLock) {230updateKeys.addLast(ski);231}232}233234@Override235public Selector wakeup() {236synchronized (interruptLock) {237if (!interruptTriggered) {238try {239IOUtil.write1(fd1, (byte)0);240} catch (IOException ioe) {241throw new InternalError(ioe);242}243interruptTriggered = true;244}245}246return this;247}248249private void clearInterrupt() throws IOException {250synchronized (interruptLock) {251IOUtil.drain(fd0);252interruptTriggered = false;253}254}255256/**257* Sets the first pollfd enty in the poll array to the given fd258*/259private void setFirst(int fd, int ops) {260assert pollArraySize == 0;261assert pollKeys.isEmpty();262263putDescriptor(0, fd);264putEventOps(0, ops);265pollArraySize = 1;266267pollKeys.add(null); // dummy element268}269270/**271* Adds a pollfd entry to the poll array, expanding the poll array if needed.272*/273private void add(SelectionKeyImpl ski, int ops) {274expandIfNeeded();275276int index = pollArraySize;277assert index > 0;278putDescriptor(index, ski.getFDVal());279putEventOps(index, ops);280putReventOps(index, 0);281ski.setIndex(index);282pollArraySize++;283284pollKeys.add(ski);285assert pollKeys.size() == pollArraySize;286}287288/**289* Update the events of pollfd entry.290*/291private void update(SelectionKeyImpl ski, int ops) {292int index = ski.getIndex();293assert index > 0 && index < pollArraySize;294assert getDescriptor(index) == ski.getFDVal();295putEventOps(index, ops);296}297298/**299* Removes a pollfd entry from the poll array300*/301private void remove(SelectionKeyImpl ski) {302int index = ski.getIndex();303assert index > 0 && index < pollArraySize;304assert getDescriptor(index) == ski.getFDVal();305306// replace pollfd at index with the last pollfd in array307int lastIndex = pollArraySize - 1;308if (lastIndex != index) {309SelectionKeyImpl lastKey = pollKeys.get(lastIndex);310assert lastKey.getIndex() == lastIndex;311int lastFd = getDescriptor(lastIndex);312int lastOps = getEventOps(lastIndex);313int lastRevents = getReventOps(lastIndex);314assert lastKey.getFDVal() == lastFd;315putDescriptor(index, lastFd);316putEventOps(index, lastOps);317putReventOps(index, lastRevents);318pollKeys.set(index, lastKey);319lastKey.setIndex(index);320}321pollKeys.remove(lastIndex);322pollArraySize--;323assert pollKeys.size() == pollArraySize;324325ski.setIndex(0);326}327328/**329* Expand poll array if at capacity330*/331private void expandIfNeeded() {332if (pollArraySize == pollArrayCapacity) {333int oldSize = pollArrayCapacity * SIZE_POLLFD;334int newCapacity = pollArrayCapacity + INITIAL_CAPACITY;335int newSize = newCapacity * SIZE_POLLFD;336AllocatedNativeObject newPollArray = new AllocatedNativeObject(newSize, false);337Unsafe.getUnsafe().copyMemory(pollArray.address(), newPollArray.address(), oldSize);338pollArray.free();339pollArray = newPollArray;340pollArrayCapacity = newCapacity;341}342}343344private static final short SIZE_POLLFD = 8;345private static final short FD_OFFSET = 0;346private static final short EVENT_OFFSET = 4;347private static final short REVENT_OFFSET = 6;348349private void putDescriptor(int i, int fd) {350int offset = SIZE_POLLFD * i + FD_OFFSET;351pollArray.putInt(offset, fd);352}353354private int getDescriptor(int i) {355int offset = SIZE_POLLFD * i + FD_OFFSET;356return pollArray.getInt(offset);357}358359private void putEventOps(int i, int event) {360int offset = SIZE_POLLFD * i + EVENT_OFFSET;361pollArray.putShort(offset, (short)event);362}363364private int getEventOps(int i) {365int offset = SIZE_POLLFD * i + EVENT_OFFSET;366return pollArray.getShort(offset);367}368369private void putReventOps(int i, int revent) {370int offset = SIZE_POLLFD * i + REVENT_OFFSET;371pollArray.putShort(offset, (short)revent);372}373374private int getReventOps(int i) {375int offset = SIZE_POLLFD * i + REVENT_OFFSET;376return pollArray.getShort(offset);377}378379private static native int poll(long pollAddress, int numfds, int timeout);380381static {382IOUtil.load();383}384}385386387