Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
32288 views
/*1* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.*;28import java.nio.ByteBuffer;29import java.nio.BufferOverflowException;30import java.net.*;31import java.util.concurrent.*;32import java.io.IOException;33import java.security.AccessController;34import java.security.PrivilegedActionException;35import java.security.PrivilegedExceptionAction;36import sun.misc.Unsafe;3738/**39* Windows implementation of AsynchronousSocketChannel using overlapped I/O.40*/4142class WindowsAsynchronousSocketChannelImpl43extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel44{45private static final Unsafe unsafe = Unsafe.getUnsafe();46private static int addressSize = unsafe.addressSize();4748private static int dependsArch(int value32, int value64) {49return (addressSize == 4) ? value32 : value64;50}5152/*53* typedef struct _WSABUF {54* u_long len;55* char FAR * buf;56* } WSABUF;57*/58private static final int SIZEOF_WSABUF = dependsArch(8, 16);59private static final int OFFSETOF_LEN = 0;60private static final int OFFSETOF_BUF = dependsArch(4, 8);6162// maximum vector size for scatter/gather I/O63private static final int MAX_WSABUF = 16;6465private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;666768// socket handle. Use begin()/end() around each usage of this handle.69final long handle;7071// I/O completion port that the socket is associated with72private final Iocp iocp;7374// completion key to identify channel when I/O completes75private final int completionKey;7677// Pending I/O operations are tied to an OVERLAPPED structure that can only78// be released when the I/O completion event is posted to the completion79// port. Where I/O operations complete immediately then it is possible80// there may be more than two OVERLAPPED structures in use.81private final PendingIoCache ioCache;8283// per-channel arrays of WSABUF structures84private final long readBufferArray;85private final long writeBufferArray;868788WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)89throws IOException90{91super(iocp);9293// associate socket with default completion port94long h = IOUtil.fdVal(fd);95int key = 0;96try {97key = iocp.associate(this, h);98} catch (ShutdownChannelGroupException x) {99if (failIfGroupShutdown) {100closesocket0(h);101throw x;102}103} catch (IOException x) {104closesocket0(h);105throw x;106}107108this.handle = h;109this.iocp = iocp;110this.completionKey = key;111this.ioCache = new PendingIoCache();112113// allocate WSABUF arrays114this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);115this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);116}117118WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {119this(iocp, true);120}121122@Override123public AsynchronousChannelGroupImpl group() {124return iocp;125}126127/**128* Invoked by Iocp when an I/O operation competes.129*/130@Override131public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {132return ioCache.remove(overlapped);133}134135// invoked by WindowsAsynchronousServerSocketChannelImpl136long handle() {137return handle;138}139140// invoked by WindowsAsynchronousServerSocketChannelImpl when new connection141// accept142void setConnected(InetSocketAddress localAddress,143InetSocketAddress remoteAddress)144{145synchronized (stateLock) {146state = ST_CONNECTED;147this.localAddress = localAddress;148this.remoteAddress = remoteAddress;149}150}151152@Override153void implClose() throws IOException {154// close socket (may cause outstanding async I/O operations to fail).155closesocket0(handle);156157// waits until all I/O operations have completed158ioCache.close();159160// release arrays of WSABUF structures161unsafe.freeMemory(readBufferArray);162unsafe.freeMemory(writeBufferArray);163164// finally disassociate from the completion port (key can be 0 if165// channel created when group is shutdown)166if (completionKey != 0)167iocp.disassociate(completionKey);168}169170@Override171public void onCancel(PendingFuture<?,?> task) {172if (task.getContext() instanceof ConnectTask)173killConnect();174if (task.getContext() instanceof ReadTask)175killReading();176if (task.getContext() instanceof WriteTask)177killWriting();178}179180/**181* Implements the task to initiate a connection and the handler to182* consume the result when the connection is established (or fails).183*/184private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {185private final InetSocketAddress remote;186private final PendingFuture<Void,A> result;187188ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {189this.remote = remote;190this.result = result;191}192193private void closeChannel() {194try {195close();196} catch (IOException ignore) { }197}198199private IOException toIOException(Throwable x) {200if (x instanceof IOException) {201if (x instanceof ClosedChannelException)202x = new AsynchronousCloseException();203return (IOException)x;204}205return new IOException(x);206}207208/**209* Invoke after a connection is successfully established.210*/211private void afterConnect() throws IOException {212updateConnectContext(handle);213synchronized (stateLock) {214state = ST_CONNECTED;215remoteAddress = remote;216}217}218219/**220* Task to initiate a connection.221*/222@Override223public void run() {224long overlapped = 0L;225Throwable exc = null;226try {227begin();228229// synchronize on result to allow this thread handle the case230// where the connection is established immediately.231synchronized (result) {232overlapped = ioCache.add(result);233// initiate the connection234int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),235remote.getPort(), overlapped);236if (n == IOStatus.UNAVAILABLE) {237// connection is pending238return;239}240241// connection established immediately242afterConnect();243result.setResult(null);244}245} catch (Throwable x) {246if (overlapped != 0L)247ioCache.remove(overlapped);248exc = x;249} finally {250end();251}252253if (exc != null) {254closeChannel();255result.setFailure(toIOException(exc));256}257Invoker.invoke(result);258}259260/**261* Invoked by handler thread when connection established.262*/263@Override264public void completed(int bytesTransferred, boolean canInvokeDirect) {265Throwable exc = null;266try {267begin();268afterConnect();269result.setResult(null);270} catch (Throwable x) {271// channel is closed or unable to finish connect272exc = x;273} finally {274end();275}276277// can't close channel while in begin/end block278if (exc != null) {279closeChannel();280result.setFailure(toIOException(exc));281}282283if (canInvokeDirect) {284Invoker.invokeUnchecked(result);285} else {286Invoker.invoke(result);287}288}289290/**291* Invoked by handler thread when failed to establish connection.292*/293@Override294public void failed(int error, IOException x) {295if (isOpen()) {296closeChannel();297result.setFailure(x);298} else {299result.setFailure(new AsynchronousCloseException());300}301Invoker.invoke(result);302}303}304305private void doPrivilegedBind(final SocketAddress sa) throws IOException {306try {307AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {308public Void run() throws IOException {309bind(sa);310return null;311}312});313} catch (PrivilegedActionException e) {314throw (IOException) e.getException();315}316}317318@Override319<A> Future<Void> implConnect(SocketAddress remote,320A attachment,321CompletionHandler<Void,? super A> handler)322{323if (!isOpen()) {324Throwable exc = new ClosedChannelException();325if (handler == null)326return CompletedFuture.withFailure(exc);327Invoker.invoke(this, handler, attachment, null, exc);328return null;329}330331InetSocketAddress isa = Net.checkAddress(remote);332333// permission check334SecurityManager sm = System.getSecurityManager();335if (sm != null)336sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());337338// check and update state339// ConnectEx requires the socket to be bound to a local address340IOException bindException = null;341synchronized (stateLock) {342if (state == ST_CONNECTED)343throw new AlreadyConnectedException();344if (state == ST_PENDING)345throw new ConnectionPendingException();346if (localAddress == null) {347try {348SocketAddress any = new InetSocketAddress(0);349if (sm == null) {350bind(any);351} else {352doPrivilegedBind(any);353}354} catch (IOException x) {355bindException = x;356}357}358if (bindException == null)359state = ST_PENDING;360}361362// handle bind failure363if (bindException != null) {364try {365close();366} catch (IOException ignore) { }367if (handler == null)368return CompletedFuture.withFailure(bindException);369Invoker.invoke(this, handler, attachment, null, bindException);370return null;371}372373// setup task374PendingFuture<Void,A> result =375new PendingFuture<Void,A>(this, handler, attachment);376ConnectTask<A> task = new ConnectTask<A>(isa, result);377result.setContext(task);378379// initiate I/O380if (Iocp.supportsThreadAgnosticIo()) {381task.run();382} else {383Invoker.invokeOnThreadInThreadPool(this, task);384}385return result;386}387388/**389* Implements the task to initiate a read and the handler to consume the390* result when the read completes.391*/392private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {393private final ByteBuffer[] bufs;394private final int numBufs;395private final boolean scatteringRead;396private final PendingFuture<V,A> result;397398// set by run method399private ByteBuffer[] shadow;400401ReadTask(ByteBuffer[] bufs,402boolean scatteringRead,403PendingFuture<V,A> result)404{405this.bufs = bufs;406this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;407this.scatteringRead = scatteringRead;408this.result = result;409}410411/**412* Invoked prior to read to prepare the WSABUF array. Where necessary,413* it substitutes non-direct buffers with direct buffers.414*/415void prepareBuffers() {416shadow = new ByteBuffer[numBufs];417long address = readBufferArray;418for (int i=0; i<numBufs; i++) {419ByteBuffer dst = bufs[i];420int pos = dst.position();421int lim = dst.limit();422assert (pos <= lim);423int rem = (pos <= lim ? lim - pos : 0);424long a;425if (!(dst instanceof DirectBuffer)) {426// substitute with direct buffer427ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);428shadow[i] = bb;429a = ((DirectBuffer)bb).address();430} else {431shadow[i] = dst;432a = ((DirectBuffer)dst).address() + pos;433}434unsafe.putAddress(address + OFFSETOF_BUF, a);435unsafe.putInt(address + OFFSETOF_LEN, rem);436address += SIZEOF_WSABUF;437}438}439440/**441* Invoked after a read has completed to update the buffer positions442* and release any substituted buffers.443*/444void updateBuffers(int bytesRead) {445for (int i=0; i<numBufs; i++) {446ByteBuffer nextBuffer = shadow[i];447int pos = nextBuffer.position();448int len = nextBuffer.remaining();449if (bytesRead >= len) {450bytesRead -= len;451int newPosition = pos + len;452try {453nextBuffer.position(newPosition);454} catch (IllegalArgumentException x) {455// position changed by another456}457} else { // Buffers not completely filled458if (bytesRead > 0) {459assert(pos + bytesRead < (long)Integer.MAX_VALUE);460int newPosition = pos + bytesRead;461try {462nextBuffer.position(newPosition);463} catch (IllegalArgumentException x) {464// position changed by another465}466}467break;468}469}470471// Put results from shadow into the slow buffers472for (int i=0; i<numBufs; i++) {473if (!(bufs[i] instanceof DirectBuffer)) {474shadow[i].flip();475try {476bufs[i].put(shadow[i]);477} catch (BufferOverflowException x) {478// position changed by another479}480}481}482}483484void releaseBuffers() {485for (int i=0; i<numBufs; i++) {486if (!(bufs[i] instanceof DirectBuffer)) {487Util.releaseTemporaryDirectBuffer(shadow[i]);488}489}490}491492@Override493@SuppressWarnings("unchecked")494public void run() {495long overlapped = 0L;496boolean prepared = false;497boolean pending = false;498499try {500begin();501502// substitute non-direct buffers503prepareBuffers();504prepared = true;505506// get an OVERLAPPED structure (from the cache or allocate)507overlapped = ioCache.add(result);508509// initiate read510int n = read0(handle, numBufs, readBufferArray, overlapped);511if (n == IOStatus.UNAVAILABLE) {512// I/O is pending513pending = true;514return;515}516if (n == IOStatus.EOF) {517// input shutdown518enableReading();519if (scatteringRead) {520result.setResult((V)Long.valueOf(-1L));521} else {522result.setResult((V)Integer.valueOf(-1));523}524} else {525throw new InternalError("Read completed immediately");526}527} catch (Throwable x) {528// failed to initiate read529// reset read flag before releasing waiters530enableReading();531if (x instanceof ClosedChannelException)532x = new AsynchronousCloseException();533if (!(x instanceof IOException))534x = new IOException(x);535result.setFailure(x);536} finally {537// release resources if I/O not pending538if (!pending) {539if (overlapped != 0L)540ioCache.remove(overlapped);541if (prepared)542releaseBuffers();543}544end();545}546547// invoke completion handler548Invoker.invoke(result);549}550551/**552* Executed when the I/O has completed553*/554@Override555@SuppressWarnings("unchecked")556public void completed(int bytesTransferred, boolean canInvokeDirect) {557if (bytesTransferred == 0) {558bytesTransferred = -1; // EOF559} else {560updateBuffers(bytesTransferred);561}562563// return direct buffer to cache if substituted564releaseBuffers();565566// release waiters if not already released by timeout567synchronized (result) {568if (result.isDone())569return;570enableReading();571if (scatteringRead) {572result.setResult((V)Long.valueOf(bytesTransferred));573} else {574result.setResult((V)Integer.valueOf(bytesTransferred));575}576}577if (canInvokeDirect) {578Invoker.invokeUnchecked(result);579} else {580Invoker.invoke(result);581}582}583584@Override585public void failed(int error, IOException x) {586// return direct buffer to cache if substituted587releaseBuffers();588589// release waiters if not already released by timeout590if (!isOpen())591x = new AsynchronousCloseException();592593synchronized (result) {594if (result.isDone())595return;596enableReading();597result.setFailure(x);598}599Invoker.invoke(result);600}601602/**603* Invoked if timeout expires before it is cancelled604*/605void timeout() {606// synchronize on result as the I/O could complete/fail607synchronized (result) {608if (result.isDone())609return;610611// kill further reading before releasing waiters612enableReading(true);613result.setFailure(new InterruptedByTimeoutException());614}615616// invoke handler without any locks617Invoker.invoke(result);618}619}620621@Override622<V extends Number,A> Future<V> implRead(boolean isScatteringRead,623ByteBuffer dst,624ByteBuffer[] dsts,625long timeout,626TimeUnit unit,627A attachment,628CompletionHandler<V,? super A> handler)629{630// setup task631PendingFuture<V,A> result =632new PendingFuture<V,A>(this, handler, attachment);633ByteBuffer[] bufs;634if (isScatteringRead) {635bufs = dsts;636} else {637bufs = new ByteBuffer[1];638bufs[0] = dst;639}640final ReadTask<V,A> readTask =641new ReadTask<V,A>(bufs, isScatteringRead, result);642result.setContext(readTask);643644// schedule timeout645if (timeout > 0L) {646Future<?> timeoutTask = iocp.schedule(new Runnable() {647public void run() {648readTask.timeout();649}650}, timeout, unit);651result.setTimeoutTask(timeoutTask);652}653654// initiate I/O655if (Iocp.supportsThreadAgnosticIo()) {656readTask.run();657} else {658Invoker.invokeOnThreadInThreadPool(this, readTask);659}660return result;661}662663/**664* Implements the task to initiate a write and the handler to consume the665* result when the write completes.666*/667private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {668private final ByteBuffer[] bufs;669private final int numBufs;670private final boolean gatheringWrite;671private final PendingFuture<V,A> result;672673// set by run method674private ByteBuffer[] shadow;675676WriteTask(ByteBuffer[] bufs,677boolean gatheringWrite,678PendingFuture<V,A> result)679{680this.bufs = bufs;681this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;682this.gatheringWrite = gatheringWrite;683this.result = result;684}685686/**687* Invoked prior to write to prepare the WSABUF array. Where necessary,688* it substitutes non-direct buffers with direct buffers.689*/690void prepareBuffers() {691shadow = new ByteBuffer[numBufs];692long address = writeBufferArray;693for (int i=0; i<numBufs; i++) {694ByteBuffer src = bufs[i];695int pos = src.position();696int lim = src.limit();697assert (pos <= lim);698int rem = (pos <= lim ? lim - pos : 0);699long a;700if (!(src instanceof DirectBuffer)) {701// substitute with direct buffer702ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);703bb.put(src);704bb.flip();705src.position(pos); // leave heap buffer untouched for now706shadow[i] = bb;707a = ((DirectBuffer)bb).address();708} else {709shadow[i] = src;710a = ((DirectBuffer)src).address() + pos;711}712unsafe.putAddress(address + OFFSETOF_BUF, a);713unsafe.putInt(address + OFFSETOF_LEN, rem);714address += SIZEOF_WSABUF;715}716}717718/**719* Invoked after a write has completed to update the buffer positions720* and release any substituted buffers.721*/722void updateBuffers(int bytesWritten) {723// Notify the buffers how many bytes were taken724for (int i=0; i<numBufs; i++) {725ByteBuffer nextBuffer = bufs[i];726int pos = nextBuffer.position();727int lim = nextBuffer.limit();728int len = (pos <= lim ? lim - pos : lim);729if (bytesWritten >= len) {730bytesWritten -= len;731int newPosition = pos + len;732try {733nextBuffer.position(newPosition);734} catch (IllegalArgumentException x) {735// position changed by someone else736}737} else { // Buffers not completely filled738if (bytesWritten > 0) {739assert(pos + bytesWritten < (long)Integer.MAX_VALUE);740int newPosition = pos + bytesWritten;741try {742nextBuffer.position(newPosition);743} catch (IllegalArgumentException x) {744// position changed by someone else745}746}747break;748}749}750}751752void releaseBuffers() {753for (int i=0; i<numBufs; i++) {754if (!(bufs[i] instanceof DirectBuffer)) {755Util.releaseTemporaryDirectBuffer(shadow[i]);756}757}758}759760@Override761//@SuppressWarnings("unchecked")762public void run() {763long overlapped = 0L;764boolean prepared = false;765boolean pending = false;766boolean shutdown = false;767768try {769begin();770771// substitute non-direct buffers772prepareBuffers();773prepared = true;774775// get an OVERLAPPED structure (from the cache or allocate)776overlapped = ioCache.add(result);777int n = write0(handle, numBufs, writeBufferArray, overlapped);778if (n == IOStatus.UNAVAILABLE) {779// I/O is pending780pending = true;781return;782}783if (n == IOStatus.EOF) {784// special case for shutdown output785shutdown = true;786throw new ClosedChannelException();787}788// write completed immediately789throw new InternalError("Write completed immediately");790} catch (Throwable x) {791// write failed. Enable writing before releasing waiters.792enableWriting();793if (!shutdown && (x instanceof ClosedChannelException))794x = new AsynchronousCloseException();795if (!(x instanceof IOException))796x = new IOException(x);797result.setFailure(x);798} finally {799// release resources if I/O not pending800if (!pending) {801if (overlapped != 0L)802ioCache.remove(overlapped);803if (prepared)804releaseBuffers();805}806end();807}808809// invoke completion handler810Invoker.invoke(result);811}812813/**814* Executed when the I/O has completed815*/816@Override817@SuppressWarnings("unchecked")818public void completed(int bytesTransferred, boolean canInvokeDirect) {819updateBuffers(bytesTransferred);820821// return direct buffer to cache if substituted822releaseBuffers();823824// release waiters if not already released by timeout825synchronized (result) {826if (result.isDone())827return;828enableWriting();829if (gatheringWrite) {830result.setResult((V)Long.valueOf(bytesTransferred));831} else {832result.setResult((V)Integer.valueOf(bytesTransferred));833}834}835if (canInvokeDirect) {836Invoker.invokeUnchecked(result);837} else {838Invoker.invoke(result);839}840}841842@Override843public void failed(int error, IOException x) {844// return direct buffer to cache if substituted845releaseBuffers();846847// release waiters if not already released by timeout848if (!isOpen())849x = new AsynchronousCloseException();850851synchronized (result) {852if (result.isDone())853return;854enableWriting();855result.setFailure(x);856}857Invoker.invoke(result);858}859860/**861* Invoked if timeout expires before it is cancelled862*/863void timeout() {864// synchronize on result as the I/O could complete/fail865synchronized (result) {866if (result.isDone())867return;868869// kill further writing before releasing waiters870enableWriting(true);871result.setFailure(new InterruptedByTimeoutException());872}873874// invoke handler without any locks875Invoker.invoke(result);876}877}878879@Override880<V extends Number,A> Future<V> implWrite(boolean gatheringWrite,881ByteBuffer src,882ByteBuffer[] srcs,883long timeout,884TimeUnit unit,885A attachment,886CompletionHandler<V,? super A> handler)887{888// setup task889PendingFuture<V,A> result =890new PendingFuture<V,A>(this, handler, attachment);891ByteBuffer[] bufs;892if (gatheringWrite) {893bufs = srcs;894} else {895bufs = new ByteBuffer[1];896bufs[0] = src;897}898final WriteTask<V,A> writeTask =899new WriteTask<V,A>(bufs, gatheringWrite, result);900result.setContext(writeTask);901902// schedule timeout903if (timeout > 0L) {904Future<?> timeoutTask = iocp.schedule(new Runnable() {905public void run() {906writeTask.timeout();907}908}, timeout, unit);909result.setTimeoutTask(timeoutTask);910}911912// initiate I/O (can only be done from thread in thread pool)913// initiate I/O914if (Iocp.supportsThreadAgnosticIo()) {915writeTask.run();916} else {917Invoker.invokeOnThreadInThreadPool(this, writeTask);918}919return result;920}921922// -- Native methods --923924private static native void initIDs();925926private static native int connect0(long socket, boolean preferIPv6,927InetAddress remote, int remotePort, long overlapped) throws IOException;928929private static native void updateConnectContext(long socket) throws IOException;930931private static native int read0(long socket, int count, long addres, long overlapped)932throws IOException;933934private static native int write0(long socket, int count, long address,935long overlapped) throws IOException;936937private static native void shutdown0(long socket, int how) throws IOException;938939private static native void closesocket0(long socket) throws IOException;940941static {942IOUtil.load();943initIDs();944}945}946947948