Path: blob/master/test/jdk/java/net/httpclient/AggregateRequestBodyTest.java
66644 views
/*1* Copyright (c) 2020, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/*24* @test25* @bug 825237426* @library /test/lib http2/server27* @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters28* ReferenceTracker AggregateRequestBodyTest29* @modules java.base/sun.net.www.http30* java.net.http/jdk.internal.net.http.common31* java.net.http/jdk.internal.net.http.frame32* java.net.http/jdk.internal.net.http.hpack33* @run testng/othervm -Djdk.internal.httpclient.debug=true34* -Djdk.httpclient.HttpClient.log=requests,responses,errors35* AggregateRequestBodyTest36* @summary Tests HttpRequest.BodyPublishers::concat37*/3839import java.net.InetAddress;40import java.net.InetSocketAddress;41import java.net.URI;42import java.net.http.HttpClient;43import java.net.http.HttpRequest;44import java.net.http.HttpRequest.BodyPublisher;45import java.net.http.HttpRequest.BodyPublishers;46import java.net.http.HttpResponse;47import java.net.http.HttpResponse.BodyHandlers;48import java.nio.ByteBuffer;49import java.util.Arrays;50import java.util.LinkedHashMap;51import java.util.List;52import java.util.Map;53import java.util.concurrent.CompletableFuture;54import java.util.concurrent.CompletionException;55import java.util.concurrent.ConcurrentHashMap;56import java.util.concurrent.ConcurrentLinkedDeque;57import java.util.concurrent.ConcurrentMap;58import java.util.concurrent.Executor;59import java.util.concurrent.Executors;60import java.util.concurrent.Flow;61import java.util.concurrent.Flow.Subscriber;62import java.util.concurrent.Flow.Subscription;63import java.util.concurrent.TimeUnit;64import java.util.concurrent.TimeoutException;65import java.util.concurrent.atomic.AtomicLong;66import java.util.concurrent.atomic.AtomicReference;67import java.util.function.Consumer;68import java.util.function.Supplier;69import java.util.stream.Collectors;70import java.util.stream.LongStream;71import java.util.stream.Stream;72import javax.net.ssl.SSLContext;7374import com.sun.net.httpserver.HttpServer;75import com.sun.net.httpserver.HttpsConfigurator;76import com.sun.net.httpserver.HttpsServer;77import jdk.test.lib.net.SimpleSSLContext;78import org.testng.Assert;79import org.testng.ITestContext;80import org.testng.ITestResult;81import org.testng.SkipException;82import org.testng.annotations.AfterClass;83import org.testng.annotations.AfterTest;84import org.testng.annotations.BeforeMethod;85import org.testng.annotations.BeforeTest;86import org.testng.annotations.DataProvider;87import org.testng.annotations.Test;8889import static java.lang.System.out;90import static org.testng.Assert.assertEquals;91import static org.testng.Assert.assertFalse;92import static org.testng.Assert.assertTrue;93import static org.testng.Assert.expectThrows;9495public class AggregateRequestBodyTest implements HttpServerAdapters {9697SSLContext sslContext;98HttpTestServer http1TestServer; // HTTP/1.1 ( http )99HttpTestServer https1TestServer; // HTTPS/1.1 ( https )100HttpTestServer http2TestServer; // HTTP/2 ( h2c )101HttpTestServer https2TestServer; // HTTP/2 ( h2 )102String http1URI;103String https1URI;104String http2URI;105String https2URI;106107static final int RESPONSE_CODE = 200;108static final int ITERATION_COUNT = 4;109static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;110static final Class<CompletionException> CE = CompletionException.class;111// a shared executor helps reduce the amount of threads created by the test112static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());113static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();114static volatile boolean tasksFailed;115static final AtomicLong serverCount = new AtomicLong();116static final AtomicLong clientCount = new AtomicLong();117static final long start = System.nanoTime();118public static String now() {119long now = System.nanoTime() - start;120long secs = now / 1000_000_000;121long mill = (now % 1000_000_000) / 1000_000;122long nan = now % 1000_000;123return String.format("[%d s, %d ms, %d ns] ", secs, mill, nan);124}125126final ReferenceTracker TRACKER = ReferenceTracker.INSTANCE;127private volatile HttpClient sharedClient;128129static class TestExecutor implements Executor {130final AtomicLong tasks = new AtomicLong();131Executor executor;132TestExecutor(Executor executor) {133this.executor = executor;134}135136@Override137public void execute(Runnable command) {138long id = tasks.incrementAndGet();139executor.execute(() -> {140try {141command.run();142} catch (Throwable t) {143tasksFailed = true;144System.out.printf(now() + "Task %s failed: %s%n", id, t);145System.err.printf(now() + "Task %s failed: %s%n", id, t);146FAILURES.putIfAbsent("Task " + id, t);147throw t;148}149});150}151}152153protected boolean stopAfterFirstFailure() {154return Boolean.getBoolean("jdk.internal.httpclient.debug");155}156157final AtomicReference<SkipException> skiptests = new AtomicReference<>();158void checkSkip() {159var skip = skiptests.get();160if (skip != null) throw skip;161}162static String name(ITestResult result) {163var params = result.getParameters();164return result.getName()165+ (params == null ? "()" : Arrays.toString(result.getParameters()));166}167168@BeforeMethod169void beforeMethod(ITestContext context) {170if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {171if (skiptests.get() == null) {172SkipException skip = new SkipException("some tests failed");173skip.setStackTrace(new StackTraceElement[0]);174skiptests.compareAndSet(null, skip);175}176}177}178179@AfterClass180static final void printFailedTests(ITestContext context) {181out.println("\n=========================");182try {183var failed = context.getFailedTests().getAllResults().stream()184.collect(Collectors.toMap(r -> name(r), ITestResult::getThrowable));185FAILURES.putAll(failed);186187out.printf("%n%sCreated %d servers and %d clients%n",188now(), serverCount.get(), clientCount.get());189if (FAILURES.isEmpty()) return;190out.println("Failed tests: ");191FAILURES.entrySet().forEach((e) -> {192out.printf("\t%s: %s%n", e.getKey(), e.getValue());193e.getValue().printStackTrace(out);194e.getValue().printStackTrace();195});196if (tasksFailed) {197System.out.println("WARNING: Some tasks failed");198}199} finally {200out.println("\n=========================\n");201}202}203204private String[] uris() {205return new String[] {206http1URI,207https1URI,208http2URI,209https2URI,210};211}212213static AtomicLong URICOUNT = new AtomicLong();214215@DataProvider(name = "variants")216public Object[][] variants(ITestContext context) {217if (stopAfterFirstFailure() && context.getFailedTests().size() > 0) {218return new Object[0][];219}220String[] uris = uris();221Object[][] result = new Object[uris.length * 2][];222int i = 0;223for (boolean sameClient : List.of(false, true)) {224for (String uri : uris()) {225result[i++] = new Object[]{uri, sameClient};226}227}228assert i == uris.length * 2;229return result;230}231232private HttpClient makeNewClient() {233clientCount.incrementAndGet();234HttpClient client = HttpClient.newBuilder()235.proxy(HttpClient.Builder.NO_PROXY)236.executor(executor)237.sslContext(sslContext)238.build();239return TRACKER.track(client);240}241242HttpClient newHttpClient(boolean share) {243if (!share) return makeNewClient();244HttpClient shared = sharedClient;245if (shared != null) return shared;246synchronized (this) {247shared = sharedClient;248if (shared == null) {249shared = sharedClient = makeNewClient();250}251return shared;252}253}254255static final List<String> BODIES = List.of(256"Lorem ipsum",257"dolor sit amet",258"consectetur adipiscing elit, sed do eiusmod tempor",259"quis nostrud exercitation ullamco",260"laboris nisi",261"ut",262"aliquip ex ea commodo consequat." +263"Duis aute irure dolor in reprehenderit in voluptate velit esse" +264"cillum dolore eu fugiat nulla pariatur.",265"Excepteur sint occaecat cupidatat non proident."266);267268static BodyPublisher[] publishers(String... content) {269if (content == null) return null;270BodyPublisher[] result = new BodyPublisher[content.length];271for (int i=0; i < content.length ; i++) {272result[i] = content[i] == null ? null : BodyPublishers.ofString(content[i]);273}274return result;275}276277static String[] strings(String... s) {278return s;279}280281@DataProvider(name = "sparseContent")282Object[][] nulls() {283return new Object[][] {284{"null array", null},285{"null element", strings((String)null)},286{"null first element", strings(null, "one")},287{"null second element", strings( "one", null)},288{"null third element", strings( "one", "two", null)},289{"null fourth element", strings( "one", "two", "three", null)},290{"null random element", strings( "one", "two", "three", null, "five")},291};292}293294static List<Long> lengths(long... lengths) {295return LongStream.of(lengths)296.mapToObj(Long::valueOf)297.collect(Collectors.toList());298}299300@DataProvider(name = "contentLengths")301Object[][] contentLengths() {302return new Object[][] {303{-1, lengths(-1)},304{-42, lengths(-42)},305{42, lengths(42)},306{42, lengths(10, 0, 20, 0, 12)},307{-1, lengths(10, 0, 20, -1, 12)},308{-1, lengths(-1, 0, 20, 10, 12)},309{-1, lengths(10, 0, 20, 12, -1)},310{-1, lengths(10, 0, 20, -10, 12)},311{-1, lengths(-10, 0, 20, 10, 12)},312{-1, lengths(10, 0, 20, 12, -10)},313{-1, lengths(10, 0, Long.MIN_VALUE, -1, 12)},314{-1, lengths(-1, 0, Long.MIN_VALUE, 10, 12)},315{-1, lengths(10, Long.MIN_VALUE, 20, 12, -1)},316{Long.MAX_VALUE, lengths(10, Long.MAX_VALUE - 42L, 20, 0, 12)},317{-1, lengths(10, Long.MAX_VALUE - 40L, 20, 0, 12)},318{-1, lengths(10, Long.MAX_VALUE - 12L, 20, 0, 12)},319{-1, lengths(10, Long.MAX_VALUE/2L, Long.MAX_VALUE/2L + 1L, 0, 12)},320{-1, lengths(10, Long.MAX_VALUE/2L, -1, Long.MAX_VALUE/2L + 1L, 12)},321{-1, lengths(10, Long.MAX_VALUE, 12, Long.MAX_VALUE, 20)},322{-1, lengths(10, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},323{-1, lengths(0, Long.MAX_VALUE, Long.MAX_VALUE, 12, 20)},324{-1, lengths(Long.MAX_VALUE, Long.MAX_VALUE, 12, 0, 20)}325};326}327328@DataProvider(name="negativeRequests")329Object[][] negativeRequests() {330return new Object[][] {331{0L}, {-1L}, {-2L}, {Long.MIN_VALUE + 1L}, {Long.MIN_VALUE}332};333}334335336static class ContentLengthPublisher implements BodyPublisher {337final long length;338ContentLengthPublisher(long length) {339this.length = length;340}341@Override342public long contentLength() {343return length;344}345346@Override347public void subscribe(Subscriber<? super ByteBuffer> subscriber) {348}349350static ContentLengthPublisher[] of(List<Long> lengths) {351return lengths.stream()352.map(ContentLengthPublisher::new)353.toArray(ContentLengthPublisher[]::new);354}355}356357/**358* A dummy publisher that allows to call onError on its subscriber (or not...).359*/360static class PublishWithError implements BodyPublisher {361final ConcurrentHashMap<Subscriber<?>, ErrorSubscription> subscribers = new ConcurrentHashMap<>();362final long length;363final List<String> content;364final int errorAt;365final Supplier<? extends Throwable> errorSupplier;366PublishWithError(List<String> content, int errorAt, Supplier<? extends Throwable> supplier) {367this.content = content;368this.errorAt = errorAt;369this.errorSupplier = supplier;370length = content.stream().mapToInt(String::length).sum();371}372373boolean hasErrors() {374return errorAt < content.size();375}376377@Override378public long contentLength() {379return length;380}381382@Override383public void subscribe(Subscriber<? super ByteBuffer> subscriber) {384ErrorSubscription subscription = new ErrorSubscription(subscriber);385subscribers.put(subscriber, subscription);386subscriber.onSubscribe(subscription);387}388389class ErrorSubscription implements Flow.Subscription {390volatile boolean cancelled;391volatile int at;392final Subscriber<? super ByteBuffer> subscriber;393ErrorSubscription(Subscriber<? super ByteBuffer> subscriber) {394this.subscriber = subscriber;395}396@Override397public void request(long n) {398while (!cancelled && --n >= 0 && at < Math.min(errorAt+1, content.size())) {399if (at++ == errorAt) {400subscriber.onError(errorSupplier.get());401return;402} else if (at <= content.size()){403subscriber.onNext(ByteBuffer.wrap(404content.get(at-1).getBytes()));405if (at == content.size()) {406subscriber.onComplete();407return;408}409}410}411}412413@Override414public void cancel() {415cancelled = true;416}417}418}419420static class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {421CompletableFuture<Subscription> subscriptionCF = new CompletableFuture<>();422ConcurrentLinkedDeque<ByteBuffer> items = new ConcurrentLinkedDeque<>();423CompletableFuture<List<ByteBuffer>> resultCF = new CompletableFuture<>();424425@Override426public void onSubscribe(Subscription subscription) {427this.subscriptionCF.complete(subscription);428}429430@Override431public void onNext(ByteBuffer item) {432items.addLast(item);433}434435@Override436public void onError(Throwable throwable) {437resultCF.completeExceptionally(throwable);438}439440@Override441public void onComplete() {442resultCF.complete(items.stream().collect(Collectors.toUnmodifiableList()));443}444445CompletableFuture<List<ByteBuffer>> resultCF() { return resultCF; }446}447448static String stringFromBuffer(ByteBuffer buffer) {449byte[] bytes = new byte[buffer.remaining()];450buffer.get(bytes);451return new String(bytes);452}453454String stringFromBytes(Stream<ByteBuffer> buffers) {455return buffers.map(AggregateRequestBodyTest::stringFromBuffer)456.collect(Collectors.joining());457}458459static PublishWithError withNoError(String content) {460return new PublishWithError(List.of(content), 1,461() -> new AssertionError("Should not happen!"));462}463464static PublishWithError withNoError(List<String> content) {465return new PublishWithError(content, content.size(),466() -> new AssertionError("Should not happen!"));467}468469@Test(dataProvider = "sparseContent") // checks that NPE is thrown470public void testNullPointerException(String description, String[] content) {471checkSkip();472BodyPublisher[] publishers = publishers(content);473Assert.assertThrows(NullPointerException.class, () -> BodyPublishers.concat(publishers));474}475476// Verifies that an empty array creates a "noBody" publisher477@Test478public void testEmpty() {479checkSkip();480BodyPublisher publisher = BodyPublishers.concat();481RequestSubscriber subscriber = new RequestSubscriber();482assertEquals(publisher.contentLength(), 0);483publisher.subscribe(subscriber);484subscriber.subscriptionCF.thenAccept(s -> s.request(1));485List<ByteBuffer> result = subscriber.resultCF.join();486assertEquals(result, List.of());487assertTrue(subscriber.items.isEmpty());;488}489490// verifies that error emitted by upstream publishers are propagated downstream.491@Test(dataProvider = "sparseContent") // nulls are replaced with error publisher492public void testOnError(String description, String[] content) {493checkSkip();494final RequestSubscriber subscriber = new RequestSubscriber();495final PublishWithError errorPublisher;496final BodyPublisher[] publishers;497String result = BODIES.stream().collect(Collectors.joining());498if (content == null) {499content = List.of(result).toArray(String[]::new);500errorPublisher = new PublishWithError(BODIES, BODIES.size(),501() -> new AssertionError("Unexpected!!"));502publishers = List.of(errorPublisher).toArray(new BodyPublisher[0]);503description = "No error";504} else {505publishers = publishers(content);506description = description.replace("null", "error at");507errorPublisher = new PublishWithError(BODIES, 2, () -> new Exception("expected"));508}509result = "";510boolean hasErrors = false;511for (int i=0; i < content.length; i++) {512if (content[i] == null) {513publishers[i] = errorPublisher;514if (hasErrors) continue;515if (!errorPublisher.hasErrors()) {516result = result + errorPublisher517.content.stream().collect(Collectors.joining());518} else {519result = result + errorPublisher.content520.stream().limit(errorPublisher.errorAt)521.collect(Collectors.joining());522result = result + "<error>";523hasErrors = true;524}525} else if (!hasErrors) {526result = result + content[i];527}528}529BodyPublisher publisher = BodyPublishers.concat(publishers);530publisher.subscribe(subscriber);531subscriber.subscriptionCF.thenAccept(s -> s.request(Long.MAX_VALUE));532if (errorPublisher.hasErrors()) {533CompletionException ce = expectThrows(CompletionException.class,534() -> subscriber.resultCF.join());535out.println(description + ": got expected " + ce);536assertEquals(ce.getCause().getClass(), Exception.class);537assertEquals(stringFromBytes(subscriber.items.stream()) + "<error>", result);538} else {539assertEquals(stringFromBytes(subscriber.resultCF.join().stream()), result);540out.println(description + ": got expected result: " + result);541}542}543544// Verifies that if an upstream publisher has an unknown length, the545// aggregate publisher will have an unknown length as well. Otherwise546// the length should be known.547@Test(dataProvider = "sparseContent") // nulls are replaced with unknown length548public void testUnknownContentLength(String description, String[] content) {549checkSkip();550if (content == null) {551content = BODIES.toArray(String[]::new);552description = "BODIES (known length)";553} else {554description = description.replace("null", "length(-1)");555}556BodyPublisher[] publishers = publishers(content);557BodyPublisher nolength = new BodyPublisher() {558final BodyPublisher missing = BodyPublishers.ofString("missing");559@Override560public long contentLength() { return -1; }561@Override562public void subscribe(Subscriber<? super ByteBuffer> subscriber) {563missing.subscribe(subscriber);564}565};566long length = 0;567for (int i=0; i < content.length; i++) {568if (content[i] == null) {569publishers[i] = nolength;570length = -1;571} else if (length >= 0) {572length += content[i].length();573}574}575out.printf("testUnknownContentLength(%s): %d%n", description, length);576BodyPublisher publisher = BodyPublishers.concat(publishers);577assertEquals(publisher.contentLength(), length,578description.replace("null", "length(-1)"));579}580581private static final Throwable completionCause(CompletionException x) {582while (x.getCause() instanceof CompletionException) {583x = (CompletionException)x.getCause();584}585return x.getCause();586}587588@Test(dataProvider = "negativeRequests")589public void testNegativeRequest(long n) {590checkSkip();591assert n <= 0 : "test for negative request called with n > 0 : " + n;592BodyPublisher[] publishers = ContentLengthPublisher.of(List.of(1L, 2L, 3L));593BodyPublisher publisher = BodyPublishers.concat(publishers);594RequestSubscriber subscriber = new RequestSubscriber();595publisher.subscribe(subscriber);596Subscription subscription = subscriber.subscriptionCF.join();597subscription.request(n);598CompletionException expected = expectThrows(CE, () -> subscriber.resultCF.join());599Throwable cause = completionCause(expected);600if (cause instanceof IllegalArgumentException) {601System.out.printf("Got expected IAE for %d: %s%n", n, cause);602} else {603throw new AssertionError("Unexpected exception: " + cause,604(cause == null) ? expected : cause);605}606}607608static BodyPublisher[] ofStrings(String... strings) {609return Stream.of(strings).map(BodyPublishers::ofString).toArray(BodyPublisher[]::new);610}611612@Test613public void testPositiveRequests() {614checkSkip();615// A composite array of publishers616BodyPublisher[] publishers = Stream.of(617Stream.of(ofStrings("Lorem", " ", "ipsum", " ")),618Stream.of(BodyPublishers.concat(ofStrings("dolor", " ", "sit", " ", "amet", ", "))),619Stream.<BodyPublisher>of(withNoError(List.of("consectetur", " ", "adipiscing"))),620Stream.of(ofStrings(" ")),621Stream.of(BodyPublishers.concat(ofStrings("elit", ".")))622).flatMap((s) -> s).toArray(BodyPublisher[]::new);623BodyPublisher publisher = BodyPublishers.concat(publishers);624625// Test that we can request all 13 items in a single request call.626RequestSubscriber requestSubscriber1 = new RequestSubscriber();627publisher.subscribe(requestSubscriber1);628Subscription subscription1 = requestSubscriber1.subscriptionCF.join();629subscription1.request(16);630assertTrue(requestSubscriber1.resultCF().isDone());631List<ByteBuffer> list1 = requestSubscriber1.resultCF().join();632String result1 = stringFromBytes(list1.stream());633assertEquals(result1, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");634System.out.println("Got expected sentence with one request: \"%s\"".formatted(result1));635636// Test that we can split our requests call any which way we want637// (whether in the 'middle of a publisher' or at the boundaries.638RequestSubscriber requestSubscriber2 = new RequestSubscriber();639publisher.subscribe(requestSubscriber2);640Subscription subscription2 = requestSubscriber2.subscriptionCF.join();641subscription2.request(1);642assertFalse(requestSubscriber2.resultCF().isDone());643subscription2.request(10);644assertFalse(requestSubscriber2.resultCF().isDone());645subscription2.request(4);646assertFalse(requestSubscriber2.resultCF().isDone());647subscription2.request(1);648assertTrue(requestSubscriber2.resultCF().isDone());649List<ByteBuffer> list2 = requestSubscriber2.resultCF().join();650String result2 = stringFromBytes(list2.stream());651assertEquals(result2, "Lorem ipsum dolor sit amet, consectetur adipiscing elit.");652System.out.println("Got expected sentence with 4 requests: \"%s\"".formatted(result1));653}654655@Test(dataProvider = "contentLengths")656public void testContentLength(long expected, List<Long> lengths) {657checkSkip();658BodyPublisher[] publishers = ContentLengthPublisher.of(lengths);659BodyPublisher aggregate = BodyPublishers.concat(publishers);660assertEquals(aggregate.contentLength(), expected,661"Unexpected result for %s".formatted(lengths));662}663664// Verifies that cancelling the subscription ensure that downstream665// publishers are no longer subscribed etc...666@Test667public void testCancel() {668checkSkip();669BodyPublisher[] publishers = BODIES.stream()670.map(BodyPublishers::ofString)671.toArray(BodyPublisher[]::new);672BodyPublisher publisher = BodyPublishers.concat(publishers);673674assertEquals(publisher.contentLength(),675BODIES.stream().mapToInt(String::length).sum());676Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();677678for (int n=0; n < BODIES.size(); n++) {679680String description = String.format(681"cancel after %d/%d onNext() invocations",682n, BODIES.size());683RequestSubscriber subscriber = new RequestSubscriber();684publisher.subscribe(subscriber);685Subscription subscription = subscriber.subscriptionCF.join();686subscribers.put(subscriber, description);687688// receive half the data689for (int i = 0; i < n; i++) {690subscription.request(1);691ByteBuffer buffer = subscriber.items.pop();692}693694// cancel subscription695subscription.cancel();696// request the rest...697subscription.request(Long.MAX_VALUE);698}699700CompletableFuture[] results = subscribers.keySet()701.stream().map(RequestSubscriber::resultCF)702.toArray(CompletableFuture[]::new);703CompletableFuture<?> any = CompletableFuture.anyOf(results);704705// subscription was cancelled, so nothing should be received...706try {707TimeoutException x = Assert.expectThrows(TimeoutException.class,708() -> any.get(5, TimeUnit.SECONDS));709out.println("Got expected " + x);710} finally {711subscribers.keySet().stream()712.filter(rs -> rs.resultCF.isDone())713.forEach(rs -> System.err.printf(714"Failed: %s completed with %s",715subscribers.get(rs), rs.resultCF));716}717Consumer<RequestSubscriber> check = (rs) -> {718Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");719Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");720out.println(subscribers.get(rs) + ": PASSED");721};722subscribers.keySet().stream().forEach(check);723}724725// Verifies that cancelling the subscription is propagated downstream726@Test727public void testCancelSubscription() {728checkSkip();729PublishWithError upstream = new PublishWithError(BODIES, BODIES.size(),730() -> new AssertionError("should not come here"));731BodyPublisher publisher = BodyPublishers.concat(upstream);732733assertEquals(publisher.contentLength(),734BODIES.stream().mapToInt(String::length).sum());735Map<RequestSubscriber, String> subscribers = new LinkedHashMap<>();736737for (int n=0; n < BODIES.size(); n++) {738739String description = String.format(740"cancel after %d/%d onNext() invocations",741n, BODIES.size());742RequestSubscriber subscriber = new RequestSubscriber();743publisher.subscribe(subscriber);744Subscription subscription = subscriber.subscriptionCF.join();745subscribers.put(subscriber, description);746747// receive half the data748for (int i = 0; i < n; i++) {749subscription.request(1);750ByteBuffer buffer = subscriber.items.pop();751}752753// cancel subscription754subscription.cancel();755// request the rest...756subscription.request(Long.MAX_VALUE);757assertTrue(upstream.subscribers.get(subscriber).cancelled,758description + " upstream subscription not cancelled");759out.println(description + " upstream subscription was properly cancelled");760}761762CompletableFuture[] results = subscribers.keySet()763.stream().map(RequestSubscriber::resultCF)764.toArray(CompletableFuture[]::new);765CompletableFuture<?> any = CompletableFuture.anyOf(results);766767// subscription was cancelled, so nothing should be received...768try {769TimeoutException x = Assert.expectThrows(TimeoutException.class,770() -> any.get(5, TimeUnit.SECONDS));771out.println("Got expected " + x);772} finally {773subscribers.keySet().stream()774.filter(rs -> rs.resultCF.isDone())775.forEach(rs -> System.err.printf(776"Failed: %s completed with %s",777subscribers.get(rs), rs.resultCF));778}779Consumer<RequestSubscriber> check = (rs) -> {780Assert.assertTrue(rs.items.isEmpty(), subscribers.get(rs) + " has items");781Assert.assertFalse(rs.resultCF.isDone(), subscribers.get(rs) + " was not cancelled");782out.println(subscribers.get(rs) + ": PASSED");783};784subscribers.keySet().stream().forEach(check);785786}787788@Test(dataProvider = "variants")789public void test(String uri, boolean sameClient) throws Exception {790checkSkip();791System.out.println("Request to " + uri);792793HttpClient client = newHttpClient(sameClient);794795BodyPublisher publisher = BodyPublishers.concat(796BODIES.stream()797.map(BodyPublishers::ofString)798.toArray(HttpRequest.BodyPublisher[]::new)799);800HttpRequest request = HttpRequest.newBuilder(URI.create(uri))801.POST(publisher)802.build();803for (int i = 0; i < ITERATION_COUNT; i++) {804System.out.println("Iteration: " + i);805HttpResponse<String> response = client.send(request, BodyHandlers.ofString());806int expectedResponse = RESPONSE_CODE;807if (response.statusCode() != expectedResponse)808throw new RuntimeException("wrong response code " + Integer.toString(response.statusCode()));809assertEquals(response.body(), BODIES.stream().collect(Collectors.joining()));810}811System.out.println("test: DONE");812}813814@BeforeTest815public void setup() throws Exception {816sslContext = new SimpleSSLContext().get();817if (sslContext == null)818throw new AssertionError("Unexpected null sslContext");819820HttpTestHandler handler = new HttpTestEchoHandler();821InetSocketAddress loopback = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);822823HttpServer http1 = HttpServer.create(loopback, 0);824http1TestServer = HttpTestServer.of(http1);825http1TestServer.addHandler(handler, "/http1/echo/");826http1URI = "http://" + http1TestServer.serverAuthority() + "/http1/echo/x";827828HttpsServer https1 = HttpsServer.create(loopback, 0);829https1.setHttpsConfigurator(new HttpsConfigurator(sslContext));830https1TestServer = HttpTestServer.of(https1);831https1TestServer.addHandler(handler, "/https1/echo/");832https1URI = "https://" + https1TestServer.serverAuthority() + "/https1/echo/x";833834// HTTP/2835http2TestServer = HttpTestServer.of(new Http2TestServer("localhost", false, 0));836http2TestServer.addHandler(handler, "/http2/echo/");837http2URI = "http://" + http2TestServer.serverAuthority() + "/http2/echo/x";838839https2TestServer = HttpTestServer.of(new Http2TestServer("localhost", true, sslContext));840https2TestServer.addHandler(handler, "/https2/echo/");841https2URI = "https://" + https2TestServer.serverAuthority() + "/https2/echo/x";842843serverCount.addAndGet(4);844http1TestServer.start();845https1TestServer.start();846http2TestServer.start();847https2TestServer.start();848}849850@AfterTest851public void teardown() throws Exception {852String sharedClientName =853sharedClient == null ? null : sharedClient.toString();854sharedClient = null;855Thread.sleep(100);856AssertionError fail = TRACKER.check(500);857try {858http1TestServer.stop();859https1TestServer.stop();860http2TestServer.stop();861https2TestServer.stop();862} finally {863if (fail != null) {864if (sharedClientName != null) {865System.err.println("Shared client name is: " + sharedClientName);866}867throw fail;868}869}870}871}872873874