Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/share/classes/sun/nio/ch/AsynchronousSocketChannelImpl.java
38918 views
/*1* Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.ByteBuffer;28import java.nio.channels.*;29import java.net.SocketOption;30import java.net.StandardSocketOptions;31import java.net.SocketAddress;32import java.net.InetSocketAddress;33import java.io.IOException;34import java.io.FileDescriptor;35import java.util.Set;36import java.util.HashSet;37import java.util.Collections;38import java.util.concurrent.*;39import java.util.concurrent.locks.*;40import sun.net.NetHooks;41import sun.net.ExtendedOptionsImpl;42import sun.net.ExtendedOptionsHelper;4344/**45* Base implementation of AsynchronousSocketChannel46*/4748abstract class AsynchronousSocketChannelImpl49extends AsynchronousSocketChannel50implements Cancellable, Groupable51{52protected final FileDescriptor fd;5354// protects state, localAddress, and remoteAddress55protected final Object stateLock = new Object();5657protected volatile InetSocketAddress localAddress = null;58protected volatile InetSocketAddress remoteAddress = null;5960// State, increases monotonically61static final int ST_UNINITIALIZED = -1;62static final int ST_UNCONNECTED = 0;63static final int ST_PENDING = 1;64static final int ST_CONNECTED = 2;65protected volatile int state = ST_UNINITIALIZED;6667// reading state68private final Object readLock = new Object();69private boolean reading;70private boolean readShutdown;71private boolean readKilled; // further reading disallowed due to timeout7273// writing state74private final Object writeLock = new Object();75private boolean writing;76private boolean writeShutdown;77private boolean writeKilled; // further writing disallowed due to timeout7879// close support80private final ReadWriteLock closeLock = new ReentrantReadWriteLock();81private volatile boolean open = true;8283// set true when exclusive binding is on and SO_REUSEADDR is emulated84private boolean isReuseAddress;8586AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)87throws IOException88{89super(group.provider());90this.fd = Net.socket(true);91this.state = ST_UNCONNECTED;92}9394// Constructor for sockets obtained from AsynchronousServerSocketChannelImpl95AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group,96FileDescriptor fd,97InetSocketAddress remote)98throws IOException99{100super(group.provider());101this.fd = fd;102this.state = ST_CONNECTED;103this.localAddress = Net.localAddress(fd);104this.remoteAddress = remote;105}106107@Override108public final boolean isOpen() {109return open;110}111112/**113* Marks beginning of access to file descriptor/handle114*/115final void begin() throws IOException {116closeLock.readLock().lock();117if (!isOpen())118throw new ClosedChannelException();119}120121/**122* Marks end of access to file descriptor/handle123*/124final void end() {125closeLock.readLock().unlock();126}127128/**129* Invoked to close socket and release other resources.130*/131abstract void implClose() throws IOException;132133@Override134public final void close() throws IOException {135// synchronize with any threads initiating asynchronous operations136closeLock.writeLock().lock();137try {138if (!open)139return; // already closed140open = false;141} finally {142closeLock.writeLock().unlock();143}144implClose();145}146147final void enableReading(boolean killed) {148synchronized (readLock) {149reading = false;150if (killed)151readKilled = true;152}153}154155final void enableReading() {156enableReading(false);157}158159final void enableWriting(boolean killed) {160synchronized (writeLock) {161writing = false;162if (killed)163writeKilled = true;164}165}166167final void enableWriting() {168enableWriting(false);169}170171final void killReading() {172synchronized (readLock) {173readKilled = true;174}175}176177final void killWriting() {178synchronized (writeLock) {179writeKilled = true;180}181}182183final void killConnect() {184// when a connect is cancelled then the connection may have been185// established so prevent reading or writing.186killReading();187killWriting();188}189190/**191* Invoked by connect to initiate the connect operation.192*/193abstract <A> Future<Void> implConnect(SocketAddress remote,194A attachment,195CompletionHandler<Void,? super A> handler);196197@Override198public final Future<Void> connect(SocketAddress remote) {199return implConnect(remote, null, null);200}201202@Override203public final <A> void connect(SocketAddress remote,204A attachment,205CompletionHandler<Void,? super A> handler)206{207if (handler == null)208throw new NullPointerException("'handler' is null");209implConnect(remote, attachment, handler);210}211212/**213* Invoked by read to initiate the I/O operation.214*/215abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead,216ByteBuffer dst,217ByteBuffer[] dsts,218long timeout,219TimeUnit unit,220A attachment,221CompletionHandler<V,? super A> handler);222223@SuppressWarnings("unchecked")224private <V extends Number,A> Future<V> read(boolean isScatteringRead,225ByteBuffer dst,226ByteBuffer[] dsts,227long timeout,228TimeUnit unit,229A att,230CompletionHandler<V,? super A> handler)231{232if (!isOpen()) {233Throwable e = new ClosedChannelException();234if (handler == null)235return CompletedFuture.withFailure(e);236Invoker.invoke(this, handler, att, null, e);237return null;238}239240if (remoteAddress == null)241throw new NotYetConnectedException();242243boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining();244boolean shutdown = false;245246// check and update state247synchronized (readLock) {248if (readKilled)249throw new IllegalStateException("Reading not allowed due to timeout or cancellation");250if (reading)251throw new ReadPendingException();252if (readShutdown) {253shutdown = true;254} else {255if (hasSpaceToRead) {256reading = true;257}258}259}260261// immediately complete with -1 if shutdown for read262// immediately complete with 0 if no space remaining263if (shutdown || !hasSpaceToRead) {264Number result;265if (isScatteringRead) {266result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L);267} else {268result = (shutdown) ? -1 : 0;269}270if (handler == null)271return CompletedFuture.withResult((V)result);272Invoker.invoke(this, handler, att, (V)result, null);273return null;274}275276return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler);277}278279@Override280public final Future<Integer> read(ByteBuffer dst) {281if (dst.isReadOnly())282throw new IllegalArgumentException("Read-only buffer");283return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null);284}285286@Override287public final <A> void read(ByteBuffer dst,288long timeout,289TimeUnit unit,290A attachment,291CompletionHandler<Integer,? super A> handler)292{293if (handler == null)294throw new NullPointerException("'handler' is null");295if (dst.isReadOnly())296throw new IllegalArgumentException("Read-only buffer");297read(false, dst, null, timeout, unit, attachment, handler);298}299300@Override301public final <A> void read(ByteBuffer[] dsts,302int offset,303int length,304long timeout,305TimeUnit unit,306A attachment,307CompletionHandler<Long,? super A> handler)308{309if (handler == null)310throw new NullPointerException("'handler' is null");311if ((offset < 0) || (length < 0) || (offset > dsts.length - length))312throw new IndexOutOfBoundsException();313ByteBuffer[] bufs = Util.subsequence(dsts, offset, length);314for (int i=0; i<bufs.length; i++) {315if (bufs[i].isReadOnly())316throw new IllegalArgumentException("Read-only buffer");317}318read(true, null, bufs, timeout, unit, attachment, handler);319}320321/**322* Invoked by write to initiate the I/O operation.323*/324abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,325ByteBuffer src,326ByteBuffer[] srcs,327long timeout,328TimeUnit unit,329A attachment,330CompletionHandler<V,? super A> handler);331332@SuppressWarnings("unchecked")333private <V extends Number,A> Future<V> write(boolean isGatheringWrite,334ByteBuffer src,335ByteBuffer[] srcs,336long timeout,337TimeUnit unit,338A att,339CompletionHandler<V,? super A> handler)340{341boolean hasDataToWrite = isGatheringWrite || src.hasRemaining();342343boolean closed = false;344if (isOpen()) {345if (remoteAddress == null)346throw new NotYetConnectedException();347// check and update state348synchronized (writeLock) {349if (writeKilled)350throw new IllegalStateException("Writing not allowed due to timeout or cancellation");351if (writing)352throw new WritePendingException();353if (writeShutdown) {354closed = true;355} else {356if (hasDataToWrite)357writing = true;358}359}360} else {361closed = true;362}363364// channel is closed or shutdown for write365if (closed) {366Throwable e = new ClosedChannelException();367if (handler == null)368return CompletedFuture.withFailure(e);369Invoker.invoke(this, handler, att, null, e);370return null;371}372373// nothing to write so complete immediately374if (!hasDataToWrite) {375Number result = (isGatheringWrite) ? (Number)0L : (Number)0;376if (handler == null)377return CompletedFuture.withResult((V)result);378Invoker.invoke(this, handler, att, (V)result, null);379return null;380}381382return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler);383}384385@Override386public final Future<Integer> write(ByteBuffer src) {387return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null);388}389390@Override391public final <A> void write(ByteBuffer src,392long timeout,393TimeUnit unit,394A attachment,395CompletionHandler<Integer,? super A> handler)396{397if (handler == null)398throw new NullPointerException("'handler' is null");399write(false, src, null, timeout, unit, attachment, handler);400}401402@Override403public final <A> void write(ByteBuffer[] srcs,404int offset,405int length,406long timeout,407TimeUnit unit,408A attachment,409CompletionHandler<Long,? super A> handler)410{411if (handler == null)412throw new NullPointerException("'handler' is null");413if ((offset < 0) || (length < 0) || (offset > srcs.length - length))414throw new IndexOutOfBoundsException();415srcs = Util.subsequence(srcs, offset, length);416write(true, null, srcs, timeout, unit, attachment, handler);417}418419@Override420public final AsynchronousSocketChannel bind(SocketAddress local)421throws IOException422{423try {424begin();425synchronized (stateLock) {426if (state == ST_PENDING)427throw new ConnectionPendingException();428if (localAddress != null)429throw new AlreadyBoundException();430InetSocketAddress isa = (local == null) ?431new InetSocketAddress(0) : Net.checkAddress(local);432SecurityManager sm = System.getSecurityManager();433if (sm != null) {434sm.checkListen(isa.getPort());435}436NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());437Net.bind(fd, isa.getAddress(), isa.getPort());438localAddress = Net.localAddress(fd);439}440} finally {441end();442}443return this;444}445446@Override447public final SocketAddress getLocalAddress() throws IOException {448if (!isOpen())449throw new ClosedChannelException();450return Net.getRevealedLocalAddress(localAddress);451}452453@Override454public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value)455throws IOException456{457if (name == null)458throw new NullPointerException();459if (!supportedOptions().contains(name))460throw new UnsupportedOperationException("'" + name + "' not supported");461462try {463begin();464if (writeShutdown)465throw new IOException("Connection has been shutdown for writing");466if (name == StandardSocketOptions.SO_REUSEADDR &&467Net.useExclusiveBind())468{469// SO_REUSEADDR emulated when using exclusive bind470isReuseAddress = (Boolean)value;471} else {472Net.setSocketOption(fd, Net.UNSPEC, name, value);473}474return this;475} finally {476end();477}478}479480@Override481@SuppressWarnings("unchecked")482public final <T> T getOption(SocketOption<T> name) throws IOException {483if (name == null)484throw new NullPointerException();485if (!supportedOptions().contains(name))486throw new UnsupportedOperationException("'" + name + "' not supported");487488try {489begin();490if (name == StandardSocketOptions.SO_REUSEADDR &&491Net.useExclusiveBind())492{493// SO_REUSEADDR emulated when using exclusive bind494return (T)Boolean.valueOf(isReuseAddress);495}496return (T) Net.getSocketOption(fd, Net.UNSPEC, name);497} finally {498end();499}500}501502private static class DefaultOptionsHolder {503static final Set<SocketOption<?>> defaultOptions = defaultOptions();504505private static Set<SocketOption<?>> defaultOptions() {506HashSet<SocketOption<?>> set = new HashSet<SocketOption<?>>(5);507set.add(StandardSocketOptions.SO_SNDBUF);508set.add(StandardSocketOptions.SO_RCVBUF);509set.add(StandardSocketOptions.SO_KEEPALIVE);510set.add(StandardSocketOptions.SO_REUSEADDR);511set.add(StandardSocketOptions.TCP_NODELAY);512if (ExtendedOptionsImpl.flowSupported()) {513set.add(jdk.net.ExtendedSocketOptions.SO_FLOW_SLA);514}515set.addAll(ExtendedOptionsHelper.keepAliveOptions());516return Collections.unmodifiableSet(set);517}518}519520@Override521public final Set<SocketOption<?>> supportedOptions() {522return DefaultOptionsHolder.defaultOptions;523}524525@Override526public final SocketAddress getRemoteAddress() throws IOException {527if (!isOpen())528throw new ClosedChannelException();529return remoteAddress;530}531532@Override533public final AsynchronousSocketChannel shutdownInput() throws IOException {534try {535begin();536if (remoteAddress == null)537throw new NotYetConnectedException();538synchronized (readLock) {539if (!readShutdown) {540Net.shutdown(fd, Net.SHUT_RD);541readShutdown = true;542}543}544} finally {545end();546}547return this;548}549550@Override551public final AsynchronousSocketChannel shutdownOutput() throws IOException {552try {553begin();554if (remoteAddress == null)555throw new NotYetConnectedException();556synchronized (writeLock) {557if (!writeShutdown) {558Net.shutdown(fd, Net.SHUT_WR);559writeShutdown = true;560}561}562} finally {563end();564}565return this;566}567568@Override569public final String toString() {570StringBuilder sb = new StringBuilder();571sb.append(this.getClass().getName());572sb.append('[');573synchronized (stateLock) {574if (!isOpen()) {575sb.append("closed");576} else {577switch (state) {578case ST_UNCONNECTED:579sb.append("unconnected");580break;581case ST_PENDING:582sb.append("connection-pending");583break;584case ST_CONNECTED:585sb.append("connected");586if (readShutdown)587sb.append(" ishut");588if (writeShutdown)589sb.append(" oshut");590break;591}592if (localAddress != null) {593sb.append(" local=");594sb.append(595Net.getRevealedLocalAddressAsString(localAddress));596}597if (remoteAddress != null) {598sb.append(" remote=");599sb.append(remoteAddress.toString());600}601}602}603sb.append(']');604return sb.toString();605}606}607608609