Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.base/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
41139 views
1
/*
2
* Copyright (c) 2008, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.nio.channels.*;
29
import java.nio.ByteBuffer;
30
import java.nio.BufferOverflowException;
31
import java.net.*;
32
import java.util.concurrent.*;
33
import java.io.IOException;
34
import java.security.AccessController;
35
import java.security.PrivilegedActionException;
36
import java.security.PrivilegedExceptionAction;
37
import jdk.internal.misc.Unsafe;
38
import sun.net.util.SocketExceptions;
39
40
/**
41
* Windows implementation of AsynchronousSocketChannel using overlapped I/O.
42
*/
43
44
class WindowsAsynchronousSocketChannelImpl
45
extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
46
{
47
private static final Unsafe unsafe = Unsafe.getUnsafe();
48
private static int addressSize = unsafe.addressSize();
49
50
private static int dependsArch(int value32, int value64) {
51
return (addressSize == 4) ? value32 : value64;
52
}
53
54
/*
55
* typedef struct _WSABUF {
56
* u_long len;
57
* char FAR * buf;
58
* } WSABUF;
59
*/
60
private static final int SIZEOF_WSABUF = dependsArch(8, 16);
61
private static final int OFFSETOF_LEN = 0;
62
private static final int OFFSETOF_BUF = dependsArch(4, 8);
63
64
// maximum vector size for scatter/gather I/O
65
private static final int MAX_WSABUF = 16;
66
67
private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
68
69
70
// socket handle. Use begin()/end() around each usage of this handle.
71
final long handle;
72
73
// I/O completion port that the socket is associated with
74
private final Iocp iocp;
75
76
// completion key to identify channel when I/O completes
77
private final int completionKey;
78
79
// Pending I/O operations are tied to an OVERLAPPED structure that can only
80
// be released when the I/O completion event is posted to the completion
81
// port. Where I/O operations complete immediately then it is possible
82
// there may be more than two OVERLAPPED structures in use.
83
private final PendingIoCache ioCache;
84
85
// per-channel arrays of WSABUF structures
86
private final long readBufferArray;
87
private final long writeBufferArray;
88
89
90
WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
91
throws IOException
92
{
93
super(iocp);
94
95
// associate socket with default completion port
96
long h = IOUtil.fdVal(fd);
97
int key = 0;
98
try {
99
key = iocp.associate(this, h);
100
} catch (ShutdownChannelGroupException x) {
101
if (failIfGroupShutdown) {
102
closesocket0(h);
103
throw x;
104
}
105
} catch (IOException x) {
106
closesocket0(h);
107
throw x;
108
}
109
110
this.handle = h;
111
this.iocp = iocp;
112
this.completionKey = key;
113
this.ioCache = new PendingIoCache();
114
115
// allocate WSABUF arrays
116
this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
117
this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
118
}
119
120
WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
121
this(iocp, true);
122
}
123
124
@Override
125
public AsynchronousChannelGroupImpl group() {
126
return iocp;
127
}
128
129
/**
130
* Invoked by Iocp when an I/O operation competes.
131
*/
132
@Override
133
public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
134
return ioCache.remove(overlapped);
135
}
136
137
// invoked by WindowsAsynchronousServerSocketChannelImpl
138
long handle() {
139
return handle;
140
}
141
142
// invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
143
// accept
144
void setConnected(InetSocketAddress localAddress,
145
InetSocketAddress remoteAddress)
146
{
147
synchronized (stateLock) {
148
state = ST_CONNECTED;
149
this.localAddress = localAddress;
150
this.remoteAddress = remoteAddress;
151
}
152
}
153
154
@Override
155
void implClose() throws IOException {
156
// close socket (may cause outstanding async I/O operations to fail).
157
closesocket0(handle);
158
159
// waits until all I/O operations have completed
160
ioCache.close();
161
162
// release arrays of WSABUF structures
163
unsafe.freeMemory(readBufferArray);
164
unsafe.freeMemory(writeBufferArray);
165
166
// finally disassociate from the completion port (key can be 0 if
167
// channel created when group is shutdown)
168
if (completionKey != 0)
169
iocp.disassociate(completionKey);
170
}
171
172
@Override
173
public void onCancel(PendingFuture<?,?> task) {
174
if (task.getContext() instanceof ConnectTask)
175
killConnect();
176
if (task.getContext() instanceof ReadTask)
177
killReading();
178
if (task.getContext() instanceof WriteTask)
179
killWriting();
180
}
181
182
/**
183
* Implements the task to initiate a connection and the handler to
184
* consume the result when the connection is established (or fails).
185
*/
186
private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
187
private final InetSocketAddress remote;
188
private final PendingFuture<Void,A> result;
189
190
ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
191
this.remote = remote;
192
this.result = result;
193
}
194
195
private void closeChannel() {
196
try {
197
close();
198
} catch (IOException ignore) { }
199
}
200
201
private IOException toIOException(Throwable x) {
202
if (x instanceof IOException) {
203
if (x instanceof ClosedChannelException)
204
x = new AsynchronousCloseException();
205
return (IOException)x;
206
}
207
return new IOException(x);
208
}
209
210
/**
211
* Invoke after a connection is successfully established.
212
*/
213
private void afterConnect() throws IOException {
214
updateConnectContext(handle);
215
synchronized (stateLock) {
216
state = ST_CONNECTED;
217
remoteAddress = remote;
218
}
219
}
220
221
/**
222
* Task to initiate a connection.
223
*/
224
@Override
225
public void run() {
226
long overlapped = 0L;
227
Throwable exc = null;
228
try {
229
begin();
230
231
// synchronize on result to allow this thread handle the case
232
// where the connection is established immediately.
233
synchronized (result) {
234
overlapped = ioCache.add(result);
235
// initiate the connection
236
int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
237
remote.getPort(), overlapped);
238
if (n == IOStatus.UNAVAILABLE) {
239
// connection is pending
240
return;
241
}
242
243
// connection established immediately
244
afterConnect();
245
result.setResult(null);
246
}
247
} catch (Throwable x) {
248
if (overlapped != 0L)
249
ioCache.remove(overlapped);
250
exc = x;
251
} finally {
252
end();
253
}
254
255
if (exc != null) {
256
closeChannel();
257
exc = SocketExceptions.of(toIOException(exc), remote);
258
result.setFailure(exc);
259
}
260
Invoker.invoke(result);
261
}
262
263
/**
264
* Invoked by handler thread when connection established.
265
*/
266
@Override
267
public void completed(int bytesTransferred, boolean canInvokeDirect) {
268
Throwable exc = null;
269
try {
270
begin();
271
afterConnect();
272
result.setResult(null);
273
} catch (Throwable x) {
274
// channel is closed or unable to finish connect
275
exc = x;
276
} finally {
277
end();
278
}
279
280
// can't close channel while in begin/end block
281
if (exc != null) {
282
closeChannel();
283
IOException ee = toIOException(exc);
284
ee = SocketExceptions.of(ee, remote);
285
result.setFailure(ee);
286
}
287
288
if (canInvokeDirect) {
289
Invoker.invokeUnchecked(result);
290
} else {
291
Invoker.invoke(result);
292
}
293
}
294
295
/**
296
* Invoked by handler thread when failed to establish connection.
297
*/
298
@Override
299
public void failed(int error, IOException x) {
300
x = SocketExceptions.of(x, remote);
301
if (isOpen()) {
302
closeChannel();
303
result.setFailure(x);
304
} else {
305
x = SocketExceptions.of(new AsynchronousCloseException(), remote);
306
result.setFailure(x);
307
}
308
Invoker.invoke(result);
309
}
310
}
311
312
@SuppressWarnings("removal")
313
private void doPrivilegedBind(final SocketAddress sa) throws IOException {
314
try {
315
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
316
public Void run() throws IOException {
317
bind(sa);
318
return null;
319
}
320
});
321
} catch (PrivilegedActionException e) {
322
throw (IOException) e.getException();
323
}
324
}
325
326
@Override
327
<A> Future<Void> implConnect(SocketAddress remote,
328
A attachment,
329
CompletionHandler<Void,? super A> handler)
330
{
331
if (!isOpen()) {
332
Throwable exc = new ClosedChannelException();
333
if (handler == null)
334
return CompletedFuture.withFailure(exc);
335
Invoker.invoke(this, handler, attachment, null, exc);
336
return null;
337
}
338
339
InetSocketAddress isa = Net.checkAddress(remote);
340
341
// permission check
342
@SuppressWarnings("removal")
343
SecurityManager sm = System.getSecurityManager();
344
if (sm != null)
345
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
346
347
// check and update state
348
// ConnectEx requires the socket to be bound to a local address
349
IOException bindException = null;
350
synchronized (stateLock) {
351
if (state == ST_CONNECTED)
352
throw new AlreadyConnectedException();
353
if (state == ST_PENDING)
354
throw new ConnectionPendingException();
355
if (localAddress == null) {
356
try {
357
SocketAddress any = new InetSocketAddress(0);
358
if (sm == null) {
359
bind(any);
360
} else {
361
doPrivilegedBind(any);
362
}
363
} catch (IOException x) {
364
bindException = x;
365
}
366
}
367
if (bindException == null)
368
state = ST_PENDING;
369
}
370
371
// handle bind failure
372
if (bindException != null) {
373
try {
374
close();
375
} catch (IOException ignore) { }
376
if (handler == null)
377
return CompletedFuture.withFailure(bindException);
378
Invoker.invoke(this, handler, attachment, null, bindException);
379
return null;
380
}
381
382
// setup task
383
PendingFuture<Void,A> result =
384
new PendingFuture<Void,A>(this, handler, attachment);
385
ConnectTask<A> task = new ConnectTask<A>(isa, result);
386
result.setContext(task);
387
388
// initiate I/O
389
task.run();
390
return result;
391
}
392
393
/**
394
* Implements the task to initiate a read and the handler to consume the
395
* result when the read completes.
396
*/
397
private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
398
private final ByteBuffer[] bufs;
399
private final int numBufs;
400
private final boolean scatteringRead;
401
private final PendingFuture<V,A> result;
402
403
// set by run method
404
private ByteBuffer[] shadow;
405
private Runnable scopeHandleReleasers;
406
407
ReadTask(ByteBuffer[] bufs,
408
boolean scatteringRead,
409
PendingFuture<V,A> result)
410
{
411
this.bufs = bufs;
412
this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
413
this.scatteringRead = scatteringRead;
414
this.result = result;
415
}
416
417
/**
418
* Invoked prior to read to prepare the WSABUF array. Where necessary,
419
* it substitutes non-direct buffers with direct buffers.
420
*/
421
void prepareBuffers() {
422
scopeHandleReleasers = IOUtil.acquireScopes(bufs);
423
shadow = new ByteBuffer[numBufs];
424
long address = readBufferArray;
425
for (int i=0; i<numBufs; i++) {
426
ByteBuffer dst = bufs[i];
427
int pos = dst.position();
428
int lim = dst.limit();
429
assert (pos <= lim);
430
int rem = (pos <= lim ? lim - pos : 0);
431
long a;
432
if (!(dst instanceof DirectBuffer)) {
433
// substitute with direct buffer
434
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
435
shadow[i] = bb;
436
a = IOUtil.bufferAddress(bb);
437
} else {
438
shadow[i] = dst;
439
a = IOUtil.bufferAddress(dst) + pos;
440
}
441
unsafe.putAddress(address + OFFSETOF_BUF, a);
442
unsafe.putInt(address + OFFSETOF_LEN, rem);
443
address += SIZEOF_WSABUF;
444
}
445
}
446
447
/**
448
* Invoked after a read has completed to update the buffer positions
449
* and release any substituted buffers.
450
*/
451
void updateBuffers(int bytesRead) {
452
for (int i=0; i<numBufs; i++) {
453
ByteBuffer nextBuffer = shadow[i];
454
int pos = nextBuffer.position();
455
int len = nextBuffer.remaining();
456
if (bytesRead >= len) {
457
bytesRead -= len;
458
int newPosition = pos + len;
459
try {
460
nextBuffer.position(newPosition);
461
} catch (IllegalArgumentException x) {
462
// position changed by another
463
}
464
} else { // Buffers not completely filled
465
if (bytesRead > 0) {
466
assert(pos + bytesRead < (long)Integer.MAX_VALUE);
467
int newPosition = pos + bytesRead;
468
try {
469
nextBuffer.position(newPosition);
470
} catch (IllegalArgumentException x) {
471
// position changed by another
472
}
473
}
474
break;
475
}
476
}
477
478
// Put results from shadow into the slow buffers
479
for (int i=0; i<numBufs; i++) {
480
if (!(bufs[i] instanceof DirectBuffer)) {
481
shadow[i].flip();
482
try {
483
bufs[i].put(shadow[i]);
484
} catch (BufferOverflowException x) {
485
// position changed by another
486
}
487
}
488
}
489
}
490
491
void releaseBuffers() {
492
for (int i=0; i<numBufs; i++) {
493
if (!(bufs[i] instanceof DirectBuffer)) {
494
Util.releaseTemporaryDirectBuffer(shadow[i]);
495
}
496
}
497
IOUtil.releaseScopes(scopeHandleReleasers);
498
}
499
500
@Override
501
@SuppressWarnings("unchecked")
502
public void run() {
503
long overlapped = 0L;
504
boolean prepared = false;
505
boolean pending = false;
506
507
try {
508
begin();
509
510
// substitute non-direct buffers
511
prepareBuffers();
512
prepared = true;
513
514
// get an OVERLAPPED structure (from the cache or allocate)
515
overlapped = ioCache.add(result);
516
517
// initiate read
518
int n = read0(handle, numBufs, readBufferArray, overlapped);
519
if (n == IOStatus.UNAVAILABLE) {
520
// I/O is pending
521
pending = true;
522
return;
523
}
524
if (n == IOStatus.EOF) {
525
// input shutdown
526
enableReading();
527
if (scatteringRead) {
528
result.setResult((V)Long.valueOf(-1L));
529
} else {
530
result.setResult((V)Integer.valueOf(-1));
531
}
532
} else {
533
throw new InternalError("Read completed immediately");
534
}
535
} catch (Throwable x) {
536
// failed to initiate read
537
// reset read flag before releasing waiters
538
enableReading();
539
if (x instanceof ClosedChannelException)
540
x = new AsynchronousCloseException();
541
if (!(x instanceof IOException))
542
x = new IOException(x);
543
result.setFailure(x);
544
} finally {
545
// release resources if I/O not pending
546
if (!pending) {
547
if (overlapped != 0L)
548
ioCache.remove(overlapped);
549
if (prepared)
550
releaseBuffers();
551
}
552
end();
553
}
554
555
// invoke completion handler
556
Invoker.invoke(result);
557
}
558
559
/**
560
* Executed when the I/O has completed
561
*/
562
@Override
563
@SuppressWarnings("unchecked")
564
public void completed(int bytesTransferred, boolean canInvokeDirect) {
565
if (bytesTransferred == 0) {
566
bytesTransferred = -1; // EOF
567
} else {
568
updateBuffers(bytesTransferred);
569
}
570
571
// return direct buffer to cache if substituted
572
releaseBuffers();
573
574
// release waiters if not already released by timeout
575
synchronized (result) {
576
if (result.isDone())
577
return;
578
enableReading();
579
if (scatteringRead) {
580
result.setResult((V)Long.valueOf(bytesTransferred));
581
} else {
582
result.setResult((V)Integer.valueOf(bytesTransferred));
583
}
584
}
585
if (canInvokeDirect) {
586
Invoker.invokeUnchecked(result);
587
} else {
588
Invoker.invoke(result);
589
}
590
}
591
592
@Override
593
public void failed(int error, IOException x) {
594
// return direct buffer to cache if substituted
595
releaseBuffers();
596
597
// release waiters if not already released by timeout
598
if (!isOpen())
599
x = new AsynchronousCloseException();
600
601
synchronized (result) {
602
if (result.isDone())
603
return;
604
enableReading();
605
result.setFailure(x);
606
}
607
Invoker.invoke(result);
608
}
609
610
/**
611
* Invoked if timeout expires before it is cancelled
612
*/
613
void timeout() {
614
// synchronize on result as the I/O could complete/fail
615
synchronized (result) {
616
if (result.isDone())
617
return;
618
619
// kill further reading before releasing waiters
620
enableReading(true);
621
result.setFailure(new InterruptedByTimeoutException());
622
}
623
624
// invoke handler without any locks
625
Invoker.invoke(result);
626
}
627
}
628
629
@Override
630
<V extends Number,A> Future<V> implRead(boolean isScatteringRead,
631
ByteBuffer dst,
632
ByteBuffer[] dsts,
633
long timeout,
634
TimeUnit unit,
635
A attachment,
636
CompletionHandler<V,? super A> handler)
637
{
638
// setup task
639
PendingFuture<V,A> result =
640
new PendingFuture<V,A>(this, handler, attachment);
641
ByteBuffer[] bufs;
642
if (isScatteringRead) {
643
bufs = dsts;
644
} else {
645
bufs = new ByteBuffer[1];
646
bufs[0] = dst;
647
}
648
final ReadTask<V,A> readTask =
649
new ReadTask<V,A>(bufs, isScatteringRead, result);
650
result.setContext(readTask);
651
652
// schedule timeout
653
if (timeout > 0L) {
654
Future<?> timeoutTask = iocp.schedule(new Runnable() {
655
public void run() {
656
readTask.timeout();
657
}
658
}, timeout, unit);
659
result.setTimeoutTask(timeoutTask);
660
}
661
662
// initiate I/O
663
readTask.run();
664
return result;
665
}
666
667
/**
668
* Implements the task to initiate a write and the handler to consume the
669
* result when the write completes.
670
*/
671
private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
672
private final ByteBuffer[] bufs;
673
private final int numBufs;
674
private final boolean gatheringWrite;
675
private final PendingFuture<V,A> result;
676
677
// set by run method
678
private ByteBuffer[] shadow;
679
private Runnable scopeHandleReleasers;
680
681
WriteTask(ByteBuffer[] bufs,
682
boolean gatheringWrite,
683
PendingFuture<V,A> result)
684
{
685
this.bufs = bufs;
686
this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
687
this.gatheringWrite = gatheringWrite;
688
this.result = result;
689
}
690
691
/**
692
* Invoked prior to write to prepare the WSABUF array. Where necessary,
693
* it substitutes non-direct buffers with direct buffers.
694
*/
695
void prepareBuffers() {
696
scopeHandleReleasers = IOUtil.acquireScopes(bufs);
697
shadow = new ByteBuffer[numBufs];
698
long address = writeBufferArray;
699
for (int i=0; i<numBufs; i++) {
700
ByteBuffer src = bufs[i];
701
int pos = src.position();
702
int lim = src.limit();
703
assert (pos <= lim);
704
int rem = (pos <= lim ? lim - pos : 0);
705
long a;
706
if (!(src instanceof DirectBuffer)) {
707
// substitute with direct buffer
708
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
709
bb.put(src);
710
bb.flip();
711
src.position(pos); // leave heap buffer untouched for now
712
shadow[i] = bb;
713
a = IOUtil.bufferAddress(bb);
714
} else {
715
shadow[i] = src;
716
a = IOUtil.bufferAddress(src) + pos;
717
}
718
unsafe.putAddress(address + OFFSETOF_BUF, a);
719
unsafe.putInt(address + OFFSETOF_LEN, rem);
720
address += SIZEOF_WSABUF;
721
}
722
}
723
724
/**
725
* Invoked after a write has completed to update the buffer positions
726
* and release any substituted buffers.
727
*/
728
void updateBuffers(int bytesWritten) {
729
// Notify the buffers how many bytes were taken
730
for (int i=0; i<numBufs; i++) {
731
ByteBuffer nextBuffer = bufs[i];
732
int pos = nextBuffer.position();
733
int lim = nextBuffer.limit();
734
int len = (pos <= lim ? lim - pos : lim);
735
if (bytesWritten >= len) {
736
bytesWritten -= len;
737
int newPosition = pos + len;
738
try {
739
nextBuffer.position(newPosition);
740
} catch (IllegalArgumentException x) {
741
// position changed by someone else
742
}
743
} else { // Buffers not completely filled
744
if (bytesWritten > 0) {
745
assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
746
int newPosition = pos + bytesWritten;
747
try {
748
nextBuffer.position(newPosition);
749
} catch (IllegalArgumentException x) {
750
// position changed by someone else
751
}
752
}
753
break;
754
}
755
}
756
}
757
758
void releaseBuffers() {
759
for (int i=0; i<numBufs; i++) {
760
if (!(bufs[i] instanceof DirectBuffer)) {
761
Util.releaseTemporaryDirectBuffer(shadow[i]);
762
}
763
}
764
IOUtil.releaseScopes(scopeHandleReleasers);
765
}
766
767
@Override
768
//@SuppressWarnings("unchecked")
769
public void run() {
770
long overlapped = 0L;
771
boolean prepared = false;
772
boolean pending = false;
773
boolean shutdown = false;
774
775
try {
776
begin();
777
778
// substitute non-direct buffers
779
prepareBuffers();
780
prepared = true;
781
782
// get an OVERLAPPED structure (from the cache or allocate)
783
overlapped = ioCache.add(result);
784
int n = write0(handle, numBufs, writeBufferArray, overlapped);
785
if (n == IOStatus.UNAVAILABLE) {
786
// I/O is pending
787
pending = true;
788
return;
789
}
790
if (n == IOStatus.EOF) {
791
// special case for shutdown output
792
shutdown = true;
793
throw new ClosedChannelException();
794
}
795
// write completed immediately
796
throw new InternalError("Write completed immediately");
797
} catch (Throwable x) {
798
// write failed. Enable writing before releasing waiters.
799
enableWriting();
800
if (!shutdown && (x instanceof ClosedChannelException))
801
x = new AsynchronousCloseException();
802
if (!(x instanceof IOException))
803
x = new IOException(x);
804
result.setFailure(x);
805
} finally {
806
// release resources if I/O not pending
807
if (!pending) {
808
if (overlapped != 0L)
809
ioCache.remove(overlapped);
810
if (prepared)
811
releaseBuffers();
812
}
813
end();
814
}
815
816
// invoke completion handler
817
Invoker.invoke(result);
818
}
819
820
/**
821
* Executed when the I/O has completed
822
*/
823
@Override
824
@SuppressWarnings("unchecked")
825
public void completed(int bytesTransferred, boolean canInvokeDirect) {
826
updateBuffers(bytesTransferred);
827
828
// return direct buffer to cache if substituted
829
releaseBuffers();
830
831
// release waiters if not already released by timeout
832
synchronized (result) {
833
if (result.isDone())
834
return;
835
enableWriting();
836
if (gatheringWrite) {
837
result.setResult((V)Long.valueOf(bytesTransferred));
838
} else {
839
result.setResult((V)Integer.valueOf(bytesTransferred));
840
}
841
}
842
if (canInvokeDirect) {
843
Invoker.invokeUnchecked(result);
844
} else {
845
Invoker.invoke(result);
846
}
847
}
848
849
@Override
850
public void failed(int error, IOException x) {
851
// return direct buffer to cache if substituted
852
releaseBuffers();
853
854
// release waiters if not already released by timeout
855
if (!isOpen())
856
x = new AsynchronousCloseException();
857
858
synchronized (result) {
859
if (result.isDone())
860
return;
861
enableWriting();
862
result.setFailure(x);
863
}
864
Invoker.invoke(result);
865
}
866
867
/**
868
* Invoked if timeout expires before it is cancelled
869
*/
870
void timeout() {
871
// synchronize on result as the I/O could complete/fail
872
synchronized (result) {
873
if (result.isDone())
874
return;
875
876
// kill further writing before releasing waiters
877
enableWriting(true);
878
result.setFailure(new InterruptedByTimeoutException());
879
}
880
881
// invoke handler without any locks
882
Invoker.invoke(result);
883
}
884
}
885
886
@Override
887
<V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
888
ByteBuffer src,
889
ByteBuffer[] srcs,
890
long timeout,
891
TimeUnit unit,
892
A attachment,
893
CompletionHandler<V,? super A> handler)
894
{
895
// setup task
896
PendingFuture<V,A> result =
897
new PendingFuture<V,A>(this, handler, attachment);
898
ByteBuffer[] bufs;
899
if (gatheringWrite) {
900
bufs = srcs;
901
} else {
902
bufs = new ByteBuffer[1];
903
bufs[0] = src;
904
}
905
final WriteTask<V,A> writeTask =
906
new WriteTask<V,A>(bufs, gatheringWrite, result);
907
result.setContext(writeTask);
908
909
// schedule timeout
910
if (timeout > 0L) {
911
Future<?> timeoutTask = iocp.schedule(new Runnable() {
912
public void run() {
913
writeTask.timeout();
914
}
915
}, timeout, unit);
916
result.setTimeoutTask(timeoutTask);
917
}
918
919
// initiate I/O
920
writeTask.run();
921
return result;
922
}
923
924
// -- Native methods --
925
926
private static native void initIDs();
927
928
private static native int connect0(long socket, boolean preferIPv6,
929
InetAddress remote, int remotePort, long overlapped) throws IOException;
930
931
private static native void updateConnectContext(long socket) throws IOException;
932
933
private static native int read0(long socket, int count, long addres, long overlapped)
934
throws IOException;
935
936
private static native int write0(long socket, int count, long address,
937
long overlapped) throws IOException;
938
939
private static native void shutdown0(long socket, int how) throws IOException;
940
941
private static native void closesocket0(long socket) throws IOException;
942
943
static {
944
IOUtil.load();
945
initIDs();
946
}
947
}
948
949