Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/test/java/nio/channels/AsyncCloseAndInterrupt.java
38813 views
1
/*
2
* Copyright (c) 2002, 2016, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/* @test
25
* @bug 4460583 4470470 4840199 6419424 6710579 6596323 6824135 6395224 7142919
26
* 8151582
27
* @run main/othervm AsyncCloseAndInterrupt
28
* @summary Comprehensive test of asynchronous closing and interruption
29
* @author Mark Reinhold
30
*/
31
32
import java.io.*;
33
import java.net.*;
34
import java.nio.channels.*;
35
import java.nio.ByteBuffer;
36
import java.util.ArrayList;
37
import java.util.List;
38
import java.util.concurrent.ExecutorService;
39
import java.util.concurrent.Executors;
40
import java.util.concurrent.ThreadFactory;
41
import java.util.concurrent.Callable;
42
import java.util.concurrent.Future;
43
import java.util.concurrent.TimeUnit;
44
45
public class AsyncCloseAndInterrupt {
46
47
static PrintStream log = System.err;
48
49
static void sleep(int ms) {
50
try {
51
Thread.sleep(ms);
52
} catch (InterruptedException x) { }
53
}
54
55
// Wildcard address localized to this machine -- Windoze doesn't allow
56
// connecting to a server socket that was previously bound to a true
57
// wildcard, namely new InetSocketAddress((InetAddress)null, 0).
58
//
59
private static InetSocketAddress wildcardAddress;
60
61
62
// Server socket that blindly accepts all connections
63
64
static ServerSocketChannel acceptor;
65
66
private static void initAcceptor() throws IOException {
67
acceptor = ServerSocketChannel.open();
68
acceptor.socket().bind(wildcardAddress);
69
70
Thread th = new Thread("Acceptor") {
71
public void run() {
72
try {
73
for (;;) {
74
SocketChannel sc = acceptor.accept();
75
}
76
} catch (IOException x) {
77
x.printStackTrace();
78
}
79
}
80
};
81
82
th.setDaemon(true);
83
th.start();
84
}
85
86
87
// Server socket that refuses all connections
88
89
static ServerSocketChannel refuser;
90
91
private static void initRefuser() throws IOException {
92
refuser = ServerSocketChannel.open();
93
refuser.bind(wildcardAddress, 1); // use minimum backlog
94
}
95
96
// Dead pipe source and sink
97
98
static Pipe.SourceChannel deadSource;
99
static Pipe.SinkChannel deadSink;
100
101
private static void initPipes() throws IOException {
102
if (deadSource != null)
103
deadSource.close();
104
deadSource = Pipe.open().source();
105
if (deadSink != null)
106
deadSink.close();
107
deadSink = Pipe.open().sink();
108
}
109
110
111
// Files
112
113
private static File fifoFile = null; // File that blocks on reads and writes
114
private static File diskFile = null; // Disk file
115
116
private static void initFile() throws Exception {
117
118
diskFile = File.createTempFile("aci", ".tmp");
119
diskFile.deleteOnExit();
120
FileChannel fc = new FileOutputStream(diskFile).getChannel();
121
buffer.clear();
122
if (fc.write(buffer) != buffer.capacity())
123
throw new RuntimeException("Cannot create disk file");
124
fc.close();
125
126
if (TestUtil.onWindows()) {
127
log.println("WARNING: Cannot completely test FileChannels on Windows");
128
return;
129
}
130
fifoFile = new File("x.fifo");
131
if (fifoFile.exists()) {
132
if (!fifoFile.delete())
133
throw new IOException("Cannot delete existing fifo " + fifoFile);
134
}
135
Process p = Runtime.getRuntime().exec("mkfifo " + fifoFile);
136
if (p.waitFor() != 0)
137
throw new IOException("Error creating fifo");
138
new RandomAccessFile(fifoFile, "rw").close();
139
140
}
141
142
143
// Channel factories
144
145
static abstract class ChannelFactory {
146
private final String name;
147
ChannelFactory(String name) {
148
this.name = name;
149
}
150
public String toString() {
151
return name;
152
}
153
abstract InterruptibleChannel create() throws IOException;
154
}
155
156
static ChannelFactory socketChannelFactory
157
= new ChannelFactory("SocketChannel") {
158
InterruptibleChannel create() throws IOException {
159
return SocketChannel.open();
160
}
161
};
162
163
static ChannelFactory connectedSocketChannelFactory
164
= new ChannelFactory("SocketChannel") {
165
InterruptibleChannel create() throws IOException {
166
SocketAddress sa = acceptor.socket().getLocalSocketAddress();
167
return SocketChannel.open(sa);
168
}
169
};
170
171
static ChannelFactory serverSocketChannelFactory
172
= new ChannelFactory("ServerSocketChannel") {
173
InterruptibleChannel create() throws IOException {
174
ServerSocketChannel ssc = ServerSocketChannel.open();
175
ssc.socket().bind(wildcardAddress);
176
return ssc;
177
}
178
};
179
180
static ChannelFactory datagramChannelFactory
181
= new ChannelFactory("DatagramChannel") {
182
InterruptibleChannel create() throws IOException {
183
DatagramChannel dc = DatagramChannel.open();
184
InetAddress lb = InetAddress.getByName("127.0.0.1");
185
dc.bind(new InetSocketAddress(lb, 0));
186
dc.connect(new InetSocketAddress(lb, 80));
187
return dc;
188
}
189
};
190
191
static ChannelFactory pipeSourceChannelFactory
192
= new ChannelFactory("Pipe.SourceChannel") {
193
InterruptibleChannel create() throws IOException {
194
// ## arrange to close sink
195
return Pipe.open().source();
196
}
197
};
198
199
static ChannelFactory pipeSinkChannelFactory
200
= new ChannelFactory("Pipe.SinkChannel") {
201
InterruptibleChannel create() throws IOException {
202
// ## arrange to close source
203
return Pipe.open().sink();
204
}
205
};
206
207
static ChannelFactory fifoFileChannelFactory
208
= new ChannelFactory("FileChannel") {
209
InterruptibleChannel create() throws IOException {
210
return new RandomAccessFile(fifoFile, "rw").getChannel();
211
}
212
};
213
214
static ChannelFactory diskFileChannelFactory
215
= new ChannelFactory("FileChannel") {
216
InterruptibleChannel create() throws IOException {
217
return new RandomAccessFile(diskFile, "rw").getChannel();
218
}
219
};
220
221
222
// I/O operations
223
224
static abstract class Op {
225
private final String name;
226
protected Op(String name) {
227
this.name = name;
228
}
229
abstract void doIO(InterruptibleChannel ich) throws IOException;
230
void setup() throws IOException { }
231
public String toString() { return name; }
232
}
233
234
static ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 20);
235
236
static ByteBuffer[] buffers = new ByteBuffer[] {
237
ByteBuffer.allocateDirect(1 << 19),
238
ByteBuffer.allocateDirect(1 << 19)
239
};
240
241
static void clearBuffers() {
242
buffers[0].clear();
243
buffers[1].clear();
244
}
245
246
static void show(Channel ch) {
247
log.print("Channel " + (ch.isOpen() ? "open" : "closed"));
248
if (ch.isOpen() && (ch instanceof SocketChannel)) {
249
SocketChannel sc = (SocketChannel)ch;
250
if (sc.socket().isInputShutdown())
251
log.print(", input shutdown");
252
if (sc.socket().isOutputShutdown())
253
log.print(", output shutdown");
254
}
255
log.println();
256
}
257
258
static final Op READ = new Op("read") {
259
void doIO(InterruptibleChannel ich) throws IOException {
260
ReadableByteChannel rbc = (ReadableByteChannel)ich;
261
buffer.clear();
262
int n = rbc.read(buffer);
263
log.println("Read returned " + n);
264
show(rbc);
265
if (rbc.isOpen()
266
&& (n == -1)
267
&& (rbc instanceof SocketChannel)
268
&& ((SocketChannel)rbc).socket().isInputShutdown()) {
269
return;
270
}
271
throw new RuntimeException("Read succeeded");
272
}
273
};
274
275
static final Op READV = new Op("readv") {
276
void doIO(InterruptibleChannel ich) throws IOException {
277
ScatteringByteChannel sbc = (ScatteringByteChannel)ich;
278
clearBuffers();
279
int n = (int)sbc.read(buffers);
280
log.println("Read returned " + n);
281
show(sbc);
282
if (sbc.isOpen()
283
&& (n == -1)
284
&& (sbc instanceof SocketChannel)
285
&& ((SocketChannel)sbc).socket().isInputShutdown()) {
286
return;
287
}
288
throw new RuntimeException("Read succeeded");
289
}
290
};
291
292
static final Op RECEIVE = new Op("receive") {
293
void doIO(InterruptibleChannel ich) throws IOException {
294
DatagramChannel dc = (DatagramChannel)ich;
295
buffer.clear();
296
dc.receive(buffer);
297
show(dc);
298
throw new RuntimeException("Read succeeded");
299
}
300
};
301
302
static final Op WRITE = new Op("write") {
303
void doIO(InterruptibleChannel ich) throws IOException {
304
305
WritableByteChannel wbc = (WritableByteChannel)ich;
306
307
SocketChannel sc = null;
308
if (wbc instanceof SocketChannel)
309
sc = (SocketChannel)wbc;
310
311
int n = 0;
312
for (;;) {
313
buffer.clear();
314
int d = wbc.write(buffer);
315
n += d;
316
if (!wbc.isOpen())
317
break;
318
if ((sc != null) && sc.socket().isOutputShutdown())
319
break;
320
}
321
log.println("Wrote " + n + " bytes");
322
show(wbc);
323
}
324
};
325
326
static final Op WRITEV = new Op("writev") {
327
void doIO(InterruptibleChannel ich) throws IOException {
328
329
GatheringByteChannel gbc = (GatheringByteChannel)ich;
330
331
SocketChannel sc = null;
332
if (gbc instanceof SocketChannel)
333
sc = (SocketChannel)gbc;
334
335
int n = 0;
336
for (;;) {
337
clearBuffers();
338
int d = (int)gbc.write(buffers);
339
n += d;
340
if (!gbc.isOpen())
341
break;
342
if ((sc != null) && sc.socket().isOutputShutdown())
343
break;
344
}
345
log.println("Wrote " + n + " bytes");
346
show(gbc);
347
348
}
349
};
350
351
static final Op CONNECT = new Op("connect") {
352
void setup() {
353
waitPump("connect waiting for pumping refuser ...");
354
}
355
void doIO(InterruptibleChannel ich) throws IOException {
356
SocketChannel sc = (SocketChannel)ich;
357
if (sc.connect(refuser.socket().getLocalSocketAddress()))
358
throw new RuntimeException("Connection succeeded");
359
throw new RuntimeException("Connection did not block");
360
}
361
};
362
363
static final Op FINISH_CONNECT = new Op("finishConnect") {
364
void setup() {
365
waitPump("finishConnect waiting for pumping refuser ...");
366
}
367
void doIO(InterruptibleChannel ich) throws IOException {
368
SocketChannel sc = (SocketChannel)ich;
369
sc.configureBlocking(false);
370
SocketAddress sa = refuser.socket().getLocalSocketAddress();
371
if (sc.connect(sa))
372
throw new RuntimeException("Connection succeeded");
373
sc.configureBlocking(true);
374
if (sc.finishConnect())
375
throw new RuntimeException("Connection succeeded");
376
throw new RuntimeException("Connection did not block");
377
}
378
};
379
380
static final Op ACCEPT = new Op("accept") {
381
void doIO(InterruptibleChannel ich) throws IOException {
382
ServerSocketChannel ssc = (ServerSocketChannel)ich;
383
ssc.accept();
384
throw new RuntimeException("Accept succeeded");
385
}
386
};
387
388
// Use only with diskFileChannelFactory
389
static final Op TRANSFER_TO = new Op("transferTo") {
390
void doIO(InterruptibleChannel ich) throws IOException {
391
FileChannel fc = (FileChannel)ich;
392
long n = fc.transferTo(0, fc.size(), deadSink);
393
log.println("Transferred " + n + " bytes");
394
show(fc);
395
}
396
};
397
398
// Use only with diskFileChannelFactory
399
static final Op TRANSFER_FROM = new Op("transferFrom") {
400
void doIO(InterruptibleChannel ich) throws IOException {
401
FileChannel fc = (FileChannel)ich;
402
long n = fc.transferFrom(deadSource, 0, 1 << 20);
403
log.println("Transferred " + n + " bytes");
404
show(fc);
405
}
406
};
407
408
409
410
// Test modes
411
412
static final int TEST_PREINTR = 0; // Interrupt thread before I/O
413
static final int TEST_INTR = 1; // Interrupt thread during I/O
414
static final int TEST_CLOSE = 2; // Close channel during I/O
415
static final int TEST_SHUTI = 3; // Shutdown input during I/O
416
static final int TEST_SHUTO = 4; // Shutdown output during I/O
417
418
static final String[] testName = new String[] {
419
"pre-interrupt", "interrupt", "close",
420
"shutdown-input", "shutdown-output"
421
};
422
423
424
static class Tester extends TestThread {
425
426
private InterruptibleChannel ch;
427
private Op op;
428
private int test;
429
volatile boolean ready = false;
430
431
protected Tester(ChannelFactory cf, InterruptibleChannel ch,
432
Op op, int test)
433
{
434
super(cf + "/" + op + "/" + testName[test]);
435
this.ch = ch;
436
this.op = op;
437
this.test = test;
438
}
439
440
@SuppressWarnings("fallthrough")
441
private void caught(Channel ch, IOException x) {
442
String xn = x.getClass().getName();
443
switch (test) {
444
445
case TEST_PREINTR:
446
case TEST_INTR:
447
if (!xn.equals("java.nio.channels.ClosedByInterruptException"))
448
throw new RuntimeException("Wrong exception thrown: " + x);
449
break;
450
451
case TEST_CLOSE:
452
case TEST_SHUTO:
453
if (!xn.equals("java.nio.channels.AsynchronousCloseException"))
454
throw new RuntimeException("Wrong exception thrown: " + x);
455
break;
456
457
case TEST_SHUTI:
458
if (TestUtil.onWindows())
459
break;
460
// FALL THROUGH
461
462
default:
463
throw new Error(x);
464
}
465
466
if (ch.isOpen()) {
467
if (test == TEST_SHUTO) {
468
SocketChannel sc = (SocketChannel)ch;
469
if (!sc.socket().isOutputShutdown())
470
throw new RuntimeException("Output not shutdown");
471
} else if ((test == TEST_INTR) && (op == TRANSFER_FROM)) {
472
// Let this case pass -- CBIE applies to other channel
473
} else {
474
throw new RuntimeException("Channel still open");
475
}
476
}
477
478
log.println("Thrown as expected: " + x);
479
}
480
481
final void go() throws Exception {
482
if (test == TEST_PREINTR)
483
Thread.currentThread().interrupt();
484
ready = true;
485
try {
486
op.doIO(ch);
487
} catch (ClosedByInterruptException x) {
488
caught(ch, x);
489
} catch (AsynchronousCloseException x) {
490
caught(ch, x);
491
} finally {
492
ch.close();
493
}
494
}
495
496
}
497
498
private static volatile boolean pumpDone = false;
499
private static volatile boolean pumpReady = false;
500
501
private static void waitPump(String msg){
502
log.println(msg);
503
while (!pumpReady){
504
sleep(200);
505
}
506
log.println(msg + " done");
507
}
508
509
// Create a pump thread dedicated to saturate refuser's connection backlog
510
private static Future<Integer> pumpRefuser(ExecutorService pumperExecutor) {
511
512
Callable<Integer> pumpTask = new Callable<Integer>() {
513
514
@Override
515
public Integer call() throws IOException {
516
// Can't reliably saturate connection backlog on Windows Server editions
517
assert !TestUtil.onWindows();
518
log.println("Start pumping refuser ...");
519
List<SocketChannel> refuserClients = new ArrayList<>();
520
521
// Saturate the refuser's connection backlog so that further connection
522
// attempts will be blocked
523
pumpReady = false;
524
while (!pumpDone) {
525
SocketChannel sc = SocketChannel.open();
526
sc.configureBlocking(false);
527
boolean connected = sc.connect(refuser.socket().getLocalSocketAddress());
528
529
// Assume that the connection backlog is saturated if a
530
// client cannot connect to the refuser within 50 milliseconds
531
long start = System.currentTimeMillis();
532
while (!pumpReady && !connected
533
&& (System.currentTimeMillis() - start < 50)) {
534
connected = sc.finishConnect();
535
}
536
537
if (connected) {
538
// Retain so that finalizer doesn't close
539
refuserClients.add(sc);
540
} else {
541
sc.close();
542
pumpReady = true;
543
}
544
}
545
546
for (SocketChannel sc : refuserClients) {
547
sc.close();
548
}
549
refuser.close();
550
551
log.println("Stop pumping refuser ...");
552
return refuserClients.size();
553
}
554
};
555
556
return pumperExecutor.submit(pumpTask);
557
}
558
559
// Test
560
static void test(ChannelFactory cf, Op op, int test)
561
throws Exception
562
{
563
log.println();
564
initPipes();
565
InterruptibleChannel ch = cf.create();
566
Tester t = new Tester(cf, ch, op, test);
567
log.println(t);
568
op.setup();
569
t.start();
570
do {
571
sleep(50);
572
} while (!t.ready);
573
574
switch (test) {
575
576
case TEST_INTR:
577
t.interrupt();
578
break;
579
580
case TEST_CLOSE:
581
ch.close();
582
break;
583
584
case TEST_SHUTI:
585
if (TestUtil.onWindows()) {
586
log.println("WARNING: Asynchronous shutdown not working on Windows");
587
ch.close();
588
} else {
589
((SocketChannel)ch).socket().shutdownInput();
590
}
591
break;
592
593
case TEST_SHUTO:
594
if (TestUtil.onWindows()) {
595
log.println("WARNING: Asynchronous shutdown not working on Windows");
596
ch.close();
597
} else {
598
((SocketChannel)ch).socket().shutdownOutput();
599
}
600
break;
601
602
default:
603
break;
604
}
605
606
t.finishAndThrow(500);
607
}
608
609
610
static void test(ChannelFactory cf, Op op) throws Exception {
611
// Test INTR cases before PREINTER cases since sometimes
612
// interrupted threads can't load classes
613
test(cf, op, TEST_INTR);
614
test(cf, op, TEST_PREINTR);
615
616
// Bugs, see FileChannelImpl for details
617
if (op == TRANSFER_FROM) {
618
log.println("WARNING: transferFrom/close not tested");
619
return;
620
}
621
if ((op == TRANSFER_TO) && !TestUtil.onWindows()) {
622
log.println("WARNING: transferTo/close not tested");
623
return;
624
}
625
626
test(cf, op, TEST_CLOSE);
627
}
628
629
static void test(ChannelFactory cf)
630
throws Exception
631
{
632
InterruptibleChannel ch = cf.create(); // Sample channel
633
ch.close();
634
635
if (ch instanceof ReadableByteChannel) {
636
test(cf, READ);
637
if (ch instanceof SocketChannel)
638
test(cf, READ, TEST_SHUTI);
639
}
640
641
if (ch instanceof ScatteringByteChannel) {
642
test(cf, READV);
643
if (ch instanceof SocketChannel)
644
test(cf, READV, TEST_SHUTI);
645
}
646
647
if (ch instanceof DatagramChannel) {
648
test(cf, RECEIVE);
649
650
// Return here: We can't effectively test writes since, if they
651
// block, they do so only for a fleeting moment unless the network
652
// interface is overloaded.
653
return;
654
655
}
656
657
if (ch instanceof WritableByteChannel) {
658
test(cf, WRITE);
659
if (ch instanceof SocketChannel)
660
test(cf, WRITE, TEST_SHUTO);
661
}
662
663
if (ch instanceof GatheringByteChannel) {
664
test(cf, WRITEV);
665
if (ch instanceof SocketChannel)
666
test(cf, WRITEV, TEST_SHUTO);
667
}
668
669
}
670
671
public static void main(String[] args) throws Exception {
672
673
wildcardAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
674
initAcceptor();
675
if (!TestUtil.onWindows())
676
initRefuser();
677
initPipes();
678
initFile();
679
680
if (TestUtil.onWindows()) {
681
log.println("WARNING: Cannot test FileChannel transfer operations"
682
+ " on Windows");
683
} else {
684
test(diskFileChannelFactory, TRANSFER_TO);
685
test(diskFileChannelFactory, TRANSFER_FROM);
686
}
687
if (fifoFile != null)
688
test(fifoFileChannelFactory);
689
690
// Testing positional file reads and writes is impractical: It requires
691
// access to a large file soft-mounted via NFS, and even then isn't
692
// completely guaranteed to work.
693
//
694
// Testing map is impractical and arguably unnecessary: It's
695
// unclear under what conditions mmap(2) will actually block.
696
697
test(connectedSocketChannelFactory);
698
699
if (TestUtil.onWindows()) {
700
log.println("WARNING Cannot reliably test connect/finishConnect"
701
+ " operations on Windows");
702
} else {
703
// Only the following tests need refuser's connection backlog
704
// to be saturated
705
ExecutorService pumperExecutor =
706
Executors.newSingleThreadExecutor(
707
new ThreadFactory() {
708
709
@Override
710
public Thread newThread(Runnable r) {
711
Thread t = new Thread(r);
712
t.setDaemon(true);
713
t.setName("Pumper");
714
return t;
715
}
716
});
717
718
pumpDone = false;
719
try {
720
Future<Integer> pumpFuture = pumpRefuser(pumperExecutor);
721
waitPump("\nWait for initial Pump");
722
723
test(socketChannelFactory, CONNECT);
724
test(socketChannelFactory, FINISH_CONNECT);
725
726
pumpDone = true;
727
Integer newConn = pumpFuture.get(30, TimeUnit.SECONDS);
728
log.println("Pump " + newConn + " connections.");
729
} finally {
730
pumperExecutor.shutdown();
731
}
732
}
733
734
test(serverSocketChannelFactory, ACCEPT);
735
test(datagramChannelFactory);
736
test(pipeSourceChannelFactory);
737
test(pipeSinkChannelFactory);
738
}
739
}
740
741