Path: blob/master/src/java.base/windows/classes/sun/nio/ch/Iocp.java
41139 views
/*1* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.*;28import java.nio.channels.spi.AsynchronousChannelProvider;29import java.io.Closeable;30import java.io.IOException;31import java.io.FileDescriptor;32import java.util.*;33import java.util.concurrent.*;34import java.util.concurrent.locks.ReadWriteLock;35import java.util.concurrent.locks.ReentrantReadWriteLock;36import jdk.internal.misc.Unsafe;3738/**39* Windows implementation of AsynchronousChannelGroup encapsulating an I/O40* completion port.41*/4243class Iocp extends AsynchronousChannelGroupImpl {44private static final Unsafe unsafe = Unsafe.getUnsafe();45private static final long INVALID_HANDLE_VALUE = -1L;4647// maps completion key to channel48private final ReadWriteLock keyToChannelLock = new ReentrantReadWriteLock();49private final Map<Integer,OverlappedChannel> keyToChannel =50new HashMap<Integer,OverlappedChannel>();51private int nextCompletionKey;5253// handle to completion port54private final long port;5556// true if port has been closed57private boolean closed;5859// the set of "stale" OVERLAPPED structures. These OVERLAPPED structures60// relate to I/O operations where the completion notification was not61// received in a timely manner after the channel is closed.62private final Set<Long> staleIoSet = new HashSet<Long>();6364Iocp(AsynchronousChannelProvider provider, ThreadPool pool)65throws IOException66{67super(provider, pool);68this.port =69createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, fixedThreadCount());70this.nextCompletionKey = 1;71}7273Iocp start() {74startThreads(new EventHandlerTask());75return this;76}7778/*79* Channels implements this interface support overlapped I/O and can be80* associated with a completion port.81*/82static interface OverlappedChannel extends Closeable {83/**84* Returns a reference to the pending I/O result.85*/86<V,A> PendingFuture<V,A> getByOverlapped(long overlapped);87}8889// release all resources90void implClose() {91synchronized (this) {92if (closed)93return;94closed = true;95}96close0(port);97synchronized (staleIoSet) {98for (Long ov: staleIoSet) {99unsafe.freeMemory(ov);100}101staleIoSet.clear();102}103}104105@Override106boolean isEmpty() {107keyToChannelLock.writeLock().lock();108try {109return keyToChannel.isEmpty();110} finally {111keyToChannelLock.writeLock().unlock();112}113}114115@Override116final Object attachForeignChannel(final Channel channel, FileDescriptor fdObj)117throws IOException118{119int key = associate(new OverlappedChannel() {120public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {121return null;122}123public void close() throws IOException {124channel.close();125}126}, 0L);127return Integer.valueOf(key);128}129130@Override131final void detachForeignChannel(Object key) {132disassociate((Integer)key);133}134135@Override136void closeAllChannels() {137/**138* On Windows the close operation will close the socket/file handle139* and then wait until all outstanding I/O operations have aborted.140* This is necessary as each channel's cache of OVERLAPPED structures141* can only be freed once all I/O operations have completed. As I/O142* completion requires a lookup of the keyToChannel then we must close143* the channels when not holding the write lock.144*/145final int MAX_BATCH_SIZE = 32;146OverlappedChannel channels[] = new OverlappedChannel[MAX_BATCH_SIZE];147int count;148do {149// grab a batch of up to 32 channels150keyToChannelLock.writeLock().lock();151count = 0;152try {153for (Integer key: keyToChannel.keySet()) {154channels[count++] = keyToChannel.get(key);155if (count >= MAX_BATCH_SIZE)156break;157}158} finally {159keyToChannelLock.writeLock().unlock();160}161162// close them163for (int i=0; i<count; i++) {164try {165channels[i].close();166} catch (IOException ignore) { }167}168} while (count > 0);169}170171private void wakeup() {172try {173postQueuedCompletionStatus(port, 0);174} catch (IOException e) {175// should not happen176throw new AssertionError(e);177}178}179180@Override181void executeOnHandlerTask(Runnable task) {182synchronized (this) {183if (closed)184throw new RejectedExecutionException();185offerTask(task);186wakeup();187}188189}190191@Override192void shutdownHandlerTasks() {193// shutdown all handler threads194int nThreads = threadCount();195while (nThreads-- > 0) {196wakeup();197}198}199200/**201* Associate the given handle with this group202*/203int associate(OverlappedChannel ch, long handle) throws IOException {204keyToChannelLock.writeLock().lock();205206// generate a completion key (if not shutdown)207int key;208try {209if (isShutdown())210throw new ShutdownChannelGroupException();211212// generate unique key213do {214key = nextCompletionKey++;215} while ((key == 0) || keyToChannel.containsKey(key));216217// associate with I/O completion port218if (handle != 0L) {219createIoCompletionPort(handle, port, key, 0);220}221222// setup mapping223keyToChannel.put(key, ch);224} finally {225keyToChannelLock.writeLock().unlock();226}227return key;228}229230/**231* Disassociate channel from the group.232*/233void disassociate(int key) {234boolean checkForShutdown = false;235236keyToChannelLock.writeLock().lock();237try {238keyToChannel.remove(key);239240// last key to be removed so check if group is shutdown241if (keyToChannel.isEmpty())242checkForShutdown = true;243244} finally {245keyToChannelLock.writeLock().unlock();246}247248// continue shutdown249if (checkForShutdown && isShutdown()) {250try {251shutdownNow();252} catch (IOException ignore) { }253}254}255256/**257* Invoked when a channel associated with this port is closed before258* notifications for all outstanding I/O operations have been received.259*/260void makeStale(Long overlapped) {261synchronized (staleIoSet) {262staleIoSet.add(overlapped);263}264}265266/**267* Checks if the given OVERLAPPED is stale and if so, releases it.268*/269private void checkIfStale(long ov) {270synchronized (staleIoSet) {271boolean removed = staleIoSet.remove(ov);272if (removed) {273unsafe.freeMemory(ov);274}275}276}277278/**279* The handler for consuming the result of an asynchronous I/O operation.280*/281static interface ResultHandler {282/**283* Invoked if the I/O operation completes successfully.284*/285public void completed(int bytesTransferred, boolean canInvokeDirect);286287/**288* Invoked if the I/O operation fails.289*/290public void failed(int error, IOException ioe);291}292293// Creates IOException for the given I/O error.294private static IOException translateErrorToIOException(int error) {295String msg = getErrorMessage(error);296if (msg == null)297msg = "Unknown error: 0x0" + Integer.toHexString(error);298return new IOException(msg);299}300301/**302* Long-running task servicing system-wide or per-file completion port303*/304private class EventHandlerTask implements Runnable {305public void run() {306Invoker.GroupAndInvokeCount myGroupAndInvokeCount =307Invoker.getGroupAndInvokeCount();308boolean canInvokeDirect = (myGroupAndInvokeCount != null);309CompletionStatus ioResult = new CompletionStatus();310boolean replaceMe = false;311312try {313for (;;) {314// reset invoke count315if (myGroupAndInvokeCount != null)316myGroupAndInvokeCount.resetInvokeCount();317318// wait for I/O completion event319// An error here is fatal (thread will not be replaced)320replaceMe = false;321try {322getQueuedCompletionStatus(port, ioResult);323} catch (IOException x) {324// should not happen325x.printStackTrace();326return;327}328329// handle wakeup to execute task or shutdown330if (ioResult.completionKey() == 0 &&331ioResult.overlapped() == 0L)332{333Runnable task = pollTask();334if (task == null) {335// shutdown request336return;337}338339// run task340// (if error/exception then replace thread)341replaceMe = true;342task.run();343continue;344}345346// map key to channel347OverlappedChannel ch = null;348keyToChannelLock.readLock().lock();349try {350ch = keyToChannel.get(ioResult.completionKey());351if (ch == null) {352checkIfStale(ioResult.overlapped());353continue;354}355} finally {356keyToChannelLock.readLock().unlock();357}358359// lookup I/O request360PendingFuture<?,?> result = ch.getByOverlapped(ioResult.overlapped());361if (result == null) {362// we get here if the OVERLAPPED structure is associated363// with an I/O operation on a channel that was closed364// but the I/O operation event wasn't read in a timely365// manner. Alternatively, it may be related to a366// tryLock operation as the OVERLAPPED structures for367// these operations are not in the I/O cache.368checkIfStale(ioResult.overlapped());369continue;370}371372// synchronize on result in case I/O completed immediately373// and was handled by initiator374synchronized (result) {375if (result.isDone()) {376continue;377}378// not handled by initiator379}380381// invoke I/O result handler382int error = ioResult.error();383ResultHandler rh = (ResultHandler)result.getContext();384replaceMe = true; // (if error/exception then replace thread)385if (error == 0) {386rh.completed(ioResult.bytesTransferred(), canInvokeDirect);387} else {388rh.failed(error, translateErrorToIOException(error));389}390}391} finally {392// last thread to exit when shutdown releases resources393int remaining = threadExit(this, replaceMe);394if (remaining == 0 && isShutdown()) {395implClose();396}397}398}399}400401/**402* Container for data returned by GetQueuedCompletionStatus403*/404private static class CompletionStatus {405private int error;406private int bytesTransferred;407private int completionKey;408private long overlapped;409410private CompletionStatus() { }411int error() { return error; }412int bytesTransferred() { return bytesTransferred; }413int completionKey() { return completionKey; }414long overlapped() { return overlapped; }415}416417// -- native methods --418419private static native void initIDs();420421private static native long createIoCompletionPort(long handle,422long existingPort, int completionKey, int concurrency) throws IOException;423424private static native void close0(long handle);425426private static native void getQueuedCompletionStatus(long completionPort,427CompletionStatus status) throws IOException;428429private static native void postQueuedCompletionStatus(long completionPort,430int completionKey) throws IOException;431432private static native String getErrorMessage(int error);433434static {435IOUtil.load();436initIDs();437}438}439440441