Path: blob/master/src/java.base/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
41139 views
/*1* Copyright (c) 2008, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.*;28import java.nio.ByteBuffer;29import java.nio.BufferOverflowException;30import java.net.*;31import java.util.concurrent.*;32import java.io.IOException;33import java.security.AccessController;34import java.security.PrivilegedActionException;35import java.security.PrivilegedExceptionAction;36import jdk.internal.misc.Unsafe;37import sun.net.util.SocketExceptions;3839/**40* Windows implementation of AsynchronousSocketChannel using overlapped I/O.41*/4243class WindowsAsynchronousSocketChannelImpl44extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel45{46private static final Unsafe unsafe = Unsafe.getUnsafe();47private static int addressSize = unsafe.addressSize();4849private static int dependsArch(int value32, int value64) {50return (addressSize == 4) ? value32 : value64;51}5253/*54* typedef struct _WSABUF {55* u_long len;56* char FAR * buf;57* } WSABUF;58*/59private static final int SIZEOF_WSABUF = dependsArch(8, 16);60private static final int OFFSETOF_LEN = 0;61private static final int OFFSETOF_BUF = dependsArch(4, 8);6263// maximum vector size for scatter/gather I/O64private static final int MAX_WSABUF = 16;6566private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;676869// socket handle. Use begin()/end() around each usage of this handle.70final long handle;7172// I/O completion port that the socket is associated with73private final Iocp iocp;7475// completion key to identify channel when I/O completes76private final int completionKey;7778// Pending I/O operations are tied to an OVERLAPPED structure that can only79// be released when the I/O completion event is posted to the completion80// port. Where I/O operations complete immediately then it is possible81// there may be more than two OVERLAPPED structures in use.82private final PendingIoCache ioCache;8384// per-channel arrays of WSABUF structures85private final long readBufferArray;86private final long writeBufferArray;878889WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)90throws IOException91{92super(iocp);9394// associate socket with default completion port95long h = IOUtil.fdVal(fd);96int key = 0;97try {98key = iocp.associate(this, h);99} catch (ShutdownChannelGroupException x) {100if (failIfGroupShutdown) {101closesocket0(h);102throw x;103}104} catch (IOException x) {105closesocket0(h);106throw x;107}108109this.handle = h;110this.iocp = iocp;111this.completionKey = key;112this.ioCache = new PendingIoCache();113114// allocate WSABUF arrays115this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);116this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);117}118119WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {120this(iocp, true);121}122123@Override124public AsynchronousChannelGroupImpl group() {125return iocp;126}127128/**129* Invoked by Iocp when an I/O operation competes.130*/131@Override132public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {133return ioCache.remove(overlapped);134}135136// invoked by WindowsAsynchronousServerSocketChannelImpl137long handle() {138return handle;139}140141// invoked by WindowsAsynchronousServerSocketChannelImpl when new connection142// accept143void setConnected(InetSocketAddress localAddress,144InetSocketAddress remoteAddress)145{146synchronized (stateLock) {147state = ST_CONNECTED;148this.localAddress = localAddress;149this.remoteAddress = remoteAddress;150}151}152153@Override154void implClose() throws IOException {155// close socket (may cause outstanding async I/O operations to fail).156closesocket0(handle);157158// waits until all I/O operations have completed159ioCache.close();160161// release arrays of WSABUF structures162unsafe.freeMemory(readBufferArray);163unsafe.freeMemory(writeBufferArray);164165// finally disassociate from the completion port (key can be 0 if166// channel created when group is shutdown)167if (completionKey != 0)168iocp.disassociate(completionKey);169}170171@Override172public void onCancel(PendingFuture<?,?> task) {173if (task.getContext() instanceof ConnectTask)174killConnect();175if (task.getContext() instanceof ReadTask)176killReading();177if (task.getContext() instanceof WriteTask)178killWriting();179}180181/**182* Implements the task to initiate a connection and the handler to183* consume the result when the connection is established (or fails).184*/185private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {186private final InetSocketAddress remote;187private final PendingFuture<Void,A> result;188189ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {190this.remote = remote;191this.result = result;192}193194private void closeChannel() {195try {196close();197} catch (IOException ignore) { }198}199200private IOException toIOException(Throwable x) {201if (x instanceof IOException) {202if (x instanceof ClosedChannelException)203x = new AsynchronousCloseException();204return (IOException)x;205}206return new IOException(x);207}208209/**210* Invoke after a connection is successfully established.211*/212private void afterConnect() throws IOException {213updateConnectContext(handle);214synchronized (stateLock) {215state = ST_CONNECTED;216remoteAddress = remote;217}218}219220/**221* Task to initiate a connection.222*/223@Override224public void run() {225long overlapped = 0L;226Throwable exc = null;227try {228begin();229230// synchronize on result to allow this thread handle the case231// where the connection is established immediately.232synchronized (result) {233overlapped = ioCache.add(result);234// initiate the connection235int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),236remote.getPort(), overlapped);237if (n == IOStatus.UNAVAILABLE) {238// connection is pending239return;240}241242// connection established immediately243afterConnect();244result.setResult(null);245}246} catch (Throwable x) {247if (overlapped != 0L)248ioCache.remove(overlapped);249exc = x;250} finally {251end();252}253254if (exc != null) {255closeChannel();256exc = SocketExceptions.of(toIOException(exc), remote);257result.setFailure(exc);258}259Invoker.invoke(result);260}261262/**263* Invoked by handler thread when connection established.264*/265@Override266public void completed(int bytesTransferred, boolean canInvokeDirect) {267Throwable exc = null;268try {269begin();270afterConnect();271result.setResult(null);272} catch (Throwable x) {273// channel is closed or unable to finish connect274exc = x;275} finally {276end();277}278279// can't close channel while in begin/end block280if (exc != null) {281closeChannel();282IOException ee = toIOException(exc);283ee = SocketExceptions.of(ee, remote);284result.setFailure(ee);285}286287if (canInvokeDirect) {288Invoker.invokeUnchecked(result);289} else {290Invoker.invoke(result);291}292}293294/**295* Invoked by handler thread when failed to establish connection.296*/297@Override298public void failed(int error, IOException x) {299x = SocketExceptions.of(x, remote);300if (isOpen()) {301closeChannel();302result.setFailure(x);303} else {304x = SocketExceptions.of(new AsynchronousCloseException(), remote);305result.setFailure(x);306}307Invoker.invoke(result);308}309}310311@SuppressWarnings("removal")312private void doPrivilegedBind(final SocketAddress sa) throws IOException {313try {314AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {315public Void run() throws IOException {316bind(sa);317return null;318}319});320} catch (PrivilegedActionException e) {321throw (IOException) e.getException();322}323}324325@Override326<A> Future<Void> implConnect(SocketAddress remote,327A attachment,328CompletionHandler<Void,? super A> handler)329{330if (!isOpen()) {331Throwable exc = new ClosedChannelException();332if (handler == null)333return CompletedFuture.withFailure(exc);334Invoker.invoke(this, handler, attachment, null, exc);335return null;336}337338InetSocketAddress isa = Net.checkAddress(remote);339340// permission check341@SuppressWarnings("removal")342SecurityManager sm = System.getSecurityManager();343if (sm != null)344sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());345346// check and update state347// ConnectEx requires the socket to be bound to a local address348IOException bindException = null;349synchronized (stateLock) {350if (state == ST_CONNECTED)351throw new AlreadyConnectedException();352if (state == ST_PENDING)353throw new ConnectionPendingException();354if (localAddress == null) {355try {356SocketAddress any = new InetSocketAddress(0);357if (sm == null) {358bind(any);359} else {360doPrivilegedBind(any);361}362} catch (IOException x) {363bindException = x;364}365}366if (bindException == null)367state = ST_PENDING;368}369370// handle bind failure371if (bindException != null) {372try {373close();374} catch (IOException ignore) { }375if (handler == null)376return CompletedFuture.withFailure(bindException);377Invoker.invoke(this, handler, attachment, null, bindException);378return null;379}380381// setup task382PendingFuture<Void,A> result =383new PendingFuture<Void,A>(this, handler, attachment);384ConnectTask<A> task = new ConnectTask<A>(isa, result);385result.setContext(task);386387// initiate I/O388task.run();389return result;390}391392/**393* Implements the task to initiate a read and the handler to consume the394* result when the read completes.395*/396private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {397private final ByteBuffer[] bufs;398private final int numBufs;399private final boolean scatteringRead;400private final PendingFuture<V,A> result;401402// set by run method403private ByteBuffer[] shadow;404private Runnable scopeHandleReleasers;405406ReadTask(ByteBuffer[] bufs,407boolean scatteringRead,408PendingFuture<V,A> result)409{410this.bufs = bufs;411this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;412this.scatteringRead = scatteringRead;413this.result = result;414}415416/**417* Invoked prior to read to prepare the WSABUF array. Where necessary,418* it substitutes non-direct buffers with direct buffers.419*/420void prepareBuffers() {421scopeHandleReleasers = IOUtil.acquireScopes(bufs);422shadow = new ByteBuffer[numBufs];423long address = readBufferArray;424for (int i=0; i<numBufs; i++) {425ByteBuffer dst = bufs[i];426int pos = dst.position();427int lim = dst.limit();428assert (pos <= lim);429int rem = (pos <= lim ? lim - pos : 0);430long a;431if (!(dst instanceof DirectBuffer)) {432// substitute with direct buffer433ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);434shadow[i] = bb;435a = IOUtil.bufferAddress(bb);436} else {437shadow[i] = dst;438a = IOUtil.bufferAddress(dst) + pos;439}440unsafe.putAddress(address + OFFSETOF_BUF, a);441unsafe.putInt(address + OFFSETOF_LEN, rem);442address += SIZEOF_WSABUF;443}444}445446/**447* Invoked after a read has completed to update the buffer positions448* and release any substituted buffers.449*/450void updateBuffers(int bytesRead) {451for (int i=0; i<numBufs; i++) {452ByteBuffer nextBuffer = shadow[i];453int pos = nextBuffer.position();454int len = nextBuffer.remaining();455if (bytesRead >= len) {456bytesRead -= len;457int newPosition = pos + len;458try {459nextBuffer.position(newPosition);460} catch (IllegalArgumentException x) {461// position changed by another462}463} else { // Buffers not completely filled464if (bytesRead > 0) {465assert(pos + bytesRead < (long)Integer.MAX_VALUE);466int newPosition = pos + bytesRead;467try {468nextBuffer.position(newPosition);469} catch (IllegalArgumentException x) {470// position changed by another471}472}473break;474}475}476477// Put results from shadow into the slow buffers478for (int i=0; i<numBufs; i++) {479if (!(bufs[i] instanceof DirectBuffer)) {480shadow[i].flip();481try {482bufs[i].put(shadow[i]);483} catch (BufferOverflowException x) {484// position changed by another485}486}487}488}489490void releaseBuffers() {491for (int i=0; i<numBufs; i++) {492if (!(bufs[i] instanceof DirectBuffer)) {493Util.releaseTemporaryDirectBuffer(shadow[i]);494}495}496IOUtil.releaseScopes(scopeHandleReleasers);497}498499@Override500@SuppressWarnings("unchecked")501public void run() {502long overlapped = 0L;503boolean prepared = false;504boolean pending = false;505506try {507begin();508509// substitute non-direct buffers510prepareBuffers();511prepared = true;512513// get an OVERLAPPED structure (from the cache or allocate)514overlapped = ioCache.add(result);515516// initiate read517int n = read0(handle, numBufs, readBufferArray, overlapped);518if (n == IOStatus.UNAVAILABLE) {519// I/O is pending520pending = true;521return;522}523if (n == IOStatus.EOF) {524// input shutdown525enableReading();526if (scatteringRead) {527result.setResult((V)Long.valueOf(-1L));528} else {529result.setResult((V)Integer.valueOf(-1));530}531} else {532throw new InternalError("Read completed immediately");533}534} catch (Throwable x) {535// failed to initiate read536// reset read flag before releasing waiters537enableReading();538if (x instanceof ClosedChannelException)539x = new AsynchronousCloseException();540if (!(x instanceof IOException))541x = new IOException(x);542result.setFailure(x);543} finally {544// release resources if I/O not pending545if (!pending) {546if (overlapped != 0L)547ioCache.remove(overlapped);548if (prepared)549releaseBuffers();550}551end();552}553554// invoke completion handler555Invoker.invoke(result);556}557558/**559* Executed when the I/O has completed560*/561@Override562@SuppressWarnings("unchecked")563public void completed(int bytesTransferred, boolean canInvokeDirect) {564if (bytesTransferred == 0) {565bytesTransferred = -1; // EOF566} else {567updateBuffers(bytesTransferred);568}569570// return direct buffer to cache if substituted571releaseBuffers();572573// release waiters if not already released by timeout574synchronized (result) {575if (result.isDone())576return;577enableReading();578if (scatteringRead) {579result.setResult((V)Long.valueOf(bytesTransferred));580} else {581result.setResult((V)Integer.valueOf(bytesTransferred));582}583}584if (canInvokeDirect) {585Invoker.invokeUnchecked(result);586} else {587Invoker.invoke(result);588}589}590591@Override592public void failed(int error, IOException x) {593// return direct buffer to cache if substituted594releaseBuffers();595596// release waiters if not already released by timeout597if (!isOpen())598x = new AsynchronousCloseException();599600synchronized (result) {601if (result.isDone())602return;603enableReading();604result.setFailure(x);605}606Invoker.invoke(result);607}608609/**610* Invoked if timeout expires before it is cancelled611*/612void timeout() {613// synchronize on result as the I/O could complete/fail614synchronized (result) {615if (result.isDone())616return;617618// kill further reading before releasing waiters619enableReading(true);620result.setFailure(new InterruptedByTimeoutException());621}622623// invoke handler without any locks624Invoker.invoke(result);625}626}627628@Override629<V extends Number,A> Future<V> implRead(boolean isScatteringRead,630ByteBuffer dst,631ByteBuffer[] dsts,632long timeout,633TimeUnit unit,634A attachment,635CompletionHandler<V,? super A> handler)636{637// setup task638PendingFuture<V,A> result =639new PendingFuture<V,A>(this, handler, attachment);640ByteBuffer[] bufs;641if (isScatteringRead) {642bufs = dsts;643} else {644bufs = new ByteBuffer[1];645bufs[0] = dst;646}647final ReadTask<V,A> readTask =648new ReadTask<V,A>(bufs, isScatteringRead, result);649result.setContext(readTask);650651// schedule timeout652if (timeout > 0L) {653Future<?> timeoutTask = iocp.schedule(new Runnable() {654public void run() {655readTask.timeout();656}657}, timeout, unit);658result.setTimeoutTask(timeoutTask);659}660661// initiate I/O662readTask.run();663return result;664}665666/**667* Implements the task to initiate a write and the handler to consume the668* result when the write completes.669*/670private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {671private final ByteBuffer[] bufs;672private final int numBufs;673private final boolean gatheringWrite;674private final PendingFuture<V,A> result;675676// set by run method677private ByteBuffer[] shadow;678private Runnable scopeHandleReleasers;679680WriteTask(ByteBuffer[] bufs,681boolean gatheringWrite,682PendingFuture<V,A> result)683{684this.bufs = bufs;685this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;686this.gatheringWrite = gatheringWrite;687this.result = result;688}689690/**691* Invoked prior to write to prepare the WSABUF array. Where necessary,692* it substitutes non-direct buffers with direct buffers.693*/694void prepareBuffers() {695scopeHandleReleasers = IOUtil.acquireScopes(bufs);696shadow = new ByteBuffer[numBufs];697long address = writeBufferArray;698for (int i=0; i<numBufs; i++) {699ByteBuffer src = bufs[i];700int pos = src.position();701int lim = src.limit();702assert (pos <= lim);703int rem = (pos <= lim ? lim - pos : 0);704long a;705if (!(src instanceof DirectBuffer)) {706// substitute with direct buffer707ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);708bb.put(src);709bb.flip();710src.position(pos); // leave heap buffer untouched for now711shadow[i] = bb;712a = IOUtil.bufferAddress(bb);713} else {714shadow[i] = src;715a = IOUtil.bufferAddress(src) + pos;716}717unsafe.putAddress(address + OFFSETOF_BUF, a);718unsafe.putInt(address + OFFSETOF_LEN, rem);719address += SIZEOF_WSABUF;720}721}722723/**724* Invoked after a write has completed to update the buffer positions725* and release any substituted buffers.726*/727void updateBuffers(int bytesWritten) {728// Notify the buffers how many bytes were taken729for (int i=0; i<numBufs; i++) {730ByteBuffer nextBuffer = bufs[i];731int pos = nextBuffer.position();732int lim = nextBuffer.limit();733int len = (pos <= lim ? lim - pos : lim);734if (bytesWritten >= len) {735bytesWritten -= len;736int newPosition = pos + len;737try {738nextBuffer.position(newPosition);739} catch (IllegalArgumentException x) {740// position changed by someone else741}742} else { // Buffers not completely filled743if (bytesWritten > 0) {744assert(pos + bytesWritten < (long)Integer.MAX_VALUE);745int newPosition = pos + bytesWritten;746try {747nextBuffer.position(newPosition);748} catch (IllegalArgumentException x) {749// position changed by someone else750}751}752break;753}754}755}756757void releaseBuffers() {758for (int i=0; i<numBufs; i++) {759if (!(bufs[i] instanceof DirectBuffer)) {760Util.releaseTemporaryDirectBuffer(shadow[i]);761}762}763IOUtil.releaseScopes(scopeHandleReleasers);764}765766@Override767//@SuppressWarnings("unchecked")768public void run() {769long overlapped = 0L;770boolean prepared = false;771boolean pending = false;772boolean shutdown = false;773774try {775begin();776777// substitute non-direct buffers778prepareBuffers();779prepared = true;780781// get an OVERLAPPED structure (from the cache or allocate)782overlapped = ioCache.add(result);783int n = write0(handle, numBufs, writeBufferArray, overlapped);784if (n == IOStatus.UNAVAILABLE) {785// I/O is pending786pending = true;787return;788}789if (n == IOStatus.EOF) {790// special case for shutdown output791shutdown = true;792throw new ClosedChannelException();793}794// write completed immediately795throw new InternalError("Write completed immediately");796} catch (Throwable x) {797// write failed. Enable writing before releasing waiters.798enableWriting();799if (!shutdown && (x instanceof ClosedChannelException))800x = new AsynchronousCloseException();801if (!(x instanceof IOException))802x = new IOException(x);803result.setFailure(x);804} finally {805// release resources if I/O not pending806if (!pending) {807if (overlapped != 0L)808ioCache.remove(overlapped);809if (prepared)810releaseBuffers();811}812end();813}814815// invoke completion handler816Invoker.invoke(result);817}818819/**820* Executed when the I/O has completed821*/822@Override823@SuppressWarnings("unchecked")824public void completed(int bytesTransferred, boolean canInvokeDirect) {825updateBuffers(bytesTransferred);826827// return direct buffer to cache if substituted828releaseBuffers();829830// release waiters if not already released by timeout831synchronized (result) {832if (result.isDone())833return;834enableWriting();835if (gatheringWrite) {836result.setResult((V)Long.valueOf(bytesTransferred));837} else {838result.setResult((V)Integer.valueOf(bytesTransferred));839}840}841if (canInvokeDirect) {842Invoker.invokeUnchecked(result);843} else {844Invoker.invoke(result);845}846}847848@Override849public void failed(int error, IOException x) {850// return direct buffer to cache if substituted851releaseBuffers();852853// release waiters if not already released by timeout854if (!isOpen())855x = new AsynchronousCloseException();856857synchronized (result) {858if (result.isDone())859return;860enableWriting();861result.setFailure(x);862}863Invoker.invoke(result);864}865866/**867* Invoked if timeout expires before it is cancelled868*/869void timeout() {870// synchronize on result as the I/O could complete/fail871synchronized (result) {872if (result.isDone())873return;874875// kill further writing before releasing waiters876enableWriting(true);877result.setFailure(new InterruptedByTimeoutException());878}879880// invoke handler without any locks881Invoker.invoke(result);882}883}884885@Override886<V extends Number,A> Future<V> implWrite(boolean gatheringWrite,887ByteBuffer src,888ByteBuffer[] srcs,889long timeout,890TimeUnit unit,891A attachment,892CompletionHandler<V,? super A> handler)893{894// setup task895PendingFuture<V,A> result =896new PendingFuture<V,A>(this, handler, attachment);897ByteBuffer[] bufs;898if (gatheringWrite) {899bufs = srcs;900} else {901bufs = new ByteBuffer[1];902bufs[0] = src;903}904final WriteTask<V,A> writeTask =905new WriteTask<V,A>(bufs, gatheringWrite, result);906result.setContext(writeTask);907908// schedule timeout909if (timeout > 0L) {910Future<?> timeoutTask = iocp.schedule(new Runnable() {911public void run() {912writeTask.timeout();913}914}, timeout, unit);915result.setTimeoutTask(timeoutTask);916}917918// initiate I/O919writeTask.run();920return result;921}922923// -- Native methods --924925private static native void initIDs();926927private static native int connect0(long socket, boolean preferIPv6,928InetAddress remote, int remotePort, long overlapped) throws IOException;929930private static native void updateConnectContext(long socket) throws IOException;931932private static native int read0(long socket, int count, long addres, long overlapped)933throws IOException;934935private static native int write0(long socket, int count, long address,936long overlapped) throws IOException;937938private static native void shutdown0(long socket, int how) throws IOException;939940private static native void closesocket0(long socket) throws IOException;941942static {943IOUtil.load();944initIDs();945}946}947948949