Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/jdk17u
Path: blob/master/test/jdk/java/net/httpclient/AggregateRequestBodyTest.java
66644 views
1
/*
2
* Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/*
25
* @test
26
* @bug 8252374
27
* @library /test/lib http2/server
28
* @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters
29
* ReferenceTracker AggregateRequestBodyTest
30
* @modules java.base/sun.net.www.http
31
* java.net.http/jdk.internal.net.http.common
32
* java.net.http/jdk.internal.net.http.frame
33
* java.net.http/jdk.internal.net.http.hpack
34
* @run testng/othervm -Djdk.internal.httpclient.debug=true
35
* -Djdk.httpclient.HttpClient.log=requests,responses,errors
36
* AggregateRequestBodyTest
37
* @summary Tests HttpRequest.BodyPublishers::concat
38
*/
39
40
import java.net.InetAddress;
41
import java.net.InetSocketAddress;
42
import java.net.URI;
43
import java.net.http.HttpClient;
44
import java.net.http.HttpRequest;
45
import java.net.http.HttpRequest.BodyPublisher;
46
import java.net.http.HttpRequest.BodyPublishers;
47
import java.net.http.HttpResponse;
48
import java.net.http.HttpResponse.BodyHandlers;
49
import java.nio.ByteBuffer;
50
import java.util.Arrays;
51
import java.util.LinkedHashMap;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.CompletionException;
56
import java.util.concurrent.ConcurrentHashMap;
57
import java.util.concurrent.ConcurrentLinkedDeque;
58
import java.util.concurrent.ConcurrentMap;
59
import java.util.concurrent.Executor;
60
import java.util.concurrent.Executors;
61
import java.util.concurrent.Flow;
62
import java.util.concurrent.Flow.Subscriber;
63
import java.util.concurrent.Flow.Subscription;
64
import java.util.concurrent.TimeUnit;
65
import java.util.concurrent.TimeoutException;
66
import java.util.concurrent.atomic.AtomicLong;
67
import java.util.concurrent.atomic.AtomicReference;
68
import java.util.function.Consumer;
69
import java.util.function.Supplier;
70
import java.util.stream.Collectors;
71
import java.util.stream.LongStream;
72
import java.util.stream.Stream;
73
import javax.net.ssl.SSLContext;
74
75
import com.sun.net.httpserver.HttpServer;
76
import com.sun.net.httpserver.HttpsConfigurator;
77
import com.sun.net.httpserver.HttpsServer;
78
import jdk.test.lib.net.SimpleSSLContext;
79
import org.testng.Assert;
80
import org.testng.ITestContext;
81
import org.testng.ITestResult;
82
import org.testng.SkipException;
83
import org.testng.annotations.AfterClass;
84
import org.testng.annotations.AfterTest;
85
import org.testng.annotations.BeforeMethod;
86
import org.testng.annotations.BeforeTest;
87
import org.testng.annotations.DataProvider;
88
import org.testng.annotations.Test;
89
90
import static java.lang.System.out;
91
import static org.testng.Assert.assertEquals;
92
import static org.testng.Assert.assertFalse;
93
import static org.testng.Assert.assertTrue;
94
import static org.testng.Assert.expectThrows;
95
96
public class AggregateRequestBodyTest implements HttpServerAdapters {
97
98
SSLContext sslContext;
99
HttpTestServer http1TestServer; // HTTP/1.1 ( http )
100
HttpTestServer https1TestServer; // HTTPS/1.1 ( https )
101
HttpTestServer http2TestServer; // HTTP/2 ( h2c )
102
HttpTestServer https2TestServer; // HTTP/2 ( h2 )
103
String http1URI;
104
String https1URI;
105
String http2URI;
106
String https2URI;
107
108
static final int RESPONSE_CODE = 200;
109
static final int ITERATION_COUNT = 4;
110
static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
111
static final Class<CompletionException> CE = CompletionException.class;
112
// a shared executor helps reduce the amount of threads created by the test
113
static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
114
static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
115
static volatile boolean tasksFailed;
116
static final AtomicLong serverCount = new AtomicLong();
117
static final AtomicLong clientCount = new AtomicLong();
118
static final long start = System.nanoTime();
119
public static String now() {
120
long now = System.nanoTime() - start;
121
long secs = now / 1000_000_000;
122
long mill = (now % 1000_000_000) / 1000_000;
123
long nan = now % 1000_000;
124
return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);
125
}
126
127
final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;
128
private volatile HttpClient sharedClient;
129
130
static class TestExecutor implements Executor {
131
final AtomicLong tasks = new AtomicLong();
132
Executor executor;
133
TestExecutor(Executor executor) {
134
this.executor = executor;
135
}
136
137
@Override
138
public void execute(Runnable command) {
139
long id = tasks.incrementAndGet();
140
executor.execute(() -> {
141
try {
142
command.run();
143
} catch (Throwable t) {
144
tasksFailed = true;
145
System.out.printf(now() + "Task %s failed: %s%n", id, t);
146
System.err.printf(now() + "Task %s failed: %s%n", id, t);
147
FAILURES.putIfAbsent("Task " + id, t);
148
throw t;
149
}
150
});
151
}
152
}
153
154
protected boolean stopAfterFirstFailure() {
155
return Boolean.getBoolean("jdk.internal.httpclient.debug");
156
}
157
158
final AtomicReference<SkipException> skiptests = new AtomicReference<>();
159
void checkSkip() {
160
var skip = skiptests.get();
161
if (skip != null) throw skip;
162
}
163
static String name(ITestResult result) {
164
var params = result.getParameters();
165
return result.getName()
166
+ (params == null ? "()" : Arrays.toString(result.getParameters()));
167
}
168
169
@BeforeMethod
170
void beforeMethod(ITestContext context) {
171
if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {
172
if (skiptests.get() == null) {
173
SkipException skip = new SkipException("some tests failed");
174
skip.setStackTrace(new StackTraceElement[0]);
175
skiptests.compareAndSet(null, skip);
176
}
177
}
178
}
179
180
@AfterClass
181
static final void printFailedTests(ITestContext context) {
182
out.println("\n=========================");
183
try {
184
var failed = context.getFailedTests().getAllResults().stream()
185
.collect(Collectors.toMap(r -> name(r), ITestResult::getThrowable));
186
FAILURES.putAll(failed);
187
188
out.printf("%n%sCreated %d servers and %d clients%n",
189
now(), serverCount.get(), clientCount.get());
190
if (FAILURES.isEmpty()) return;
191
out.println("Failed tests: ");
192
FAILURES.entrySet().forEach((e) -> {
193
out.printf("\t%s: %s%n", e.getKey(), e.getValue());
194
e.getValue().printStackTrace(out);
195
e.getValue().printStackTrace();
196
});
197
if (tasksFailed) {
198
System.out.println("WARNING: Some tasks failed");
199
}
200
} finally {
201
out.println("\n=========================\n");
202
}
203
}
204
205
private String[] uris() {
206
return new String[] {
207
http1URI,
208
https1URI,
209
http2URI,
210
https2URI,
211
};
212
}
213
214
static AtomicLong URICOUNT = new AtomicLong();
215
216
@DataProvider(name = "variants")
217
public Object[][] variants(ITestContext context) {
218
if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {
219
return new Object[0][];
220
}
221
String[] uris = uris();
222
Object[][] result = new Object[uris.length * 2][];
223
int i = 0;
224
for (boolean sameClient : List.of(false, true)) {
225
for (String uri : uris()) {
226
result[i++] = new Object[]{uri, sameClient};
227
}
228
}
229
assert i == uris.length * 2;
230
return result;
231
}
232
233
private HttpClient makeNewClient() {
234
clientCount.incrementAndGet();
235
HttpClient client = HttpClient.newBuilder()
236
.proxy(HttpClient.Builder.NO_PROXY)
237
.executor(executor)
238
.sslContext(sslContext)
239
.build();
240
return TRACKER.track(client);
241
}
242
243
HttpClient newHttpClient(boolean share) {
244
if (!share) return makeNewClient();
245
HttpClient shared = sharedClient;
246
if (shared != null) return shared;
247
synchronized (this) {
248
shared = sharedClient;
249
if (shared == null) {
250
shared = sharedClient = makeNewClient();
251
}
252
return shared;
253
}
254
}
255
256
static final List<String> BODIES = List.of(
257
"Lorem ipsum",
258
"dolor sit amet",
259
"consectetur adipiscing elit, sed do eiusmod tempor",
260
"quis nostrud exercitation ullamco",
261
"laboris nisi",
262
"ut",
263
"aliquip ex ea commodo consequat." +
264
"Duis aute irure dolor in reprehenderit in voluptate velit esse" +
265
"cillum dolore eu fugiat nulla pariatur.",
266
"Excepteur sint occaecat cupidatat non proident."
267
);
268
269
static BodyPublisher[] publishers(String... content) {
270
if (content == null) return null;
271
BodyPublisher[] result = new BodyPublisher[content.length];
272
for (int i=0; i < content.length ; i++) {
273
result[i] = content[i] == null ? null : BodyPublishers.ofString(content[i]);
274
}
275
return result;
276
}
277
278
static String[] strings(String... s) {
279
return s;
280
}
281
282
@DataProvider(name = "sparseContent")
283
Object[][] nulls() {
284
return new Object[][] {
285
{"null array", null},
286
{"null element", strings((String)null)},
287
{"null first element", strings(null, "one")},
288
{"null second element", strings( "one", null)},
289
{"null third element", strings( "one", "two", null)},
290
{"null fourth element", strings( "one", "two", "three", null)},
291
{"null random element", strings( "one", "two", "three", null, "five")},
292
};
293
}
294
295
static List<Long> lengths(long... lengths) {
296
return LongStream.of(lengths)
297
.mapToObj(Long::valueOf)
298
.collect(Collectors.toList());
299
}
300
301
@DataProvider(name = "contentLengths")
302
Object[][] contentLengths() {
303
return new Object[][] {
304
{-1, lengths(-1)},
305
{-42, lengths(-42)},
306
{42, lengths(42)},
307
{42, lengths(10, 0, 20, 0, 12)},
308
{-1, lengths(10, 0, 20, -1, 12)},
309
{-1, lengths(-1, 0, 20, 10, 12)},
310
{-1, lengths(10, 0, 20, 12, -1)},
311
{-1, lengths(10, 0, 20, -10, 12)},
312
{-1, lengths(-10, 0, 20, 10, 12)},
313
{-1, lengths(10, 0, 20, 12, -10)},
314
{-1, lengths(10, 0, Long.MIN_VALUE, -1, 12)},
315
{-1, lengths(-1, 0, Long.MIN_VALUE, 10, 12)},
316
{-1, lengths(10, Long.MIN_VALUE, 20, 12, -1)},
317
{Long.MAX_VALUE, lengths(10, Long.MAX_VALUE - 42L, 20, 0, 12)},
318
{-1, lengths(10, Long.MAX_VALUE - 40L, 20, 0, 12)},
319
{-1, lengths(10, Long.MAX_VALUE - 12L, 20, 0, 12)},
320
{-1, lengths(10, Long.MAX_VALUE/2L, Long.MAX_VALUE/2L + 1L, 0, 12)},
321
{-1, lengths(10, Long.MAX_VALUE/2L, -1, Long.MAX_VALUE/2L + 1L, 12)},
322
{-1, lengths(10, Long.MAX_VALUE, 12, Long.MAX_VALUE, 20)},
323
{-1, lengths(10, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},
324
{-1, lengths(0, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},
325
{-1, lengths(Long.MAX_VALUE, Long.MAX_VALUE, 12, 0, 20)}
326
};
327
}
328
329
@DataProvider(name="negativeRequests")
330
Object[][] negativeRequests() {
331
return new Object[][] {
332
{0L}, {-1L}, {-2L}, {Long.MIN_VALUE + 1L}, {Long.MIN_VALUE}
333
};
334
}
335
336
337
static class ContentLengthPublisher implements BodyPublisher {
338
final long length;
339
ContentLengthPublisher(long length) {
340
this.length = length;
341
}
342
@Override
343
public long contentLength() {
344
return length;
345
}
346
347
@Override
348
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
349
}
350
351
static ContentLengthPublisher[] of(List<Long> lengths) {
352
return lengths.stream()
353
.map(ContentLengthPublisher::new)
354
.toArray(ContentLengthPublisher[]::new);
355
}
356
}
357
358
/**
359
* A dummy publisher that allows to call onError on its subscriber (or not...).
360
*/
361
static class PublishWithError implements BodyPublisher {
362
final ConcurrentHashMap<Subscriber<?>, ErrorSubscription> subscribers = new ConcurrentHashMap<>();
363
final long length;
364
final List<String> content;
365
final int errorAt;
366
final Supplier<? extends Throwable> errorSupplier;
367
PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier) {
368
this.content = content;
369
this.errorAt = errorAt;
370
this.errorSupplier = supplier;
371
length = content.stream().mapToInt(String::length).sum();
372
}
373
374
boolean hasErrors() {
375
return errorAt < content.size();
376
}
377
378
@Override
379
public long contentLength() {
380
return length;
381
}
382
383
@Override
384
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
385
ErrorSubscription subscription = new ErrorSubscription(subscriber);
386
subscribers.put(subscriber, subscription);
387
subscriber.onSubscribe(subscription);
388
}
389
390
class ErrorSubscription implements Flow.Subscription {
391
volatile boolean cancelled;
392
volatile int at;
393
final Subscriber<? super ByteBuffer> subscriber;
394
ErrorSubscription(Subscriber<? super ByteBuffer> subscriber) {
395
this.subscriber = subscriber;
396
}
397
@Override
398
public void request(long n) {
399
while (!cancelled && --n >= 0 && at < Math.min(errorAt+1, content.size())) {
400
if (at++ == errorAt) {
401
subscriber.onError(errorSupplier.get());
402
return;
403
} else if (at <= content.size()){
404
subscriber.onNext(ByteBuffer.wrap(
405
content.get(at-1).getBytes()));
406
if (at == content.size()) {
407
subscriber.onComplete();
408
return;
409
}
410
}
411
}
412
}
413
414
@Override
415
public void cancel() {
416
cancelled = true;
417
}
418
}
419
}
420
421
static class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
422
CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();
423
ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>();
424
CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>();
425
426
@Override
427
public void onSubscribe(Subscription subscription) {
428
this.subscriptionCF.complete(subscription);
429
}
430
431
@Override
432
public void onNext(ByteBuffer item) {
433
items.addLast(item);
434
}
435
436
@Override
437
public void onError(Throwable throwable) {
438
resultCF.completeExceptionally(throwable);
439
}
440
441
@Override
442
public void onComplete() {
443
resultCF.complete(items.stream().collect(Collectors.toUnmodifiableList()));
444
}
445
446
CompletableFuture<List<ByteBuffer>> resultCF() { return resultCF; }
447
}
448
449
static String stringFromBuffer(ByteBuffer buffer) {
450
byte[] bytes = new byte[buffer.remaining()];
451
buffer.get(bytes);
452
return new String(bytes);
453
}
454
455
String stringFromBytes(Stream<ByteBuffer> buffers) {
456
return buffers.map(AggregateRequestBodyTest::stringFromBuffer)
457
.collect(Collectors.joining());
458
}
459
460
static PublishWithError withNoError(String content) {
461
return new PublishWithError(List.of(content), 1,
462
() -> new AssertionError("Should not happen!"));
463
}
464
465
static PublishWithError withNoError(List<String> content) {
466
return new PublishWithError(content, content.size(),
467
() -> new AssertionError("Should not happen!"));
468
}
469
470
@Test(dataProvider = "sparseContent") // checks that NPE is thrown
471
public void testNullPointerException(String description, String[] content) {
472
checkSkip();
473
BodyPublisher[] publishers = publishers(content);
474
Assert.assertThrows(NullPointerException.class, () -> BodyPublishers.concat(publishers));
475
}
476
477
// Verifies that an empty array creates a "noBody" publisher
478
@Test
479
public void testEmpty() {
480
checkSkip();
481
BodyPublisher publisher = BodyPublishers.concat();
482
RequestSubscriber subscriber = new RequestSubscriber();
483
assertEquals(publisher.contentLength(), 0);
484
publisher.subscribe(subscriber);
485
subscriber.subscriptionCF.thenAccept(s -> s.request(1));
486
List<ByteBuffer> result = subscriber.resultCF.join();
487
assertEquals(result, List.of());
488
assertTrue(subscriber.items.isEmpty());;
489
}
490
491
// verifies that error emitted by upstream publishers are propagated downstream.
492
@Test(dataProvider = "sparseContent") // nulls are replaced with error publisher
493
public void testOnError(String description, String[] content) {
494
checkSkip();
495
final RequestSubscriber subscriber = new RequestSubscriber();
496
final PublishWithError errorPublisher;
497
final BodyPublisher[] publishers;
498
String result = BODIES.stream().collect(Collectors.joining());
499
if (content == null) {
500
content = List.of(result).toArray(String[]::new);
501
errorPublisher = new PublishWithError(BODIES, BODIES.size(),
502
() -> new AssertionError("Unexpected!!"));
503
publishers = List.of(errorPublisher).toArray(new BodyPublisher[0]);
504
description = "No error";
505
} else {
506
publishers = publishers(content);
507
description = description.replace("null", "error at");
508
errorPublisher = new PublishWithError(BODIES, 2, () -> new Exception("expected"));
509
}
510
result = "";
511
boolean hasErrors = false;
512
for (int i=0; i < content.length; i++) {
513
if (content[i] == null) {
514
publishers[i] = errorPublisher;
515
if (hasErrors) continue;
516
if (!errorPublisher.hasErrors()) {
517
result = result + errorPublisher
518
.content.stream().collect(Collectors.joining());
519
} else {
520
result = result + errorPublisher.content
521
.stream().limit(errorPublisher.errorAt)
522
.collect(Collectors.joining());
523
result = result + "<error>";
524
hasErrors = true;
525
}
526
} else if (!hasErrors) {
527
result = result + content[i];
528
}
529
}
530
BodyPublisher publisher = BodyPublishers.concat(publishers);
531
publisher.subscribe(subscriber);
532
subscriber.subscriptionCF.thenAccept(s -> s.request(Long.MAX_VALUE));
533
if (errorPublisher.hasErrors()) {
534
CompletionException ce = expectThrows(CompletionException.class,
535
() -> subscriber.resultCF.join());
536
out.println(description + ": got expected " + ce);
537
assertEquals(ce.getCause().getClass(), Exception.class);
538
assertEquals(stringFromBytes(subscriber.items.stream()) + "<error>", result);
539
} else {
540
assertEquals(stringFromBytes(subscriber.resultCF.join().stream()), result);
541
out.println(description + ": got expected result: " + result);
542
}
543
}
544
545
// Verifies that if an upstream publisher has an unknown length, the
546
// aggregate publisher will have an unknown length as well. Otherwise
547
// the length should be known.
548
@Test(dataProvider = "sparseContent") // nulls are replaced with unknown length
549
public void testUnknownContentLength(String description, String[] content) {
550
checkSkip();
551
if (content == null) {
552
content = BODIES.toArray(String[]::new);
553
description = "BODIES (known length)";
554
} else {
555
description = description.replace("null", "length(-1)");
556
}
557
BodyPublisher[] publishers = publishers(content);
558
BodyPublisher nolength = new BodyPublisher() {
559
final BodyPublisher missing = BodyPublishers.ofString("missing");
560
@Override
561
public long contentLength() { return -1; }
562
@Override
563
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
564
missing.subscribe(subscriber);
565
}
566
};
567
long length = 0;
568
for (int i=0; i < content.length; i++) {
569
if (content[i] == null) {
570
publishers[i] = nolength;
571
length = -1;
572
} else if (length >= 0) {
573
length += content[i].length();
574
}
575
}
576
out.printf("testUnknownContentLength(%s): %d%n", description, length);
577
BodyPublisher publisher = BodyPublishers.concat(publishers);
578
assertEquals(publisher.contentLength(), length,
579
description.replace("null", "length(-1)"));
580
}
581
582
private static final Throwable completionCause(CompletionException x) {
583
while (x.getCause() instanceof CompletionException) {
584
x = (CompletionException)x.getCause();
585
}
586
return x.getCause();
587
}
588
589
@Test(dataProvider = "negativeRequests")
590
public void testNegativeRequest(long n) {
591
checkSkip();
592
assert n <= 0 : "test for negative request called with n > 0 : " + n;
593
BodyPublisher[] publishers = ContentLengthPublisher.of(List.of(1L, 2L, 3L));
594
BodyPublisher publisher = BodyPublishers.concat(publishers);
595
RequestSubscriber subscriber = new RequestSubscriber();
596
publisher.subscribe(subscriber);
597
Subscription subscription = subscriber.subscriptionCF.join();
598
subscription.request(n);
599
CompletionException expected = expectThrows(CE, () -> subscriber.resultCF.join());
600
Throwable cause = completionCause(expected);
601
if (cause instanceof IllegalArgumentException) {
602
System.out.printf("Got expected IAE for %d: %s%n", n, cause);
603
} else {
604
throw new AssertionError("Unexpected exception: " + cause,
605
(cause == null) ? expected : cause);
606
}
607
}
608
609
static BodyPublisher[] ofStrings(String... strings) {
610
return Stream.of(strings).map(BodyPublishers::ofString).toArray(BodyPublisher[]::new);
611
}
612
613
@Test
614
public void testPositiveRequests() {
615
checkSkip();
616
// A composite array of publishers
617
BodyPublisher[] publishers = Stream.of(
618
Stream.of(ofStrings("Lorem", " ", "ipsum", " ")),
619
Stream.of(BodyPublishers.concat(ofStrings("dolor", " ", "sit", " ", "amet", ", "))),
620
Stream.<BodyPublisher>of(withNoError(List.of("consectetur", " ", "adipiscing"))),
621
Stream.of(ofStrings(" ")),
622
Stream.of(BodyPublishers.concat(ofStrings("elit", ".")))
623
).flatMap((s) -> s).toArray(BodyPublisher[]::new);
624
BodyPublisher publisher = BodyPublishers.concat(publishers);
625
626
// Test that we can request all 13 items in a single request call.
627
RequestSubscriber requestSubscriber1 = new RequestSubscriber();
628
publisher.subscribe(requestSubscriber1);
629
Subscription subscription1 = requestSubscriber1.subscriptionCF.join();
630
subscription1.request(16);
631
assertTrue(requestSubscriber1.resultCF().isDone());
632
List<ByteBuffer> list1 = requestSubscriber1.resultCF().join();
633
String result1 = stringFromBytes(list1.stream());
634
assertEquals(result1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
635
System.out.println("Got expected sentence with one request: \"%s\"".formatted(result1));
636
637
// Test that we can split our requests call any which way we want
638
// (whether in the 'middle of a publisher' or at the boundaries.
639
RequestSubscriber requestSubscriber2 = new RequestSubscriber();
640
publisher.subscribe(requestSubscriber2);
641
Subscription subscription2 = requestSubscriber2.subscriptionCF.join();
642
subscription2.request(1);
643
assertFalse(requestSubscriber2.resultCF().isDone());
644
subscription2.request(10);
645
assertFalse(requestSubscriber2.resultCF().isDone());
646
subscription2.request(4);
647
assertFalse(requestSubscriber2.resultCF().isDone());
648
subscription2.request(1);
649
assertTrue(requestSubscriber2.resultCF().isDone());
650
List<ByteBuffer> list2 = requestSubscriber2.resultCF().join();
651
String result2 = stringFromBytes(list2.stream());
652
assertEquals(result2, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
653
System.out.println("Got expected sentence with 4 requests: \"%s\"".formatted(result1));
654
}
655
656
@Test(dataProvider = "contentLengths")
657
public void testContentLength(long expected, List<Long> lengths) {
658
checkSkip();
659
BodyPublisher[] publishers = ContentLengthPublisher.of(lengths);
660
BodyPublisher aggregate = BodyPublishers.concat(publishers);
661
assertEquals(aggregate.contentLength(), expected,
662
"Unexpected result for %s".formatted(lengths));
663
}
664
665
// Verifies that cancelling the subscription ensure that downstream
666
// publishers are no longer subscribed etc...
667
@Test
668
public void testCancel() {
669
checkSkip();
670
BodyPublisher[] publishers = BODIES.stream()
671
.map(BodyPublishers::ofString)
672
.toArray(BodyPublisher[]::new);
673
BodyPublisher publisher = BodyPublishers.concat(publishers);
674
675
assertEquals(publisher.contentLength(),
676
BODIES.stream().mapToInt(String::length).sum());
677
Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();
678
679
for (int n=0; n < BODIES.size(); n++) {
680
681
String description = String.format(
682
"cancel after %d/%d onNext() invocations",
683
n, BODIES.size());
684
RequestSubscriber subscriber = new RequestSubscriber();
685
publisher.subscribe(subscriber);
686
Subscription subscription = subscriber.subscriptionCF.join();
687
subscribers.put(subscriber, description);
688
689
// receive half the data
690
for (int i = 0; i < n; i++) {
691
subscription.request(1);
692
ByteBuffer buffer = subscriber.items.pop();
693
}
694
695
// cancel subscription
696
subscription.cancel();
697
// request the rest...
698
subscription.request(Long.MAX_VALUE);
699
}
700
701
CompletableFuture[] results = subscribers.keySet()
702
.stream().map(RequestSubscriber::resultCF)
703
.toArray(CompletableFuture[]::new);
704
CompletableFuture<?> any = CompletableFuture.anyOf(results);
705
706
// subscription was cancelled, so nothing should be received...
707
try {
708
TimeoutException x = Assert.expectThrows(TimeoutException.class,
709
() -> any.get(5, TimeUnit.SECONDS));
710
out.println("Got expected " + x);
711
} finally {
712
subscribers.keySet().stream()
713
.filter(rs -> rs.resultCF.isDone())
714
.forEach(rs -> System.err.printf(
715
"Failed: %s completed with %s",
716
subscribers.get(rs), rs.resultCF));
717
}
718
Consumer<RequestSubscriber> check = (rs) -> {
719
Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");
720
Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");
721
out.println(subscribers.get(rs) + ": PASSED");
722
};
723
subscribers.keySet().stream().forEach(check);
724
}
725
726
// Verifies that cancelling the subscription is propagated downstream
727
@Test
728
public void testCancelSubscription() {
729
checkSkip();
730
PublishWithError upstream = new PublishWithError(BODIES, BODIES.size(),
731
() -> new AssertionError("should not come here"));
732
BodyPublisher publisher = BodyPublishers.concat(upstream);
733
734
assertEquals(publisher.contentLength(),
735
BODIES.stream().mapToInt(String::length).sum());
736
Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();
737
738
for (int n=0; n < BODIES.size(); n++) {
739
740
String description = String.format(
741
"cancel after %d/%d onNext() invocations",
742
n, BODIES.size());
743
RequestSubscriber subscriber = new RequestSubscriber();
744
publisher.subscribe(subscriber);
745
Subscription subscription = subscriber.subscriptionCF.join();
746
subscribers.put(subscriber, description);
747
748
// receive half the data
749
for (int i = 0; i < n; i++) {
750
subscription.request(1);
751
ByteBuffer buffer = subscriber.items.pop();
752
}
753
754
// cancel subscription
755
subscription.cancel();
756
// request the rest...
757
subscription.request(Long.MAX_VALUE);
758
assertTrue(upstream.subscribers.get(subscriber).cancelled,
759
description + " upstream subscription not cancelled");
760
out.println(description + " upstream subscription was properly cancelled");
761
}
762
763
CompletableFuture[] results = subscribers.keySet()
764
.stream().map(RequestSubscriber::resultCF)
765
.toArray(CompletableFuture[]::new);
766
CompletableFuture<?> any = CompletableFuture.anyOf(results);
767
768
// subscription was cancelled, so nothing should be received...
769
try {
770
TimeoutException x = Assert.expectThrows(TimeoutException.class,
771
() -> any.get(5, TimeUnit.SECONDS));
772
out.println("Got expected " + x);
773
} finally {
774
subscribers.keySet().stream()
775
.filter(rs -> rs.resultCF.isDone())
776
.forEach(rs -> System.err.printf(
777
"Failed: %s completed with %s",
778
subscribers.get(rs), rs.resultCF));
779
}
780
Consumer<RequestSubscriber> check = (rs) -> {
781
Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");
782
Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");
783
out.println(subscribers.get(rs) + ": PASSED");
784
};
785
subscribers.keySet().stream().forEach(check);
786
787
}
788
789
@Test(dataProvider = "variants")
790
public void test(String uri, boolean sameClient) throws Exception {
791
checkSkip();
792
System.out.println("Request to " + uri);
793
794
HttpClient client = newHttpClient(sameClient);
795
796
BodyPublisher publisher = BodyPublishers.concat(
797
BODIES.stream()
798
.map(BodyPublishers::ofString)
799
.toArray(HttpRequest.BodyPublisher[]::new)
800
);
801
HttpRequest request = HttpRequest.newBuilder(URI.create(uri))
802
.POST(publisher)
803
.build();
804
for (int i = 0; i < ITERATION_COUNT; i++) {
805
System.out.println("Iteration: " + i);
806
HttpResponse<String> response = client.send(request, BodyHandlers.ofString());
807
int expectedResponse = RESPONSE_CODE;
808
if (response.statusCode() != expectedResponse)
809
throw new RuntimeException("wrong response code " + Integer.toString(response.statusCode()));
810
assertEquals(response.body(), BODIES.stream().collect(Collectors.joining()));
811
}
812
System.out.println("test: DONE");
813
}
814
815
@BeforeTest
816
public void setup() throws Exception {
817
sslContext = new SimpleSSLContext().get();
818
if (sslContext == null)
819
throw new AssertionError("Unexpected null sslContext");
820
821
HttpTestHandler handler = new HttpTestEchoHandler();
822
InetSocketAddress loopback = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
823
824
HttpServer http1 = HttpServer.create(loopback, 0);
825
http1TestServer = HttpTestServer.of(http1);
826
http1TestServer.addHandler(handler, "/http1/echo/");
827
http1URI = "http://" + http1TestServer.serverAuthority() + "/http1/echo/x";
828
829
HttpsServer https1 = HttpsServer.create(loopback, 0);
830
https1.setHttpsConfigurator(new HttpsConfigurator(sslContext));
831
https1TestServer = HttpTestServer.of(https1);
832
https1TestServer.addHandler(handler, "/https1/echo/");
833
https1URI = "https://" + https1TestServer.serverAuthority() + "/https1/echo/x";
834
835
// HTTP/2
836
http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));
837
http2TestServer.addHandler(handler, "/http2/echo/");
838
http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo/x";
839
840
https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));
841
https2TestServer.addHandler(handler, "/https2/echo/");
842
https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo/x";
843
844
serverCount.addAndGet(4);
845
http1TestServer.start();
846
https1TestServer.start();
847
http2TestServer.start();
848
https2TestServer.start();
849
}
850
851
@AfterTest
852
public void teardown() throws Exception {
853
String sharedClientName =
854
sharedClient == null ? null : sharedClient.toString();
855
sharedClient = null;
856
Thread.sleep(100);
857
AssertionError fail = TRACKER.check(500);
858
try {
859
http1TestServer.stop();
860
https1TestServer.stop();
861
http2TestServer.stop();
862
https2TestServer.stop();
863
} finally {
864
if (fail != null) {
865
if (sharedClientName != null) {
866
System.err.println("Shared client name is: " + sharedClientName);
867
}
868
throw fail;
869
}
870
}
871
}
872
}
873
874