Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/share/classes/sun/nio/ch/AsynchronousChannelGroupImpl.java
38918 views
/*1* Copyright (c) 2008, 2017, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.Channel;28import java.nio.channels.AsynchronousChannelGroup;29import java.nio.channels.spi.AsynchronousChannelProvider;30import java.io.IOException;31import java.io.FileDescriptor;32import java.util.Queue;33import java.util.concurrent.*;34import java.util.concurrent.atomic.AtomicInteger;35import java.util.concurrent.atomic.AtomicBoolean;36import java.security.PrivilegedAction;37import java.security.AccessController;38import java.security.AccessControlContext;39import sun.security.action.GetIntegerAction;4041/**42* Base implementation of AsynchronousChannelGroup43*/4445abstract class AsynchronousChannelGroupImpl46extends AsynchronousChannelGroup implements Executor47{48// number of internal threads handling I/O events when using an unbounded49// thread pool. Internal threads do not dispatch to completion handlers.50private static final int internalThreadCount = AccessController.doPrivileged(51new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));5253// associated thread pool54private final ThreadPool pool;5556// number of tasks running (including internal)57private final AtomicInteger threadCount = new AtomicInteger();5859// associated Executor for timeouts60private ScheduledThreadPoolExecutor timeoutExecutor;6162// task queue for when using a fixed thread pool. In that case, thread63// waiting on I/O events must be awokon to poll tasks from this queue.64private final Queue<Runnable> taskQueue;6566// group shutdown67private final AtomicBoolean shutdown = new AtomicBoolean();68private final Object shutdownNowLock = new Object();69private volatile boolean terminateInitiated;7071AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,72ThreadPool pool)73{74super(provider);75this.pool = pool;7677if (pool.isFixedThreadPool()) {78taskQueue = new ConcurrentLinkedQueue<Runnable>();79} else {80taskQueue = null; // not used81}8283// use default thread factory as thread should not be visible to84// application (it doesn't execute completion handlers).85this.timeoutExecutor = (ScheduledThreadPoolExecutor)86Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());87this.timeoutExecutor.setRemoveOnCancelPolicy(true);88}8990final ExecutorService executor() {91return pool.executor();92}9394final boolean isFixedThreadPool() {95return pool.isFixedThreadPool();96}9798final int fixedThreadCount() {99if (isFixedThreadPool()) {100return pool.poolSize();101} else {102return pool.poolSize() + internalThreadCount;103}104}105106private Runnable bindToGroup(final Runnable task) {107final AsynchronousChannelGroupImpl thisGroup = this;108return new Runnable() {109public void run() {110Invoker.bindToGroup(thisGroup);111task.run();112}113};114}115116private void startInternalThread(final Runnable task) {117AccessController.doPrivileged(new PrivilegedAction<Void>() {118@Override119public Void run() {120// internal threads should not be visible to application so121// cannot use user-supplied thread factory122ThreadPool.defaultThreadFactory().newThread(task).start();123return null;124}125});126}127128protected final void startThreads(Runnable task) {129if (!isFixedThreadPool()) {130for (int i=0; i<internalThreadCount; i++) {131startInternalThread(task);132threadCount.incrementAndGet();133}134}135if (pool.poolSize() > 0) {136task = bindToGroup(task);137try {138for (int i=0; i<pool.poolSize(); i++) {139pool.executor().execute(task);140threadCount.incrementAndGet();141}142} catch (RejectedExecutionException x) {143// nothing we can do144}145}146}147148final int threadCount() {149return threadCount.get();150}151152/**153* Invoked by tasks as they terminate154*/155final int threadExit(Runnable task, boolean replaceMe) {156if (replaceMe) {157try {158if (Invoker.isBoundToAnyGroup()) {159// submit new task to replace this thread160pool.executor().execute(bindToGroup(task));161} else {162// replace internal thread163startInternalThread(task);164}165return threadCount.get();166} catch (RejectedExecutionException x) {167// unable to replace168}169}170return threadCount.decrementAndGet();171}172173/**174* Wakes up a thread waiting for I/O events to execute the given task.175*/176abstract void executeOnHandlerTask(Runnable task);177178/**179* For a fixed thread pool the task is queued to a thread waiting on I/O180* events. For other thread pools we simply submit the task to the thread181* pool.182*/183final void executeOnPooledThread(Runnable task) {184if (isFixedThreadPool()) {185executeOnHandlerTask(task);186} else {187pool.executor().execute(bindToGroup(task));188}189}190191final void offerTask(Runnable task) {192taskQueue.offer(task);193}194195final Runnable pollTask() {196return (taskQueue == null) ? null : taskQueue.poll();197}198199final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {200try {201return timeoutExecutor.schedule(task, timeout, unit);202} catch (RejectedExecutionException rej) {203if (terminateInitiated) {204// no timeout scheduled as group is terminating205return null;206}207throw new AssertionError(rej);208}209}210211@Override212public final boolean isShutdown() {213return shutdown.get();214}215216@Override217public final boolean isTerminated() {218return pool.executor().isTerminated();219}220221/**222* Returns true if there are no channels in the group223*/224abstract boolean isEmpty();225226/**227* Attaches a foreign channel to this group.228*/229abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)230throws IOException;231232/**233* Detaches a foreign channel from this group.234*/235abstract void detachForeignChannel(Object key);236237/**238* Closes all channels in the group239*/240abstract void closeAllChannels() throws IOException;241242/**243* Shutdown all tasks waiting for I/O events.244*/245abstract void shutdownHandlerTasks();246247private void shutdownExecutors() {248AccessController.doPrivileged(249new PrivilegedAction<Void>() {250public Void run() {251pool.executor().shutdown();252timeoutExecutor.shutdown();253return null;254}255},256null,257new RuntimePermission("modifyThread"));258}259260@Override261public final void shutdown() {262if (shutdown.getAndSet(true)) {263// already shutdown264return;265}266// if there are channels in the group then shutdown will continue267// when the last channel is closed268if (!isEmpty()) {269return;270}271// initiate termination (acquire shutdownNowLock to ensure that other272// threads invoking shutdownNow will block).273synchronized (shutdownNowLock) {274if (!terminateInitiated) {275terminateInitiated = true;276shutdownHandlerTasks();277shutdownExecutors();278}279}280}281282@Override283public final void shutdownNow() throws IOException {284shutdown.set(true);285synchronized (shutdownNowLock) {286if (!terminateInitiated) {287terminateInitiated = true;288closeAllChannels();289shutdownHandlerTasks();290shutdownExecutors();291}292}293}294295/**296* For use by AsynchronousFileChannel to release resources without shutting297* down the thread pool.298*/299final void detachFromThreadPool() {300if (shutdown.getAndSet(true))301throw new AssertionError("Already shutdown");302if (!isEmpty())303throw new AssertionError("Group not empty");304shutdownHandlerTasks();305}306307@Override308public final boolean awaitTermination(long timeout, TimeUnit unit)309throws InterruptedException310{311return pool.executor().awaitTermination(timeout, unit);312}313314/**315* Executes the given command on one of the channel group's pooled threads.316*/317@Override318public final void execute(Runnable task) {319SecurityManager sm = System.getSecurityManager();320if (sm != null) {321// when a security manager is installed then the user's task322// must be run with the current calling context323final AccessControlContext acc = AccessController.getContext();324final Runnable delegate = task;325task = new Runnable() {326@Override327public void run() {328AccessController.doPrivileged(new PrivilegedAction<Void>() {329@Override330public Void run() {331delegate.run();332return null;333}334}, acc);335}336};337}338executeOnPooledThread(task);339}340}341342343