Path: blob/master/src/java.base/aix/classes/sun/nio/ch/AixPollPort.java
41139 views
/*1* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.2* Copyright (c) 2012 SAP SE. All rights reserved.3* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.4*5* This code is free software; you can redistribute it and/or modify it6* under the terms of the GNU General Public License version 2 only, as7* published by the Free Software Foundation. Oracle designates this8* particular file as subject to the "Classpath" exception as provided9* by Oracle in the LICENSE file that accompanied this code.10*11* This code is distributed in the hope that it will be useful, but WITHOUT12* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or13* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License14* version 2 for more details (a copy is included in the LICENSE file that15* accompanied this code).16*17* You should have received a copy of the GNU General Public License version18* 2 along with this work; if not, write to the Free Software Foundation,19* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.20*21* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA22* or visit www.oracle.com if you need additional information or have any23* questions.24*/2526package sun.nio.ch;2728import java.nio.channels.spi.AsynchronousChannelProvider;29import java.io.IOException;30import java.util.HashSet;31import java.util.Iterator;32import java.util.concurrent.ArrayBlockingQueue;33import java.util.concurrent.RejectedExecutionException;34import java.util.concurrent.atomic.AtomicInteger;35import java.util.concurrent.locks.ReentrantLock;36import jdk.internal.misc.Unsafe;3738/**39* AsynchronousChannelGroup implementation based on the AIX pollset framework.40*/41final class AixPollPort42extends Port43{44private static final Unsafe unsafe = Unsafe.getUnsafe();4546static {47IOUtil.load();48init();49}5051/**52* struct pollfd {53* int fd;54* short events;55* short revents;56* }57*/58private static final int SIZEOF_POLLFD = eventSize();59private static final int OFFSETOF_EVENTS = eventsOffset();60private static final int OFFSETOF_REVENTS = reventsOffset();61private static final int OFFSETOF_FD = fdOffset();6263// opcodes64private static final int PS_ADD = 0x0;65private static final int PS_MOD = 0x1;66private static final int PS_DELETE = 0x2;6768// maximum number of events to poll at a time69private static final int MAX_POLL_EVENTS = 512;7071// pollset ID72private final int pollset;7374// true if port is closed75private boolean closed;7677// socket pair used for wakeup78private final int sp[];7980// socket pair used to indicate pending pollsetCtl calls81// Background info: pollsetCtl blocks when another thread is in a pollsetPoll call.82private final int ctlSp[];8384// number of wakeups pending85private final AtomicInteger wakeupCount = new AtomicInteger();8687// address of the poll array passed to pollset_poll88private final long address;8990// encapsulates an event for a channel91static class Event {92final PollableChannel channel;93final int events;9495Event(PollableChannel channel, int events) {96this.channel = channel;97this.events = events;98}99100PollableChannel channel() { return channel; }101int events() { return events; }102}103104// queue of events for cases that a polling thread dequeues more than one105// event106private final ArrayBlockingQueue<Event> queue;107private final Event NEED_TO_POLL = new Event(null, 0);108private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);109private final Event CONTINUE_AFTER_CTL_EVENT = new Event(null, 0);110111// encapsulates a pollset control event for a file descriptor112static class ControlEvent {113final int fd;114final int events;115final boolean removeOnly;116int error = 0;117118ControlEvent(int fd, int events, boolean removeOnly) {119this.fd = fd;120this.events = events;121this.removeOnly = removeOnly;122}123124int fd() { return fd; }125int events() { return events; }126boolean removeOnly() { return removeOnly; }127int error() { return error; }128void setError(int error) { this.error = error; }129}130131// queue of control events that need to be processed132// (this object is also used for synchronization)133private final HashSet<ControlEvent> controlQueue = new HashSet<ControlEvent>();134135// lock used to check whether a poll operation is ongoing136private final ReentrantLock controlLock = new ReentrantLock();137138AixPollPort(AsynchronousChannelProvider provider, ThreadPool pool)139throws IOException140{141super(provider, pool);142143// open pollset144this.pollset = pollsetCreate();145146// create socket pair for wakeup mechanism147int[] sv = new int[2];148try {149socketpair(sv);150// register one end with pollset151pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);152} catch (IOException x) {153pollsetDestroy(pollset);154throw x;155}156this.sp = sv;157158// create socket pair for pollset control mechanism159sv = new int[2];160try {161socketpair(sv);162// register one end with pollset163pollsetCtl(pollset, PS_ADD, sv[0], Net.POLLIN);164} catch (IOException x) {165pollsetDestroy(pollset);166throw x;167}168this.ctlSp = sv;169170// allocate the poll array171this.address = allocatePollArray(MAX_POLL_EVENTS);172173// create the queue and offer the special event to ensure that the first174// threads polls175this.queue = new ArrayBlockingQueue<Event>(MAX_POLL_EVENTS);176this.queue.offer(NEED_TO_POLL);177}178179AixPollPort start() {180startThreads(new EventHandlerTask());181return this;182}183184/**185* Release all resources186*/187private void implClose() {188synchronized (this) {189if (closed)190return;191closed = true;192}193freePollArray(address);194close0(sp[0]);195close0(sp[1]);196close0(ctlSp[0]);197close0(ctlSp[1]);198pollsetDestroy(pollset);199}200201private void wakeup() {202if (wakeupCount.incrementAndGet() == 1) {203// write byte to socketpair to force wakeup204try {205interrupt(sp[1]);206} catch (IOException x) {207throw new AssertionError(x);208}209}210}211212@Override213void executeOnHandlerTask(Runnable task) {214synchronized (this) {215if (closed)216throw new RejectedExecutionException();217offerTask(task);218wakeup();219}220}221222@Override223void shutdownHandlerTasks() {224/*225* If no tasks are running then just release resources; otherwise226* write to the one end of the socketpair to wakeup any polling threads.227*/228int nThreads = threadCount();229if (nThreads == 0) {230implClose();231} else {232// send interrupt to each thread233while (nThreads-- > 0) {234wakeup();235}236}237}238239// invoke by clients to register a file descriptor240@Override241void startPoll(int fd, int events) {242queueControlEvent(new ControlEvent(fd, events, false));243}244245// Callback method for implementations that need special handling when fd is removed246@Override247protected void preUnregister(int fd) {248queueControlEvent(new ControlEvent(fd, 0, true));249}250251// Add control event into queue and wait for completion.252// In case the control lock is free, this method also tries to apply the control change directly.253private void queueControlEvent(ControlEvent ev) {254// pollsetCtl blocks when a poll call is ongoing. This is very probable.255// Therefore we let the polling thread do the pollsetCtl call.256synchronized (controlQueue) {257controlQueue.add(ev);258// write byte to socketpair to force wakeup259try {260interrupt(ctlSp[1]);261} catch (IOException x) {262throw new AssertionError(x);263}264do {265// Directly empty queue if no poll call is ongoing.266if (controlLock.tryLock()) {267try {268processControlQueue();269} finally {270controlLock.unlock();271}272} else {273try {274// Do not starve in case the polling thread returned before275// we could write to ctlSp[1] but the polling thread did not276// release the control lock until we checked. Therefore, use277// a timed wait for the time being.278controlQueue.wait(100);279} catch (InterruptedException e) {280// ignore exception and try again281}282}283} while (controlQueue.contains(ev));284}285if (ev.error() != 0) {286throw new AssertionError();287}288}289290// Process all events currently stored in the control queue.291private void processControlQueue() {292synchronized (controlQueue) {293// On Aix it is only possible to set the event294// bits on the first call of pollsetCtl. Later295// calls only add bits, but cannot remove them.296// Therefore, we always remove the file297// descriptor ignoring the error and then add it.298Iterator<ControlEvent> iter = controlQueue.iterator();299while (iter.hasNext()) {300ControlEvent ev = iter.next();301pollsetCtl(pollset, PS_DELETE, ev.fd(), 0);302if (!ev.removeOnly()) {303ev.setError(pollsetCtl(pollset, PS_MOD, ev.fd(), ev.events()));304}305iter.remove();306}307controlQueue.notifyAll();308}309}310311/*312* Task to process events from pollset and dispatch to the channel's313* onEvent handler.314*315* Events are retreived from pollset in batch and offered to a BlockingQueue316* where they are consumed by handler threads. A special "NEED_TO_POLL"317* event is used to signal one consumer to re-poll when all events have318* been consumed.319*/320private class EventHandlerTask implements Runnable {321private Event poll() throws IOException {322try {323for (;;) {324int n;325controlLock.lock();326try {327n = pollsetPoll(pollset, address, MAX_POLL_EVENTS);328} finally {329controlLock.unlock();330}331/*332* 'n' events have been read. Here we map them to their333* corresponding channel in batch and queue n-1 so that334* they can be handled by other handler threads. The last335* event is handled by this thread (and so is not queued).336*/337fdToChannelLock.readLock().lock();338try {339while (n-- > 0) {340long eventAddress = getEvent(address, n);341int fd = getDescriptor(eventAddress);342343// To emulate one shot semantic we need to remove344// the file descriptor here.345if (fd != sp[0] && fd != ctlSp[0]) {346synchronized (controlQueue) {347pollsetCtl(pollset, PS_DELETE, fd, 0);348}349}350351// wakeup352if (fd == sp[0]) {353if (wakeupCount.decrementAndGet() == 0) {354// no more wakeups so drain pipe355drain1(sp[0]);356}357358// queue special event if there are more events359// to handle.360if (n > 0) {361queue.offer(EXECUTE_TASK_OR_SHUTDOWN);362continue;363}364return EXECUTE_TASK_OR_SHUTDOWN;365}366367// wakeup to process control event368if (fd == ctlSp[0]) {369synchronized (controlQueue) {370drain1(ctlSp[0]);371processControlQueue();372}373if (n > 0) {374continue;375}376return CONTINUE_AFTER_CTL_EVENT;377}378379PollableChannel channel = fdToChannel.get(fd);380if (channel != null) {381int events = getRevents(eventAddress);382Event ev = new Event(channel, events);383384// n-1 events are queued; This thread handles385// the last one except for the wakeup386if (n > 0) {387queue.offer(ev);388} else {389return ev;390}391}392}393} finally {394fdToChannelLock.readLock().unlock();395}396}397} finally {398// to ensure that some thread will poll when all events have399// been consumed400queue.offer(NEED_TO_POLL);401}402}403404public void run() {405Invoker.GroupAndInvokeCount myGroupAndInvokeCount =406Invoker.getGroupAndInvokeCount();407final boolean isPooledThread = (myGroupAndInvokeCount != null);408boolean replaceMe = false;409Event ev;410try {411for (;;) {412// reset invoke count413if (isPooledThread)414myGroupAndInvokeCount.resetInvokeCount();415416try {417replaceMe = false;418ev = queue.take();419420// no events and this thread has been "selected" to421// poll for more.422if (ev == NEED_TO_POLL) {423try {424ev = poll();425} catch (IOException x) {426x.printStackTrace();427return;428}429}430} catch (InterruptedException x) {431continue;432}433434// contine after we processed a control event435if (ev == CONTINUE_AFTER_CTL_EVENT) {436continue;437}438439// handle wakeup to execute task or shutdown440if (ev == EXECUTE_TASK_OR_SHUTDOWN) {441Runnable task = pollTask();442if (task == null) {443// shutdown request444return;445}446// run task (may throw error/exception)447replaceMe = true;448task.run();449continue;450}451452// process event453try {454ev.channel().onEvent(ev.events(), isPooledThread);455} catch (Error x) {456replaceMe = true; throw x;457} catch (RuntimeException x) {458replaceMe = true; throw x;459}460}461} finally {462// last handler to exit when shutdown releases resources463int remaining = threadExit(this, replaceMe);464if (remaining == 0 && isShutdown()) {465implClose();466}467}468}469}470471/**472* Allocates a poll array to handle up to {@code count} events.473*/474private static long allocatePollArray(int count) {475return unsafe.allocateMemory(count * SIZEOF_POLLFD);476}477478/**479* Free a poll array480*/481private static void freePollArray(long address) {482unsafe.freeMemory(address);483}484485/**486* Returns event[i];487*/488private static long getEvent(long address, int i) {489return address + (SIZEOF_POLLFD*i);490}491492/**493* Returns event->fd494*/495private static int getDescriptor(long eventAddress) {496return unsafe.getInt(eventAddress + OFFSETOF_FD);497}498499/**500* Returns event->events501*/502private static int getEvents(long eventAddress) {503return unsafe.getChar(eventAddress + OFFSETOF_EVENTS);504}505506/**507* Returns event->revents508*/509private static int getRevents(long eventAddress) {510return unsafe.getChar(eventAddress + OFFSETOF_REVENTS);511}512513// -- Native methods --514515private static native void init();516517private static native int eventSize();518519private static native int eventsOffset();520521private static native int reventsOffset();522523private static native int fdOffset();524525private static native int pollsetCreate() throws IOException;526527private static native int pollsetCtl(int pollset, int opcode, int fd, int events);528529private static native int pollsetPoll(int pollset, long pollAddress, int numfds)530throws IOException;531532private static native void pollsetDestroy(int pollset);533534private static native void socketpair(int[] sv) throws IOException;535536private static native void interrupt(int fd) throws IOException;537538private static native void drain1(int fd) throws IOException;539540private static native void close0(int fd);541}542543544