Path: blob/master/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java
41137 views
/*1* Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.io.FileDescriptor;28import java.io.IOException;29import java.nio.ByteBuffer;30import java.nio.channels.AsynchronousCloseException;31import java.nio.channels.ClosedChannelException;32import java.nio.channels.NotYetConnectedException;33import java.nio.channels.Pipe;34import java.nio.channels.SelectionKey;35import java.nio.channels.spi.SelectorProvider;36import java.util.Objects;37import java.util.concurrent.locks.ReentrantLock;3839class SourceChannelImpl40extends Pipe.SourceChannel41implements SelChImpl42{43// Used to make native read and write calls44private static final NativeDispatcher nd = new FileDispatcherImpl();4546// The file descriptor associated with this channel47private final FileDescriptor fd;48private final int fdVal;4950// Lock held by current reading thread51private final ReentrantLock readLock = new ReentrantLock();5253// Lock held by any thread that modifies the state fields declared below54// DO NOT invoke a blocking I/O operation while holding this lock!55private final Object stateLock = new Object();5657// -- The following fields are protected by stateLock5859// Channel state60private static final int ST_INUSE = 0;61private static final int ST_CLOSING = 1;62private static final int ST_CLOSED = 2;63private int state;6465// ID of native thread doing read, for signalling66private long thread;6768// -- End of fields protected by stateLock697071public FileDescriptor getFD() {72return fd;73}7475public int getFDVal() {76return fdVal;77}7879SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) {80super(sp);81this.fd = fd;82this.fdVal = IOUtil.fdVal(fd);83}8485/**86* Closes the read end of the pipe if there are no read operation in87* progress and the channel is not registered with a Selector.88*/89private boolean tryClose() throws IOException {90assert Thread.holdsLock(stateLock) && state == ST_CLOSING;91if (thread == 0 && !isRegistered()) {92state = ST_CLOSED;93nd.close(fd);94return true;95} else {96return false;97}98}99100/**101* Invokes tryClose to attempt to close the read end of the pipe.102*103* This method is used for deferred closing by I/O and Selector operations.104*/105private void tryFinishClose() {106try {107tryClose();108} catch (IOException ignore) { }109}110111/**112* Closes this channel when configured in blocking mode.113*114* If there is a read operation in progress then the read-end of the pipe115* is pre-closed and the reader is signalled, in which case the final close116* is deferred until the reader aborts.117*/118private void implCloseBlockingMode() throws IOException {119synchronized (stateLock) {120assert state < ST_CLOSING;121state = ST_CLOSING;122if (!tryClose()) {123long th = thread;124if (th != 0) {125nd.preClose(fd);126NativeThread.signal(th);127}128}129}130}131132/**133* Closes this channel when configured in non-blocking mode.134*135* If the channel is registered with a Selector then the close is deferred136* until the channel is flushed from all Selectors.137*/138private void implCloseNonBlockingMode() throws IOException {139synchronized (stateLock) {140assert state < ST_CLOSING;141state = ST_CLOSING;142}143// wait for any read operation to complete before trying to close144readLock.lock();145readLock.unlock();146synchronized (stateLock) {147if (state == ST_CLOSING) {148tryClose();149}150}151}152153/**154* Invoked by implCloseChannel to close the channel.155*/156@Override157protected void implCloseSelectableChannel() throws IOException {158assert !isOpen();159if (isBlocking()) {160implCloseBlockingMode();161} else {162implCloseNonBlockingMode();163}164}165@Override166public void kill() {167synchronized (stateLock) {168assert !isOpen();169if (state == ST_CLOSING) {170tryFinishClose();171}172}173}174175@Override176protected void implConfigureBlocking(boolean block) throws IOException {177readLock.lock();178try {179synchronized (stateLock) {180if (!isOpen())181throw new ClosedChannelException();182IOUtil.configureBlocking(fd, block);183}184} finally {185readLock.unlock();186}187}188189public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {190int intOps = ski.nioInterestOps();191int oldOps = ski.nioReadyOps();192int newOps = initialOps;193194if ((ops & Net.POLLNVAL) != 0)195throw new Error("POLLNVAL detected");196197if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {198newOps = intOps;199ski.nioReadyOps(newOps);200return (newOps & ~oldOps) != 0;201}202203if (((ops & Net.POLLIN) != 0) &&204((intOps & SelectionKey.OP_READ) != 0))205newOps |= SelectionKey.OP_READ;206207ski.nioReadyOps(newOps);208return (newOps & ~oldOps) != 0;209}210211public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {212return translateReadyOps(ops, ski.nioReadyOps(), ski);213}214215public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {216return translateReadyOps(ops, 0, ski);217}218219public int translateInterestOps(int ops) {220int newOps = 0;221if (ops == SelectionKey.OP_READ)222newOps |= Net.POLLIN;223return newOps;224}225226/**227* Marks the beginning of a read operation that might block.228*229* @throws ClosedChannelException if the channel is closed230* @throws NotYetConnectedException if the channel is not yet connected231*/232private void beginRead(boolean blocking) throws ClosedChannelException {233if (blocking) {234// set hook for Thread.interrupt235begin();236}237synchronized (stateLock) {238if (!isOpen())239throw new ClosedChannelException();240if (blocking)241thread = NativeThread.current();242}243}244245/**246* Marks the end of a read operation that may have blocked.247*248* @throws AsynchronousCloseException if the channel was closed due to this249* thread being interrupted on a blocking read operation.250*/251private void endRead(boolean blocking, boolean completed)252throws AsynchronousCloseException253{254if (blocking) {255synchronized (stateLock) {256thread = 0;257if (state == ST_CLOSING) {258tryFinishClose();259}260}261// remove hook for Thread.interrupt262end(completed);263}264}265266@Override267public int read(ByteBuffer dst) throws IOException {268Objects.requireNonNull(dst);269270readLock.lock();271try {272boolean blocking = isBlocking();273int n = 0;274try {275beginRead(blocking);276n = IOUtil.read(fd, dst, -1, nd);277if (blocking) {278while (IOStatus.okayToRetry(n) && isOpen()) {279park(Net.POLLIN);280n = IOUtil.read(fd, dst, -1, nd);281}282}283} finally {284endRead(blocking, n > 0);285assert IOStatus.check(n);286}287return IOStatus.normalize(n);288} finally {289readLock.unlock();290}291}292293@Override294public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {295Objects.checkFromIndexSize(offset, length, dsts.length);296297readLock.lock();298try {299boolean blocking = isBlocking();300long n = 0;301try {302beginRead(blocking);303n = IOUtil.read(fd, dsts, offset, length, nd);304if (blocking) {305while (IOStatus.okayToRetry(n) && isOpen()) {306park(Net.POLLIN);307n = IOUtil.read(fd, dsts, offset, length, nd);308}309}310} finally {311endRead(blocking, n > 0);312assert IOStatus.check(n);313}314return IOStatus.normalize(n);315} finally {316readLock.unlock();317}318}319320@Override321public long read(ByteBuffer[] dsts) throws IOException {322return read(dsts, 0, dsts.length);323}324}325326327