Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/classes/sun/nio/ch/EPollPort.java
32288 views
/*1* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.spi.AsynchronousChannelProvider;28import java.io.IOException;29import java.util.concurrent.ArrayBlockingQueue;30import java.util.concurrent.RejectedExecutionException;31import java.util.concurrent.atomic.AtomicInteger;32import static sun.nio.ch.EPoll.*;3334/**35* AsynchronousChannelGroup implementation based on the Linux epoll facility.36*/3738final class EPollPort39extends Port40{41// maximum number of events to poll at a time42private static final int MAX_EPOLL_EVENTS = 512;4344// errors45private static final int ENOENT = 2;4647// epoll file descriptor48private final int epfd;4950// true if epoll closed51private boolean closed;5253// socket pair used for wakeup54private final int sp[];5556// number of wakeups pending57private final AtomicInteger wakeupCount = new AtomicInteger();5859// address of the poll array passed to epoll_wait60private final long address;6162// encapsulates an event for a channel63static class Event {64final PollableChannel channel;65final int events;6667Event(PollableChannel channel, int events) {68this.channel = channel;69this.events = events;70}7172PollableChannel channel() { return channel; }73int events() { return events; }74}7576// queue of events for cases that a polling thread dequeues more than one77// event78private final ArrayBlockingQueue<Event> queue;79private final Event NEED_TO_POLL = new Event(null, 0);80private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);8182EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)83throws IOException84{85super(provider, pool);8687// open epoll88this.epfd = epollCreate();8990// create socket pair for wakeup mechanism91int[] sv = new int[2];92try {93socketpair(sv);94// register one end with epoll95epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN);96} catch (IOException x) {97close0(epfd);98throw x;99}100this.sp = sv;101102// allocate the poll array103this.address = allocatePollArray(MAX_EPOLL_EVENTS);104105// create the queue and offer the special event to ensure that the first106// threads polls107this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);108this.queue.offer(NEED_TO_POLL);109}110111EPollPort start() {112startThreads(new EventHandlerTask());113return this;114}115116/**117* Release all resources118*/119private void implClose() {120synchronized (this) {121if (closed)122return;123closed = true;124}125freePollArray(address);126close0(sp[0]);127close0(sp[1]);128close0(epfd);129}130131private void wakeup() {132if (wakeupCount.incrementAndGet() == 1) {133// write byte to socketpair to force wakeup134try {135interrupt(sp[1]);136} catch (IOException x) {137throw new AssertionError(x);138}139}140}141142@Override143void executeOnHandlerTask(Runnable task) {144synchronized (this) {145if (closed)146throw new RejectedExecutionException();147offerTask(task);148wakeup();149}150}151152@Override153void shutdownHandlerTasks() {154/*155* If no tasks are running then just release resources; otherwise156* write to the one end of the socketpair to wakeup any polling threads.157*/158int nThreads = threadCount();159if (nThreads == 0) {160implClose();161} else {162// send interrupt to each thread163while (nThreads-- > 0) {164wakeup();165}166}167}168169// invoke by clients to register a file descriptor170@Override171void startPoll(int fd, int events) {172// update events (or add to epoll on first usage)173int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));174if (err == ENOENT)175err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));176if (err != 0)177throw new AssertionError(); // should not happen178}179180/*181* Task to process events from epoll and dispatch to the channel's182* onEvent handler.183*184* Events are retreived from epoll in batch and offered to a BlockingQueue185* where they are consumed by handler threads. A special "NEED_TO_POLL"186* event is used to signal one consumer to re-poll when all events have187* been consumed.188*/189private class EventHandlerTask implements Runnable {190private Event poll() throws IOException {191try {192for (;;) {193int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);194/*195* 'n' events have been read. Here we map them to their196* corresponding channel in batch and queue n-1 so that197* they can be handled by other handler threads. The last198* event is handled by this thread (and so is not queued).199*/200fdToChannelLock.readLock().lock();201try {202while (n-- > 0) {203long eventAddress = getEvent(address, n);204int fd = getDescriptor(eventAddress);205206// wakeup207if (fd == sp[0]) {208if (wakeupCount.decrementAndGet() == 0) {209// no more wakeups so drain pipe210drain1(sp[0]);211}212213// queue special event if there are more events214// to handle.215if (n > 0) {216queue.offer(EXECUTE_TASK_OR_SHUTDOWN);217continue;218}219return EXECUTE_TASK_OR_SHUTDOWN;220}221222PollableChannel channel = fdToChannel.get(fd);223if (channel != null) {224int events = getEvents(eventAddress);225Event ev = new Event(channel, events);226227// n-1 events are queued; This thread handles228// the last one except for the wakeup229if (n > 0) {230queue.offer(ev);231} else {232return ev;233}234}235}236} finally {237fdToChannelLock.readLock().unlock();238}239}240} finally {241// to ensure that some thread will poll when all events have242// been consumed243queue.offer(NEED_TO_POLL);244}245}246247public void run() {248Invoker.GroupAndInvokeCount myGroupAndInvokeCount =249Invoker.getGroupAndInvokeCount();250final boolean isPooledThread = (myGroupAndInvokeCount != null);251boolean replaceMe = false;252Event ev;253try {254for (;;) {255// reset invoke count256if (isPooledThread)257myGroupAndInvokeCount.resetInvokeCount();258259try {260replaceMe = false;261ev = queue.take();262263// no events and this thread has been "selected" to264// poll for more.265if (ev == NEED_TO_POLL) {266try {267ev = poll();268} catch (IOException x) {269x.printStackTrace();270return;271}272}273} catch (InterruptedException x) {274continue;275}276277// handle wakeup to execute task or shutdown278if (ev == EXECUTE_TASK_OR_SHUTDOWN) {279Runnable task = pollTask();280if (task == null) {281// shutdown request282return;283}284// run task (may throw error/exception)285replaceMe = true;286task.run();287continue;288}289290// process event291try {292ev.channel().onEvent(ev.events(), isPooledThread);293} catch (Error x) {294replaceMe = true; throw x;295} catch (RuntimeException x) {296replaceMe = true; throw x;297}298}299} finally {300// last handler to exit when shutdown releases resources301int remaining = threadExit(this, replaceMe);302if (remaining == 0 && isShutdown()) {303implClose();304}305}306}307}308309// -- Native methods --310311private static native void socketpair(int[] sv) throws IOException;312313private static native void interrupt(int fd) throws IOException;314315private static native void drain1(int fd) throws IOException;316317private static native void close0(int fd);318319static {320IOUtil.load();321}322}323324325