Path: blob/master/src/java.base/macosx/classes/sun/nio/ch/KQueuePort.java
41137 views
/*1* Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.spi.AsynchronousChannelProvider;28import java.io.IOException;29import java.util.concurrent.ArrayBlockingQueue;30import java.util.concurrent.RejectedExecutionException;31import java.util.concurrent.atomic.AtomicInteger;3233import static sun.nio.ch.KQueue.EVFILT_READ;34import static sun.nio.ch.KQueue.EVFILT_WRITE;35import static sun.nio.ch.KQueue.EV_ADD;36import static sun.nio.ch.KQueue.EV_ONESHOT;3738/**39* AsynchronousChannelGroup implementation based on the BSD kqueue facility.40*/4142final class KQueuePort43extends Port44{45// maximum number of events to poll at a time46private static final int MAX_KEVENTS_TO_POLL = 512;4748// kqueue file descriptor49private final int kqfd;5051// address of the poll array passed to kqueue_wait52private final long address;5354// true if kqueue closed55private boolean closed;5657// socket pair used for wakeup58private final int sp[];5960// number of wakeups pending61private final AtomicInteger wakeupCount = new AtomicInteger();6263// encapsulates an event for a channel64static class Event {65final PollableChannel channel;66final int events;6768Event(PollableChannel channel, int events) {69this.channel = channel;70this.events = events;71}7273PollableChannel channel() { return channel; }74int events() { return events; }75}7677// queue of events for cases that a polling thread dequeues more than one78// event79private final ArrayBlockingQueue<Event> queue;80private final Event NEED_TO_POLL = new Event(null, 0);81private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);8283KQueuePort(AsynchronousChannelProvider provider, ThreadPool pool)84throws IOException85{86super(provider, pool);8788this.kqfd = KQueue.create();89this.address = KQueue.allocatePollArray(MAX_KEVENTS_TO_POLL);9091// create socket pair for wakeup mechanism92try {93long fds = IOUtil.makePipe(true);94this.sp = new int[]{(int) (fds >>> 32), (int) fds};95} catch (IOException ioe) {96KQueue.freePollArray(address);97FileDispatcherImpl.closeIntFD(kqfd);98throw ioe;99}100101// register one end with kqueue102KQueue.register(kqfd, sp[0], EVFILT_READ, EV_ADD);103104// create the queue and offer the special event to ensure that the first105// threads polls106this.queue = new ArrayBlockingQueue<>(MAX_KEVENTS_TO_POLL);107this.queue.offer(NEED_TO_POLL);108}109110KQueuePort start() {111startThreads(new EventHandlerTask());112return this;113}114115/**116* Release all resources117*/118private void implClose() {119synchronized (this) {120if (closed)121return;122closed = true;123}124125try { FileDispatcherImpl.closeIntFD(kqfd); } catch (IOException ioe) { }126try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }127try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }128KQueue.freePollArray(address);129}130131private void wakeup() {132if (wakeupCount.incrementAndGet() == 1) {133// write byte to socketpair to force wakeup134try {135IOUtil.write1(sp[1], (byte)0);136} catch (IOException x) {137throw new AssertionError(x);138}139}140}141142@Override143void executeOnHandlerTask(Runnable task) {144synchronized (this) {145if (closed)146throw new RejectedExecutionException();147offerTask(task);148wakeup();149}150}151152@Override153void shutdownHandlerTasks() {154/*155* If no tasks are running then just release resources; otherwise156* write to the one end of the socketpair to wakeup any polling threads.157*/158int nThreads = threadCount();159if (nThreads == 0) {160implClose();161} else {162// send wakeup to each thread163while (nThreads-- > 0) {164wakeup();165}166}167}168169// invoked by clients to register a file descriptor170@Override171void startPoll(int fd, int events) {172// We use a separate filter for read and write events.173// TBD: Measure cost of EV_ONESHOT vs. EV_CLEAR, either will do here.174int err = 0;175int flags = (EV_ADD|EV_ONESHOT);176if ((events & Net.POLLIN) > 0)177err = KQueue.register(kqfd, fd, EVFILT_READ, flags);178if (err == 0 && (events & Net.POLLOUT) > 0)179err = KQueue.register(kqfd, fd, EVFILT_WRITE, flags);180if (err != 0)181throw new InternalError("kevent failed: " + err); // should not happen182}183184/**185* Task to process events from kqueue and dispatch to the channel's186* onEvent handler.187*188* Events are retrieved from kqueue in batch and offered to a BlockingQueue189* where they are consumed by handler threads. A special "NEED_TO_POLL"190* event is used to signal one consumer to re-poll when all events have191* been consumed.192*/193private class EventHandlerTask implements Runnable {194private Event poll() throws IOException {195try {196for (;;) {197int n;198do {199n = KQueue.poll(kqfd, address, MAX_KEVENTS_TO_POLL, -1L);200} while (n == IOStatus.INTERRUPTED);201202/**203* 'n' events have been read. Here we map them to their204* corresponding channel in batch and queue n-1 so that205* they can be handled by other handler threads. The last206* event is handled by this thread (and so is not queued).207*/208fdToChannelLock.readLock().lock();209try {210while (n-- > 0) {211long keventAddress = KQueue.getEvent(address, n);212int fd = KQueue.getDescriptor(keventAddress);213214// wakeup215if (fd == sp[0]) {216if (wakeupCount.decrementAndGet() == 0) {217// consume one wakeup byte, never more as this218// would interfere with shutdown when there is219// a wakeup byte queued to wake each thread220int nread;221do {222nread = IOUtil.drain1(sp[0]);223} while (nread == IOStatus.INTERRUPTED);224}225226// queue special event if there are more events227// to handle.228if (n > 0) {229queue.offer(EXECUTE_TASK_OR_SHUTDOWN);230continue;231}232return EXECUTE_TASK_OR_SHUTDOWN;233}234235PollableChannel channel = fdToChannel.get(fd);236if (channel != null) {237int filter = KQueue.getFilter(keventAddress);238int events = 0;239if (filter == EVFILT_READ)240events = Net.POLLIN;241else if (filter == EVFILT_WRITE)242events = Net.POLLOUT;243244Event ev = new Event(channel, events);245246// n-1 events are queued; This thread handles247// the last one except for the wakeup248if (n > 0) {249queue.offer(ev);250} else {251return ev;252}253}254}255} finally {256fdToChannelLock.readLock().unlock();257}258}259} finally {260// to ensure that some thread will poll when all events have261// been consumed262queue.offer(NEED_TO_POLL);263}264}265266public void run() {267Invoker.GroupAndInvokeCount myGroupAndInvokeCount =268Invoker.getGroupAndInvokeCount();269final boolean isPooledThread = (myGroupAndInvokeCount != null);270boolean replaceMe = false;271Event ev;272try {273for (;;) {274// reset invoke count275if (isPooledThread)276myGroupAndInvokeCount.resetInvokeCount();277278try {279replaceMe = false;280ev = queue.take();281282// no events and this thread has been "selected" to283// poll for more.284if (ev == NEED_TO_POLL) {285try {286ev = poll();287} catch (IOException x) {288x.printStackTrace();289return;290}291}292} catch (InterruptedException x) {293continue;294}295296// handle wakeup to execute task or shutdown297if (ev == EXECUTE_TASK_OR_SHUTDOWN) {298Runnable task = pollTask();299if (task == null) {300// shutdown request301return;302}303// run task (may throw error/exception)304replaceMe = true;305task.run();306continue;307}308309// process event310try {311ev.channel().onEvent(ev.events(), isPooledThread);312} catch (Error x) {313replaceMe = true; throw x;314} catch (RuntimeException x) {315replaceMe = true; throw x;316}317}318} finally {319// last handler to exit when shutdown releases resources320int remaining = threadExit(this, replaceMe);321if (remaining == 0 && isShutdown()) {322implClose();323}324}325}326}327}328329330