Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
32288 views
1
/*
2
* Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.nio.channels.*;
29
import java.nio.ByteBuffer;
30
import java.nio.BufferOverflowException;
31
import java.net.*;
32
import java.util.concurrent.*;
33
import java.io.IOException;
34
import java.security.AccessController;
35
import java.security.PrivilegedActionException;
36
import java.security.PrivilegedExceptionAction;
37
import sun.misc.Unsafe;
38
39
/**
40
* Windows implementation of AsynchronousSocketChannel using overlapped I/O.
41
*/
42
43
class WindowsAsynchronousSocketChannelImpl
44
extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
45
{
46
private static final Unsafe unsafe = Unsafe.getUnsafe();
47
private static int addressSize = unsafe.addressSize();
48
49
private static int dependsArch(int value32, int value64) {
50
return (addressSize == 4) ? value32 : value64;
51
}
52
53
/*
54
* typedef struct _WSABUF {
55
* u_long len;
56
* char FAR * buf;
57
* } WSABUF;
58
*/
59
private static final int SIZEOF_WSABUF = dependsArch(8, 16);
60
private static final int OFFSETOF_LEN = 0;
61
private static final int OFFSETOF_BUF = dependsArch(4, 8);
62
63
// maximum vector size for scatter/gather I/O
64
private static final int MAX_WSABUF = 16;
65
66
private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
67
68
69
// socket handle. Use begin()/end() around each usage of this handle.
70
final long handle;
71
72
// I/O completion port that the socket is associated with
73
private final Iocp iocp;
74
75
// completion key to identify channel when I/O completes
76
private final int completionKey;
77
78
// Pending I/O operations are tied to an OVERLAPPED structure that can only
79
// be released when the I/O completion event is posted to the completion
80
// port. Where I/O operations complete immediately then it is possible
81
// there may be more than two OVERLAPPED structures in use.
82
private final PendingIoCache ioCache;
83
84
// per-channel arrays of WSABUF structures
85
private final long readBufferArray;
86
private final long writeBufferArray;
87
88
89
WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
90
throws IOException
91
{
92
super(iocp);
93
94
// associate socket with default completion port
95
long h = IOUtil.fdVal(fd);
96
int key = 0;
97
try {
98
key = iocp.associate(this, h);
99
} catch (ShutdownChannelGroupException x) {
100
if (failIfGroupShutdown) {
101
closesocket0(h);
102
throw x;
103
}
104
} catch (IOException x) {
105
closesocket0(h);
106
throw x;
107
}
108
109
this.handle = h;
110
this.iocp = iocp;
111
this.completionKey = key;
112
this.ioCache = new PendingIoCache();
113
114
// allocate WSABUF arrays
115
this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
116
this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
117
}
118
119
WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
120
this(iocp, true);
121
}
122
123
@Override
124
public AsynchronousChannelGroupImpl group() {
125
return iocp;
126
}
127
128
/**
129
* Invoked by Iocp when an I/O operation competes.
130
*/
131
@Override
132
public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
133
return ioCache.remove(overlapped);
134
}
135
136
// invoked by WindowsAsynchronousServerSocketChannelImpl
137
long handle() {
138
return handle;
139
}
140
141
// invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
142
// accept
143
void setConnected(InetSocketAddress localAddress,
144
InetSocketAddress remoteAddress)
145
{
146
synchronized (stateLock) {
147
state = ST_CONNECTED;
148
this.localAddress = localAddress;
149
this.remoteAddress = remoteAddress;
150
}
151
}
152
153
@Override
154
void implClose() throws IOException {
155
// close socket (may cause outstanding async I/O operations to fail).
156
closesocket0(handle);
157
158
// waits until all I/O operations have completed
159
ioCache.close();
160
161
// release arrays of WSABUF structures
162
unsafe.freeMemory(readBufferArray);
163
unsafe.freeMemory(writeBufferArray);
164
165
// finally disassociate from the completion port (key can be 0 if
166
// channel created when group is shutdown)
167
if (completionKey != 0)
168
iocp.disassociate(completionKey);
169
}
170
171
@Override
172
public void onCancel(PendingFuture<?,?> task) {
173
if (task.getContext() instanceof ConnectTask)
174
killConnect();
175
if (task.getContext() instanceof ReadTask)
176
killReading();
177
if (task.getContext() instanceof WriteTask)
178
killWriting();
179
}
180
181
/**
182
* Implements the task to initiate a connection and the handler to
183
* consume the result when the connection is established (or fails).
184
*/
185
private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
186
private final InetSocketAddress remote;
187
private final PendingFuture<Void,A> result;
188
189
ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
190
this.remote = remote;
191
this.result = result;
192
}
193
194
private void closeChannel() {
195
try {
196
close();
197
} catch (IOException ignore) { }
198
}
199
200
private IOException toIOException(Throwable x) {
201
if (x instanceof IOException) {
202
if (x instanceof ClosedChannelException)
203
x = new AsynchronousCloseException();
204
return (IOException)x;
205
}
206
return new IOException(x);
207
}
208
209
/**
210
* Invoke after a connection is successfully established.
211
*/
212
private void afterConnect() throws IOException {
213
updateConnectContext(handle);
214
synchronized (stateLock) {
215
state = ST_CONNECTED;
216
remoteAddress = remote;
217
}
218
}
219
220
/**
221
* Task to initiate a connection.
222
*/
223
@Override
224
public void run() {
225
long overlapped = 0L;
226
Throwable exc = null;
227
try {
228
begin();
229
230
// synchronize on result to allow this thread handle the case
231
// where the connection is established immediately.
232
synchronized (result) {
233
overlapped = ioCache.add(result);
234
// initiate the connection
235
int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
236
remote.getPort(), overlapped);
237
if (n == IOStatus.UNAVAILABLE) {
238
// connection is pending
239
return;
240
}
241
242
// connection established immediately
243
afterConnect();
244
result.setResult(null);
245
}
246
} catch (Throwable x) {
247
if (overlapped != 0L)
248
ioCache.remove(overlapped);
249
exc = x;
250
} finally {
251
end();
252
}
253
254
if (exc != null) {
255
closeChannel();
256
result.setFailure(toIOException(exc));
257
}
258
Invoker.invoke(result);
259
}
260
261
/**
262
* Invoked by handler thread when connection established.
263
*/
264
@Override
265
public void completed(int bytesTransferred, boolean canInvokeDirect) {
266
Throwable exc = null;
267
try {
268
begin();
269
afterConnect();
270
result.setResult(null);
271
} catch (Throwable x) {
272
// channel is closed or unable to finish connect
273
exc = x;
274
} finally {
275
end();
276
}
277
278
// can't close channel while in begin/end block
279
if (exc != null) {
280
closeChannel();
281
result.setFailure(toIOException(exc));
282
}
283
284
if (canInvokeDirect) {
285
Invoker.invokeUnchecked(result);
286
} else {
287
Invoker.invoke(result);
288
}
289
}
290
291
/**
292
* Invoked by handler thread when failed to establish connection.
293
*/
294
@Override
295
public void failed(int error, IOException x) {
296
if (isOpen()) {
297
closeChannel();
298
result.setFailure(x);
299
} else {
300
result.setFailure(new AsynchronousCloseException());
301
}
302
Invoker.invoke(result);
303
}
304
}
305
306
private void doPrivilegedBind(final SocketAddress sa) throws IOException {
307
try {
308
AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
309
public Void run() throws IOException {
310
bind(sa);
311
return null;
312
}
313
});
314
} catch (PrivilegedActionException e) {
315
throw (IOException) e.getException();
316
}
317
}
318
319
@Override
320
<A> Future<Void> implConnect(SocketAddress remote,
321
A attachment,
322
CompletionHandler<Void,? super A> handler)
323
{
324
if (!isOpen()) {
325
Throwable exc = new ClosedChannelException();
326
if (handler == null)
327
return CompletedFuture.withFailure(exc);
328
Invoker.invoke(this, handler, attachment, null, exc);
329
return null;
330
}
331
332
InetSocketAddress isa = Net.checkAddress(remote);
333
334
// permission check
335
SecurityManager sm = System.getSecurityManager();
336
if (sm != null)
337
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
338
339
// check and update state
340
// ConnectEx requires the socket to be bound to a local address
341
IOException bindException = null;
342
synchronized (stateLock) {
343
if (state == ST_CONNECTED)
344
throw new AlreadyConnectedException();
345
if (state == ST_PENDING)
346
throw new ConnectionPendingException();
347
if (localAddress == null) {
348
try {
349
SocketAddress any = new InetSocketAddress(0);
350
if (sm == null) {
351
bind(any);
352
} else {
353
doPrivilegedBind(any);
354
}
355
} catch (IOException x) {
356
bindException = x;
357
}
358
}
359
if (bindException == null)
360
state = ST_PENDING;
361
}
362
363
// handle bind failure
364
if (bindException != null) {
365
try {
366
close();
367
} catch (IOException ignore) { }
368
if (handler == null)
369
return CompletedFuture.withFailure(bindException);
370
Invoker.invoke(this, handler, attachment, null, bindException);
371
return null;
372
}
373
374
// setup task
375
PendingFuture<Void,A> result =
376
new PendingFuture<Void,A>(this, handler, attachment);
377
ConnectTask<A> task = new ConnectTask<A>(isa, result);
378
result.setContext(task);
379
380
// initiate I/O
381
if (Iocp.supportsThreadAgnosticIo()) {
382
task.run();
383
} else {
384
Invoker.invokeOnThreadInThreadPool(this, task);
385
}
386
return result;
387
}
388
389
/**
390
* Implements the task to initiate a read and the handler to consume the
391
* result when the read completes.
392
*/
393
private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
394
private final ByteBuffer[] bufs;
395
private final int numBufs;
396
private final boolean scatteringRead;
397
private final PendingFuture<V,A> result;
398
399
// set by run method
400
private ByteBuffer[] shadow;
401
402
ReadTask(ByteBuffer[] bufs,
403
boolean scatteringRead,
404
PendingFuture<V,A> result)
405
{
406
this.bufs = bufs;
407
this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
408
this.scatteringRead = scatteringRead;
409
this.result = result;
410
}
411
412
/**
413
* Invoked prior to read to prepare the WSABUF array. Where necessary,
414
* it substitutes non-direct buffers with direct buffers.
415
*/
416
void prepareBuffers() {
417
shadow = new ByteBuffer[numBufs];
418
long address = readBufferArray;
419
for (int i=0; i<numBufs; i++) {
420
ByteBuffer dst = bufs[i];
421
int pos = dst.position();
422
int lim = dst.limit();
423
assert (pos <= lim);
424
int rem = (pos <= lim ? lim - pos : 0);
425
long a;
426
if (!(dst instanceof DirectBuffer)) {
427
// substitute with direct buffer
428
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
429
shadow[i] = bb;
430
a = ((DirectBuffer)bb).address();
431
} else {
432
shadow[i] = dst;
433
a = ((DirectBuffer)dst).address() + pos;
434
}
435
unsafe.putAddress(address + OFFSETOF_BUF, a);
436
unsafe.putInt(address + OFFSETOF_LEN, rem);
437
address += SIZEOF_WSABUF;
438
}
439
}
440
441
/**
442
* Invoked after a read has completed to update the buffer positions
443
* and release any substituted buffers.
444
*/
445
void updateBuffers(int bytesRead) {
446
for (int i=0; i<numBufs; i++) {
447
ByteBuffer nextBuffer = shadow[i];
448
int pos = nextBuffer.position();
449
int len = nextBuffer.remaining();
450
if (bytesRead >= len) {
451
bytesRead -= len;
452
int newPosition = pos + len;
453
try {
454
nextBuffer.position(newPosition);
455
} catch (IllegalArgumentException x) {
456
// position changed by another
457
}
458
} else { // Buffers not completely filled
459
if (bytesRead > 0) {
460
assert(pos + bytesRead < (long)Integer.MAX_VALUE);
461
int newPosition = pos + bytesRead;
462
try {
463
nextBuffer.position(newPosition);
464
} catch (IllegalArgumentException x) {
465
// position changed by another
466
}
467
}
468
break;
469
}
470
}
471
472
// Put results from shadow into the slow buffers
473
for (int i=0; i<numBufs; i++) {
474
if (!(bufs[i] instanceof DirectBuffer)) {
475
shadow[i].flip();
476
try {
477
bufs[i].put(shadow[i]);
478
} catch (BufferOverflowException x) {
479
// position changed by another
480
}
481
}
482
}
483
}
484
485
void releaseBuffers() {
486
for (int i=0; i<numBufs; i++) {
487
if (!(bufs[i] instanceof DirectBuffer)) {
488
Util.releaseTemporaryDirectBuffer(shadow[i]);
489
}
490
}
491
}
492
493
@Override
494
@SuppressWarnings("unchecked")
495
public void run() {
496
long overlapped = 0L;
497
boolean prepared = false;
498
boolean pending = false;
499
500
try {
501
begin();
502
503
// substitute non-direct buffers
504
prepareBuffers();
505
prepared = true;
506
507
// get an OVERLAPPED structure (from the cache or allocate)
508
overlapped = ioCache.add(result);
509
510
// initiate read
511
int n = read0(handle, numBufs, readBufferArray, overlapped);
512
if (n == IOStatus.UNAVAILABLE) {
513
// I/O is pending
514
pending = true;
515
return;
516
}
517
if (n == IOStatus.EOF) {
518
// input shutdown
519
enableReading();
520
if (scatteringRead) {
521
result.setResult((V)Long.valueOf(-1L));
522
} else {
523
result.setResult((V)Integer.valueOf(-1));
524
}
525
} else {
526
throw new InternalError("Read completed immediately");
527
}
528
} catch (Throwable x) {
529
// failed to initiate read
530
// reset read flag before releasing waiters
531
enableReading();
532
if (x instanceof ClosedChannelException)
533
x = new AsynchronousCloseException();
534
if (!(x instanceof IOException))
535
x = new IOException(x);
536
result.setFailure(x);
537
} finally {
538
// release resources if I/O not pending
539
if (!pending) {
540
if (overlapped != 0L)
541
ioCache.remove(overlapped);
542
if (prepared)
543
releaseBuffers();
544
}
545
end();
546
}
547
548
// invoke completion handler
549
Invoker.invoke(result);
550
}
551
552
/**
553
* Executed when the I/O has completed
554
*/
555
@Override
556
@SuppressWarnings("unchecked")
557
public void completed(int bytesTransferred, boolean canInvokeDirect) {
558
if (bytesTransferred == 0) {
559
bytesTransferred = -1; // EOF
560
} else {
561
updateBuffers(bytesTransferred);
562
}
563
564
// return direct buffer to cache if substituted
565
releaseBuffers();
566
567
// release waiters if not already released by timeout
568
synchronized (result) {
569
if (result.isDone())
570
return;
571
enableReading();
572
if (scatteringRead) {
573
result.setResult((V)Long.valueOf(bytesTransferred));
574
} else {
575
result.setResult((V)Integer.valueOf(bytesTransferred));
576
}
577
}
578
if (canInvokeDirect) {
579
Invoker.invokeUnchecked(result);
580
} else {
581
Invoker.invoke(result);
582
}
583
}
584
585
@Override
586
public void failed(int error, IOException x) {
587
// return direct buffer to cache if substituted
588
releaseBuffers();
589
590
// release waiters if not already released by timeout
591
if (!isOpen())
592
x = new AsynchronousCloseException();
593
594
synchronized (result) {
595
if (result.isDone())
596
return;
597
enableReading();
598
result.setFailure(x);
599
}
600
Invoker.invoke(result);
601
}
602
603
/**
604
* Invoked if timeout expires before it is cancelled
605
*/
606
void timeout() {
607
// synchronize on result as the I/O could complete/fail
608
synchronized (result) {
609
if (result.isDone())
610
return;
611
612
// kill further reading before releasing waiters
613
enableReading(true);
614
result.setFailure(new InterruptedByTimeoutException());
615
}
616
617
// invoke handler without any locks
618
Invoker.invoke(result);
619
}
620
}
621
622
@Override
623
<V extends Number,A> Future<V> implRead(boolean isScatteringRead,
624
ByteBuffer dst,
625
ByteBuffer[] dsts,
626
long timeout,
627
TimeUnit unit,
628
A attachment,
629
CompletionHandler<V,? super A> handler)
630
{
631
// setup task
632
PendingFuture<V,A> result =
633
new PendingFuture<V,A>(this, handler, attachment);
634
ByteBuffer[] bufs;
635
if (isScatteringRead) {
636
bufs = dsts;
637
} else {
638
bufs = new ByteBuffer[1];
639
bufs[0] = dst;
640
}
641
final ReadTask<V,A> readTask =
642
new ReadTask<V,A>(bufs, isScatteringRead, result);
643
result.setContext(readTask);
644
645
// schedule timeout
646
if (timeout > 0L) {
647
Future<?> timeoutTask = iocp.schedule(new Runnable() {
648
public void run() {
649
readTask.timeout();
650
}
651
}, timeout, unit);
652
result.setTimeoutTask(timeoutTask);
653
}
654
655
// initiate I/O
656
if (Iocp.supportsThreadAgnosticIo()) {
657
readTask.run();
658
} else {
659
Invoker.invokeOnThreadInThreadPool(this, readTask);
660
}
661
return result;
662
}
663
664
/**
665
* Implements the task to initiate a write and the handler to consume the
666
* result when the write completes.
667
*/
668
private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
669
private final ByteBuffer[] bufs;
670
private final int numBufs;
671
private final boolean gatheringWrite;
672
private final PendingFuture<V,A> result;
673
674
// set by run method
675
private ByteBuffer[] shadow;
676
677
WriteTask(ByteBuffer[] bufs,
678
boolean gatheringWrite,
679
PendingFuture<V,A> result)
680
{
681
this.bufs = bufs;
682
this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
683
this.gatheringWrite = gatheringWrite;
684
this.result = result;
685
}
686
687
/**
688
* Invoked prior to write to prepare the WSABUF array. Where necessary,
689
* it substitutes non-direct buffers with direct buffers.
690
*/
691
void prepareBuffers() {
692
shadow = new ByteBuffer[numBufs];
693
long address = writeBufferArray;
694
for (int i=0; i<numBufs; i++) {
695
ByteBuffer src = bufs[i];
696
int pos = src.position();
697
int lim = src.limit();
698
assert (pos <= lim);
699
int rem = (pos <= lim ? lim - pos : 0);
700
long a;
701
if (!(src instanceof DirectBuffer)) {
702
// substitute with direct buffer
703
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
704
bb.put(src);
705
bb.flip();
706
src.position(pos); // leave heap buffer untouched for now
707
shadow[i] = bb;
708
a = ((DirectBuffer)bb).address();
709
} else {
710
shadow[i] = src;
711
a = ((DirectBuffer)src).address() + pos;
712
}
713
unsafe.putAddress(address + OFFSETOF_BUF, a);
714
unsafe.putInt(address + OFFSETOF_LEN, rem);
715
address += SIZEOF_WSABUF;
716
}
717
}
718
719
/**
720
* Invoked after a write has completed to update the buffer positions
721
* and release any substituted buffers.
722
*/
723
void updateBuffers(int bytesWritten) {
724
// Notify the buffers how many bytes were taken
725
for (int i=0; i<numBufs; i++) {
726
ByteBuffer nextBuffer = bufs[i];
727
int pos = nextBuffer.position();
728
int lim = nextBuffer.limit();
729
int len = (pos <= lim ? lim - pos : lim);
730
if (bytesWritten >= len) {
731
bytesWritten -= len;
732
int newPosition = pos + len;
733
try {
734
nextBuffer.position(newPosition);
735
} catch (IllegalArgumentException x) {
736
// position changed by someone else
737
}
738
} else { // Buffers not completely filled
739
if (bytesWritten > 0) {
740
assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
741
int newPosition = pos + bytesWritten;
742
try {
743
nextBuffer.position(newPosition);
744
} catch (IllegalArgumentException x) {
745
// position changed by someone else
746
}
747
}
748
break;
749
}
750
}
751
}
752
753
void releaseBuffers() {
754
for (int i=0; i<numBufs; i++) {
755
if (!(bufs[i] instanceof DirectBuffer)) {
756
Util.releaseTemporaryDirectBuffer(shadow[i]);
757
}
758
}
759
}
760
761
@Override
762
//@SuppressWarnings("unchecked")
763
public void run() {
764
long overlapped = 0L;
765
boolean prepared = false;
766
boolean pending = false;
767
boolean shutdown = false;
768
769
try {
770
begin();
771
772
// substitute non-direct buffers
773
prepareBuffers();
774
prepared = true;
775
776
// get an OVERLAPPED structure (from the cache or allocate)
777
overlapped = ioCache.add(result);
778
int n = write0(handle, numBufs, writeBufferArray, overlapped);
779
if (n == IOStatus.UNAVAILABLE) {
780
// I/O is pending
781
pending = true;
782
return;
783
}
784
if (n == IOStatus.EOF) {
785
// special case for shutdown output
786
shutdown = true;
787
throw new ClosedChannelException();
788
}
789
// write completed immediately
790
throw new InternalError("Write completed immediately");
791
} catch (Throwable x) {
792
// write failed. Enable writing before releasing waiters.
793
enableWriting();
794
if (!shutdown && (x instanceof ClosedChannelException))
795
x = new AsynchronousCloseException();
796
if (!(x instanceof IOException))
797
x = new IOException(x);
798
result.setFailure(x);
799
} finally {
800
// release resources if I/O not pending
801
if (!pending) {
802
if (overlapped != 0L)
803
ioCache.remove(overlapped);
804
if (prepared)
805
releaseBuffers();
806
}
807
end();
808
}
809
810
// invoke completion handler
811
Invoker.invoke(result);
812
}
813
814
/**
815
* Executed when the I/O has completed
816
*/
817
@Override
818
@SuppressWarnings("unchecked")
819
public void completed(int bytesTransferred, boolean canInvokeDirect) {
820
updateBuffers(bytesTransferred);
821
822
// return direct buffer to cache if substituted
823
releaseBuffers();
824
825
// release waiters if not already released by timeout
826
synchronized (result) {
827
if (result.isDone())
828
return;
829
enableWriting();
830
if (gatheringWrite) {
831
result.setResult((V)Long.valueOf(bytesTransferred));
832
} else {
833
result.setResult((V)Integer.valueOf(bytesTransferred));
834
}
835
}
836
if (canInvokeDirect) {
837
Invoker.invokeUnchecked(result);
838
} else {
839
Invoker.invoke(result);
840
}
841
}
842
843
@Override
844
public void failed(int error, IOException x) {
845
// return direct buffer to cache if substituted
846
releaseBuffers();
847
848
// release waiters if not already released by timeout
849
if (!isOpen())
850
x = new AsynchronousCloseException();
851
852
synchronized (result) {
853
if (result.isDone())
854
return;
855
enableWriting();
856
result.setFailure(x);
857
}
858
Invoker.invoke(result);
859
}
860
861
/**
862
* Invoked if timeout expires before it is cancelled
863
*/
864
void timeout() {
865
// synchronize on result as the I/O could complete/fail
866
synchronized (result) {
867
if (result.isDone())
868
return;
869
870
// kill further writing before releasing waiters
871
enableWriting(true);
872
result.setFailure(new InterruptedByTimeoutException());
873
}
874
875
// invoke handler without any locks
876
Invoker.invoke(result);
877
}
878
}
879
880
@Override
881
<V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
882
ByteBuffer src,
883
ByteBuffer[] srcs,
884
long timeout,
885
TimeUnit unit,
886
A attachment,
887
CompletionHandler<V,? super A> handler)
888
{
889
// setup task
890
PendingFuture<V,A> result =
891
new PendingFuture<V,A>(this, handler, attachment);
892
ByteBuffer[] bufs;
893
if (gatheringWrite) {
894
bufs = srcs;
895
} else {
896
bufs = new ByteBuffer[1];
897
bufs[0] = src;
898
}
899
final WriteTask<V,A> writeTask =
900
new WriteTask<V,A>(bufs, gatheringWrite, result);
901
result.setContext(writeTask);
902
903
// schedule timeout
904
if (timeout > 0L) {
905
Future<?> timeoutTask = iocp.schedule(new Runnable() {
906
public void run() {
907
writeTask.timeout();
908
}
909
}, timeout, unit);
910
result.setTimeoutTask(timeoutTask);
911
}
912
913
// initiate I/O (can only be done from thread in thread pool)
914
// initiate I/O
915
if (Iocp.supportsThreadAgnosticIo()) {
916
writeTask.run();
917
} else {
918
Invoker.invokeOnThreadInThreadPool(this, writeTask);
919
}
920
return result;
921
}
922
923
// -- Native methods --
924
925
private static native void initIDs();
926
927
private static native int connect0(long socket, boolean preferIPv6,
928
InetAddress remote, int remotePort, long overlapped) throws IOException;
929
930
private static native void updateConnectContext(long socket) throws IOException;
931
932
private static native int read0(long socket, int count, long addres, long overlapped)
933
throws IOException;
934
935
private static native int write0(long socket, int count, long address,
936
long overlapped) throws IOException;
937
938
private static native void shutdown0(long socket, int how) throws IOException;
939
940
private static native void closesocket0(long socket) throws IOException;
941
942
static {
943
IOUtil.load();
944
initIDs();
945
}
946
}
947
948