Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/mobile
Path: blob/master/src/java.base/unix/classes/sun/nio/ch/UnixAsynchronousSocketChannelImpl.java
41137 views
1
/*
2
* Copyright (c) 2008, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package sun.nio.ch;
27
28
import java.nio.channels.*;
29
import java.nio.ByteBuffer;
30
import java.net.*;
31
import java.util.concurrent.*;
32
import java.io.IOException;
33
import java.io.FileDescriptor;
34
35
import sun.net.ConnectionResetException;
36
import sun.net.NetHooks;
37
import sun.net.util.SocketExceptions;
38
import sun.security.action.GetPropertyAction;
39
40
/**
41
* Unix implementation of AsynchronousSocketChannel
42
*/
43
44
class UnixAsynchronousSocketChannelImpl
45
extends AsynchronousSocketChannelImpl implements Port.PollableChannel
46
{
47
private static final NativeDispatcher nd = new SocketDispatcher();
48
private static enum OpType { CONNECT, READ, WRITE };
49
50
private static final boolean disableSynchronousRead;
51
static {
52
String propValue = GetPropertyAction.privilegedGetProperty(
53
"sun.nio.ch.disableSynchronousRead", "false");
54
disableSynchronousRead = propValue.isEmpty() ?
55
true : Boolean.parseBoolean(propValue);
56
}
57
58
private final Port port;
59
private final int fdVal;
60
61
// used to ensure that the context for I/O operations that complete
62
// ascynrhonously is visible to the pooled threads handling I/O events.
63
private final Object updateLock = new Object();
64
65
// pending connect (updateLock)
66
private boolean connectPending;
67
private CompletionHandler<Void,Object> connectHandler;
68
private Object connectAttachment;
69
private PendingFuture<Void,Object> connectFuture;
70
71
// pending remote address (stateLock)
72
private SocketAddress pendingRemote;
73
74
// pending read (updateLock)
75
private boolean readPending;
76
private boolean isScatteringRead;
77
private ByteBuffer readBuffer;
78
private ByteBuffer[] readBuffers;
79
private Runnable readScopeHandleReleasers;
80
private CompletionHandler<Number,Object> readHandler;
81
private Object readAttachment;
82
private PendingFuture<Number,Object> readFuture;
83
private Future<?> readTimer;
84
85
// pending write (updateLock)
86
private boolean writePending;
87
private boolean isGatheringWrite;
88
private ByteBuffer writeBuffer;
89
private ByteBuffer[] writeBuffers;
90
private Runnable writeScopeHandleReleasers;
91
private CompletionHandler<Number,Object> writeHandler;
92
private Object writeAttachment;
93
private PendingFuture<Number,Object> writeFuture;
94
private Future<?> writeTimer;
95
96
97
UnixAsynchronousSocketChannelImpl(Port port)
98
throws IOException
99
{
100
super(port);
101
102
// set non-blocking
103
try {
104
IOUtil.configureBlocking(fd, false);
105
} catch (IOException x) {
106
nd.close(fd);
107
throw x;
108
}
109
110
this.port = port;
111
this.fdVal = IOUtil.fdVal(fd);
112
113
// add mapping from file descriptor to this channel
114
port.register(fdVal, this);
115
}
116
117
// Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl
118
UnixAsynchronousSocketChannelImpl(Port port,
119
FileDescriptor fd,
120
InetSocketAddress remote)
121
throws IOException
122
{
123
super(port, fd, remote);
124
125
this.fdVal = IOUtil.fdVal(fd);
126
IOUtil.configureBlocking(fd, false);
127
128
try {
129
port.register(fdVal, this);
130
} catch (ShutdownChannelGroupException x) {
131
// ShutdownChannelGroupException thrown if we attempt to register a
132
// new channel after the group is shutdown
133
throw new IOException(x);
134
}
135
136
this.port = port;
137
}
138
139
@Override
140
public AsynchronousChannelGroupImpl group() {
141
return port;
142
}
143
144
// register events for outstanding I/O operations, caller already owns updateLock
145
private void updateEvents() {
146
assert Thread.holdsLock(updateLock);
147
int events = 0;
148
if (readPending)
149
events |= Net.POLLIN;
150
if (connectPending || writePending)
151
events |= Net.POLLOUT;
152
if (events != 0)
153
port.startPoll(fdVal, events);
154
}
155
156
// register events for outstanding I/O operations
157
private void lockAndUpdateEvents() {
158
synchronized (updateLock) {
159
updateEvents();
160
}
161
}
162
163
// invoke to finish read and/or write operations
164
private void finish(boolean mayInvokeDirect,
165
boolean readable,
166
boolean writable)
167
{
168
boolean finishRead = false;
169
boolean finishWrite = false;
170
boolean finishConnect = false;
171
172
// map event to pending result
173
synchronized (updateLock) {
174
if (readable && this.readPending) {
175
this.readPending = false;
176
finishRead = true;
177
}
178
if (writable) {
179
if (this.writePending) {
180
this.writePending = false;
181
finishWrite = true;
182
} else if (this.connectPending) {
183
this.connectPending = false;
184
finishConnect = true;
185
}
186
}
187
}
188
189
// complete the I/O operation. Special case for when channel is
190
// ready for both reading and writing. In that case, submit task to
191
// complete write if write operation has a completion handler.
192
if (finishRead) {
193
if (finishWrite)
194
finishWrite(false);
195
finishRead(mayInvokeDirect);
196
return;
197
}
198
if (finishWrite) {
199
finishWrite(mayInvokeDirect);
200
}
201
if (finishConnect) {
202
finishConnect(mayInvokeDirect);
203
}
204
}
205
206
/**
207
* Invoked by event handler thread when file descriptor is polled
208
*/
209
@Override
210
public void onEvent(int events, boolean mayInvokeDirect) {
211
boolean readable = (events & Net.POLLIN) > 0;
212
boolean writable = (events & Net.POLLOUT) > 0;
213
if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) {
214
readable = true;
215
writable = true;
216
}
217
finish(mayInvokeDirect, readable, writable);
218
}
219
220
@Override
221
void implClose() throws IOException {
222
// remove the mapping
223
port.unregister(fdVal);
224
225
// close file descriptor
226
nd.close(fd);
227
228
// All outstanding I/O operations are required to fail
229
finish(false, true, true);
230
}
231
232
@Override
233
public void onCancel(PendingFuture<?,?> task) {
234
if (task.getContext() == OpType.CONNECT)
235
killConnect();
236
if (task.getContext() == OpType.READ)
237
killReading();
238
if (task.getContext() == OpType.WRITE)
239
killWriting();
240
}
241
242
// -- connect --
243
244
private void setConnected() throws IOException {
245
synchronized (stateLock) {
246
state = ST_CONNECTED;
247
localAddress = Net.localAddress(fd);
248
remoteAddress = (InetSocketAddress)pendingRemote;
249
}
250
}
251
252
private void finishConnect(boolean mayInvokeDirect) {
253
Throwable e = null;
254
try {
255
begin();
256
checkConnect(fdVal);
257
setConnected();
258
} catch (Throwable x) {
259
if (x instanceof ClosedChannelException)
260
x = new AsynchronousCloseException();
261
e = x;
262
} finally {
263
end();
264
}
265
if (e != null) {
266
if (e instanceof IOException) {
267
var isa = (InetSocketAddress)pendingRemote;
268
e = SocketExceptions.of((IOException)e, isa);
269
}
270
// close channel if connection cannot be established
271
try {
272
close();
273
} catch (Throwable suppressed) {
274
e.addSuppressed(suppressed);
275
}
276
}
277
278
// invoke handler and set result
279
CompletionHandler<Void,Object> handler = connectHandler;
280
connectHandler = null;
281
Object att = connectAttachment;
282
PendingFuture<Void,Object> future = connectFuture;
283
if (handler == null) {
284
future.setResult(null, e);
285
} else {
286
if (mayInvokeDirect) {
287
Invoker.invokeUnchecked(handler, att, null, e);
288
} else {
289
Invoker.invokeIndirectly(this, handler, att, null, e);
290
}
291
}
292
}
293
294
@Override
295
@SuppressWarnings("unchecked")
296
<A> Future<Void> implConnect(SocketAddress remote,
297
A attachment,
298
CompletionHandler<Void,? super A> handler)
299
{
300
if (!isOpen()) {
301
Throwable e = new ClosedChannelException();
302
if (handler == null) {
303
return CompletedFuture.withFailure(e);
304
} else {
305
Invoker.invoke(this, handler, attachment, null, e);
306
return null;
307
}
308
}
309
310
InetSocketAddress isa = Net.checkAddress(remote);
311
312
// permission check
313
@SuppressWarnings("removal")
314
SecurityManager sm = System.getSecurityManager();
315
if (sm != null)
316
sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
317
318
// check and set state
319
boolean notifyBeforeTcpConnect;
320
synchronized (stateLock) {
321
if (state == ST_CONNECTED)
322
throw new AlreadyConnectedException();
323
if (state == ST_PENDING)
324
throw new ConnectionPendingException();
325
state = ST_PENDING;
326
pendingRemote = remote;
327
notifyBeforeTcpConnect = (localAddress == null);
328
}
329
330
Throwable e = null;
331
try {
332
begin();
333
// notify hook if unbound
334
if (notifyBeforeTcpConnect)
335
NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
336
int n = Net.connect(fd, isa.getAddress(), isa.getPort());
337
if (n == IOStatus.UNAVAILABLE) {
338
// connection could not be established immediately
339
PendingFuture<Void,A> result = null;
340
synchronized (updateLock) {
341
if (handler == null) {
342
result = new PendingFuture<Void,A>(this, OpType.CONNECT);
343
this.connectFuture = (PendingFuture<Void,Object>)result;
344
} else {
345
this.connectHandler = (CompletionHandler<Void,Object>)handler;
346
this.connectAttachment = attachment;
347
}
348
this.connectPending = true;
349
updateEvents();
350
}
351
return result;
352
}
353
setConnected();
354
} catch (Throwable x) {
355
if (x instanceof ClosedChannelException)
356
x = new AsynchronousCloseException();
357
e = x;
358
} finally {
359
end();
360
}
361
362
// close channel if connect fails
363
if (e != null) {
364
if (e instanceof IOException) {
365
e = SocketExceptions.of((IOException)e, isa);
366
}
367
try {
368
close();
369
} catch (Throwable suppressed) {
370
e.addSuppressed(suppressed);
371
}
372
}
373
if (handler == null) {
374
return CompletedFuture.withResult(null, e);
375
} else {
376
Invoker.invoke(this, handler, attachment, null, e);
377
return null;
378
}
379
}
380
381
// -- read --
382
383
private void finishRead(boolean mayInvokeDirect) {
384
int n = -1;
385
Throwable exc = null;
386
387
// copy fields as we can't access them after reading is re-enabled.
388
boolean scattering = isScatteringRead;
389
CompletionHandler<Number,Object> handler = readHandler;
390
Object att = readAttachment;
391
PendingFuture<Number,Object> future = readFuture;
392
Future<?> timeout = readTimer;
393
394
try {
395
begin();
396
397
if (scattering) {
398
n = (int)IOUtil.read(fd, readBuffers, true, nd);
399
} else {
400
n = IOUtil.read(fd, readBuffer, -1, true, nd);
401
}
402
if (n == IOStatus.UNAVAILABLE) {
403
// spurious wakeup, is this possible?
404
synchronized (updateLock) {
405
readPending = true;
406
}
407
return;
408
}
409
410
// allow objects to be GC'ed.
411
this.readBuffer = null;
412
this.readBuffers = null;
413
this.readAttachment = null;
414
this.readHandler = null;
415
IOUtil.releaseScopes(readScopeHandleReleasers);
416
417
// allow another read to be initiated
418
enableReading();
419
420
} catch (Throwable x) {
421
enableReading();
422
if (x instanceof ClosedChannelException)
423
x = new AsynchronousCloseException();
424
if (x instanceof ConnectionResetException)
425
x = new IOException(x.getMessage());
426
exc = x;
427
} finally {
428
// restart poll in case of concurrent write
429
if (!(exc instanceof AsynchronousCloseException))
430
lockAndUpdateEvents();
431
end();
432
}
433
434
// cancel the associated timer
435
if (timeout != null)
436
timeout.cancel(false);
437
438
// create result
439
Number result = (exc != null) ? null : (scattering) ?
440
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
441
442
// invoke handler or set result
443
if (handler == null) {
444
future.setResult(result, exc);
445
} else {
446
if (mayInvokeDirect) {
447
Invoker.invokeUnchecked(handler, att, result, exc);
448
} else {
449
Invoker.invokeIndirectly(this, handler, att, result, exc);
450
}
451
}
452
}
453
454
private Runnable readTimeoutTask = new Runnable() {
455
public void run() {
456
CompletionHandler<Number,Object> handler = null;
457
Object att = null;
458
PendingFuture<Number,Object> future = null;
459
460
synchronized (updateLock) {
461
if (!readPending)
462
return;
463
readPending = false;
464
handler = readHandler;
465
att = readAttachment;
466
future = readFuture;
467
}
468
469
// kill further reading before releasing waiters
470
enableReading(true);
471
472
// invoke handler or set result
473
Exception exc = new InterruptedByTimeoutException();
474
if (handler == null) {
475
future.setFailure(exc);
476
} else {
477
AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;
478
Invoker.invokeIndirectly(ch, handler, att, null, exc);
479
}
480
}
481
};
482
483
/**
484
* Initiates a read or scattering read operation
485
*/
486
@Override
487
@SuppressWarnings("unchecked")
488
<V extends Number,A> Future<V> implRead(boolean isScatteringRead,
489
ByteBuffer dst,
490
ByteBuffer[] dsts,
491
long timeout,
492
TimeUnit unit,
493
A attachment,
494
CompletionHandler<V,? super A> handler)
495
{
496
// A synchronous read is not attempted if disallowed by system property
497
// or, we are using a fixed thread pool and the completion handler may
498
// not be invoked directly (because the thread is not a pooled thread or
499
// there are too many handlers on the stack).
500
Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null;
501
boolean invokeDirect = false;
502
boolean attemptRead = false;
503
if (!disableSynchronousRead) {
504
if (handler == null) {
505
attemptRead = true;
506
} else {
507
myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
508
invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
509
// okay to attempt read with user thread pool
510
attemptRead = invokeDirect || !port.isFixedThreadPool();
511
}
512
}
513
514
int n = IOStatus.UNAVAILABLE;
515
Throwable exc = null;
516
boolean pending = false;
517
518
try {
519
begin();
520
521
if (attemptRead) {
522
if (isScatteringRead) {
523
n = (int)IOUtil.read(fd, dsts, true, nd);
524
} else {
525
n = IOUtil.read(fd, dst, -1, true, nd);
526
}
527
}
528
529
if (n == IOStatus.UNAVAILABLE) {
530
PendingFuture<V,A> result = null;
531
synchronized (updateLock) {
532
this.isScatteringRead = isScatteringRead;
533
this.readScopeHandleReleasers = IOUtil.acquireScopes(dst, dsts);
534
this.readBuffer = dst;
535
this.readBuffers = dsts;
536
if (handler == null) {
537
this.readHandler = null;
538
result = new PendingFuture<V,A>(this, OpType.READ);
539
this.readFuture = (PendingFuture<Number,Object>)result;
540
this.readAttachment = null;
541
} else {
542
this.readHandler = (CompletionHandler<Number,Object>)handler;
543
this.readAttachment = attachment;
544
this.readFuture = null;
545
}
546
if (timeout > 0L) {
547
this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
548
}
549
this.readPending = true;
550
updateEvents();
551
}
552
pending = true;
553
return result;
554
}
555
} catch (Throwable x) {
556
if (x instanceof ClosedChannelException)
557
x = new AsynchronousCloseException();
558
if (x instanceof ConnectionResetException)
559
x = new IOException(x.getMessage());
560
exc = x;
561
} finally {
562
if (!pending)
563
enableReading();
564
end();
565
}
566
567
Number result = (exc != null) ? null : (isScatteringRead) ?
568
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
569
570
// read completed immediately
571
if (handler != null) {
572
if (invokeDirect) {
573
Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
574
} else {
575
Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
576
}
577
return null;
578
} else {
579
return CompletedFuture.withResult((V)result, exc);
580
}
581
}
582
583
// -- write --
584
585
private void finishWrite(boolean mayInvokeDirect) {
586
int n = -1;
587
Throwable exc = null;
588
589
// copy fields as we can't access them after reading is re-enabled.
590
boolean gathering = this.isGatheringWrite;
591
CompletionHandler<Number,Object> handler = this.writeHandler;
592
Object att = this.writeAttachment;
593
PendingFuture<Number,Object> future = this.writeFuture;
594
Future<?> timer = this.writeTimer;
595
596
try {
597
begin();
598
599
if (gathering) {
600
n = (int)IOUtil.write(fd, writeBuffers, true, nd);
601
} else {
602
n = IOUtil.write(fd, writeBuffer, -1, true, nd);
603
}
604
if (n == IOStatus.UNAVAILABLE) {
605
// spurious wakeup, is this possible?
606
synchronized (updateLock) {
607
writePending = true;
608
}
609
return;
610
}
611
612
// allow objects to be GC'ed.
613
this.writeBuffer = null;
614
this.writeBuffers = null;
615
this.writeAttachment = null;
616
this.writeHandler = null;
617
IOUtil.releaseScopes(writeScopeHandleReleasers);
618
619
// allow another write to be initiated
620
enableWriting();
621
622
} catch (Throwable x) {
623
enableWriting();
624
if (x instanceof ClosedChannelException)
625
x = new AsynchronousCloseException();
626
exc = x;
627
} finally {
628
// restart poll in case of concurrent write
629
if (!(exc instanceof AsynchronousCloseException))
630
lockAndUpdateEvents();
631
end();
632
}
633
634
// cancel the associated timer
635
if (timer != null)
636
timer.cancel(false);
637
638
// create result
639
Number result = (exc != null) ? null : (gathering) ?
640
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
641
642
// invoke handler or set result
643
if (handler == null) {
644
future.setResult(result, exc);
645
} else {
646
if (mayInvokeDirect) {
647
Invoker.invokeUnchecked(handler, att, result, exc);
648
} else {
649
Invoker.invokeIndirectly(this, handler, att, result, exc);
650
}
651
}
652
}
653
654
private Runnable writeTimeoutTask = new Runnable() {
655
public void run() {
656
CompletionHandler<Number,Object> handler = null;
657
Object att = null;
658
PendingFuture<Number,Object> future = null;
659
660
synchronized (updateLock) {
661
if (!writePending)
662
return;
663
writePending = false;
664
handler = writeHandler;
665
att = writeAttachment;
666
future = writeFuture;
667
}
668
669
// kill further writing before releasing waiters
670
enableWriting(true);
671
672
// invoke handler or set result
673
Exception exc = new InterruptedByTimeoutException();
674
if (handler != null) {
675
Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this,
676
handler, att, null, exc);
677
} else {
678
future.setFailure(exc);
679
}
680
}
681
};
682
683
/**
684
* Initiates a read or scattering read operation
685
*/
686
@Override
687
@SuppressWarnings("unchecked")
688
<V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
689
ByteBuffer src,
690
ByteBuffer[] srcs,
691
long timeout,
692
TimeUnit unit,
693
A attachment,
694
CompletionHandler<V,? super A> handler)
695
{
696
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
697
Invoker.getGroupAndInvokeCount();
698
boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
699
boolean attemptWrite = (handler == null) || invokeDirect ||
700
!port.isFixedThreadPool(); // okay to attempt write with user thread pool
701
702
int n = IOStatus.UNAVAILABLE;
703
Throwable exc = null;
704
boolean pending = false;
705
706
try {
707
begin();
708
709
if (attemptWrite) {
710
if (isGatheringWrite) {
711
n = (int)IOUtil.write(fd, srcs, true, nd);
712
} else {
713
n = IOUtil.write(fd, src, -1, true, nd);
714
}
715
}
716
717
if (n == IOStatus.UNAVAILABLE) {
718
PendingFuture<V,A> result = null;
719
synchronized (updateLock) {
720
this.isGatheringWrite = isGatheringWrite;
721
this.writeScopeHandleReleasers = IOUtil.acquireScopes(src, srcs);
722
this.writeBuffer = src;
723
this.writeBuffers = srcs;
724
if (handler == null) {
725
this.writeHandler = null;
726
result = new PendingFuture<V,A>(this, OpType.WRITE);
727
this.writeFuture = (PendingFuture<Number,Object>)result;
728
this.writeAttachment = null;
729
} else {
730
this.writeHandler = (CompletionHandler<Number,Object>)handler;
731
this.writeAttachment = attachment;
732
this.writeFuture = null;
733
}
734
if (timeout > 0L) {
735
this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);
736
}
737
this.writePending = true;
738
updateEvents();
739
}
740
pending = true;
741
return result;
742
}
743
} catch (Throwable x) {
744
if (x instanceof ClosedChannelException)
745
x = new AsynchronousCloseException();
746
exc = x;
747
} finally {
748
if (!pending)
749
enableWriting();
750
end();
751
}
752
753
Number result = (exc != null) ? null : (isGatheringWrite) ?
754
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
755
756
// write completed immediately
757
if (handler != null) {
758
if (invokeDirect) {
759
Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
760
} else {
761
Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
762
}
763
return null;
764
} else {
765
return CompletedFuture.withResult((V)result, exc);
766
}
767
}
768
769
// -- Native methods --
770
771
private static native void checkConnect(int fdVal) throws IOException;
772
773
static {
774
IOUtil.load();
775
}
776
}
777
778