Path: blob/master/src/java.base/linux/classes/sun/nio/ch/EPollPort.java
40948 views
/*1* Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.spi.AsynchronousChannelProvider;28import java.io.IOException;29import java.util.concurrent.ArrayBlockingQueue;30import java.util.concurrent.RejectedExecutionException;31import java.util.concurrent.atomic.AtomicInteger;3233import static sun.nio.ch.EPoll.EPOLLIN;34import static sun.nio.ch.EPoll.EPOLLONESHOT;35import static sun.nio.ch.EPoll.EPOLL_CTL_ADD;36import static sun.nio.ch.EPoll.EPOLL_CTL_DEL;37import static sun.nio.ch.EPoll.EPOLL_CTL_MOD;383940/**41* AsynchronousChannelGroup implementation based on the Linux epoll facility.42*/4344final class EPollPort45extends Port46{47// maximum number of events to poll at a time48private static final int MAX_EPOLL_EVENTS = 512;4950// errors51private static final int ENOENT = 2;5253// epoll file descriptor54private final int epfd;5556// address of the poll array passed to epoll_wait57private final long address;5859// true if epoll closed60private boolean closed;6162// socket pair used for wakeup63private final int sp[];6465// number of wakeups pending66private final AtomicInteger wakeupCount = new AtomicInteger();6768// encapsulates an event for a channel69static class Event {70final PollableChannel channel;71final int events;7273Event(PollableChannel channel, int events) {74this.channel = channel;75this.events = events;76}7778PollableChannel channel() { return channel; }79int events() { return events; }80}8182// queue of events for cases that a polling thread dequeues more than one83// event84private final ArrayBlockingQueue<Event> queue;85private final Event NEED_TO_POLL = new Event(null, 0);86private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);8788EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)89throws IOException90{91super(provider, pool);9293this.epfd = EPoll.create();94this.address = EPoll.allocatePollArray(MAX_EPOLL_EVENTS);9596// create socket pair for wakeup mechanism97try {98long fds = IOUtil.makePipe(true);99this.sp = new int[]{(int) (fds >>> 32), (int) fds};100} catch (IOException ioe) {101EPoll.freePollArray(address);102FileDispatcherImpl.closeIntFD(epfd);103throw ioe;104}105106// register one end with epoll107EPoll.ctl(epfd, EPOLL_CTL_ADD, sp[0], EPOLLIN);108109// create the queue and offer the special event to ensure that the first110// threads polls111this.queue = new ArrayBlockingQueue<>(MAX_EPOLL_EVENTS);112this.queue.offer(NEED_TO_POLL);113}114115EPollPort start() {116startThreads(new EventHandlerTask());117return this;118}119120/**121* Release all resources122*/123private void implClose() {124synchronized (this) {125if (closed)126return;127closed = true;128}129try { FileDispatcherImpl.closeIntFD(epfd); } catch (IOException ioe) { }130try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }131try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }132EPoll.freePollArray(address);133}134135private void wakeup() {136if (wakeupCount.incrementAndGet() == 1) {137// write byte to socketpair to force wakeup138try {139IOUtil.write1(sp[1], (byte)0);140} catch (IOException x) {141throw new AssertionError(x);142}143}144}145146@Override147void executeOnHandlerTask(Runnable task) {148synchronized (this) {149if (closed)150throw new RejectedExecutionException();151offerTask(task);152wakeup();153}154}155156@Override157void shutdownHandlerTasks() {158/*159* If no tasks are running then just release resources; otherwise160* write to the one end of the socketpair to wakeup any polling threads.161*/162int nThreads = threadCount();163if (nThreads == 0) {164implClose();165} else {166// send wakeup to each thread167while (nThreads-- > 0) {168wakeup();169}170}171}172173// invoke by clients to register a file descriptor174@Override175void startPoll(int fd, int events) {176// update events (or add to epoll on first usage)177int err = EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));178if (err == ENOENT)179err = EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));180if (err != 0)181throw new AssertionError(); // should not happen182}183184/**185* Task to process events from epoll and dispatch to the channel's186* onEvent handler.187*188* Events are retrieved from epoll in batch and offered to a BlockingQueue189* where they are consumed by handler threads. A special "NEED_TO_POLL"190* event is used to signal one consumer to re-poll when all events have191* been consumed.192*/193private class EventHandlerTask implements Runnable {194private Event poll() throws IOException {195try {196for (;;) {197int n;198do {199n = EPoll.wait(epfd, address, MAX_EPOLL_EVENTS, -1);200} while (n == IOStatus.INTERRUPTED);201202/**203* 'n' events have been read. Here we map them to their204* corresponding channel in batch and queue n-1 so that205* they can be handled by other handler threads. The last206* event is handled by this thread (and so is not queued).207*/208fdToChannelLock.readLock().lock();209try {210while (n-- > 0) {211long eventAddress = EPoll.getEvent(address, n);212int fd = EPoll.getDescriptor(eventAddress);213214// wakeup215if (fd == sp[0]) {216if (wakeupCount.decrementAndGet() == 0) {217// consume one wakeup byte, never more as this218// would interfere with shutdown when there is219// a wakeup byte queued to wake each thread220int nread;221do {222nread = IOUtil.drain1(sp[0]);223} while (nread == IOStatus.INTERRUPTED);224}225226// queue special event if there are more events227// to handle.228if (n > 0) {229queue.offer(EXECUTE_TASK_OR_SHUTDOWN);230continue;231}232return EXECUTE_TASK_OR_SHUTDOWN;233}234235PollableChannel channel = fdToChannel.get(fd);236if (channel != null) {237int events = EPoll.getEvents(eventAddress);238Event ev = new Event(channel, events);239240// n-1 events are queued; This thread handles241// the last one except for the wakeup242if (n > 0) {243queue.offer(ev);244} else {245return ev;246}247}248}249} finally {250fdToChannelLock.readLock().unlock();251}252}253} finally {254// to ensure that some thread will poll when all events have255// been consumed256queue.offer(NEED_TO_POLL);257}258}259260public void run() {261Invoker.GroupAndInvokeCount myGroupAndInvokeCount =262Invoker.getGroupAndInvokeCount();263final boolean isPooledThread = (myGroupAndInvokeCount != null);264boolean replaceMe = false;265Event ev;266try {267for (;;) {268// reset invoke count269if (isPooledThread)270myGroupAndInvokeCount.resetInvokeCount();271272try {273replaceMe = false;274ev = queue.take();275276// no events and this thread has been "selected" to277// poll for more.278if (ev == NEED_TO_POLL) {279try {280ev = poll();281} catch (IOException x) {282x.printStackTrace();283return;284}285}286} catch (InterruptedException x) {287continue;288}289290// handle wakeup to execute task or shutdown291if (ev == EXECUTE_TASK_OR_SHUTDOWN) {292Runnable task = pollTask();293if (task == null) {294// shutdown request295return;296}297// run task (may throw error/exception)298replaceMe = true;299task.run();300continue;301}302303// process event304try {305ev.channel().onEvent(ev.events(), isPooledThread);306} catch (Error x) {307replaceMe = true; throw x;308} catch (RuntimeException x) {309replaceMe = true; throw x;310}311}312} finally {313// last handler to exit when shutdown releases resources314int remaining = threadExit(this, replaceMe);315if (remaining == 0 && isShutdown()) {316implClose();317}318}319}320}321}322323324