Path: blob/master/src/java.base/unix/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java
41137 views
/*1* Copyright (c) 2008, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.*;28import java.nio.ByteBuffer;29import java.net.*;30import java.util.concurrent.*;31import java.io.IOException;32import java.io.FileDescriptor;3334import sun.net.ConnectionResetException;35import sun.net.NetHooks;36import sun.net.util.SocketExceptions;37import sun.security.action.GetPropertyAction;3839/**40* Unix implementation of AsynchronousSocketChannel41*/4243class UnixAsynchronousSocketChannelImpl44extends AsynchronousSocketChannelImpl implements Port.PollableChannel45{46private static final NativeDispatcher nd = new SocketDispatcher();47private static enum OpType { CONNECT, READ, WRITE };4849private static final boolean disableSynchronousRead;50static {51String propValue = GetPropertyAction.privilegedGetProperty(52"sun.nio.ch.disableSynchronousRead", "false");53disableSynchronousRead = propValue.isEmpty() ?54true : Boolean.parseBoolean(propValue);55}5657private final Port port;58private final int fdVal;5960// used to ensure that the context for I/O operations that complete61// ascynrhonously is visible to the pooled threads handling I/O events.62private final Object updateLock = new Object();6364// pending connect (updateLock)65private boolean connectPending;66private CompletionHandler<Void,Object> connectHandler;67private Object connectAttachment;68private PendingFuture<Void,Object> connectFuture;6970// pending remote address (stateLock)71private SocketAddress pendingRemote;7273// pending read (updateLock)74private boolean readPending;75private boolean isScatteringRead;76private ByteBuffer readBuffer;77private ByteBuffer[] readBuffers;78private Runnable readScopeHandleReleasers;79private CompletionHandler<Number,Object> readHandler;80private Object readAttachment;81private PendingFuture<Number,Object> readFuture;82private Future<?> readTimer;8384// pending write (updateLock)85private boolean writePending;86private boolean isGatheringWrite;87private ByteBuffer writeBuffer;88private ByteBuffer[] writeBuffers;89private Runnable writeScopeHandleReleasers;90private CompletionHandler<Number,Object> writeHandler;91private Object writeAttachment;92private PendingFuture<Number,Object> writeFuture;93private Future<?> writeTimer;949596UnixAsynchronousSocketChannelImpl(Port port)97throws IOException98{99super(port);100101// set non-blocking102try {103IOUtil.configureBlocking(fd, false);104} catch (IOException x) {105nd.close(fd);106throw x;107}108109this.port = port;110this.fdVal = IOUtil.fdVal(fd);111112// add mapping from file descriptor to this channel113port.register(fdVal, this);114}115116// Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl117UnixAsynchronousSocketChannelImpl(Port port,118FileDescriptor fd,119InetSocketAddress remote)120throws IOException121{122super(port, fd, remote);123124this.fdVal = IOUtil.fdVal(fd);125IOUtil.configureBlocking(fd, false);126127try {128port.register(fdVal, this);129} catch (ShutdownChannelGroupException x) {130// ShutdownChannelGroupException thrown if we attempt to register a131// new channel after the group is shutdown132throw new IOException(x);133}134135this.port = port;136}137138@Override139public AsynchronousChannelGroupImpl group() {140return port;141}142143// register events for outstanding I/O operations, caller already owns updateLock144private void updateEvents() {145assert Thread.holdsLock(updateLock);146int events = 0;147if (readPending)148events |= Net.POLLIN;149if (connectPending || writePending)150events |= Net.POLLOUT;151if (events != 0)152port.startPoll(fdVal, events);153}154155// register events for outstanding I/O operations156private void lockAndUpdateEvents() {157synchronized (updateLock) {158updateEvents();159}160}161162// invoke to finish read and/or write operations163private void finish(boolean mayInvokeDirect,164boolean readable,165boolean writable)166{167boolean finishRead = false;168boolean finishWrite = false;169boolean finishConnect = false;170171// map event to pending result172synchronized (updateLock) {173if (readable && this.readPending) {174this.readPending = false;175finishRead = true;176}177if (writable) {178if (this.writePending) {179this.writePending = false;180finishWrite = true;181} else if (this.connectPending) {182this.connectPending = false;183finishConnect = true;184}185}186}187188// complete the I/O operation. Special case for when channel is189// ready for both reading and writing. In that case, submit task to190// complete write if write operation has a completion handler.191if (finishRead) {192if (finishWrite)193finishWrite(false);194finishRead(mayInvokeDirect);195return;196}197if (finishWrite) {198finishWrite(mayInvokeDirect);199}200if (finishConnect) {201finishConnect(mayInvokeDirect);202}203}204205/**206* Invoked by event handler thread when file descriptor is polled207*/208@Override209public void onEvent(int events, boolean mayInvokeDirect) {210boolean readable = (events & Net.POLLIN) > 0;211boolean writable = (events & Net.POLLOUT) > 0;212if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) {213readable = true;214writable = true;215}216finish(mayInvokeDirect, readable, writable);217}218219@Override220void implClose() throws IOException {221// remove the mapping222port.unregister(fdVal);223224// close file descriptor225nd.close(fd);226227// All outstanding I/O operations are required to fail228finish(false, true, true);229}230231@Override232public void onCancel(PendingFuture<?,?> task) {233if (task.getContext() == OpType.CONNECT)234killConnect();235if (task.getContext() == OpType.READ)236killReading();237if (task.getContext() == OpType.WRITE)238killWriting();239}240241// -- connect --242243private void setConnected() throws IOException {244synchronized (stateLock) {245state = ST_CONNECTED;246localAddress = Net.localAddress(fd);247remoteAddress = (InetSocketAddress)pendingRemote;248}249}250251private void finishConnect(boolean mayInvokeDirect) {252Throwable e = null;253try {254begin();255checkConnect(fdVal);256setConnected();257} catch (Throwable x) {258if (x instanceof ClosedChannelException)259x = new AsynchronousCloseException();260e = x;261} finally {262end();263}264if (e != null) {265if (e instanceof IOException) {266var isa = (InetSocketAddress)pendingRemote;267e = SocketExceptions.of((IOException)e, isa);268}269// close channel if connection cannot be established270try {271close();272} catch (Throwable suppressed) {273e.addSuppressed(suppressed);274}275}276277// invoke handler and set result278CompletionHandler<Void,Object> handler = connectHandler;279connectHandler = null;280Object att = connectAttachment;281PendingFuture<Void,Object> future = connectFuture;282if (handler == null) {283future.setResult(null, e);284} else {285if (mayInvokeDirect) {286Invoker.invokeUnchecked(handler, att, null, e);287} else {288Invoker.invokeIndirectly(this, handler, att, null, e);289}290}291}292293@Override294@SuppressWarnings("unchecked")295<A> Future<Void> implConnect(SocketAddress remote,296A attachment,297CompletionHandler<Void,? super A> handler)298{299if (!isOpen()) {300Throwable e = new ClosedChannelException();301if (handler == null) {302return CompletedFuture.withFailure(e);303} else {304Invoker.invoke(this, handler, attachment, null, e);305return null;306}307}308309InetSocketAddress isa = Net.checkAddress(remote);310311// permission check312@SuppressWarnings("removal")313SecurityManager sm = System.getSecurityManager();314if (sm != null)315sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());316317// check and set state318boolean notifyBeforeTcpConnect;319synchronized (stateLock) {320if (state == ST_CONNECTED)321throw new AlreadyConnectedException();322if (state == ST_PENDING)323throw new ConnectionPendingException();324state = ST_PENDING;325pendingRemote = remote;326notifyBeforeTcpConnect = (localAddress == null);327}328329Throwable e = null;330try {331begin();332// notify hook if unbound333if (notifyBeforeTcpConnect)334NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());335int n = Net.connect(fd, isa.getAddress(), isa.getPort());336if (n == IOStatus.UNAVAILABLE) {337// connection could not be established immediately338PendingFuture<Void,A> result = null;339synchronized (updateLock) {340if (handler == null) {341result = new PendingFuture<Void,A>(this, OpType.CONNECT);342this.connectFuture = (PendingFuture<Void,Object>)result;343} else {344this.connectHandler = (CompletionHandler<Void,Object>)handler;345this.connectAttachment = attachment;346}347this.connectPending = true;348updateEvents();349}350return result;351}352setConnected();353} catch (Throwable x) {354if (x instanceof ClosedChannelException)355x = new AsynchronousCloseException();356e = x;357} finally {358end();359}360361// close channel if connect fails362if (e != null) {363if (e instanceof IOException) {364e = SocketExceptions.of((IOException)e, isa);365}366try {367close();368} catch (Throwable suppressed) {369e.addSuppressed(suppressed);370}371}372if (handler == null) {373return CompletedFuture.withResult(null, e);374} else {375Invoker.invoke(this, handler, attachment, null, e);376return null;377}378}379380// -- read --381382private void finishRead(boolean mayInvokeDirect) {383int n = -1;384Throwable exc = null;385386// copy fields as we can't access them after reading is re-enabled.387boolean scattering = isScatteringRead;388CompletionHandler<Number,Object> handler = readHandler;389Object att = readAttachment;390PendingFuture<Number,Object> future = readFuture;391Future<?> timeout = readTimer;392393try {394begin();395396if (scattering) {397n = (int)IOUtil.read(fd, readBuffers, true, nd);398} else {399n = IOUtil.read(fd, readBuffer, -1, true, nd);400}401if (n == IOStatus.UNAVAILABLE) {402// spurious wakeup, is this possible?403synchronized (updateLock) {404readPending = true;405}406return;407}408409// allow objects to be GC'ed.410this.readBuffer = null;411this.readBuffers = null;412this.readAttachment = null;413this.readHandler = null;414IOUtil.releaseScopes(readScopeHandleReleasers);415416// allow another read to be initiated417enableReading();418419} catch (Throwable x) {420enableReading();421if (x instanceof ClosedChannelException)422x = new AsynchronousCloseException();423if (x instanceof ConnectionResetException)424x = new IOException(x.getMessage());425exc = x;426} finally {427// restart poll in case of concurrent write428if (!(exc instanceof AsynchronousCloseException))429lockAndUpdateEvents();430end();431}432433// cancel the associated timer434if (timeout != null)435timeout.cancel(false);436437// create result438Number result = (exc != null) ? null : (scattering) ?439(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);440441// invoke handler or set result442if (handler == null) {443future.setResult(result, exc);444} else {445if (mayInvokeDirect) {446Invoker.invokeUnchecked(handler, att, result, exc);447} else {448Invoker.invokeIndirectly(this, handler, att, result, exc);449}450}451}452453private Runnable readTimeoutTask = new Runnable() {454public void run() {455CompletionHandler<Number,Object> handler = null;456Object att = null;457PendingFuture<Number,Object> future = null;458459synchronized (updateLock) {460if (!readPending)461return;462readPending = false;463handler = readHandler;464att = readAttachment;465future = readFuture;466}467468// kill further reading before releasing waiters469enableReading(true);470471// invoke handler or set result472Exception exc = new InterruptedByTimeoutException();473if (handler == null) {474future.setFailure(exc);475} else {476AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;477Invoker.invokeIndirectly(ch, handler, att, null, exc);478}479}480};481482/**483* Initiates a read or scattering read operation484*/485@Override486@SuppressWarnings("unchecked")487<V extends Number,A> Future<V> implRead(boolean isScatteringRead,488ByteBuffer dst,489ByteBuffer[] dsts,490long timeout,491TimeUnit unit,492A attachment,493CompletionHandler<V,? super A> handler)494{495// A synchronous read is not attempted if disallowed by system property496// or, we are using a fixed thread pool and the completion handler may497// not be invoked directly (because the thread is not a pooled thread or498// there are too many handlers on the stack).499Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null;500boolean invokeDirect = false;501boolean attemptRead = false;502if (!disableSynchronousRead) {503if (handler == null) {504attemptRead = true;505} else {506myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();507invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);508// okay to attempt read with user thread pool509attemptRead = invokeDirect || !port.isFixedThreadPool();510}511}512513int n = IOStatus.UNAVAILABLE;514Throwable exc = null;515boolean pending = false;516517try {518begin();519520if (attemptRead) {521if (isScatteringRead) {522n = (int)IOUtil.read(fd, dsts, true, nd);523} else {524n = IOUtil.read(fd, dst, -1, true, nd);525}526}527528if (n == IOStatus.UNAVAILABLE) {529PendingFuture<V,A> result = null;530synchronized (updateLock) {531this.isScatteringRead = isScatteringRead;532this.readScopeHandleReleasers = IOUtil.acquireScopes(dst, dsts);533this.readBuffer = dst;534this.readBuffers = dsts;535if (handler == null) {536this.readHandler = null;537result = new PendingFuture<V,A>(this, OpType.READ);538this.readFuture = (PendingFuture<Number,Object>)result;539this.readAttachment = null;540} else {541this.readHandler = (CompletionHandler<Number,Object>)handler;542this.readAttachment = attachment;543this.readFuture = null;544}545if (timeout > 0L) {546this.readTimer = port.schedule(readTimeoutTask, timeout, unit);547}548this.readPending = true;549updateEvents();550}551pending = true;552return result;553}554} catch (Throwable x) {555if (x instanceof ClosedChannelException)556x = new AsynchronousCloseException();557if (x instanceof ConnectionResetException)558x = new IOException(x.getMessage());559exc = x;560} finally {561if (!pending)562enableReading();563end();564}565566Number result = (exc != null) ? null : (isScatteringRead) ?567(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);568569// read completed immediately570if (handler != null) {571if (invokeDirect) {572Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);573} else {574Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);575}576return null;577} else {578return CompletedFuture.withResult((V)result, exc);579}580}581582// -- write --583584private void finishWrite(boolean mayInvokeDirect) {585int n = -1;586Throwable exc = null;587588// copy fields as we can't access them after reading is re-enabled.589boolean gathering = this.isGatheringWrite;590CompletionHandler<Number,Object> handler = this.writeHandler;591Object att = this.writeAttachment;592PendingFuture<Number,Object> future = this.writeFuture;593Future<?> timer = this.writeTimer;594595try {596begin();597598if (gathering) {599n = (int)IOUtil.write(fd, writeBuffers, true, nd);600} else {601n = IOUtil.write(fd, writeBuffer, -1, true, nd);602}603if (n == IOStatus.UNAVAILABLE) {604// spurious wakeup, is this possible?605synchronized (updateLock) {606writePending = true;607}608return;609}610611// allow objects to be GC'ed.612this.writeBuffer = null;613this.writeBuffers = null;614this.writeAttachment = null;615this.writeHandler = null;616IOUtil.releaseScopes(writeScopeHandleReleasers);617618// allow another write to be initiated619enableWriting();620621} catch (Throwable x) {622enableWriting();623if (x instanceof ClosedChannelException)624x = new AsynchronousCloseException();625exc = x;626} finally {627// restart poll in case of concurrent write628if (!(exc instanceof AsynchronousCloseException))629lockAndUpdateEvents();630end();631}632633// cancel the associated timer634if (timer != null)635timer.cancel(false);636637// create result638Number result = (exc != null) ? null : (gathering) ?639(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);640641// invoke handler or set result642if (handler == null) {643future.setResult(result, exc);644} else {645if (mayInvokeDirect) {646Invoker.invokeUnchecked(handler, att, result, exc);647} else {648Invoker.invokeIndirectly(this, handler, att, result, exc);649}650}651}652653private Runnable writeTimeoutTask = new Runnable() {654public void run() {655CompletionHandler<Number,Object> handler = null;656Object att = null;657PendingFuture<Number,Object> future = null;658659synchronized (updateLock) {660if (!writePending)661return;662writePending = false;663handler = writeHandler;664att = writeAttachment;665future = writeFuture;666}667668// kill further writing before releasing waiters669enableWriting(true);670671// invoke handler or set result672Exception exc = new InterruptedByTimeoutException();673if (handler != null) {674Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this,675handler, att, null, exc);676} else {677future.setFailure(exc);678}679}680};681682/**683* Initiates a read or scattering read operation684*/685@Override686@SuppressWarnings("unchecked")687<V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,688ByteBuffer src,689ByteBuffer[] srcs,690long timeout,691TimeUnit unit,692A attachment,693CompletionHandler<V,? super A> handler)694{695Invoker.GroupAndInvokeCount myGroupAndInvokeCount =696Invoker.getGroupAndInvokeCount();697boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);698boolean attemptWrite = (handler == null) || invokeDirect ||699!port.isFixedThreadPool(); // okay to attempt write with user thread pool700701int n = IOStatus.UNAVAILABLE;702Throwable exc = null;703boolean pending = false;704705try {706begin();707708if (attemptWrite) {709if (isGatheringWrite) {710n = (int)IOUtil.write(fd, srcs, true, nd);711} else {712n = IOUtil.write(fd, src, -1, true, nd);713}714}715716if (n == IOStatus.UNAVAILABLE) {717PendingFuture<V,A> result = null;718synchronized (updateLock) {719this.isGatheringWrite = isGatheringWrite;720this.writeScopeHandleReleasers = IOUtil.acquireScopes(src, srcs);721this.writeBuffer = src;722this.writeBuffers = srcs;723if (handler == null) {724this.writeHandler = null;725result = new PendingFuture<V,A>(this, OpType.WRITE);726this.writeFuture = (PendingFuture<Number,Object>)result;727this.writeAttachment = null;728} else {729this.writeHandler = (CompletionHandler<Number,Object>)handler;730this.writeAttachment = attachment;731this.writeFuture = null;732}733if (timeout > 0L) {734this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);735}736this.writePending = true;737updateEvents();738}739pending = true;740return result;741}742} catch (Throwable x) {743if (x instanceof ClosedChannelException)744x = new AsynchronousCloseException();745exc = x;746} finally {747if (!pending)748enableWriting();749end();750}751752Number result = (exc != null) ? null : (isGatheringWrite) ?753(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);754755// write completed immediately756if (handler != null) {757if (invokeDirect) {758Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);759} else {760Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);761}762return null;763} else {764return CompletedFuture.withResult((V)result, exc);765}766}767768// -- Native methods --769770private static native void checkConnect(int fdVal) throws IOException;771772static {773IOUtil.load();774}775}776777778