Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java
32288 views
1
/*
2
* Copyright (c) 2008, 2018, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.nio.channels.*;
29
import java.nio.ByteBuffer;
30
import java.net.*;
31
import java.util.concurrent.*;
32
import java.io.IOException;
33
import java.io.FileDescriptor;
34
import java.security.AccessController;
35
import sun.net.NetHooks;
36
import sun.security.action.GetPropertyAction;
37
38
/**
39
* Unix implementation of AsynchronousSocketChannel
40
*/
41
42
class UnixAsynchronousSocketChannelImpl
43
extends AsynchronousSocketChannelImpl implements Port.PollableChannel
44
{
45
private final static NativeDispatcher nd = new SocketDispatcher();
46
private static enum OpType { CONNECT, READ, WRITE };
47
48
private static final boolean disableSynchronousRead;
49
static {
50
String propValue = AccessController.doPrivileged(
51
new GetPropertyAction("sun.nio.ch.disableSynchronousRead", "false"));
52
disableSynchronousRead = (propValue.length() == 0) ?
53
true : Boolean.valueOf(propValue);
54
}
55
56
private final Port port;
57
private final int fdVal;
58
59
// used to ensure that the context for I/O operations that complete
60
// ascynrhonously is visible to the pooled threads handling I/O events.
61
private final Object updateLock = new Object();
62
63
// pending connect (updateLock)
64
private boolean connectPending;
65
private CompletionHandler<Void,Object> connectHandler;
66
private Object connectAttachment;
67
private PendingFuture<Void,Object> connectFuture;
68
69
// pending remote address (stateLock)
70
private SocketAddress pendingRemote;
71
72
// pending read (updateLock)
73
private boolean readPending;
74
private boolean isScatteringRead;
75
private ByteBuffer readBuffer;
76
private ByteBuffer[] readBuffers;
77
private CompletionHandler<Number,Object> readHandler;
78
private Object readAttachment;
79
private PendingFuture<Number,Object> readFuture;
80
private Future<?> readTimer;
81
82
// pending write (updateLock)
83
private boolean writePending;
84
private boolean isGatheringWrite;
85
private ByteBuffer writeBuffer;
86
private ByteBuffer[] writeBuffers;
87
private CompletionHandler<Number,Object> writeHandler;
88
private Object writeAttachment;
89
private PendingFuture<Number,Object> writeFuture;
90
private Future<?> writeTimer;
91
92
93
UnixAsynchronousSocketChannelImpl(Port port)
94
throws IOException
95
{
96
super(port);
97
98
// set non-blocking
99
try {
100
IOUtil.configureBlocking(fd, false);
101
} catch (IOException x) {
102
nd.close(fd);
103
throw x;
104
}
105
106
this.port = port;
107
this.fdVal = IOUtil.fdVal(fd);
108
109
// add mapping from file descriptor to this channel
110
port.register(fdVal, this);
111
}
112
113
// Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl
114
UnixAsynchronousSocketChannelImpl(Port port,
115
FileDescriptor fd,
116
InetSocketAddress remote)
117
throws IOException
118
{
119
super(port, fd, remote);
120
121
this.fdVal = IOUtil.fdVal(fd);
122
IOUtil.configureBlocking(fd, false);
123
124
try {
125
port.register(fdVal, this);
126
} catch (ShutdownChannelGroupException x) {
127
// ShutdownChannelGroupException thrown if we attempt to register a
128
// new channel after the group is shutdown
129
throw new IOException(x);
130
}
131
132
this.port = port;
133
}
134
135
@Override
136
public AsynchronousChannelGroupImpl group() {
137
return port;
138
}
139
140
// register events for outstanding I/O operations, caller already owns updateLock
141
private void updateEvents() {
142
assert Thread.holdsLock(updateLock);
143
int events = 0;
144
if (readPending)
145
events |= Net.POLLIN;
146
if (connectPending || writePending)
147
events |= Net.POLLOUT;
148
if (events != 0)
149
port.startPoll(fdVal, events);
150
}
151
152
// register events for outstanding I/O operations
153
private void lockAndUpdateEvents() {
154
synchronized (updateLock) {
155
updateEvents();
156
}
157
}
158
159
// invoke to finish read and/or write operations
160
private void finish(boolean mayInvokeDirect,
161
boolean readable,
162
boolean writable)
163
{
164
boolean finishRead = false;
165
boolean finishWrite = false;
166
boolean finishConnect = false;
167
168
// map event to pending result
169
synchronized (updateLock) {
170
if (readable && this.readPending) {
171
this.readPending = false;
172
finishRead = true;
173
}
174
if (writable) {
175
if (this.writePending) {
176
this.writePending = false;
177
finishWrite = true;
178
} else if (this.connectPending) {
179
this.connectPending = false;
180
finishConnect = true;
181
}
182
}
183
}
184
185
// complete the I/O operation. Special case for when channel is
186
// ready for both reading and writing. In that case, submit task to
187
// complete write if write operation has a completion handler.
188
if (finishRead) {
189
if (finishWrite)
190
finishWrite(false);
191
finishRead(mayInvokeDirect);
192
return;
193
}
194
if (finishWrite) {
195
finishWrite(mayInvokeDirect);
196
}
197
if (finishConnect) {
198
finishConnect(mayInvokeDirect);
199
}
200
}
201
202
/**
203
* Invoked by event handler thread when file descriptor is polled
204
*/
205
@Override
206
public void onEvent(int events, boolean mayInvokeDirect) {
207
boolean readable = (events & Net.POLLIN) > 0;
208
boolean writable = (events & Net.POLLOUT) > 0;
209
if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) {
210
readable = true;
211
writable = true;
212
}
213
finish(mayInvokeDirect, readable, writable);
214
}
215
216
@Override
217
void implClose() throws IOException {
218
// remove the mapping
219
port.unregister(fdVal);
220
221
// close file descriptor
222
nd.close(fd);
223
224
// All outstanding I/O operations are required to fail
225
finish(false, true, true);
226
}
227
228
@Override
229
public void onCancel(PendingFuture<?,?> task) {
230
if (task.getContext() == OpType.CONNECT)
231
killConnect();
232
if (task.getContext() == OpType.READ)
233
killReading();
234
if (task.getContext() == OpType.WRITE)
235
killWriting();
236
}
237
238
// -- connect --
239
240
private void setConnected() throws IOException {
241
synchronized (stateLock) {
242
state = ST_CONNECTED;
243
localAddress = Net.localAddress(fd);
244
remoteAddress = (InetSocketAddress)pendingRemote;
245
}
246
}
247
248
private void finishConnect(boolean mayInvokeDirect) {
249
Throwable e = null;
250
try {
251
begin();
252
checkConnect(fdVal);
253
setConnected();
254
} catch (Throwable x) {
255
if (x instanceof ClosedChannelException)
256
x = new AsynchronousCloseException();
257
e = x;
258
} finally {
259
end();
260
}
261
if (e != null) {
262
// close channel if connection cannot be established
263
try {
264
close();
265
} catch (Throwable suppressed) {
266
e.addSuppressed(suppressed);
267
}
268
}
269
270
// invoke handler and set result
271
CompletionHandler<Void,Object> handler = connectHandler;
272
connectHandler = null;
273
Object att = connectAttachment;
274
PendingFuture<Void,Object> future = connectFuture;
275
if (handler == null) {
276
future.setResult(null, e);
277
} else {
278
if (mayInvokeDirect) {
279
Invoker.invokeUnchecked(handler, att, null, e);
280
} else {
281
Invoker.invokeIndirectly(this, handler, att, null, e);
282
}
283
}
284
}
285
286
@Override
287
@SuppressWarnings("unchecked")
288
<A> Future<Void> implConnect(SocketAddress remote,
289
A attachment,
290
CompletionHandler<Void,? super A> handler)
291
{
292
if (!isOpen()) {
293
Throwable e = new ClosedChannelException();
294
if (handler == null) {
295
return CompletedFuture.withFailure(e);
296
} else {
297
Invoker.invoke(this, handler, attachment, null, e);
298
return null;
299
}
300
}
301
302
InetSocketAddress isa = Net.checkAddress(remote);
303
304
// permission check
305
SecurityManager sm = System.getSecurityManager();
306
if (sm != null)
307
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
308
309
// check and set state
310
boolean notifyBeforeTcpConnect;
311
synchronized (stateLock) {
312
if (state == ST_CONNECTED)
313
throw new AlreadyConnectedException();
314
if (state == ST_PENDING)
315
throw new ConnectionPendingException();
316
state = ST_PENDING;
317
pendingRemote = remote;
318
notifyBeforeTcpConnect = (localAddress == null);
319
}
320
321
Throwable e = null;
322
try {
323
begin();
324
// notify hook if unbound
325
if (notifyBeforeTcpConnect)
326
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
327
int n = Net.connect(fd, isa.getAddress(), isa.getPort());
328
if (n == IOStatus.UNAVAILABLE) {
329
// connection could not be established immediately
330
PendingFuture<Void,A> result = null;
331
synchronized (updateLock) {
332
if (handler == null) {
333
result = new PendingFuture<Void,A>(this, OpType.CONNECT);
334
this.connectFuture = (PendingFuture<Void,Object>)result;
335
} else {
336
this.connectHandler = (CompletionHandler<Void,Object>)handler;
337
this.connectAttachment = attachment;
338
}
339
this.connectPending = true;
340
updateEvents();
341
}
342
return result;
343
}
344
setConnected();
345
} catch (Throwable x) {
346
if (x instanceof ClosedChannelException)
347
x = new AsynchronousCloseException();
348
e = x;
349
} finally {
350
end();
351
}
352
353
// close channel if connect fails
354
if (e != null) {
355
try {
356
close();
357
} catch (Throwable suppressed) {
358
e.addSuppressed(suppressed);
359
}
360
}
361
if (handler == null) {
362
return CompletedFuture.withResult(null, e);
363
} else {
364
Invoker.invoke(this, handler, attachment, null, e);
365
return null;
366
}
367
}
368
369
// -- read --
370
371
private void finishRead(boolean mayInvokeDirect) {
372
int n = -1;
373
Throwable exc = null;
374
375
// copy fields as we can't access them after reading is re-enabled.
376
boolean scattering = isScatteringRead;
377
CompletionHandler<Number,Object> handler = readHandler;
378
Object att = readAttachment;
379
PendingFuture<Number,Object> future = readFuture;
380
Future<?> timeout = readTimer;
381
382
try {
383
begin();
384
385
if (scattering) {
386
n = (int)IOUtil.read(fd, readBuffers, nd);
387
} else {
388
n = IOUtil.read(fd, readBuffer, -1, nd);
389
}
390
if (n == IOStatus.UNAVAILABLE) {
391
// spurious wakeup, is this possible?
392
synchronized (updateLock) {
393
readPending = true;
394
}
395
return;
396
}
397
398
// allow objects to be GC'ed.
399
this.readBuffer = null;
400
this.readBuffers = null;
401
this.readAttachment = null;
402
this.readHandler = null;
403
404
// allow another read to be initiated
405
enableReading();
406
407
} catch (Throwable x) {
408
enableReading();
409
if (x instanceof ClosedChannelException)
410
x = new AsynchronousCloseException();
411
exc = x;
412
} finally {
413
// restart poll in case of concurrent write
414
if (!(exc instanceof AsynchronousCloseException))
415
lockAndUpdateEvents();
416
end();
417
}
418
419
// cancel the associated timer
420
if (timeout != null)
421
timeout.cancel(false);
422
423
// create result
424
Number result = (exc != null) ? null : (scattering) ?
425
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
426
427
// invoke handler or set result
428
if (handler == null) {
429
future.setResult(result, exc);
430
} else {
431
if (mayInvokeDirect) {
432
Invoker.invokeUnchecked(handler, att, result, exc);
433
} else {
434
Invoker.invokeIndirectly(this, handler, att, result, exc);
435
}
436
}
437
}
438
439
private Runnable readTimeoutTask = new Runnable() {
440
public void run() {
441
CompletionHandler<Number,Object> handler = null;
442
Object att = null;
443
PendingFuture<Number,Object> future = null;
444
445
synchronized (updateLock) {
446
if (!readPending)
447
return;
448
readPending = false;
449
handler = readHandler;
450
att = readAttachment;
451
future = readFuture;
452
}
453
454
// kill further reading before releasing waiters
455
enableReading(true);
456
457
// invoke handler or set result
458
Exception exc = new InterruptedByTimeoutException();
459
if (handler == null) {
460
future.setFailure(exc);
461
} else {
462
AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;
463
Invoker.invokeIndirectly(ch, handler, att, null, exc);
464
}
465
}
466
};
467
468
/**
469
* Initiates a read or scattering read operation
470
*/
471
@Override
472
@SuppressWarnings("unchecked")
473
<V extends Number,A> Future<V> implRead(boolean isScatteringRead,
474
ByteBuffer dst,
475
ByteBuffer[] dsts,
476
long timeout,
477
TimeUnit unit,
478
A attachment,
479
CompletionHandler<V,? super A> handler)
480
{
481
// A synchronous read is not attempted if disallowed by system property
482
// or, we are using a fixed thread pool and the completion handler may
483
// not be invoked directly (because the thread is not a pooled thread or
484
// there are too many handlers on the stack).
485
Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null;
486
boolean invokeDirect = false;
487
boolean attemptRead = false;
488
if (!disableSynchronousRead) {
489
if (handler == null) {
490
attemptRead = true;
491
} else {
492
myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
493
invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
494
// okay to attempt read with user thread pool
495
attemptRead = invokeDirect || !port.isFixedThreadPool();
496
}
497
}
498
499
int n = IOStatus.UNAVAILABLE;
500
Throwable exc = null;
501
boolean pending = false;
502
503
try {
504
begin();
505
506
if (attemptRead) {
507
if (isScatteringRead) {
508
n = (int)IOUtil.read(fd, dsts, nd);
509
} else {
510
n = IOUtil.read(fd, dst, -1, nd);
511
}
512
}
513
514
if (n == IOStatus.UNAVAILABLE) {
515
PendingFuture<V,A> result = null;
516
synchronized (updateLock) {
517
this.isScatteringRead = isScatteringRead;
518
this.readBuffer = dst;
519
this.readBuffers = dsts;
520
if (handler == null) {
521
this.readHandler = null;
522
result = new PendingFuture<V,A>(this, OpType.READ);
523
this.readFuture = (PendingFuture<Number,Object>)result;
524
this.readAttachment = null;
525
} else {
526
this.readHandler = (CompletionHandler<Number,Object>)handler;
527
this.readAttachment = attachment;
528
this.readFuture = null;
529
}
530
if (timeout > 0L) {
531
this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
532
}
533
this.readPending = true;
534
updateEvents();
535
}
536
pending = true;
537
return result;
538
}
539
} catch (Throwable x) {
540
if (x instanceof ClosedChannelException)
541
x = new AsynchronousCloseException();
542
exc = x;
543
} finally {
544
if (!pending)
545
enableReading();
546
end();
547
}
548
549
Number result = (exc != null) ? null : (isScatteringRead) ?
550
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
551
552
// read completed immediately
553
if (handler != null) {
554
if (invokeDirect) {
555
Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
556
} else {
557
Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
558
}
559
return null;
560
} else {
561
return CompletedFuture.withResult((V)result, exc);
562
}
563
}
564
565
// -- write --
566
567
private void finishWrite(boolean mayInvokeDirect) {
568
int n = -1;
569
Throwable exc = null;
570
571
// copy fields as we can't access them after reading is re-enabled.
572
boolean gathering = this.isGatheringWrite;
573
CompletionHandler<Number,Object> handler = this.writeHandler;
574
Object att = this.writeAttachment;
575
PendingFuture<Number,Object> future = this.writeFuture;
576
Future<?> timer = this.writeTimer;
577
578
try {
579
begin();
580
581
if (gathering) {
582
n = (int)IOUtil.write(fd, writeBuffers, nd);
583
} else {
584
n = IOUtil.write(fd, writeBuffer, -1, nd);
585
}
586
if (n == IOStatus.UNAVAILABLE) {
587
// spurious wakeup, is this possible?
588
synchronized (updateLock) {
589
writePending = true;
590
}
591
return;
592
}
593
594
// allow objects to be GC'ed.
595
this.writeBuffer = null;
596
this.writeBuffers = null;
597
this.writeAttachment = null;
598
this.writeHandler = null;
599
600
// allow another write to be initiated
601
enableWriting();
602
603
} catch (Throwable x) {
604
enableWriting();
605
if (x instanceof ClosedChannelException)
606
x = new AsynchronousCloseException();
607
exc = x;
608
} finally {
609
// restart poll in case of concurrent write
610
if (!(exc instanceof AsynchronousCloseException))
611
lockAndUpdateEvents();
612
end();
613
}
614
615
// cancel the associated timer
616
if (timer != null)
617
timer.cancel(false);
618
619
// create result
620
Number result = (exc != null) ? null : (gathering) ?
621
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
622
623
// invoke handler or set result
624
if (handler == null) {
625
future.setResult(result, exc);
626
} else {
627
if (mayInvokeDirect) {
628
Invoker.invokeUnchecked(handler, att, result, exc);
629
} else {
630
Invoker.invokeIndirectly(this, handler, att, result, exc);
631
}
632
}
633
}
634
635
private Runnable writeTimeoutTask = new Runnable() {
636
public void run() {
637
CompletionHandler<Number,Object> handler = null;
638
Object att = null;
639
PendingFuture<Number,Object> future = null;
640
641
synchronized (updateLock) {
642
if (!writePending)
643
return;
644
writePending = false;
645
handler = writeHandler;
646
att = writeAttachment;
647
future = writeFuture;
648
}
649
650
// kill further writing before releasing waiters
651
enableWriting(true);
652
653
// invoke handler or set result
654
Exception exc = new InterruptedByTimeoutException();
655
if (handler != null) {
656
Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this,
657
handler, att, null, exc);
658
} else {
659
future.setFailure(exc);
660
}
661
}
662
};
663
664
/**
665
* Initiates a read or scattering read operation
666
*/
667
@Override
668
@SuppressWarnings("unchecked")
669
<V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
670
ByteBuffer src,
671
ByteBuffer[] srcs,
672
long timeout,
673
TimeUnit unit,
674
A attachment,
675
CompletionHandler<V,? super A> handler)
676
{
677
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
678
Invoker.getGroupAndInvokeCount();
679
boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
680
boolean attemptWrite = (handler == null) || invokeDirect ||
681
!port.isFixedThreadPool(); // okay to attempt write with user thread pool
682
683
int n = IOStatus.UNAVAILABLE;
684
Throwable exc = null;
685
boolean pending = false;
686
687
try {
688
begin();
689
690
if (attemptWrite) {
691
if (isGatheringWrite) {
692
n = (int)IOUtil.write(fd, srcs, nd);
693
} else {
694
n = IOUtil.write(fd, src, -1, nd);
695
}
696
}
697
698
if (n == IOStatus.UNAVAILABLE) {
699
PendingFuture<V,A> result = null;
700
synchronized (updateLock) {
701
this.isGatheringWrite = isGatheringWrite;
702
this.writeBuffer = src;
703
this.writeBuffers = srcs;
704
if (handler == null) {
705
this.writeHandler = null;
706
result = new PendingFuture<V,A>(this, OpType.WRITE);
707
this.writeFuture = (PendingFuture<Number,Object>)result;
708
this.writeAttachment = null;
709
} else {
710
this.writeHandler = (CompletionHandler<Number,Object>)handler;
711
this.writeAttachment = attachment;
712
this.writeFuture = null;
713
}
714
if (timeout > 0L) {
715
this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);
716
}
717
this.writePending = true;
718
updateEvents();
719
}
720
pending = true;
721
return result;
722
}
723
} catch (Throwable x) {
724
if (x instanceof ClosedChannelException)
725
x = new AsynchronousCloseException();
726
exc = x;
727
} finally {
728
if (!pending)
729
enableWriting();
730
end();
731
}
732
733
Number result = (exc != null) ? null : (isGatheringWrite) ?
734
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
735
736
// write completed immediately
737
if (handler != null) {
738
if (invokeDirect) {
739
Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
740
} else {
741
Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
742
}
743
return null;
744
} else {
745
return CompletedFuture.withResult((V)result, exc);
746
}
747
}
748
749
// -- Native methods --
750
751
private static native void checkConnect(int fdVal) throws IOException;
752
753
static {
754
IOUtil.load();
755
}
756
}
757
758