Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/classes/sun/nio/ch/KQueuePort.java
32288 views
/*1* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.spi.AsynchronousChannelProvider;28import java.io.IOException;29import java.util.concurrent.ArrayBlockingQueue;30import java.util.concurrent.RejectedExecutionException;31import java.util.concurrent.atomic.AtomicInteger;32import static sun.nio.ch.KQueue.*;3334/**35* AsynchronousChannelGroup implementation based on the BSD kqueue facility.36*/3738final class KQueuePort39extends Port40{41// maximum number of events to poll at a time42private static final int MAX_KEVENTS_TO_POLL = 512;4344// kqueue file descriptor45private final int kqfd;4647// true if kqueue closed48private boolean closed;4950// socket pair used for wakeup51private final int sp[];5253// number of wakeups pending54private final AtomicInteger wakeupCount = new AtomicInteger();5556// address of the poll array passed to kqueue_wait57private final long address;5859// encapsulates an event for a channel60static class Event {61final PollableChannel channel;62final int events;6364Event(PollableChannel channel, int events) {65this.channel = channel;66this.events = events;67}6869PollableChannel channel() { return channel; }70int events() { return events; }71}7273// queue of events for cases that a polling thread dequeues more than one74// event75private final ArrayBlockingQueue<Event> queue;76private final Event NEED_TO_POLL = new Event(null, 0);77private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);7879KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)80throws IOException81{82super(provider, pool);8384// open kqueue85this.kqfd = kqueue();8687// create socket pair for wakeup mechanism88int[] sv = new int[2];89try {90socketpair(sv);9192// register one end with kqueue93keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);94} catch (IOException x) {95close0(kqfd);96throw x;97}98this.sp = sv;99100// allocate the poll array101this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);102103// create the queue and offer the special event to ensure that the first104// threads polls105this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);106this.queue.offer(NEED_TO_POLL);107}108109KQueuePort start() {110startThreads(new EventHandlerTask());111return this;112}113114/**115* Release all resources116*/117private void implClose() {118synchronized (this) {119if (closed)120return;121closed = true;122}123freePollArray(address);124close0(sp[0]);125close0(sp[1]);126close0(kqfd);127}128129private void wakeup() {130if (wakeupCount.incrementAndGet() == 1) {131// write byte to socketpair to force wakeup132try {133interrupt(sp[1]);134} catch (IOException x) {135throw new AssertionError(x);136}137}138}139140@Override141void executeOnHandlerTask(Runnable task) {142synchronized (this) {143if (closed)144throw new RejectedExecutionException();145offerTask(task);146wakeup();147}148}149150@Override151void shutdownHandlerTasks() {152/*153* If no tasks are running then just release resources; otherwise154* write to the one end of the socketpair to wakeup any polling threads.155*/156int nThreads = threadCount();157if (nThreads == 0) {158implClose();159} else {160// send interrupt to each thread161while (nThreads-- > 0) {162wakeup();163}164}165}166167// invoked by clients to register a file descriptor168@Override169void startPoll(int fd, int events) {170// We use a separate filter for read and write events.171// TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.172int err = 0;173int flags = (EV_ADD|EV_ONESHOT);174if ((events & Net.POLLIN) > 0)175err = keventRegister(kqfd, fd, EVFILT_READ, flags);176if (err == 0 && (events & Net.POLLOUT) > 0)177err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);178if (err != 0)179throw new InternalError("kevent failed: " + err); // should not happen180}181182/*183* Task to process events from kqueue and dispatch to the channel's184* onEvent handler.185*186* Events are retreived from kqueue in batch and offered to a BlockingQueue187* where they are consumed by handler threads. A special "NEED_TO_POLL"188* event is used to signal one consumer to re-poll when all events have189* been consumed.190*/191private class EventHandlerTask implements Runnable {192private Event poll() throws IOException {193try {194for (;;) {195int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);196/*197* 'n' events have been read. Here we map them to their198* corresponding channel in batch and queue n-1 so that199* they can be handled by other handler threads. The last200* event is handled by this thread (and so is not queued).201*/202fdToChannelLock.readLock().lock();203try {204while (n-- > 0) {205long keventAddress = getEvent(address, n);206int fd = getDescriptor(keventAddress);207208// wakeup209if (fd == sp[0]) {210if (wakeupCount.decrementAndGet() == 0) {211// no more wakeups so drain pipe212drain1(sp[0]);213}214215// queue special event if there are more events216// to handle.217if (n > 0) {218queue.offer(EXECUTE_TASK_OR_SHUTDOWN);219continue;220}221return EXECUTE_TASK_OR_SHUTDOWN;222}223224PollableChannel channel = fdToChannel.get(fd);225if (channel != null) {226int filter = getFilter(keventAddress);227int events = 0;228if (filter == EVFILT_READ)229events = Net.POLLIN;230else if (filter == EVFILT_WRITE)231events = Net.POLLOUT;232233Event ev = new Event(channel, events);234235// n-1 events are queued; This thread handles236// the last one except for the wakeup237if (n > 0) {238queue.offer(ev);239} else {240return ev;241}242}243}244} finally {245fdToChannelLock.readLock().unlock();246}247}248} finally {249// to ensure that some thread will poll when all events have250// been consumed251queue.offer(NEED_TO_POLL);252}253}254255public void run() {256Invoker.GroupAndInvokeCount myGroupAndInvokeCount =257Invoker.getGroupAndInvokeCount();258final boolean isPooledThread = (myGroupAndInvokeCount != null);259boolean replaceMe = false;260Event ev;261try {262for (;;) {263// reset invoke count264if (isPooledThread)265myGroupAndInvokeCount.resetInvokeCount();266267try {268replaceMe = false;269ev = queue.take();270271// no events and this thread has been "selected" to272// poll for more.273if (ev == NEED_TO_POLL) {274try {275ev = poll();276} catch (IOException x) {277x.printStackTrace();278return;279}280}281} catch (InterruptedException x) {282continue;283}284285// handle wakeup to execute task or shutdown286if (ev == EXECUTE_TASK_OR_SHUTDOWN) {287Runnable task = pollTask();288if (task == null) {289// shutdown request290return;291}292// run task (may throw error/exception)293replaceMe = true;294task.run();295continue;296}297298// process event299try {300ev.channel().onEvent(ev.events(), isPooledThread);301} catch (Error x) {302replaceMe = true; throw x;303} catch (RuntimeException x) {304replaceMe = true; throw x;305}306}307} finally {308// last handler to exit when shutdown releases resources309int remaining = threadExit(this, replaceMe);310if (remaining == 0 && isShutdown()) {311implClose();312}313}314}315}316317// -- Native methods --318319private static native void socketpair(int[] sv) throws IOException;320321private static native void interrupt(int fd) throws IOException;322323private static native void drain1(int fd) throws IOException;324325private static native void close0(int fd);326327static {328IOUtil.load();329}330}331332333