Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
67707 views
/*1* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/2425package jdk.internal.net.http;2627import java.io.EOFException;28import java.io.IOException;29import java.io.UncheckedIOException;30import java.lang.invoke.MethodHandles;31import java.lang.invoke.VarHandle;32import java.net.URI;33import java.nio.ByteBuffer;34import java.util.ArrayList;35import java.util.Collections;36import java.util.List;37import java.util.concurrent.CompletableFuture;38import java.util.concurrent.ConcurrentLinkedDeque;39import java.util.concurrent.ConcurrentLinkedQueue;40import java.util.concurrent.Executor;41import java.util.concurrent.Flow;42import java.util.concurrent.Flow.Subscription;43import java.util.concurrent.atomic.AtomicReference;44import java.util.function.BiPredicate;45import java.net.http.HttpClient;46import java.net.http.HttpHeaders;47import java.net.http.HttpRequest;48import java.net.http.HttpResponse;49import java.net.http.HttpResponse.BodySubscriber;50import jdk.internal.net.http.common.*;51import jdk.internal.net.http.frame.*;52import jdk.internal.net.http.hpack.DecodingCallback;5354/**55* Http/2 Stream handling.56*57* REQUESTS58*59* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q60*61* sendRequest() -- sendHeadersOnly() + sendBody()62*63* sendBodyAsync() -- calls sendBody() in an executor thread.64*65* sendHeadersAsync() -- calls sendHeadersOnly() which does not block66*67* sendRequestAsync() -- calls sendRequest() in an executor thread68*69* RESPONSES70*71* Multiple responses can be received per request. Responses are queued up on72* a LinkedList of CF<HttpResponse> and the first one on the list is completed73* with the next response74*75* getResponseAsync() -- queries list of response CFs and returns first one76* if one exists. Otherwise, creates one and adds it to list77* and returns it. Completion is achieved through the78* incoming() upcall from connection reader thread.79*80* getResponse() -- calls getResponseAsync() and waits for CF to complete81*82* responseBodyAsync() -- calls responseBody() in an executor thread.83*84* incoming() -- entry point called from connection reader thread. Frames are85* either handled immediately without blocking or for data frames86* placed on the stream's inputQ which is consumed by the stream's87* reader thread.88*89* PushedStream sub class90* ======================91* Sending side methods are not used because the request comes from a PUSH_PROMISE92* frame sent by the server. When a PUSH_PROMISE is received the PushedStream93* is created. PushedStream does not use responseCF list as there can be only94* one response. The CF is created when the object created and when the response95* HEADERS frame is received the object is completed.96*/97class Stream<T> extends ExchangeImpl<T> {9899private static final String COOKIE_HEADER = "Cookie";100final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);101102final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();103final SequentialScheduler sched =104SequentialScheduler.lockingScheduler(this::schedule);105final SubscriptionBase userSubscription =106new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);107108/**109* This stream's identifier. Assigned lazily by the HTTP2Connection before110* the stream's first frame is sent.111*/112protected volatile int streamid;113114long requestContentLen;115116final Http2Connection connection;117final HttpRequestImpl request;118final HeadersConsumer rspHeadersConsumer;119final HttpHeadersBuilder responseHeadersBuilder;120final HttpHeaders requestPseudoHeaders;121volatile HttpResponse.BodySubscriber<T> responseSubscriber;122final HttpRequest.BodyPublisher requestPublisher;123volatile RequestSubscriber requestSubscriber;124volatile int responseCode;125volatile Response response;126// The exception with which this stream was canceled.127private final AtomicReference<Throwable> errorRef = new AtomicReference<>();128final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();129volatile CompletableFuture<T> responseBodyCF;130volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;131volatile boolean stopRequested;132133/** True if END_STREAM has been seen in a frame received on this stream. */134private volatile boolean remotelyClosed;135private volatile boolean closed;136private volatile boolean endStreamSent;137// Indicates the first reason that was invoked when sending a ResetFrame138// to the server. A streamState of 0 indicates that no reset was sent.139// (see markStream(int code)140private volatile int streamState; // assigned using STREAM_STATE varhandle.141private volatile boolean deRegistered; // assigned using DEREGISTERED varhandle.142143// state flags144private boolean requestSent, responseReceived;145146// send lock: prevent sending DataFrames after reset occurred.147private final Object sendLock = new Object();148149/**150* A reference to this Stream's connection Send Window controller. The151* stream MUST acquire the appropriate amount of Send Window before152* sending any data. Will be null for PushStreams, as they cannot send data.153*/154private final WindowController windowController;155private final WindowUpdateSender windowUpdater;156157@Override158HttpConnection connection() {159return connection.connection;160}161162/**163* Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }164* of after user subscription window has re-opened, from SubscriptionBase.request()165*/166private void schedule() {167boolean onCompleteCalled = false;168HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;169try {170if (subscriber == null) {171subscriber = responseSubscriber = pendingResponseSubscriber;172if (subscriber == null) {173// can't process anything yet174return;175} else {176if (debug.on()) debug.log("subscribing user subscriber");177subscriber.onSubscribe(userSubscription);178}179}180while (!inputQ.isEmpty()) {181Http2Frame frame = inputQ.peek();182if (frame instanceof ResetFrame) {183inputQ.remove();184handleReset((ResetFrame)frame, subscriber);185return;186}187DataFrame df = (DataFrame)frame;188boolean finished = df.getFlag(DataFrame.END_STREAM);189190List<ByteBuffer> buffers = df.getData();191List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);192int size = Utils.remaining(dsts, Integer.MAX_VALUE);193if (size == 0 && finished) {194inputQ.remove();195connection.ensureWindowUpdated(df); // must update connection window196Log.logTrace("responseSubscriber.onComplete");197if (debug.on()) debug.log("incoming: onComplete");198sched.stop();199connection.decrementStreamsCount(streamid);200subscriber.onComplete();201onCompleteCalled = true;202setEndStreamReceived();203return;204} else if (userSubscription.tryDecrement()) {205inputQ.remove();206Log.logTrace("responseSubscriber.onNext {0}", size);207if (debug.on()) debug.log("incoming: onNext(%d)", size);208try {209subscriber.onNext(dsts);210} catch (Throwable t) {211connection.dropDataFrame(df); // must update connection window212throw t;213}214if (consumed(df)) {215Log.logTrace("responseSubscriber.onComplete");216if (debug.on()) debug.log("incoming: onComplete");217sched.stop();218connection.decrementStreamsCount(streamid);219subscriber.onComplete();220onCompleteCalled = true;221setEndStreamReceived();222return;223}224} else {225if (stopRequested) break;226return;227}228}229} catch (Throwable throwable) {230errorRef.compareAndSet(null, throwable);231} finally {232if (sched.isStopped()) drainInputQueue();233}234235Throwable t = errorRef.get();236if (t != null) {237sched.stop();238try {239if (!onCompleteCalled) {240if (debug.on())241debug.log("calling subscriber.onError: %s", (Object) t);242subscriber.onError(t);243} else {244if (debug.on())245debug.log("already completed: dropping error %s", (Object) t);246}247} catch (Throwable x) {248Log.logError("Subscriber::onError threw exception: {0}", t);249} finally {250cancelImpl(t);251drainInputQueue();252}253}254}255256// must only be called from the scheduler schedule() loop.257// ensure that all received data frames are accounted for258// in the connection window flow control if the scheduler259// is stopped before all the data is consumed.260private void drainInputQueue() {261Http2Frame frame;262while ((frame = inputQ.poll()) != null) {263if (frame instanceof DataFrame) {264connection.dropDataFrame((DataFrame)frame);265}266}267}268269@Override270void nullBody(HttpResponse<T> resp, Throwable t) {271if (debug.on()) debug.log("nullBody: streamid=%d", streamid);272// We should have an END_STREAM data frame waiting in the inputQ.273// We need a subscriber to force the scheduler to process it.274pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);275sched.runOrSchedule();276}277278// Callback invoked after the Response BodySubscriber has consumed the279// buffers contained in a DataFrame.280// Returns true if END_STREAM is reached, false otherwise.281private boolean consumed(DataFrame df) {282// RFC 7540 6.1:283// The entire DATA frame payload is included in flow control,284// including the Pad Length and Padding fields if present285int len = df.payloadLength();286boolean endStream = df.getFlag(DataFrame.END_STREAM);287if (len == 0) return endStream;288289connection.windowUpdater.update(len);290291if (!endStream) {292// Don't send window update on a stream which is293// closed or half closed.294windowUpdater.update(len);295}296297// true: end of stream; false: more data coming298return endStream;299}300301boolean deRegister() {302return DEREGISTERED.compareAndSet(this, false, true);303}304305@Override306CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,307boolean returnConnectionToPool,308Executor executor)309{310try {311Log.logTrace("Reading body on stream {0}", streamid);312debug.log("Getting BodySubscriber for: " + response);313BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));314CompletableFuture<T> cf = receiveData(bodySubscriber, executor);315316PushGroup<?> pg = exchange.getPushGroup();317if (pg != null) {318// if an error occurs make sure it is recorded in the PushGroup319cf = cf.whenComplete((t, e) -> pg.pushError(e));320}321return cf;322} catch (Throwable t) {323// may be thrown by handler.apply324cancelImpl(t);325return MinimalFuture.failedFuture(t);326}327}328329@Override330public String toString() {331return "streamid: " + streamid;332}333334private void receiveDataFrame(DataFrame df) {335inputQ.add(df);336sched.runOrSchedule();337}338339/** Handles a RESET frame. RESET is always handled inline in the queue. */340private void receiveResetFrame(ResetFrame frame) {341inputQ.add(frame);342sched.runOrSchedule();343}344345/**346* Records the first reason which was invoked when sending a ResetFrame347* to the server in the streamState, and return the previous value348* of the streamState. This is an atomic operation.349* A possible use of this method would be to send a ResetFrame only350* if no previous reset frame has been sent.351* For instance: <pre>{@code352* if (markStream(ResetFrame.CANCEL) == 0) {353* connection.sendResetFrame(streamId, ResetFrame.CANCEL);354* }355* }</pre>356* @param code the reason code as per HTTP/2 protocol357* @return the previous value of the stream state.358*/359int markStream(int code) {360if (code == 0) return streamState;361synchronized (sendLock) {362return (int) STREAM_STATE.compareAndExchange(this, 0, code);363}364}365366private void sendDataFrame(DataFrame frame) {367synchronized (sendLock) {368// must not send DataFrame after reset.369if (streamState == 0) {370connection.sendDataFrame(frame);371}372}373}374375// pushes entire response body into response subscriber376// blocking when required by local or remote flow control377CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {378// We want to allow the subscriber's getBody() method to block so it379// can work with InputStreams. So, we offload execution.380responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,381new MinimalFuture<>(), this::cancelImpl);382383if (isCanceled()) {384Throwable t = getCancelCause();385responseBodyCF.completeExceptionally(t);386} else {387pendingResponseSubscriber = bodySubscriber;388sched.runOrSchedule(); // in case data waiting already to be processed389}390return responseBodyCF;391}392393@Override394CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {395return sendBodyImpl().thenApply( v -> this);396}397398Stream(Http2Connection connection,399Exchange<T> e,400WindowController windowController)401{402super(e);403this.connection = connection;404this.windowController = windowController;405this.request = e.request();406this.requestPublisher = request.requestPublisher; // may be null407this.responseHeadersBuilder = new HttpHeadersBuilder();408this.rspHeadersConsumer = new HeadersConsumer();409this.requestPseudoHeaders = createPseudoHeaders(request);410this.windowUpdater = new StreamWindowUpdateSender(connection);411}412413private boolean checkRequestCancelled() {414if (exchange.multi.requestCancelled()) {415if (errorRef.get() == null) cancel();416else sendCancelStreamFrame();417return true;418}419return false;420}421422/**423* Entry point from Http2Connection reader thread.424*425* Data frames will be removed by response body thread.426*/427void incoming(Http2Frame frame) throws IOException {428if (debug.on()) debug.log("incoming: %s", frame);429var cancelled = checkRequestCancelled() || closed;430if ((frame instanceof HeaderFrame)) {431HeaderFrame hframe = (HeaderFrame) frame;432if (hframe.endHeaders()) {433Log.logTrace("handling response (streamid={0})", streamid);434handleResponse();435}436if (hframe.getFlag(HeaderFrame.END_STREAM)) {437if (debug.on()) debug.log("handling END_STREAM: %d", streamid);438receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));439}440} else if (frame instanceof DataFrame) {441if (cancelled) connection.dropDataFrame((DataFrame) frame);442else receiveDataFrame((DataFrame) frame);443} else {444if (!cancelled) otherFrame(frame);445}446}447448void otherFrame(Http2Frame frame) throws IOException {449switch (frame.type()) {450case WindowUpdateFrame.TYPE -> incoming_windowUpdate((WindowUpdateFrame) frame);451case ResetFrame.TYPE -> incoming_reset((ResetFrame) frame);452case PriorityFrame.TYPE -> incoming_priority((PriorityFrame) frame);453454default -> throw new IOException("Unexpected frame: " + frame);455}456}457458// The Hpack decoder decodes into one of these consumers of name,value pairs459460DecodingCallback rspHeadersConsumer() {461return rspHeadersConsumer;462}463464protected void handleResponse() throws IOException {465HttpHeaders responseHeaders = responseHeadersBuilder.build();466responseCode = (int)responseHeaders467.firstValueAsLong(":status")468.orElseThrow(() -> new IOException("no statuscode in response"));469470response = new Response(471request, exchange, responseHeaders, connection(),472responseCode, HttpClient.Version.HTTP_2);473474/* TODO: review if needs to be removed475the value is not used, but in case `content-length` doesn't parse as476long, there will be NumberFormatException. If left as is, make sure477code up the stack handles NFE correctly. */478responseHeaders.firstValueAsLong("content-length");479480if (Log.headers()) {481StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");482Log.dumpHeaders(sb, " ", responseHeaders);483Log.logHeaders(sb.toString());484}485486// this will clear the response headers487rspHeadersConsumer.reset();488489completeResponse(response);490}491492void incoming_reset(ResetFrame frame) {493Log.logTrace("Received RST_STREAM on stream {0}", streamid);494if (endStreamReceived()) {495Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);496} else if (closed) {497Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);498} else {499Flow.Subscriber<?> subscriber =500responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;501if (response == null && subscriber == null) {502// we haven't receive the headers yet, and won't receive any!503// handle reset now.504handleReset(frame, subscriber);505} else {506// put it in the input queue in order to read all507// pending data frames first. Indeed, a server may send508// RST_STREAM after sending END_STREAM, in which case we should509// ignore it. However, we won't know if we have received END_STREAM510// or not until all pending data frames are read.511receiveResetFrame(frame);512// RST_STREAM was pushed to the queue. It will be handled by513// asyncReceive after all pending data frames have been514// processed.515Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);516}517}518}519520void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {521Log.logTrace("Handling RST_STREAM on stream {0}", streamid);522if (!closed) {523synchronized (this) {524if (closed) {525if (debug.on()) debug.log("Stream already closed: ignoring RESET");526return;527}528closed = true;529}530try {531int error = frame.getErrorCode();532IOException e = new IOException("Received RST_STREAM: "533+ ErrorFrame.stringForCode(error));534if (errorRef.compareAndSet(null, e)) {535if (subscriber != null) {536subscriber.onError(e);537}538}539completeResponseExceptionally(e);540if (!requestBodyCF.isDone()) {541requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..542}543if (responseBodyCF != null) {544responseBodyCF.completeExceptionally(errorRef.get());545}546} finally {547connection.decrementStreamsCount(streamid);548connection.closeStream(streamid);549}550} else {551Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);552}553}554555void incoming_priority(PriorityFrame frame) {556// TODO: implement priority557throw new UnsupportedOperationException("Not implemented");558}559560private void incoming_windowUpdate(WindowUpdateFrame frame)561throws IOException562{563int amount = frame.getUpdate();564if (amount <= 0) {565Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",566streamid, amount);567connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);568} else {569assert streamid != 0;570boolean success = windowController.increaseStreamWindow(amount, streamid);571if (!success) { // overflow572connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);573}574}575}576577void incoming_pushPromise(HttpRequestImpl pushRequest,578PushedStream<T> pushStream)579throws IOException580{581if (Log.requests()) {582Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());583}584PushGroup<T> pushGroup = exchange.getPushGroup();585if (pushGroup == null || exchange.multi.requestCancelled()) {586Log.logTrace("Rejecting push promise stream " + streamid);587connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);588pushStream.close();589return;590}591592PushGroup.Acceptor<T> acceptor = null;593boolean accepted = false;594try {595acceptor = pushGroup.acceptPushRequest(pushRequest);596accepted = acceptor.accepted();597} catch (Throwable t) {598if (debug.on())599debug.log("PushPromiseHandler::applyPushPromise threw exception %s",600(Object)t);601}602if (!accepted) {603// cancel / reject604IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");605if (Log.trace()) {606Log.logTrace("No body subscriber for {0}: {1}", pushRequest,607ex.getMessage());608}609pushStream.cancelImpl(ex);610return;611}612613assert accepted && acceptor != null;614CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();615HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();616assert pushHandler != null;617618pushStream.requestSent();619pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?620// setup housekeeping for when the push is received621// TODO: deal with ignoring of CF anti-pattern622CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();623cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {624t = Utils.getCompletionCause(t);625if (Log.trace()) {626Log.logTrace("Push completed on stream {0} for {1}{2}",627pushStream.streamid, resp,628((t==null) ? "": " with exception " + t));629}630if (t != null) {631pushGroup.pushError(t);632pushResponseCF.completeExceptionally(t);633} else {634pushResponseCF.complete(resp);635}636pushGroup.pushCompleted();637});638639}640641private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {642HttpHeadersBuilder h = request.getSystemHeadersBuilder();643if (contentLength > 0) {644h.setHeader("content-length", Long.toString(contentLength));645}646HttpHeaders sysh = filterHeaders(h.build());647HttpHeaders userh = filterHeaders(request.getUserHeaders());648// Filter context restricted from userHeaders649userh = HttpHeaders.of(userh.map(), Utils.CONTEXT_RESTRICTED(client()));650651// Don't override Cookie values that have been set by the CookieHandler.652final HttpHeaders uh = userh;653BiPredicate<String, String> overrides =654(k, v) -> COOKIE_HEADER.equalsIgnoreCase(k)655|| uh.firstValue(k).isEmpty();656657// Filter any headers from systemHeaders that are set in userHeaders658// except for "Cookie:" - user cookies will be appended to system659// cookies660sysh = HttpHeaders.of(sysh.map(), overrides);661662OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);663if (contentLength == 0) {664f.setFlag(HeadersFrame.END_STREAM);665endStreamSent = true;666}667return f;668}669670private boolean hasProxyAuthorization(HttpHeaders headers) {671return headers.firstValue("proxy-authorization")672.isPresent();673}674675// Determines whether we need to build a new HttpHeader object.676//677// Ideally we should pass the filter to OutgoingHeaders refactor the678// code that creates the HeaderFrame to honor the filter.679// We're not there yet - so depending on the filter we need to680// apply and the content of the header we will try to determine681// whether anything might need to be filtered.682// If nothing needs filtering then we can just use the683// original headers.684private boolean needsFiltering(HttpHeaders headers,685BiPredicate<String, String> filter) {686if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {687// we're either connecting or proxying688// slight optimization: we only need to filter out689// disabled schemes, so if there are none just690// pass through.691return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)692&& hasProxyAuthorization(headers);693} else {694// we're talking to a server, either directly or through695// a tunnel.696// Slight optimization: we only need to filter out697// proxy authorization headers, so if there are none just698// pass through.699return hasProxyAuthorization(headers);700}701}702703private HttpHeaders filterHeaders(HttpHeaders headers) {704HttpConnection conn = connection();705BiPredicate<String, String> filter = conn.headerFilter(request);706if (needsFiltering(headers, filter)) {707return HttpHeaders.of(headers.map(), filter);708}709return headers;710}711712private static HttpHeaders createPseudoHeaders(HttpRequest request) {713HttpHeadersBuilder hdrs = new HttpHeadersBuilder();714String method = request.method();715hdrs.setHeader(":method", method);716URI uri = request.uri();717hdrs.setHeader(":scheme", uri.getScheme());718// TODO: userinfo deprecated. Needs to be removed719hdrs.setHeader(":authority", uri.getAuthority());720// TODO: ensure header names beginning with : not in user headers721String query = uri.getRawQuery();722String path = uri.getRawPath();723if (path == null || path.isEmpty()) {724if (method.equalsIgnoreCase("OPTIONS")) {725path = "*";726} else {727path = "/";728}729}730if (query != null) {731path += "?" + query;732}733hdrs.setHeader(":path", Utils.encode(path));734return hdrs.build();735}736737HttpHeaders getRequestPseudoHeaders() {738return requestPseudoHeaders;739}740741/** Sets endStreamReceived. Should be called only once. */742void setEndStreamReceived() {743if (debug.on()) debug.log("setEndStreamReceived: streamid=%d", streamid);744assert remotelyClosed == false: "Unexpected endStream already set";745remotelyClosed = true;746responseReceived();747}748749/** Tells whether, or not, the END_STREAM Flag has been seen in any frame750* received on this stream. */751private boolean endStreamReceived() {752return remotelyClosed;753}754755@Override756CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {757if (debug.on()) debug.log("sendHeadersOnly()");758if (Log.requests() && request != null) {759Log.logRequest(request.toString());760}761if (requestPublisher != null) {762requestContentLen = requestPublisher.contentLength();763} else {764requestContentLen = 0;765}766767// At this point the stream doesn't have a streamid yet.768// It will be allocated if we send the request headers.769Throwable t = errorRef.get();770if (t != null) {771if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);772return MinimalFuture.failedFuture(t);773}774775// sending the headers will cause the allocation of the stream id776OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);777connection.sendFrame(f);778CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();779cf.complete(this); // #### good enough for now780return cf;781}782783@Override784void released() {785if (streamid > 0) {786if (debug.on()) debug.log("Released stream %d", streamid);787// remove this stream from the Http2Connection map.788connection.decrementStreamsCount(streamid);789connection.closeStream(streamid);790} else {791if (debug.on()) debug.log("Can't release stream %d", streamid);792}793}794795@Override796void completed() {797// There should be nothing to do here: the stream should have798// been already closed (or will be closed shortly after).799}800801boolean registerStream(int id, boolean registerIfCancelled) {802boolean cancelled = closed || exchange.multi.requestCancelled();803if (!cancelled || registerIfCancelled) {804this.streamid = id;805connection.putStream(this, streamid);806if (debug.on()) {807debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",808streamid, cancelled, registerIfCancelled);809}810}811return !cancelled;812}813814void signalWindowUpdate() {815RequestSubscriber subscriber = requestSubscriber;816assert subscriber != null;817if (debug.on()) debug.log("Signalling window update");818subscriber.sendScheduler.runOrSchedule();819}820821static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);822class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {823// can be < 0 if the actual length is not known.824private final long contentLength;825private volatile long remainingContentLength;826private volatile Subscription subscription;827828// Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.829// 1) The data that was published by the request body Publisher, and830// 2) the COMPLETED sentinel, since onComplete can be invoked without demand.831final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();832833private final AtomicReference<Throwable> errorRef = new AtomicReference<>();834// A scheduler used to honor window updates. Writing must be paused835// when the window is exhausted, and resumed when the window acquires836// some space. The sendScheduler makes it possible to implement this837// behaviour in an asynchronous non-blocking way.838// See RequestSubscriber::trySend below.839final SequentialScheduler sendScheduler;840841RequestSubscriber(long contentLen) {842this.contentLength = contentLen;843this.remainingContentLength = contentLen;844this.sendScheduler =845SequentialScheduler.lockingScheduler(this::trySend);846}847848@Override849public void onSubscribe(Flow.Subscription subscription) {850if (this.subscription != null) {851throw new IllegalStateException("already subscribed");852}853this.subscription = subscription;854if (debug.on())855debug.log("RequestSubscriber: onSubscribe, request 1");856subscription.request(1);857}858859@Override860public void onNext(ByteBuffer item) {861if (debug.on())862debug.log("RequestSubscriber: onNext(%d)", item.remaining());863int size = outgoing.size();864assert size == 0 : "non-zero size: " + size;865onNextImpl(item);866}867868private void onNextImpl(ByteBuffer item) {869// Got some more request body bytes to send.870if (requestBodyCF.isDone()) {871// stream already cancelled, probably in timeout872sendScheduler.stop();873subscription.cancel();874return;875}876outgoing.add(item);877sendScheduler.runOrSchedule();878}879880@Override881public void onError(Throwable throwable) {882if (debug.on())883debug.log(() -> "RequestSubscriber: onError: " + throwable);884// ensure that errors are handled within the flow.885if (errorRef.compareAndSet(null, throwable)) {886sendScheduler.runOrSchedule();887}888}889890@Override891public void onComplete() {892if (debug.on()) debug.log("RequestSubscriber: onComplete");893int size = outgoing.size();894assert size == 0 || size == 1 : "non-zero or one size: " + size;895// last byte of request body has been obtained.896// ensure that everything is completed within the flow.897onNextImpl(COMPLETED);898}899900// Attempts to send the data, if any.901// Handles errors and completion state.902// Pause writing if the send window is exhausted, resume it if the903// send window has some bytes that can be acquired.904void trySend() {905try {906// handle errors raised by onError;907Throwable t = errorRef.get();908if (t != null) {909sendScheduler.stop();910if (requestBodyCF.isDone()) return;911subscription.cancel();912requestBodyCF.completeExceptionally(t);913cancelImpl(t);914return;915}916int state = streamState;917918do {919// handle COMPLETED;920ByteBuffer item = outgoing.peekFirst();921if (item == null) return;922else if (item == COMPLETED) {923sendScheduler.stop();924complete();925return;926}927928// handle bytes to send downstream929while (item.hasRemaining() && state == 0) {930if (debug.on()) debug.log("trySend: %d", item.remaining());931DataFrame df = getDataFrame(item);932if (df == null) {933if (debug.on())934debug.log("trySend: can't send yet: %d", item.remaining());935return; // the send window is exhausted: come back later936}937938if (contentLength > 0) {939remainingContentLength -= df.getDataLength();940if (remainingContentLength < 0) {941String msg = connection().getConnectionFlow()942+ " stream=" + streamid + " "943+ "[" + Thread.currentThread().getName() + "] "944+ "Too many bytes in request body. Expected: "945+ contentLength + ", got: "946+ (contentLength - remainingContentLength);947assert streamid > 0;948connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);949throw new IOException(msg);950} else if (remainingContentLength == 0) {951assert !endStreamSent : "internal error, send data after END_STREAM flag";952df.setFlag(DataFrame.END_STREAM);953endStreamSent = true;954}955} else {956assert !endStreamSent : "internal error, send data after END_STREAM flag";957}958if ((state = streamState) != 0) {959if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));960break;961}962if (debug.on())963debug.log("trySend: sending: %d", df.getDataLength());964sendDataFrame(df);965}966if (state != 0) break;967assert !item.hasRemaining();968ByteBuffer b = outgoing.removeFirst();969assert b == item;970} while (outgoing.peekFirst() != null);971972if (state != 0) {973t = errorRef.get();974if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));975throw t;976}977978if (debug.on()) debug.log("trySend: request 1");979subscription.request(1);980} catch (Throwable ex) {981if (debug.on()) debug.log("trySend: ", ex);982sendScheduler.stop();983subscription.cancel();984requestBodyCF.completeExceptionally(ex);985// need to cancel the stream to 1. tell the server986// we don't want to receive any more data and987// 2. ensure that the operation ref count will be988// decremented on the HttpClient.989cancelImpl(ex);990}991}992993private void complete() throws IOException {994long remaining = remainingContentLength;995long written = contentLength - remaining;996if (remaining > 0) {997connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);998// let trySend() handle the exception999throw new IOException(connection().getConnectionFlow()1000+ " stream=" + streamid + " "1001+ "[" + Thread.currentThread().getName() +"] "1002+ "Too few bytes returned by the publisher ("1003+ written + "/"1004+ contentLength + ")");1005}1006if (!endStreamSent) {1007endStreamSent = true;1008connection.sendDataFrame(getEmptyEndStreamDataFrame());1009}1010requestBodyCF.complete(null);1011}1012}10131014/**1015* Send a RESET frame to tell server to stop sending data on this stream1016*/1017@Override1018public CompletableFuture<Void> ignoreBody() {1019try {1020connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);1021return MinimalFuture.completedFuture(null);1022} catch (Throwable e) {1023Log.logTrace("Error resetting stream {0}", e.toString());1024return MinimalFuture.failedFuture(e);1025}1026}10271028DataFrame getDataFrame(ByteBuffer buffer) {1029int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());1030// blocks waiting for stream send window, if exhausted1031int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);1032if (actualAmount <= 0) return null;1033ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount);1034DataFrame df = new DataFrame(streamid, 0 , outBuf);1035return df;1036}10371038private DataFrame getEmptyEndStreamDataFrame() {1039return new DataFrame(streamid, DataFrame.END_STREAM, List.of());1040}10411042/**1043* A List of responses relating to this stream. Normally there is only1044* one response, but intermediate responses like 100 are allowed1045* and must be passed up to higher level before continuing. Deals with races1046* such as if responses are returned before the CFs get created by1047* getResponseAsync()1048*/10491050final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);10511052@Override1053CompletableFuture<Response> getResponseAsync(Executor executor) {1054CompletableFuture<Response> cf;1055// The code below deals with race condition that can be caused when1056// completeResponse() is being called before getResponseAsync()1057synchronized (response_cfs) {1058if (!response_cfs.isEmpty()) {1059// This CompletableFuture was created by completeResponse().1060// it will be already completed.1061cf = response_cfs.remove(0);1062// if we find a cf here it should be already completed.1063// finding a non completed cf should not happen. just assert it.1064assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";1065} else {1066// getResponseAsync() is called first. Create a CompletableFuture1067// that will be completed by completeResponse() when1068// completeResponse() is called.1069cf = new MinimalFuture<>();1070response_cfs.add(cf);1071}1072}1073if (executor != null && !cf.isDone()) {1074// protect from executing later chain of CompletableFuture operations from SelectorManager thread1075cf = cf.thenApplyAsync(r -> r, executor);1076}1077Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);1078PushGroup<?> pg = exchange.getPushGroup();1079if (pg != null) {1080// if an error occurs make sure it is recorded in the PushGroup1081cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));1082}1083return cf;1084}10851086/**1087* Completes the first uncompleted CF on list, and removes it. If there is no1088* uncompleted CF then creates one (completes it) and adds to list1089*/1090void completeResponse(Response resp) {1091synchronized (response_cfs) {1092CompletableFuture<Response> cf;1093int cfs_len = response_cfs.size();1094for (int i=0; i<cfs_len; i++) {1095cf = response_cfs.get(i);1096if (!cf.isDone()) {1097Log.logTrace("Completing response (streamid={0}): {1}",1098streamid, cf);1099if (debug.on())1100debug.log("Completing responseCF(%d) with response headers", i);1101response_cfs.remove(cf);1102cf.complete(resp);1103return;1104} // else we found the previous response: just leave it alone.1105}1106cf = MinimalFuture.completedFuture(resp);1107Log.logTrace("Created completed future (streamid={0}): {1}",1108streamid, cf);1109if (debug.on())1110debug.log("Adding completed responseCF(0) with response headers");1111response_cfs.add(cf);1112}1113}11141115// methods to update state and remove stream when finished11161117synchronized void requestSent() {1118requestSent = true;1119if (responseReceived) {1120if (debug.on()) debug.log("requestSent: streamid=%d", streamid);1121close();1122} else {1123if (debug.on()) {1124debug.log("requestSent: streamid=%d but response not received", streamid);1125}1126}1127}11281129synchronized void responseReceived() {1130responseReceived = true;1131if (requestSent) {1132if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);1133close();1134} else {1135if (debug.on()) {1136debug.log("responseReceived: streamid=%d but request not sent", streamid);1137}1138}1139}11401141/**1142* same as above but for errors1143*/1144void completeResponseExceptionally(Throwable t) {1145synchronized (response_cfs) {1146// use index to avoid ConcurrentModificationException1147// caused by removing the CF from within the loop.1148for (int i = 0; i < response_cfs.size(); i++) {1149CompletableFuture<Response> cf = response_cfs.get(i);1150if (!cf.isDone()) {1151response_cfs.remove(i);1152cf.completeExceptionally(t);1153return;1154}1155}1156response_cfs.add(MinimalFuture.failedFuture(t));1157}1158}11591160CompletableFuture<Void> sendBodyImpl() {1161requestBodyCF.whenComplete((v, t) -> requestSent());1162try {1163if (requestPublisher != null) {1164final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);1165requestPublisher.subscribe(requestSubscriber = subscriber);1166} else {1167// there is no request body, therefore the request is complete,1168// END_STREAM has already sent with outgoing headers1169requestBodyCF.complete(null);1170}1171} catch (Throwable t) {1172cancelImpl(t);1173requestBodyCF.completeExceptionally(t);1174}1175return requestBodyCF;1176}11771178@Override1179void cancel() {1180if ((streamid == 0)) {1181cancel(new IOException("Stream cancelled before streamid assigned"));1182} else {1183cancel(new IOException("Stream " + streamid + " cancelled"));1184}1185}11861187void onSubscriptionError(Throwable t) {1188errorRef.compareAndSet(null, t);1189if (debug.on()) debug.log("Got subscription error: %s", (Object)t);1190// This is the special case where the subscriber1191// has requested an illegal number of items.1192// In this case, the error doesn't come from1193// upstream, but from downstream, and we need to1194// handle the error without waiting for the inputQ1195// to be exhausted.1196stopRequested = true;1197sched.runOrSchedule();1198}11991200@Override1201void cancel(IOException cause) {1202cancelImpl(cause);1203}12041205void connectionClosing(Throwable cause) {1206Flow.Subscriber<?> subscriber =1207responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;1208errorRef.compareAndSet(null, cause);1209if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {1210sched.runOrSchedule();1211} else cancelImpl(cause);1212}12131214// This method sends a RST_STREAM frame1215void cancelImpl(Throwable e) {1216errorRef.compareAndSet(null, e);1217if (debug.on()) {1218if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);1219else debug.log("cancelling stream %d: %s", streamid, e);1220}1221if (Log.trace()) {1222if (streamid == 0) Log.logTrace("cancelling stream: {0}\n", e);1223else Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);1224}1225boolean closing;1226if (closing = !closed) { // assigning closing to !closed1227synchronized (this) {1228if (closing = !closed) { // assigning closing to !closed1229closed=true;1230}1231}1232}1233if (closing) { // true if the stream has not been closed yet1234if (responseSubscriber != null || pendingResponseSubscriber != null)1235sched.runOrSchedule();1236}1237completeResponseExceptionally(e);1238if (!requestBodyCF.isDone()) {1239requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..1240}1241if (responseBodyCF != null) {1242responseBodyCF.completeExceptionally(errorRef.get());1243}1244try {1245// will send a RST_STREAM frame1246if (streamid != 0 && streamState == 0) {1247e = Utils.getCompletionCause(e);1248if (e instanceof EOFException) {1249// read EOF: no need to try & send reset1250connection.decrementStreamsCount(streamid);1251connection.closeStream(streamid);1252} else {1253// no use to send CANCEL if already closed.1254sendCancelStreamFrame();1255}1256}1257} catch (Throwable ex) {1258Log.logError(ex);1259}1260}12611262void sendCancelStreamFrame() {1263// do not reset a stream until it has a streamid.1264if (streamid > 0 && markStream(ResetFrame.CANCEL) == 0) {1265connection.resetStream(streamid, ResetFrame.CANCEL);1266}1267close();1268}12691270// This method doesn't send any frame1271void close() {1272if (closed) return;1273synchronized(this) {1274if (closed) return;1275closed = true;1276}1277if (debug.on()) debug.log("close stream %d", streamid);1278Log.logTrace("Closing stream {0}", streamid);1279connection.closeStream(streamid);1280Log.logTrace("Stream {0} closed", streamid);1281}12821283static class PushedStream<T> extends Stream<T> {1284final PushGroup<T> pushGroup;1285// push streams need the response CF allocated up front as it is1286// given directly to user via the multi handler callback function.1287final CompletableFuture<Response> pushCF;1288CompletableFuture<HttpResponse<T>> responseCF;1289final HttpRequestImpl pushReq;1290HttpResponse.BodyHandler<T> pushHandler;12911292PushedStream(PushGroup<T> pushGroup,1293Http2Connection connection,1294Exchange<T> pushReq) {1295// ## no request body possible, null window controller1296super(connection, pushReq, null);1297this.pushGroup = pushGroup;1298this.pushReq = pushReq.request();1299this.pushCF = new MinimalFuture<>();1300this.responseCF = new MinimalFuture<>();13011302}13031304CompletableFuture<HttpResponse<T>> responseCF() {1305return responseCF;1306}13071308synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {1309this.pushHandler = pushHandler;1310}13111312synchronized HttpResponse.BodyHandler<T> getPushHandler() {1313// ignored parameters to function can be used as BodyHandler1314return this.pushHandler;1315}13161317// Following methods call the super class but in case of1318// error record it in the PushGroup. The error method is called1319// with a null value when no error occurred (is a no-op)1320@Override1321CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {1322return super.sendBodyAsync()1323.whenComplete((ExchangeImpl<T> v, Throwable t)1324-> pushGroup.pushError(Utils.getCompletionCause(t)));1325}13261327@Override1328CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {1329return super.sendHeadersAsync()1330.whenComplete((ExchangeImpl<T> ex, Throwable t)1331-> pushGroup.pushError(Utils.getCompletionCause(t)));1332}13331334@Override1335CompletableFuture<Response> getResponseAsync(Executor executor) {1336CompletableFuture<Response> cf = pushCF.whenComplete(1337(v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));1338if(executor!=null && !cf.isDone()) {1339cf = cf.thenApplyAsync( r -> r, executor);1340}1341return cf;1342}13431344@Override1345CompletableFuture<T> readBodyAsync(1346HttpResponse.BodyHandler<T> handler,1347boolean returnConnectionToPool,1348Executor executor)1349{1350return super.readBodyAsync(handler, returnConnectionToPool, executor)1351.whenComplete((v, t) -> pushGroup.pushError(t));1352}13531354@Override1355void completeResponse(Response r) {1356Log.logResponse(r::toString);1357pushCF.complete(r); // not strictly required for push API1358// start reading the body using the obtained BodySubscriber1359CompletableFuture<Void> start = new MinimalFuture<>();1360start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))1361.whenComplete((T body, Throwable t) -> {1362if (t != null) {1363responseCF.completeExceptionally(t);1364} else {1365HttpResponseImpl<T> resp =1366new HttpResponseImpl<>(r.request, r, null, body, getExchange());1367responseCF.complete(resp);1368}1369});1370start.completeAsync(() -> null, getExchange().executor());1371}13721373@Override1374void completeResponseExceptionally(Throwable t) {1375pushCF.completeExceptionally(t);1376}13771378// @Override1379// synchronized void responseReceived() {1380// super.responseReceived();1381// }13821383// create and return the PushResponseImpl1384@Override1385protected void handleResponse() {1386HttpHeaders responseHeaders = responseHeadersBuilder.build();1387responseCode = (int)responseHeaders1388.firstValueAsLong(":status")1389.orElse(-1);13901391if (responseCode == -1) {1392completeResponseExceptionally(new IOException("No status code"));1393}13941395this.response = new Response(1396pushReq, exchange, responseHeaders, connection(),1397responseCode, HttpClient.Version.HTTP_2);13981399/* TODO: review if needs to be removed1400the value is not used, but in case `content-length` doesn't parse1401as long, there will be NumberFormatException. If left as is, make1402sure code up the stack handles NFE correctly. */1403responseHeaders.firstValueAsLong("content-length");14041405if (Log.headers()) {1406StringBuilder sb = new StringBuilder("RESPONSE HEADERS");1407sb.append(" (streamid=").append(streamid).append("):\n");1408Log.dumpHeaders(sb, " ", responseHeaders);1409Log.logHeaders(sb.toString());1410}14111412rspHeadersConsumer.reset();14131414// different implementations for normal streams and pushed streams1415completeResponse(response);1416}1417}14181419final class StreamWindowUpdateSender extends WindowUpdateSender {14201421StreamWindowUpdateSender(Http2Connection connection) {1422super(connection);1423}14241425@Override1426int getStreamId() {1427return streamid;1428}14291430@Override1431String dbgString() {1432String dbg = dbgString;1433if (dbg != null) return dbg;1434if (streamid == 0) {1435return connection.dbgString() + ":WindowUpdateSender(stream: ?)";1436} else {1437dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";1438return dbgString = dbg;1439}1440}1441}14421443/**1444* Returns true if this exchange was canceled.1445* @return true if this exchange was canceled.1446*/1447synchronized boolean isCanceled() {1448return errorRef.get() != null;1449}14501451/**1452* Returns the cause for which this exchange was canceled, if available.1453* @return the cause for which this exchange was canceled, if available.1454*/1455synchronized Throwable getCancelCause() {1456return errorRef.get();1457}14581459final String dbgString() {1460return connection.dbgString() + "/Stream("+streamid+")";1461}14621463private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer {14641465void reset() {1466super.reset();1467responseHeadersBuilder.clear();1468debug.log("Response builder cleared, ready to receive new headers.");1469}14701471@Override1472public void onDecoded(CharSequence name, CharSequence value)1473throws UncheckedIOException1474{1475String n = name.toString();1476String v = value.toString();1477super.onDecoded(n, v);1478responseHeadersBuilder.addHeader(n, v);1479if (Log.headers() && Log.trace()) {1480Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",1481streamid, n, v);1482}1483}1484}14851486private static final VarHandle STREAM_STATE;1487private static final VarHandle DEREGISTERED;1488static {1489try {1490STREAM_STATE = MethodHandles.lookup()1491.findVarHandle(Stream.class, "streamState", int.class);1492DEREGISTERED = MethodHandles.lookup()1493.findVarHandle(Stream.class, "deRegistered", boolean.class);1494} catch (Exception x) {1495throw new ExceptionInInitializerError(x);1496}1497}1498}149915001501