Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousFileChannelImpl.java
32288 views
/*1* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package sun.nio.ch;2627import java.nio.channels.*;28import java.util.concurrent.*;29import java.nio.ByteBuffer;30import java.nio.BufferOverflowException;31import java.io.IOException;32import java.io.FileDescriptor;33import sun.misc.SharedSecrets;34import sun.misc.JavaIOFileDescriptorAccess;3536/**37* Windows implementation of AsynchronousFileChannel using overlapped I/O.38*/3940public class WindowsAsynchronousFileChannelImpl41extends AsynchronousFileChannelImpl42implements Iocp.OverlappedChannel, Groupable43{44private static final JavaIOFileDescriptorAccess fdAccess =45SharedSecrets.getJavaIOFileDescriptorAccess();4647// error when EOF is detected asynchronously.48private static final int ERROR_HANDLE_EOF = 38;4950// Lazy initialization of default I/O completion port51private static class DefaultIocpHolder {52static final Iocp defaultIocp = defaultIocp();53private static Iocp defaultIocp() {54try {55return new Iocp(null, ThreadPool.createDefault()).start();56} catch (IOException ioe) {57throw new InternalError(ioe);58}59}60}6162// Used for force/truncate/size methods63private static final FileDispatcher nd = new FileDispatcherImpl();6465// The handle is extracted for use in native methods invoked from this class.66private final long handle;6768// The key that identifies the channel's association with the I/O port69private final int completionKey;7071// I/O completion port (group)72private final Iocp iocp;7374private final boolean isDefaultIocp;7576// Caches OVERLAPPED structure for each outstanding I/O operation77private final PendingIoCache ioCache;787980private WindowsAsynchronousFileChannelImpl(FileDescriptor fdObj,81boolean reading,82boolean writing,83Iocp iocp,84boolean isDefaultIocp)85throws IOException86{87super(fdObj, reading, writing, iocp.executor());88this.handle = fdAccess.getHandle(fdObj);89this.iocp = iocp;90this.isDefaultIocp = isDefaultIocp;91this.ioCache = new PendingIoCache();92this.completionKey = iocp.associate(this, handle);93}9495public static AsynchronousFileChannel open(FileDescriptor fdo,96boolean reading,97boolean writing,98ThreadPool pool)99throws IOException100{101Iocp iocp;102boolean isDefaultIocp;103if (pool == null) {104iocp = DefaultIocpHolder.defaultIocp;105isDefaultIocp = true;106} else {107iocp = new Iocp(null, pool).start();108isDefaultIocp = false;109}110try {111return new112WindowsAsynchronousFileChannelImpl(fdo, reading, writing, iocp, isDefaultIocp);113} catch (IOException x) {114// error binding to port so need to close it (if created for this channel)115if (!isDefaultIocp)116iocp.implClose();117throw x;118}119}120121@Override122public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {123return ioCache.remove(overlapped);124}125126@Override127public void close() throws IOException {128closeLock.writeLock().lock();129try {130if (closed)131return; // already closed132closed = true;133} finally {134closeLock.writeLock().unlock();135}136137// invalidate all locks held for this channel138invalidateAllLocks();139140// close the file141close0(handle);142143// waits until all I/O operations have completed144ioCache.close();145146// disassociate from port147iocp.disassociate(completionKey);148149// for the non-default group close the port150if (!isDefaultIocp)151iocp.detachFromThreadPool();152}153154@Override155public AsynchronousChannelGroupImpl group() {156return iocp;157}158159/**160* Translates Throwable to IOException161*/162private static IOException toIOException(Throwable x) {163if (x instanceof IOException) {164if (x instanceof ClosedChannelException)165x = new AsynchronousCloseException();166return (IOException)x;167}168return new IOException(x);169}170171@Override172public long size() throws IOException {173try {174begin();175return nd.size(fdObj);176} finally {177end();178}179}180181@Override182public AsynchronousFileChannel truncate(long size) throws IOException {183if (size < 0)184throw new IllegalArgumentException("Negative size");185if (!writing)186throw new NonWritableChannelException();187try {188begin();189if (size > nd.size(fdObj))190return this;191nd.truncate(fdObj, size);192} finally {193end();194}195return this;196}197198@Override199public void force(boolean metaData) throws IOException {200try {201begin();202nd.force(fdObj, metaData);203} finally {204end();205}206}207208// -- file locking --209210/**211* Task that initiates locking operation and handles completion result.212*/213private class LockTask<A> implements Runnable, Iocp.ResultHandler {214private final long position;215private final FileLockImpl fli;216private final PendingFuture<FileLock,A> result;217218LockTask(long position,219FileLockImpl fli,220PendingFuture<FileLock,A> result)221{222this.position = position;223this.fli = fli;224this.result = result;225}226227@Override228public void run() {229long overlapped = 0L;230boolean pending = false;231try {232begin();233234// allocate OVERLAPPED structure235overlapped = ioCache.add(result);236237// synchronize on result to avoid race with handler thread238// when lock is acquired immediately.239synchronized (result) {240int n = lockFile(handle, position, fli.size(), fli.isShared(),241overlapped);242if (n == IOStatus.UNAVAILABLE) {243// I/O is pending244pending = true;245return;246}247// acquired lock immediately248result.setResult(fli);249}250251} catch (Throwable x) {252// lock failed or channel closed253removeFromFileLockTable(fli);254result.setFailure(toIOException(x));255} finally {256if (!pending && overlapped != 0L)257ioCache.remove(overlapped);258end();259}260261// invoke completion handler262Invoker.invoke(result);263}264265@Override266public void completed(int bytesTransferred, boolean canInvokeDirect) {267// release waiters and invoke completion handler268result.setResult(fli);269if (canInvokeDirect) {270Invoker.invokeUnchecked(result);271} else {272Invoker.invoke(result);273}274}275276@Override277public void failed(int error, IOException x) {278// lock not acquired so remove from lock table279removeFromFileLockTable(fli);280281// release waiters282if (isOpen()) {283result.setFailure(x);284} else {285result.setFailure(new AsynchronousCloseException());286}287Invoker.invoke(result);288}289}290291@Override292<A> Future<FileLock> implLock(final long position,293final long size,294final boolean shared,295A attachment,296final CompletionHandler<FileLock,? super A> handler)297{298if (shared && !reading)299throw new NonReadableChannelException();300if (!shared && !writing)301throw new NonWritableChannelException();302303// add to lock table304FileLockImpl fli = addToFileLockTable(position, size, shared);305if (fli == null) {306Throwable exc = new ClosedChannelException();307if (handler == null)308return CompletedFuture.withFailure(exc);309Invoker.invoke(this, handler, attachment, null, exc);310return null;311}312313// create Future and task that will be invoked to acquire lock314PendingFuture<FileLock,A> result =315new PendingFuture<FileLock,A>(this, handler, attachment);316LockTask<A> lockTask = new LockTask<A>(position, fli, result);317result.setContext(lockTask);318319// initiate I/O320if (Iocp.supportsThreadAgnosticIo()) {321lockTask.run();322} else {323boolean executed = false;324try {325Invoker.invokeOnThreadInThreadPool(this, lockTask);326executed = true;327} finally {328if (!executed) {329// rollback330removeFromFileLockTable(fli);331}332}333}334return result;335}336337static final int NO_LOCK = -1; // Failed to lock338static final int LOCKED = 0; // Obtained requested lock339340@Override341public FileLock tryLock(long position, long size, boolean shared)342throws IOException343{344if (shared && !reading)345throw new NonReadableChannelException();346if (!shared && !writing)347throw new NonWritableChannelException();348349// add to lock table350final FileLockImpl fli = addToFileLockTable(position, size, shared);351if (fli == null)352throw new ClosedChannelException();353354boolean gotLock = false;355try {356begin();357// try to acquire the lock358int res = nd.lock(fdObj, false, position, size, shared);359if (res == NO_LOCK)360return null;361gotLock = true;362return fli;363} finally {364if (!gotLock)365removeFromFileLockTable(fli);366end();367}368}369370@Override371protected void implRelease(FileLockImpl fli) throws IOException {372nd.release(fdObj, fli.position(), fli.size());373}374375/**376* Task that initiates read operation and handles completion result.377*/378private class ReadTask<A> implements Runnable, Iocp.ResultHandler {379private final ByteBuffer dst;380private final int pos, rem; // buffer position/remaining381private final long position; // file position382private final PendingFuture<Integer,A> result;383384// set to dst if direct; otherwise set to substituted direct buffer385private volatile ByteBuffer buf;386387ReadTask(ByteBuffer dst,388int pos,389int rem,390long position,391PendingFuture<Integer,A> result)392{393this.dst = dst;394this.pos = pos;395this.rem = rem;396this.position = position;397this.result = result;398}399400void releaseBufferIfSubstituted() {401if (buf != dst)402Util.releaseTemporaryDirectBuffer(buf);403}404405void updatePosition(int bytesTransferred) {406// if the I/O succeeded then adjust buffer position407if (bytesTransferred > 0) {408if (buf == dst) {409try {410dst.position(pos + bytesTransferred);411} catch (IllegalArgumentException x) {412// someone has changed the position; ignore413}414} else {415// had to substitute direct buffer416buf.position(bytesTransferred).flip();417try {418dst.put(buf);419} catch (BufferOverflowException x) {420// someone has changed the position; ignore421}422}423}424}425426@Override427public void run() {428int n = -1;429long overlapped = 0L;430long address;431432// Substitute a native buffer if not direct433if (dst instanceof DirectBuffer) {434buf = dst;435address = ((DirectBuffer)dst).address() + pos;436} else {437buf = Util.getTemporaryDirectBuffer(rem);438address = ((DirectBuffer)buf).address();439}440441boolean pending = false;442try {443begin();444445// allocate OVERLAPPED446overlapped = ioCache.add(result);447448// initiate read449n = readFile(handle, address, rem, position, overlapped);450if (n == IOStatus.UNAVAILABLE) {451// I/O is pending452pending = true;453return;454} else if (n == IOStatus.EOF) {455result.setResult(n);456} else {457throw new InternalError("Unexpected result: " + n);458}459460} catch (Throwable x) {461// failed to initiate read462result.setFailure(toIOException(x));463} finally {464if (!pending) {465// release resources466if (overlapped != 0L)467ioCache.remove(overlapped);468releaseBufferIfSubstituted();469}470end();471}472473// invoke completion handler474Invoker.invoke(result);475}476477/**478* Executed when the I/O has completed479*/480@Override481public void completed(int bytesTransferred, boolean canInvokeDirect) {482updatePosition(bytesTransferred);483484// return direct buffer to cache if substituted485releaseBufferIfSubstituted();486487// release waiters and invoke completion handler488result.setResult(bytesTransferred);489if (canInvokeDirect) {490Invoker.invokeUnchecked(result);491} else {492Invoker.invoke(result);493}494}495496@Override497public void failed(int error, IOException x) {498// if EOF detected asynchronously then it is reported as error499if (error == ERROR_HANDLE_EOF) {500completed(-1, false);501} else {502// return direct buffer to cache if substituted503releaseBufferIfSubstituted();504505// release waiters506if (isOpen()) {507result.setFailure(x);508} else {509result.setFailure(new AsynchronousCloseException());510}511Invoker.invoke(result);512}513}514}515516@Override517<A> Future<Integer> implRead(ByteBuffer dst,518long position,519A attachment,520CompletionHandler<Integer,? super A> handler)521{522if (!reading)523throw new NonReadableChannelException();524if (position < 0)525throw new IllegalArgumentException("Negative position");526if (dst.isReadOnly())527throw new IllegalArgumentException("Read-only buffer");528529// check if channel is closed530if (!isOpen()) {531Throwable exc = new ClosedChannelException();532if (handler == null)533return CompletedFuture.withFailure(exc);534Invoker.invoke(this, handler, attachment, null, exc);535return null;536}537538int pos = dst.position();539int lim = dst.limit();540assert (pos <= lim);541int rem = (pos <= lim ? lim - pos : 0);542543// no space remaining544if (rem == 0) {545if (handler == null)546return CompletedFuture.withResult(0);547Invoker.invoke(this, handler, attachment, 0, null);548return null;549}550551// create Future and task that initiates read552PendingFuture<Integer,A> result =553new PendingFuture<Integer,A>(this, handler, attachment);554ReadTask<A> readTask = new ReadTask<A>(dst, pos, rem, position, result);555result.setContext(readTask);556557// initiate I/O558if (Iocp.supportsThreadAgnosticIo()) {559readTask.run();560} else {561Invoker.invokeOnThreadInThreadPool(this, readTask);562}563return result;564}565566/**567* Task that initiates write operation and handles completion result.568*/569private class WriteTask<A> implements Runnable, Iocp.ResultHandler {570private final ByteBuffer src;571private final int pos, rem; // buffer position/remaining572private final long position; // file position573private final PendingFuture<Integer,A> result;574575// set to src if direct; otherwise set to substituted direct buffer576private volatile ByteBuffer buf;577578WriteTask(ByteBuffer src,579int pos,580int rem,581long position,582PendingFuture<Integer,A> result)583{584this.src = src;585this.pos = pos;586this.rem = rem;587this.position = position;588this.result = result;589}590591void releaseBufferIfSubstituted() {592if (buf != src)593Util.releaseTemporaryDirectBuffer(buf);594}595596void updatePosition(int bytesTransferred) {597// if the I/O succeeded then adjust buffer position598if (bytesTransferred > 0) {599try {600src.position(pos + bytesTransferred);601} catch (IllegalArgumentException x) {602// someone has changed the position603}604}605}606607@Override608public void run() {609int n = -1;610long overlapped = 0L;611long address;612613// Substitute a native buffer if not direct614if (src instanceof DirectBuffer) {615buf = src;616address = ((DirectBuffer)src).address() + pos;617} else {618buf = Util.getTemporaryDirectBuffer(rem);619buf.put(src);620buf.flip();621// temporarily restore position as we don't know how many bytes622// will be written623src.position(pos);624address = ((DirectBuffer)buf).address();625}626627try {628begin();629630// allocate an OVERLAPPED structure631overlapped = ioCache.add(result);632633// initiate the write634n = writeFile(handle, address, rem, position, overlapped);635if (n == IOStatus.UNAVAILABLE) {636// I/O is pending637return;638} else {639throw new InternalError("Unexpected result: " + n);640}641642} catch (Throwable x) {643// failed to initiate read:644result.setFailure(toIOException(x));645646// release resources647if (overlapped != 0L)648ioCache.remove(overlapped);649releaseBufferIfSubstituted();650651} finally {652end();653}654655// invoke completion handler656Invoker.invoke(result);657}658659/**660* Executed when the I/O has completed661*/662@Override663public void completed(int bytesTransferred, boolean canInvokeDirect) {664updatePosition(bytesTransferred);665666// return direct buffer to cache if substituted667releaseBufferIfSubstituted();668669// release waiters and invoke completion handler670result.setResult(bytesTransferred);671if (canInvokeDirect) {672Invoker.invokeUnchecked(result);673} else {674Invoker.invoke(result);675}676}677678@Override679public void failed(int error, IOException x) {680// return direct buffer to cache if substituted681releaseBufferIfSubstituted();682683// release waiters and invoker completion handler684if (isOpen()) {685result.setFailure(x);686} else {687result.setFailure(new AsynchronousCloseException());688}689Invoker.invoke(result);690}691}692693<A> Future<Integer> implWrite(ByteBuffer src,694long position,695A attachment,696CompletionHandler<Integer,? super A> handler)697{698if (!writing)699throw new NonWritableChannelException();700if (position < 0)701throw new IllegalArgumentException("Negative position");702703// check if channel is closed704if (!isOpen()) {705Throwable exc = new ClosedChannelException();706if (handler == null)707return CompletedFuture.withFailure(exc);708Invoker.invoke(this, handler, attachment, null, exc);709return null;710}711712int pos = src.position();713int lim = src.limit();714assert (pos <= lim);715int rem = (pos <= lim ? lim - pos : 0);716717// nothing to write718if (rem == 0) {719if (handler == null)720return CompletedFuture.withResult(0);721Invoker.invoke(this, handler, attachment, 0, null);722return null;723}724725// create Future and task to initiate write726PendingFuture<Integer,A> result =727new PendingFuture<Integer,A>(this, handler, attachment);728WriteTask<A> writeTask = new WriteTask<A>(src, pos, rem, position, result);729result.setContext(writeTask);730731// initiate I/O732if (Iocp.supportsThreadAgnosticIo()) {733writeTask.run();734} else {735Invoker.invokeOnThreadInThreadPool(this, writeTask);736}737return result;738}739740// -- Native methods --741742private static native int readFile(long handle, long address, int len,743long offset, long overlapped) throws IOException;744745private static native int writeFile(long handle, long address, int len,746long offset, long overlapped) throws IOException;747748private static native int lockFile(long handle, long position, long size,749boolean shared, long overlapped) throws IOException;750751private static native void close0(long handle);752753static {754IOUtil.load();755}756}757758759