Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/windows/classes/sun/nio/ch/Iocp.java
32288 views
/*1* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.*;28import java.nio.channels.spi.AsynchronousChannelProvider;29import java.io.Closeable;30import java.io.IOException;31import java.io.FileDescriptor;32import java.util.*;33import java.util.concurrent.*;34import java.util.concurrent.locks.ReadWriteLock;35import java.util.concurrent.locks.ReentrantReadWriteLock;36import java.security.AccessController;37import sun.security.action.GetPropertyAction;38import sun.misc.Unsafe;3940/**41* Windows implementation of AsynchronousChannelGroup encapsulating an I/O42* completion port.43*/4445class Iocp extends AsynchronousChannelGroupImpl {46private static final Unsafe unsafe = Unsafe.getUnsafe();47private static final long INVALID_HANDLE_VALUE = -1L;48private static final boolean supportsThreadAgnosticIo;4950// maps completion key to channel51private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();52private final Map<Integer,OverlappedChannel> keyToChannel =53new HashMap<Integer,OverlappedChannel>();54private int nextCompletionKey;5556// handle to completion port57private final long port;5859// true if port has been closed60private boolean closed;6162// the set of "stale" OVERLAPPED structures. These OVERLAPPED structures63// relate to I/O operations where the completion notification was not64// received in a timely manner after the channel is closed.65private final Set<Long> staleIoSet = new HashSet<Long>();6667Iocp(AsynchronousChannelProvider provider, ThreadPool pool)68throws IOException69{70super(provider, pool);71this.port =72createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());73this.nextCompletionKey = 1;74}7576Iocp start() {77startThreads(new EventHandlerTask());78return this;79}8081/*82* Channels implements this interface support overlapped I/O and can be83* associated with a completion port.84*/85static interface OverlappedChannel extends Closeable {86/**87* Returns a reference to the pending I/O result.88*/89<V,A> PendingFuture<V,A> getByOverlapped(long overlapped);90}9192/**93* Indicates if this operating system supports thread agnostic I/O.94*/95static boolean supportsThreadAgnosticIo() {96return supportsThreadAgnosticIo;97}9899// release all resources100void implClose() {101synchronized (this) {102if (closed)103return;104closed = true;105}106close0(port);107synchronized (staleIoSet) {108for (Long ov: staleIoSet) {109unsafe.freeMemory(ov);110}111staleIoSet.clear();112}113}114115@Override116boolean isEmpty() {117keyToChannelLock.writeLock().lock();118try {119return keyToChannel.isEmpty();120} finally {121keyToChannelLock.writeLock().unlock();122}123}124125@Override126final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)127throws IOException128{129int key = associate(new OverlappedChannel() {130public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {131return null;132}133public void close() throws IOException {134channel.close();135}136}, 0L);137return Integer.valueOf(key);138}139140@Override141final void detachForeignChannel(Object key) {142disassociate((Integer)key);143}144145@Override146void closeAllChannels() {147/**148* On Windows the close operation will close the socket/file handle149* and then wait until all outstanding I/O operations have aborted.150* This is necessary as each channel's cache of OVERLAPPED structures151* can only be freed once all I/O operations have completed. As I/O152* completion requires a lookup of the keyToChannel then we must close153* the channels when not holding the write lock.154*/155final int MAX_BATCH_SIZE = 32;156OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];157int count;158do {159// grab a batch of up to 32 channels160keyToChannelLock.writeLock().lock();161count = 0;162try {163for (Integer key: keyToChannel.keySet()) {164channels[count++] = keyToChannel.get(key);165if (count >= MAX_BATCH_SIZE)166break;167}168} finally {169keyToChannelLock.writeLock().unlock();170}171172// close them173for (int i=0; i<count; i++) {174try {175channels[i].close();176} catch (IOException ignore) { }177}178} while (count > 0);179}180181private void wakeup() {182try {183postQueuedCompletionStatus(port, 0);184} catch (IOException e) {185// should not happen186throw new AssertionError(e);187}188}189190@Override191void executeOnHandlerTask(Runnable task) {192synchronized (this) {193if (closed)194throw new RejectedExecutionException();195offerTask(task);196wakeup();197}198199}200201@Override202void shutdownHandlerTasks() {203// shutdown all handler threads204int nThreads = threadCount();205while (nThreads-- > 0) {206wakeup();207}208}209210/**211* Associate the given handle with this group212*/213int associate(OverlappedChannel ch, long handle) throws IOException {214keyToChannelLock.writeLock().lock();215216// generate a completion key (if not shutdown)217int key;218try {219if (isShutdown())220throw new ShutdownChannelGroupException();221222// generate unique key223do {224key = nextCompletionKey++;225} while ((key == 0) || keyToChannel.containsKey(key));226227// associate with I/O completion port228if (handle != 0L) {229createIoCompletionPort(handle, port, key, 0);230}231232// setup mapping233keyToChannel.put(key, ch);234} finally {235keyToChannelLock.writeLock().unlock();236}237return key;238}239240/**241* Disassociate channel from the group.242*/243void disassociate(int key) {244boolean checkForShutdown = false;245246keyToChannelLock.writeLock().lock();247try {248keyToChannel.remove(key);249250// last key to be removed so check if group is shutdown251if (keyToChannel.isEmpty())252checkForShutdown = true;253254} finally {255keyToChannelLock.writeLock().unlock();256}257258// continue shutdown259if (checkForShutdown && isShutdown()) {260try {261shutdownNow();262} catch (IOException ignore) { }263}264}265266/**267* Invoked when a channel associated with this port is closed before268* notifications for all outstanding I/O operations have been received.269*/270void makeStale(Long overlapped) {271synchronized (staleIoSet) {272staleIoSet.add(overlapped);273}274}275276/**277* Checks if the given OVERLAPPED is stale and if so, releases it.278*/279private void checkIfStale(long ov) {280synchronized (staleIoSet) {281boolean removed = staleIoSet.remove(ov);282if (removed) {283unsafe.freeMemory(ov);284}285}286}287288/**289* The handler for consuming the result of an asynchronous I/O operation.290*/291static interface ResultHandler {292/**293* Invoked if the I/O operation completes successfully.294*/295public void completed(int bytesTransferred, boolean canInvokeDirect);296297/**298* Invoked if the I/O operation fails.299*/300public void failed(int error, IOException ioe);301}302303// Creates IOException for the given I/O error.304private static IOException translateErrorToIOException(int error) {305String msg = getErrorMessage(error);306if (msg == null)307msg = "Unknown error: 0x0" + Integer.toHexString(error);308return new IOException(msg);309}310311/**312* Long-running task servicing system-wide or per-file completion port313*/314private class EventHandlerTask implements Runnable {315public void run() {316Invoker.GroupAndInvokeCount myGroupAndInvokeCount =317Invoker.getGroupAndInvokeCount();318boolean canInvokeDirect = (myGroupAndInvokeCount != null);319CompletionStatus ioResult = new CompletionStatus();320boolean replaceMe = false;321322try {323for (;;) {324// reset invoke count325if (myGroupAndInvokeCount != null)326myGroupAndInvokeCount.resetInvokeCount();327328// wait for I/O completion event329// A error here is fatal (thread will not be replaced)330replaceMe = false;331try {332getQueuedCompletionStatus(port, ioResult);333} catch (IOException x) {334// should not happen335x.printStackTrace();336return;337}338339// handle wakeup to execute task or shutdown340if (ioResult.completionKey() == 0 &&341ioResult.overlapped() == 0L)342{343Runnable task = pollTask();344if (task == null) {345// shutdown request346return;347}348349// run task350// (if error/exception then replace thread)351replaceMe = true;352task.run();353continue;354}355356// map key to channel357OverlappedChannel ch = null;358keyToChannelLock.readLock().lock();359try {360ch = keyToChannel.get(ioResult.completionKey());361if (ch == null) {362checkIfStale(ioResult.overlapped());363continue;364}365} finally {366keyToChannelLock.readLock().unlock();367}368369// lookup I/O request370PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());371if (result == null) {372// we get here if the OVERLAPPED structure is associated373// with an I/O operation on a channel that was closed374// but the I/O operation event wasn't read in a timely375// manner. Alternatively, it may be related to a376// tryLock operation as the OVERLAPPED structures for377// these operations are not in the I/O cache.378checkIfStale(ioResult.overlapped());379continue;380}381382// synchronize on result in case I/O completed immediately383// and was handled by initiator384synchronized (result) {385if (result.isDone()) {386continue;387}388// not handled by initiator389}390391// invoke I/O result handler392int error = ioResult.error();393ResultHandler rh = (ResultHandler)result.getContext();394replaceMe = true; // (if error/exception then replace thread)395if (error == 0) {396rh.completed(ioResult.bytesTransferred(), canInvokeDirect);397} else {398rh.failed(error, translateErrorToIOException(error));399}400}401} finally {402// last thread to exit when shutdown releases resources403int remaining = threadExit(this, replaceMe);404if (remaining == 0 && isShutdown()) {405implClose();406}407}408}409}410411/**412* Container for data returned by GetQueuedCompletionStatus413*/414private static class CompletionStatus {415private int error;416private int bytesTransferred;417private int completionKey;418private long overlapped;419420private CompletionStatus() { }421int error() { return error; }422int bytesTransferred() { return bytesTransferred; }423int completionKey() { return completionKey; }424long overlapped() { return overlapped; }425}426427// -- native methods --428429private static native void initIDs();430431private static native long createIoCompletionPort(long handle,432long existingPort, int completionKey, int concurrency) throws IOException;433434private static native void close0(long handle);435436private static native void getQueuedCompletionStatus(long completionPort,437CompletionStatus status) throws IOException;438439private static native void postQueuedCompletionStatus(long completionPort,440int completionKey) throws IOException;441442private static native String getErrorMessage(int error);443444static {445IOUtil.load();446initIDs();447448// thread agnostic I/O on Vista/2008 or newer449String osversion = AccessController.doPrivileged(450new GetPropertyAction("os.version"));451String vers[] = osversion.split("\\.");452supportsThreadAgnosticIo = Integer.parseInt(vers[0]) >= 6;453}454}455456457