Path: blob/master/src/java.base/windows/classes/sun/nio/ch/WindowsAsynchronousFileChannelImpl.java
41139 views
/*1* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.*;28import java.util.concurrent.*;29import java.nio.ByteBuffer;30import java.nio.BufferOverflowException;31import java.io.IOException;32import java.io.FileDescriptor;33import jdk.internal.access.SharedSecrets;34import jdk.internal.access.JavaIOFileDescriptorAccess;3536/**37* Windows implementation of AsynchronousFileChannel using overlapped I/O.38*/3940public class WindowsAsynchronousFileChannelImpl41extends AsynchronousFileChannelImpl42implements Iocp.OverlappedChannel, Groupable43{44private static final JavaIOFileDescriptorAccess fdAccess =45SharedSecrets.getJavaIOFileDescriptorAccess();4647// error when EOF is detected asynchronously.48private static final int ERROR_HANDLE_EOF = 38;4950// Lazy initialization of default I/O completion port51private static class DefaultIocpHolder {52static final Iocp defaultIocp = defaultIocp();53private static Iocp defaultIocp() {54try {55return new Iocp(null, ThreadPool.createDefault()).start();56} catch (IOException ioe) {57throw new InternalError(ioe);58}59}60}6162// Used for force/truncate/size methods63private static final FileDispatcher nd = new FileDispatcherImpl();6465// The handle is extracted for use in native methods invoked from this class.66private final long handle;6768// The key that identifies the channel's association with the I/O port69private final int completionKey;7071// I/O completion port (group)72private final Iocp iocp;7374private final boolean isDefaultIocp;7576// Caches OVERLAPPED structure for each outstanding I/O operation77private final PendingIoCache ioCache;787980private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj,81boolean reading,82boolean writing,83Iocp iocp,84boolean isDefaultIocp)85throws IOException86{87super(fdObj, reading, writing, iocp.executor());88this.handle = fdAccess.getHandle(fdObj);89this.iocp = iocp;90this.isDefaultIocp = isDefaultIocp;91this.ioCache = new PendingIoCache();92this.completionKey = iocp.associate(this, handle);93}9495public static AsynchronousFileChannel open(FileDescriptor fdo,96boolean reading,97boolean writing,98ThreadPool pool)99throws IOException100{101Iocp iocp;102boolean isDefaultIocp;103if (pool == null) {104iocp = DefaultIocpHolder.defaultIocp;105isDefaultIocp = true;106} else {107iocp = new Iocp(null, pool).start();108isDefaultIocp = false;109}110try {111return new112WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp);113} catch (IOException x) {114// error binding to port so need to close it (if created for this channel)115if (!isDefaultIocp)116iocp.implClose();117throw x;118}119}120121@Override122public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {123return ioCache.remove(overlapped);124}125126@Override127public void close() throws IOException {128closeLock.writeLock().lock();129try {130if (closed)131return; // already closed132closed = true;133} finally {134closeLock.writeLock().unlock();135}136137// invalidate all locks held for this channel138invalidateAllLocks();139140// close the file141nd.close(fdObj);142143// waits until all I/O operations have completed144ioCache.close();145146// disassociate from port147iocp.disassociate(completionKey);148149// for the non-default group close the port150if (!isDefaultIocp)151iocp.detachFromThreadPool();152}153154@Override155public AsynchronousChannelGroupImpl group() {156return iocp;157}158159/**160* Translates Throwable to IOException161*/162private static IOException toIOException(Throwable x) {163if (x instanceof IOException) {164if (x instanceof ClosedChannelException)165x = new AsynchronousCloseException();166return (IOException)x;167}168return new IOException(x);169}170171@Override172public long size() throws IOException {173try {174begin();175return nd.size(fdObj);176} finally {177end();178}179}180181@Override182public AsynchronousFileChannel truncate(long size) throws IOException {183if (size < 0)184throw new IllegalArgumentException("Negative size");185if (!writing)186throw new NonWritableChannelException();187try {188begin();189if (size > nd.size(fdObj))190return this;191nd.truncate(fdObj, size);192} finally {193end();194}195return this;196}197198@Override199public void force(boolean metaData) throws IOException {200try {201begin();202nd.force(fdObj, metaData);203} finally {204end();205}206}207208// -- file locking --209210/**211* Task that initiates locking operation and handles completion result.212*/213private class LockTask<A> implements Runnable, Iocp.ResultHandler {214private final long position;215private final FileLockImpl fli;216private final PendingFuture<FileLock,A> result;217218LockTask(long position,219FileLockImpl fli,220PendingFuture<FileLock,A> result)221{222this.position = position;223this.fli = fli;224this.result = result;225}226227@Override228public void run() {229long overlapped = 0L;230try {231begin();232233// allocate OVERLAPPED structure234overlapped = ioCache.add(result);235236// synchronize on result to avoid race with handler thread237// when lock is acquired immediately.238synchronized (result) {239int n = lockFile(handle, position, fli.size(), fli.isShared(),240overlapped);241if (n == IOStatus.UNAVAILABLE) {242// I/O is pending243return;244}245// acquired lock immediately246result.setResult(fli);247}248249} catch (Throwable x) {250// lock failed or channel closed251removeFromFileLockTable(fli);252result.setFailure(toIOException(x));253if (overlapped != 0L)254ioCache.remove(overlapped);255} finally {256end();257}258259// invoke completion handler260Invoker.invoke(result);261}262263@Override264public void completed(int bytesTransferred, boolean canInvokeDirect) {265// release waiters and invoke completion handler266result.setResult(fli);267if (canInvokeDirect) {268Invoker.invokeUnchecked(result);269} else {270Invoker.invoke(result);271}272}273274@Override275public void failed(int error, IOException x) {276// lock not acquired so remove from lock table277removeFromFileLockTable(fli);278279// release waiters280if (isOpen()) {281result.setFailure(x);282} else {283result.setFailure(new AsynchronousCloseException());284}285Invoker.invoke(result);286}287}288289@Override290<A> Future<FileLock> implLock(final long position,291final long size,292final boolean shared,293A attachment,294final CompletionHandler<FileLock,? super A> handler)295{296if (shared && !reading)297throw new NonReadableChannelException();298if (!shared && !writing)299throw new NonWritableChannelException();300301// add to lock table302FileLockImpl fli = addToFileLockTable(position, size, shared);303if (fli == null) {304Throwable exc = new ClosedChannelException();305if (handler == null)306return CompletedFuture.withFailure(exc);307Invoker.invoke(this, handler, attachment, null, exc);308return null;309}310311// create Future and task that will be invoked to acquire lock312PendingFuture<FileLock,A> result =313new PendingFuture<FileLock,A>(this, handler, attachment);314LockTask<A> lockTask = new LockTask<A>(position, fli, result);315result.setContext(lockTask);316317// initiate I/O318lockTask.run();319return result;320}321322static final int NO_LOCK = -1; // Failed to lock323static final int LOCKED = 0; // Obtained requested lock324325@Override326public FileLock tryLock(long position, long size, boolean shared)327throws IOException328{329if (shared && !reading)330throw new NonReadableChannelException();331if (!shared && !writing)332throw new NonWritableChannelException();333334// add to lock table335final FileLockImpl fli = addToFileLockTable(position, size, shared);336if (fli == null)337throw new ClosedChannelException();338339boolean gotLock = false;340try {341begin();342// try to acquire the lock343int res = nd.lock(fdObj, false, position, size, shared);344if (res == NO_LOCK)345return null;346gotLock = true;347return fli;348} finally {349if (!gotLock)350removeFromFileLockTable(fli);351end();352}353}354355@Override356protected void implRelease(FileLockImpl fli) throws IOException {357nd.release(fdObj, fli.position(), fli.size());358}359360/**361* Task that initiates read operation and handles completion result.362*/363private class ReadTask<A> implements Runnable, Iocp.ResultHandler {364private final ByteBuffer dst;365private final int pos, rem; // buffer position/remaining366private final long position; // file position367private final PendingFuture<Integer,A> result;368369// set to dst if direct; otherwise set to substituted direct buffer370private volatile ByteBuffer buf;371372ReadTask(ByteBuffer dst,373int pos,374int rem,375long position,376PendingFuture<Integer,A> result)377{378this.dst = dst;379this.pos = pos;380this.rem = rem;381this.position = position;382this.result = result;383}384385void releaseBufferIfSubstituted() {386if (buf != dst)387Util.releaseTemporaryDirectBuffer(buf);388}389390void updatePosition(int bytesTransferred) {391// if the I/O succeeded then adjust buffer position392if (bytesTransferred > 0) {393if (buf == dst) {394try {395dst.position(pos + bytesTransferred);396} catch (IllegalArgumentException x) {397// someone has changed the position; ignore398}399} else {400// had to substitute direct buffer401buf.position(bytesTransferred).flip();402try {403dst.put(buf);404} catch (BufferOverflowException x) {405// someone has changed the position; ignore406}407}408}409}410411@Override412public void run() {413int n = -1;414long overlapped = 0L;415long address;416417// Substitute a native buffer if not direct418if (dst instanceof DirectBuffer) {419buf = dst;420address = ((DirectBuffer)dst).address() + pos;421} else {422buf = Util.getTemporaryDirectBuffer(rem);423address = ((DirectBuffer)buf).address();424}425426boolean pending = false;427try {428begin();429430// allocate OVERLAPPED431overlapped = ioCache.add(result);432433// initiate read434n = readFile(handle, address, rem, position, overlapped);435if (n == IOStatus.UNAVAILABLE) {436// I/O is pending437pending = true;438return;439} else if (n == IOStatus.EOF) {440result.setResult(n);441} else {442throw new InternalError("Unexpected result: " + n);443}444445} catch (Throwable x) {446// failed to initiate read447result.setFailure(toIOException(x));448if (overlapped != 0L)449ioCache.remove(overlapped);450} finally {451if (!pending)452// release resources453releaseBufferIfSubstituted();454end();455}456457// invoke completion handler458Invoker.invoke(result);459}460461/**462* Executed when the I/O has completed463*/464@Override465public void completed(int bytesTransferred, boolean canInvokeDirect) {466updatePosition(bytesTransferred);467468// return direct buffer to cache if substituted469releaseBufferIfSubstituted();470471// release waiters and invoke completion handler472result.setResult(bytesTransferred);473if (canInvokeDirect) {474Invoker.invokeUnchecked(result);475} else {476Invoker.invoke(result);477}478}479480@Override481public void failed(int error, IOException x) {482// if EOF detected asynchronously then it is reported as error483if (error == ERROR_HANDLE_EOF) {484completed(-1, false);485} else {486// return direct buffer to cache if substituted487releaseBufferIfSubstituted();488489// release waiters490if (isOpen()) {491result.setFailure(x);492} else {493result.setFailure(new AsynchronousCloseException());494}495Invoker.invoke(result);496}497}498}499500@Override501<A> Future<Integer> implRead(ByteBuffer dst,502long position,503A attachment,504CompletionHandler<Integer,? super A> handler)505{506if (!reading)507throw new NonReadableChannelException();508if (position < 0)509throw new IllegalArgumentException("Negative position");510if (dst.isReadOnly())511throw new IllegalArgumentException("Read-only buffer");512513// check if channel is closed514if (!isOpen()) {515Throwable exc = new ClosedChannelException();516if (handler == null)517return CompletedFuture.withFailure(exc);518Invoker.invoke(this, handler, attachment, null, exc);519return null;520}521522int pos = dst.position();523int lim = dst.limit();524assert (pos <= lim);525int rem = (pos <= lim ? lim - pos : 0);526527// no space remaining528if (rem == 0) {529if (handler == null)530return CompletedFuture.withResult(0);531Invoker.invoke(this, handler, attachment, 0, null);532return null;533}534535// create Future and task that initiates read536PendingFuture<Integer,A> result =537new PendingFuture<Integer,A>(this, handler, attachment);538ReadTask<A> readTask = new ReadTask<A>(dst, pos, rem, position, result);539result.setContext(readTask);540541// initiate I/O542readTask.run();543return result;544}545546/**547* Task that initiates write operation and handles completion result.548*/549private class WriteTask<A> implements Runnable, Iocp.ResultHandler {550private final ByteBuffer src;551private final int pos, rem; // buffer position/remaining552private final long position; // file position553private final PendingFuture<Integer,A> result;554555// set to src if direct; otherwise set to substituted direct buffer556private volatile ByteBuffer buf;557558WriteTask(ByteBuffer src,559int pos,560int rem,561long position,562PendingFuture<Integer,A> result)563{564this.src = src;565this.pos = pos;566this.rem = rem;567this.position = position;568this.result = result;569}570571void releaseBufferIfSubstituted() {572if (buf != src)573Util.releaseTemporaryDirectBuffer(buf);574}575576void updatePosition(int bytesTransferred) {577// if the I/O succeeded then adjust buffer position578if (bytesTransferred > 0) {579try {580src.position(pos + bytesTransferred);581} catch (IllegalArgumentException x) {582// someone has changed the position583}584}585}586587@Override588public void run() {589int n = -1;590long overlapped = 0L;591long address;592593// Substitute a native buffer if not direct594if (src instanceof DirectBuffer) {595buf = src;596address = ((DirectBuffer)src).address() + pos;597} else {598buf = Util.getTemporaryDirectBuffer(rem);599buf.put(src);600buf.flip();601// temporarily restore position as we don't know how many bytes602// will be written603src.position(pos);604address = ((DirectBuffer)buf).address();605}606607try {608begin();609610// allocate an OVERLAPPED structure611overlapped = ioCache.add(result);612613// initiate the write614n = writeFile(handle, address, rem, position, overlapped);615if (n == IOStatus.UNAVAILABLE) {616// I/O is pending617return;618} else {619throw new InternalError("Unexpected result: " + n);620}621622} catch (Throwable x) {623// failed to initiate read:624result.setFailure(toIOException(x));625626// release resources627releaseBufferIfSubstituted();628if (overlapped != 0L)629ioCache.remove(overlapped);630631} finally {632end();633}634635// invoke completion handler636Invoker.invoke(result);637}638639/**640* Executed when the I/O has completed641*/642@Override643public void completed(int bytesTransferred, boolean canInvokeDirect) {644updatePosition(bytesTransferred);645646// return direct buffer to cache if substituted647releaseBufferIfSubstituted();648649// release waiters and invoke completion handler650result.setResult(bytesTransferred);651if (canInvokeDirect) {652Invoker.invokeUnchecked(result);653} else {654Invoker.invoke(result);655}656}657658@Override659public void failed(int error, IOException x) {660// return direct buffer to cache if substituted661releaseBufferIfSubstituted();662663// release waiters and invoker completion handler664if (isOpen()) {665result.setFailure(x);666} else {667result.setFailure(new AsynchronousCloseException());668}669Invoker.invoke(result);670}671}672673<A> Future<Integer> implWrite(ByteBuffer src,674long position,675A attachment,676CompletionHandler<Integer,? super A> handler)677{678if (!writing)679throw new NonWritableChannelException();680if (position < 0)681throw new IllegalArgumentException("Negative position");682683// check if channel is closed684if (!isOpen()) {685Throwable exc = new ClosedChannelException();686if (handler == null)687return CompletedFuture.withFailure(exc);688Invoker.invoke(this, handler, attachment, null, exc);689return null;690}691692int pos = src.position();693int lim = src.limit();694assert (pos <= lim);695int rem = (pos <= lim ? lim - pos : 0);696697// nothing to write698if (rem == 0) {699if (handler == null)700return CompletedFuture.withResult(0);701Invoker.invoke(this, handler, attachment, 0, null);702return null;703}704705// create Future and task to initiate write706PendingFuture<Integer,A> result =707new PendingFuture<Integer,A>(this, handler, attachment);708WriteTask<A> writeTask = new WriteTask<A>(src, pos, rem, position, result);709result.setContext(writeTask);710711// initiate I/O712writeTask.run();713return result;714}715716// -- Native methods --717718private static native int readFile(long handle, long address, int len,719long offset, long overlapped) throws IOException;720721private static native int writeFile(long handle, long address, int len,722long offset, long overlapped) throws IOException;723724private static native int lockFile(long handle, long position, long size,725boolean shared, long overlapped) throws IOException;726727static {728IOUtil.load();729}730}731732733