Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/jdk17u
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
67707 views
1
/*
2
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.EOFException;
29
import java.io.IOException;
30
import java.io.UncheckedIOException;
31
import java.lang.invoke.MethodHandles;
32
import java.lang.invoke.VarHandle;
33
import java.net.URI;
34
import java.nio.ByteBuffer;
35
import java.util.ArrayList;
36
import java.util.Collections;
37
import java.util.List;
38
import java.util.concurrent.CompletableFuture;
39
import java.util.concurrent.ConcurrentLinkedDeque;
40
import java.util.concurrent.ConcurrentLinkedQueue;
41
import java.util.concurrent.Executor;
42
import java.util.concurrent.Flow;
43
import java.util.concurrent.Flow.Subscription;
44
import java.util.concurrent.atomic.AtomicReference;
45
import java.util.function.BiPredicate;
46
import java.net.http.HttpClient;
47
import java.net.http.HttpHeaders;
48
import java.net.http.HttpRequest;
49
import java.net.http.HttpResponse;
50
import java.net.http.HttpResponse.BodySubscriber;
51
import jdk.internal.net.http.common.*;
52
import jdk.internal.net.http.frame.*;
53
import jdk.internal.net.http.hpack.DecodingCallback;
54
55
/**
56
* Http/2 Stream handling.
57
*
58
* REQUESTS
59
*
60
* sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
61
*
62
* sendRequest() -- sendHeadersOnly() + sendBody()
63
*
64
* sendBodyAsync() -- calls sendBody() in an executor thread.
65
*
66
* sendHeadersAsync() -- calls sendHeadersOnly() which does not block
67
*
68
* sendRequestAsync() -- calls sendRequest() in an executor thread
69
*
70
* RESPONSES
71
*
72
* Multiple responses can be received per request. Responses are queued up on
73
* a LinkedList of CF<HttpResponse> and the first one on the list is completed
74
* with the next response
75
*
76
* getResponseAsync() -- queries list of response CFs and returns first one
77
* if one exists. Otherwise, creates one and adds it to list
78
* and returns it. Completion is achieved through the
79
* incoming() upcall from connection reader thread.
80
*
81
* getResponse() -- calls getResponseAsync() and waits for CF to complete
82
*
83
* responseBodyAsync() -- calls responseBody() in an executor thread.
84
*
85
* incoming() -- entry point called from connection reader thread. Frames are
86
* either handled immediately without blocking or for data frames
87
* placed on the stream's inputQ which is consumed by the stream's
88
* reader thread.
89
*
90
* PushedStream sub class
91
* ======================
92
* Sending side methods are not used because the request comes from a PUSH_PROMISE
93
* frame sent by the server. When a PUSH_PROMISE is received the PushedStream
94
* is created. PushedStream does not use responseCF list as there can be only
95
* one response. The CF is created when the object created and when the response
96
* HEADERS frame is received the object is completed.
97
*/
98
class Stream<T> extends ExchangeImpl<T> {
99
100
private static final String COOKIE_HEADER = "Cookie";
101
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
102
103
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
104
final SequentialScheduler sched =
105
SequentialScheduler.lockingScheduler(this::schedule);
106
final SubscriptionBase userSubscription =
107
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);
108
109
/**
110
* This stream's identifier. Assigned lazily by the HTTP2Connection before
111
* the stream's first frame is sent.
112
*/
113
protected volatile int streamid;
114
115
long requestContentLen;
116
117
final Http2Connection connection;
118
final HttpRequestImpl request;
119
final HeadersConsumer rspHeadersConsumer;
120
final HttpHeadersBuilder responseHeadersBuilder;
121
final HttpHeaders requestPseudoHeaders;
122
volatile HttpResponse.BodySubscriber<T> responseSubscriber;
123
final HttpRequest.BodyPublisher requestPublisher;
124
volatile RequestSubscriber requestSubscriber;
125
volatile int responseCode;
126
volatile Response response;
127
// The exception with which this stream was canceled.
128
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
129
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
130
volatile CompletableFuture<T> responseBodyCF;
131
volatile HttpResponse.BodySubscriber<T> pendingResponseSubscriber;
132
volatile boolean stopRequested;
133
134
/** True if END_STREAM has been seen in a frame received on this stream. */
135
private volatile boolean remotelyClosed;
136
private volatile boolean closed;
137
private volatile boolean endStreamSent;
138
// Indicates the first reason that was invoked when sending a ResetFrame
139
// to the server. A streamState of 0 indicates that no reset was sent.
140
// (see markStream(int code)
141
private volatile int streamState; // assigned using STREAM_STATE varhandle.
142
private volatile boolean deRegistered; // assigned using DEREGISTERED varhandle.
143
144
// state flags
145
private boolean requestSent, responseReceived;
146
147
// send lock: prevent sending DataFrames after reset occurred.
148
private final Object sendLock = new Object();
149
150
/**
151
* A reference to this Stream's connection Send Window controller. The
152
* stream MUST acquire the appropriate amount of Send Window before
153
* sending any data. Will be null for PushStreams, as they cannot send data.
154
*/
155
private final WindowController windowController;
156
private final WindowUpdateSender windowUpdater;
157
158
@Override
159
HttpConnection connection() {
160
return connection.connection;
161
}
162
163
/**
164
* Invoked either from incoming() -> {receiveDataFrame() or receiveResetFrame() }
165
* of after user subscription window has re-opened, from SubscriptionBase.request()
166
*/
167
private void schedule() {
168
boolean onCompleteCalled = false;
169
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
170
try {
171
if (subscriber == null) {
172
subscriber = responseSubscriber = pendingResponseSubscriber;
173
if (subscriber == null) {
174
// can't process anything yet
175
return;
176
} else {
177
if (debug.on()) debug.log("subscribing user subscriber");
178
subscriber.onSubscribe(userSubscription);
179
}
180
}
181
while (!inputQ.isEmpty()) {
182
Http2Frame frame = inputQ.peek();
183
if (frame instanceof ResetFrame) {
184
inputQ.remove();
185
handleReset((ResetFrame)frame, subscriber);
186
return;
187
}
188
DataFrame df = (DataFrame)frame;
189
boolean finished = df.getFlag(DataFrame.END_STREAM);
190
191
List<ByteBuffer> buffers = df.getData();
192
List<ByteBuffer> dsts = Collections.unmodifiableList(buffers);
193
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
194
if (size == 0 && finished) {
195
inputQ.remove();
196
connection.ensureWindowUpdated(df); // must update connection window
197
Log.logTrace("responseSubscriber.onComplete");
198
if (debug.on()) debug.log("incoming: onComplete");
199
sched.stop();
200
connection.decrementStreamsCount(streamid);
201
subscriber.onComplete();
202
onCompleteCalled = true;
203
setEndStreamReceived();
204
return;
205
} else if (userSubscription.tryDecrement()) {
206
inputQ.remove();
207
Log.logTrace("responseSubscriber.onNext {0}", size);
208
if (debug.on()) debug.log("incoming: onNext(%d)", size);
209
try {
210
subscriber.onNext(dsts);
211
} catch (Throwable t) {
212
connection.dropDataFrame(df); // must update connection window
213
throw t;
214
}
215
if (consumed(df)) {
216
Log.logTrace("responseSubscriber.onComplete");
217
if (debug.on()) debug.log("incoming: onComplete");
218
sched.stop();
219
connection.decrementStreamsCount(streamid);
220
subscriber.onComplete();
221
onCompleteCalled = true;
222
setEndStreamReceived();
223
return;
224
}
225
} else {
226
if (stopRequested) break;
227
return;
228
}
229
}
230
} catch (Throwable throwable) {
231
errorRef.compareAndSet(null, throwable);
232
} finally {
233
if (sched.isStopped()) drainInputQueue();
234
}
235
236
Throwable t = errorRef.get();
237
if (t != null) {
238
sched.stop();
239
try {
240
if (!onCompleteCalled) {
241
if (debug.on())
242
debug.log("calling subscriber.onError: %s", (Object) t);
243
subscriber.onError(t);
244
} else {
245
if (debug.on())
246
debug.log("already completed: dropping error %s", (Object) t);
247
}
248
} catch (Throwable x) {
249
Log.logError("Subscriber::onError threw exception: {0}", t);
250
} finally {
251
cancelImpl(t);
252
drainInputQueue();
253
}
254
}
255
}
256
257
// must only be called from the scheduler schedule() loop.
258
// ensure that all received data frames are accounted for
259
// in the connection window flow control if the scheduler
260
// is stopped before all the data is consumed.
261
private void drainInputQueue() {
262
Http2Frame frame;
263
while ((frame = inputQ.poll()) != null) {
264
if (frame instanceof DataFrame) {
265
connection.dropDataFrame((DataFrame)frame);
266
}
267
}
268
}
269
270
@Override
271
void nullBody(HttpResponse<T> resp, Throwable t) {
272
if (debug.on()) debug.log("nullBody: streamid=%d", streamid);
273
// We should have an END_STREAM data frame waiting in the inputQ.
274
// We need a subscriber to force the scheduler to process it.
275
pendingResponseSubscriber = HttpResponse.BodySubscribers.replacing(null);
276
sched.runOrSchedule();
277
}
278
279
// Callback invoked after the Response BodySubscriber has consumed the
280
// buffers contained in a DataFrame.
281
// Returns true if END_STREAM is reached, false otherwise.
282
private boolean consumed(DataFrame df) {
283
// RFC 7540 6.1:
284
// The entire DATA frame payload is included in flow control,
285
// including the Pad Length and Padding fields if present
286
int len = df.payloadLength();
287
boolean endStream = df.getFlag(DataFrame.END_STREAM);
288
if (len == 0) return endStream;
289
290
connection.windowUpdater.update(len);
291
292
if (!endStream) {
293
// Don't send window update on a stream which is
294
// closed or half closed.
295
windowUpdater.update(len);
296
}
297
298
// true: end of stream; false: more data coming
299
return endStream;
300
}
301
302
boolean deRegister() {
303
return DEREGISTERED.compareAndSet(this, false, true);
304
}
305
306
@Override
307
CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
308
boolean returnConnectionToPool,
309
Executor executor)
310
{
311
try {
312
Log.logTrace("Reading body on stream {0}", streamid);
313
debug.log("Getting BodySubscriber for: " + response);
314
BodySubscriber<T> bodySubscriber = handler.apply(new ResponseInfoImpl(response));
315
CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
316
317
PushGroup<?> pg = exchange.getPushGroup();
318
if (pg != null) {
319
// if an error occurs make sure it is recorded in the PushGroup
320
cf = cf.whenComplete((t, e) -> pg.pushError(e));
321
}
322
return cf;
323
} catch (Throwable t) {
324
// may be thrown by handler.apply
325
cancelImpl(t);
326
return MinimalFuture.failedFuture(t);
327
}
328
}
329
330
@Override
331
public String toString() {
332
return "streamid: " + streamid;
333
}
334
335
private void receiveDataFrame(DataFrame df) {
336
inputQ.add(df);
337
sched.runOrSchedule();
338
}
339
340
/** Handles a RESET frame. RESET is always handled inline in the queue. */
341
private void receiveResetFrame(ResetFrame frame) {
342
inputQ.add(frame);
343
sched.runOrSchedule();
344
}
345
346
/**
347
* Records the first reason which was invoked when sending a ResetFrame
348
* to the server in the streamState, and return the previous value
349
* of the streamState. This is an atomic operation.
350
* A possible use of this method would be to send a ResetFrame only
351
* if no previous reset frame has been sent.
352
* For instance: <pre>{@code
353
* if (markStream(ResetFrame.CANCEL) == 0) {
354
* connection.sendResetFrame(streamId, ResetFrame.CANCEL);
355
* }
356
* }</pre>
357
* @param code the reason code as per HTTP/2 protocol
358
* @return the previous value of the stream state.
359
*/
360
int markStream(int code) {
361
if (code == 0) return streamState;
362
synchronized (sendLock) {
363
return (int) STREAM_STATE.compareAndExchange(this, 0, code);
364
}
365
}
366
367
private void sendDataFrame(DataFrame frame) {
368
synchronized (sendLock) {
369
// must not send DataFrame after reset.
370
if (streamState == 0) {
371
connection.sendDataFrame(frame);
372
}
373
}
374
}
375
376
// pushes entire response body into response subscriber
377
// blocking when required by local or remote flow control
378
CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
379
// We want to allow the subscriber's getBody() method to block so it
380
// can work with InputStreams. So, we offload execution.
381
responseBodyCF = ResponseSubscribers.getBodyAsync(executor, bodySubscriber,
382
new MinimalFuture<>(), this::cancelImpl);
383
384
if (isCanceled()) {
385
Throwable t = getCancelCause();
386
responseBodyCF.completeExceptionally(t);
387
} else {
388
pendingResponseSubscriber = bodySubscriber;
389
sched.runOrSchedule(); // in case data waiting already to be processed
390
}
391
return responseBodyCF;
392
}
393
394
@Override
395
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
396
return sendBodyImpl().thenApply( v -> this);
397
}
398
399
Stream(Http2Connection connection,
400
Exchange<T> e,
401
WindowController windowController)
402
{
403
super(e);
404
this.connection = connection;
405
this.windowController = windowController;
406
this.request = e.request();
407
this.requestPublisher = request.requestPublisher; // may be null
408
this.responseHeadersBuilder = new HttpHeadersBuilder();
409
this.rspHeadersConsumer = new HeadersConsumer();
410
this.requestPseudoHeaders = createPseudoHeaders(request);
411
this.windowUpdater = new StreamWindowUpdateSender(connection);
412
}
413
414
private boolean checkRequestCancelled() {
415
if (exchange.multi.requestCancelled()) {
416
if (errorRef.get() == null) cancel();
417
else sendCancelStreamFrame();
418
return true;
419
}
420
return false;
421
}
422
423
/**
424
* Entry point from Http2Connection reader thread.
425
*
426
* Data frames will be removed by response body thread.
427
*/
428
void incoming(Http2Frame frame) throws IOException {
429
if (debug.on()) debug.log("incoming: %s", frame);
430
var cancelled = checkRequestCancelled() || closed;
431
if ((frame instanceof HeaderFrame)) {
432
HeaderFrame hframe = (HeaderFrame) frame;
433
if (hframe.endHeaders()) {
434
Log.logTrace("handling response (streamid={0})", streamid);
435
handleResponse();
436
}
437
if (hframe.getFlag(HeaderFrame.END_STREAM)) {
438
if (debug.on()) debug.log("handling END_STREAM: %d", streamid);
439
receiveDataFrame(new DataFrame(streamid, DataFrame.END_STREAM, List.of()));
440
}
441
} else if (frame instanceof DataFrame) {
442
if (cancelled) connection.dropDataFrame((DataFrame) frame);
443
else receiveDataFrame((DataFrame) frame);
444
} else {
445
if (!cancelled) otherFrame(frame);
446
}
447
}
448
449
void otherFrame(Http2Frame frame) throws IOException {
450
switch (frame.type()) {
451
case WindowUpdateFrame.TYPE -> incoming_windowUpdate((WindowUpdateFrame) frame);
452
case ResetFrame.TYPE -> incoming_reset((ResetFrame) frame);
453
case PriorityFrame.TYPE -> incoming_priority((PriorityFrame) frame);
454
455
default -> throw new IOException("Unexpected frame: " + frame);
456
}
457
}
458
459
// The Hpack decoder decodes into one of these consumers of name,value pairs
460
461
DecodingCallback rspHeadersConsumer() {
462
return rspHeadersConsumer;
463
}
464
465
protected void handleResponse() throws IOException {
466
HttpHeaders responseHeaders = responseHeadersBuilder.build();
467
responseCode = (int)responseHeaders
468
.firstValueAsLong(":status")
469
.orElseThrow(() -> new IOException("no statuscode in response"));
470
471
response = new Response(
472
request, exchange, responseHeaders, connection(),
473
responseCode, HttpClient.Version.HTTP_2);
474
475
/* TODO: review if needs to be removed
476
the value is not used, but in case `content-length` doesn't parse as
477
long, there will be NumberFormatException. If left as is, make sure
478
code up the stack handles NFE correctly. */
479
responseHeaders.firstValueAsLong("content-length");
480
481
if (Log.headers()) {
482
StringBuilder sb = new StringBuilder("RESPONSE HEADERS:\n");
483
Log.dumpHeaders(sb, " ", responseHeaders);
484
Log.logHeaders(sb.toString());
485
}
486
487
// this will clear the response headers
488
rspHeadersConsumer.reset();
489
490
completeResponse(response);
491
}
492
493
void incoming_reset(ResetFrame frame) {
494
Log.logTrace("Received RST_STREAM on stream {0}", streamid);
495
if (endStreamReceived()) {
496
Log.logTrace("Ignoring RST_STREAM frame received on remotely closed stream {0}", streamid);
497
} else if (closed) {
498
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
499
} else {
500
Flow.Subscriber<?> subscriber =
501
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
502
if (response == null && subscriber == null) {
503
// we haven't receive the headers yet, and won't receive any!
504
// handle reset now.
505
handleReset(frame, subscriber);
506
} else {
507
// put it in the input queue in order to read all
508
// pending data frames first. Indeed, a server may send
509
// RST_STREAM after sending END_STREAM, in which case we should
510
// ignore it. However, we won't know if we have received END_STREAM
511
// or not until all pending data frames are read.
512
receiveResetFrame(frame);
513
// RST_STREAM was pushed to the queue. It will be handled by
514
// asyncReceive after all pending data frames have been
515
// processed.
516
Log.logTrace("RST_STREAM pushed in queue for stream {0}", streamid);
517
}
518
}
519
}
520
521
void handleReset(ResetFrame frame, Flow.Subscriber<?> subscriber) {
522
Log.logTrace("Handling RST_STREAM on stream {0}", streamid);
523
if (!closed) {
524
synchronized (this) {
525
if (closed) {
526
if (debug.on()) debug.log("Stream already closed: ignoring RESET");
527
return;
528
}
529
closed = true;
530
}
531
try {
532
int error = frame.getErrorCode();
533
IOException e = new IOException("Received RST_STREAM: "
534
+ ErrorFrame.stringForCode(error));
535
if (errorRef.compareAndSet(null, e)) {
536
if (subscriber != null) {
537
subscriber.onError(e);
538
}
539
}
540
completeResponseExceptionally(e);
541
if (!requestBodyCF.isDone()) {
542
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
543
}
544
if (responseBodyCF != null) {
545
responseBodyCF.completeExceptionally(errorRef.get());
546
}
547
} finally {
548
connection.decrementStreamsCount(streamid);
549
connection.closeStream(streamid);
550
}
551
} else {
552
Log.logTrace("Ignoring RST_STREAM frame received on closed stream {0}", streamid);
553
}
554
}
555
556
void incoming_priority(PriorityFrame frame) {
557
// TODO: implement priority
558
throw new UnsupportedOperationException("Not implemented");
559
}
560
561
private void incoming_windowUpdate(WindowUpdateFrame frame)
562
throws IOException
563
{
564
int amount = frame.getUpdate();
565
if (amount <= 0) {
566
Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
567
streamid, amount);
568
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
569
} else {
570
assert streamid != 0;
571
boolean success = windowController.increaseStreamWindow(amount, streamid);
572
if (!success) { // overflow
573
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
574
}
575
}
576
}
577
578
void incoming_pushPromise(HttpRequestImpl pushRequest,
579
PushedStream<T> pushStream)
580
throws IOException
581
{
582
if (Log.requests()) {
583
Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
584
}
585
PushGroup<T> pushGroup = exchange.getPushGroup();
586
if (pushGroup == null || exchange.multi.requestCancelled()) {
587
Log.logTrace("Rejecting push promise stream " + streamid);
588
connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
589
pushStream.close();
590
return;
591
}
592
593
PushGroup.Acceptor<T> acceptor = null;
594
boolean accepted = false;
595
try {
596
acceptor = pushGroup.acceptPushRequest(pushRequest);
597
accepted = acceptor.accepted();
598
} catch (Throwable t) {
599
if (debug.on())
600
debug.log("PushPromiseHandler::applyPushPromise threw exception %s",
601
(Object)t);
602
}
603
if (!accepted) {
604
// cancel / reject
605
IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
606
if (Log.trace()) {
607
Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
608
ex.getMessage());
609
}
610
pushStream.cancelImpl(ex);
611
return;
612
}
613
614
assert accepted && acceptor != null;
615
CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
616
HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
617
assert pushHandler != null;
618
619
pushStream.requestSent();
620
pushStream.setPushHandler(pushHandler); // TODO: could wrap the handler to throw on acceptPushPromise ?
621
// setup housekeeping for when the push is received
622
// TODO: deal with ignoring of CF anti-pattern
623
CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
624
cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
625
t = Utils.getCompletionCause(t);
626
if (Log.trace()) {
627
Log.logTrace("Push completed on stream {0} for {1}{2}",
628
pushStream.streamid, resp,
629
((t==null) ? "": " with exception " + t));
630
}
631
if (t != null) {
632
pushGroup.pushError(t);
633
pushResponseCF.completeExceptionally(t);
634
} else {
635
pushResponseCF.complete(resp);
636
}
637
pushGroup.pushCompleted();
638
});
639
640
}
641
642
private OutgoingHeaders<Stream<T>> headerFrame(long contentLength) {
643
HttpHeadersBuilder h = request.getSystemHeadersBuilder();
644
if (contentLength > 0) {
645
h.setHeader("content-length", Long.toString(contentLength));
646
}
647
HttpHeaders sysh = filterHeaders(h.build());
648
HttpHeaders userh = filterHeaders(request.getUserHeaders());
649
// Filter context restricted from userHeaders
650
userh = HttpHeaders.of(userh.map(), Utils.CONTEXT_RESTRICTED(client()));
651
652
// Don't override Cookie values that have been set by the CookieHandler.
653
final HttpHeaders uh = userh;
654
BiPredicate<String, String> overrides =
655
(k, v) -> COOKIE_HEADER.equalsIgnoreCase(k)
656
|| uh.firstValue(k).isEmpty();
657
658
// Filter any headers from systemHeaders that are set in userHeaders
659
// except for "Cookie:" - user cookies will be appended to system
660
// cookies
661
sysh = HttpHeaders.of(sysh.map(), overrides);
662
663
OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
664
if (contentLength == 0) {
665
f.setFlag(HeadersFrame.END_STREAM);
666
endStreamSent = true;
667
}
668
return f;
669
}
670
671
private boolean hasProxyAuthorization(HttpHeaders headers) {
672
return headers.firstValue("proxy-authorization")
673
.isPresent();
674
}
675
676
// Determines whether we need to build a new HttpHeader object.
677
//
678
// Ideally we should pass the filter to OutgoingHeaders refactor the
679
// code that creates the HeaderFrame to honor the filter.
680
// We're not there yet - so depending on the filter we need to
681
// apply and the content of the header we will try to determine
682
// whether anything might need to be filtered.
683
// If nothing needs filtering then we can just use the
684
// original headers.
685
private boolean needsFiltering(HttpHeaders headers,
686
BiPredicate<String, String> filter) {
687
if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
688
// we're either connecting or proxying
689
// slight optimization: we only need to filter out
690
// disabled schemes, so if there are none just
691
// pass through.
692
return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
693
&& hasProxyAuthorization(headers);
694
} else {
695
// we're talking to a server, either directly or through
696
// a tunnel.
697
// Slight optimization: we only need to filter out
698
// proxy authorization headers, so if there are none just
699
// pass through.
700
return hasProxyAuthorization(headers);
701
}
702
}
703
704
private HttpHeaders filterHeaders(HttpHeaders headers) {
705
HttpConnection conn = connection();
706
BiPredicate<String, String> filter = conn.headerFilter(request);
707
if (needsFiltering(headers, filter)) {
708
return HttpHeaders.of(headers.map(), filter);
709
}
710
return headers;
711
}
712
713
private static HttpHeaders createPseudoHeaders(HttpRequest request) {
714
HttpHeadersBuilder hdrs = new HttpHeadersBuilder();
715
String method = request.method();
716
hdrs.setHeader(":method", method);
717
URI uri = request.uri();
718
hdrs.setHeader(":scheme", uri.getScheme());
719
// TODO: userinfo deprecated. Needs to be removed
720
hdrs.setHeader(":authority", uri.getAuthority());
721
// TODO: ensure header names beginning with : not in user headers
722
String query = uri.getRawQuery();
723
String path = uri.getRawPath();
724
if (path == null || path.isEmpty()) {
725
if (method.equalsIgnoreCase("OPTIONS")) {
726
path = "*";
727
} else {
728
path = "/";
729
}
730
}
731
if (query != null) {
732
path += "?" + query;
733
}
734
hdrs.setHeader(":path", Utils.encode(path));
735
return hdrs.build();
736
}
737
738
HttpHeaders getRequestPseudoHeaders() {
739
return requestPseudoHeaders;
740
}
741
742
/** Sets endStreamReceived. Should be called only once. */
743
void setEndStreamReceived() {
744
if (debug.on()) debug.log("setEndStreamReceived: streamid=%d", streamid);
745
assert remotelyClosed == false: "Unexpected endStream already set";
746
remotelyClosed = true;
747
responseReceived();
748
}
749
750
/** Tells whether, or not, the END_STREAM Flag has been seen in any frame
751
* received on this stream. */
752
private boolean endStreamReceived() {
753
return remotelyClosed;
754
}
755
756
@Override
757
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
758
if (debug.on()) debug.log("sendHeadersOnly()");
759
if (Log.requests() && request != null) {
760
Log.logRequest(request.toString());
761
}
762
if (requestPublisher != null) {
763
requestContentLen = requestPublisher.contentLength();
764
} else {
765
requestContentLen = 0;
766
}
767
768
// At this point the stream doesn't have a streamid yet.
769
// It will be allocated if we send the request headers.
770
Throwable t = errorRef.get();
771
if (t != null) {
772
if (debug.on()) debug.log("stream already cancelled, headers not sent: %s", (Object)t);
773
return MinimalFuture.failedFuture(t);
774
}
775
776
// sending the headers will cause the allocation of the stream id
777
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
778
connection.sendFrame(f);
779
CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
780
cf.complete(this); // #### good enough for now
781
return cf;
782
}
783
784
@Override
785
void released() {
786
if (streamid > 0) {
787
if (debug.on()) debug.log("Released stream %d", streamid);
788
// remove this stream from the Http2Connection map.
789
connection.decrementStreamsCount(streamid);
790
connection.closeStream(streamid);
791
} else {
792
if (debug.on()) debug.log("Can't release stream %d", streamid);
793
}
794
}
795
796
@Override
797
void completed() {
798
// There should be nothing to do here: the stream should have
799
// been already closed (or will be closed shortly after).
800
}
801
802
boolean registerStream(int id, boolean registerIfCancelled) {
803
boolean cancelled = closed || exchange.multi.requestCancelled();
804
if (!cancelled || registerIfCancelled) {
805
this.streamid = id;
806
connection.putStream(this, streamid);
807
if (debug.on()) {
808
debug.log("Stream %d registered (cancelled: %b, registerIfCancelled: %b)",
809
streamid, cancelled, registerIfCancelled);
810
}
811
}
812
return !cancelled;
813
}
814
815
void signalWindowUpdate() {
816
RequestSubscriber subscriber = requestSubscriber;
817
assert subscriber != null;
818
if (debug.on()) debug.log("Signalling window update");
819
subscriber.sendScheduler.runOrSchedule();
820
}
821
822
static final ByteBuffer COMPLETED = ByteBuffer.allocate(0);
823
class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
824
// can be < 0 if the actual length is not known.
825
private final long contentLength;
826
private volatile long remainingContentLength;
827
private volatile Subscription subscription;
828
829
// Holds the outgoing data. There will be at most 2 outgoing ByteBuffers.
830
// 1) The data that was published by the request body Publisher, and
831
// 2) the COMPLETED sentinel, since onComplete can be invoked without demand.
832
final ConcurrentLinkedDeque<ByteBuffer> outgoing = new ConcurrentLinkedDeque<>();
833
834
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
835
// A scheduler used to honor window updates. Writing must be paused
836
// when the window is exhausted, and resumed when the window acquires
837
// some space. The sendScheduler makes it possible to implement this
838
// behaviour in an asynchronous non-blocking way.
839
// See RequestSubscriber::trySend below.
840
final SequentialScheduler sendScheduler;
841
842
RequestSubscriber(long contentLen) {
843
this.contentLength = contentLen;
844
this.remainingContentLength = contentLen;
845
this.sendScheduler =
846
SequentialScheduler.lockingScheduler(this::trySend);
847
}
848
849
@Override
850
public void onSubscribe(Flow.Subscription subscription) {
851
if (this.subscription != null) {
852
throw new IllegalStateException("already subscribed");
853
}
854
this.subscription = subscription;
855
if (debug.on())
856
debug.log("RequestSubscriber: onSubscribe, request 1");
857
subscription.request(1);
858
}
859
860
@Override
861
public void onNext(ByteBuffer item) {
862
if (debug.on())
863
debug.log("RequestSubscriber: onNext(%d)", item.remaining());
864
int size = outgoing.size();
865
assert size == 0 : "non-zero size: " + size;
866
onNextImpl(item);
867
}
868
869
private void onNextImpl(ByteBuffer item) {
870
// Got some more request body bytes to send.
871
if (requestBodyCF.isDone()) {
872
// stream already cancelled, probably in timeout
873
sendScheduler.stop();
874
subscription.cancel();
875
return;
876
}
877
outgoing.add(item);
878
sendScheduler.runOrSchedule();
879
}
880
881
@Override
882
public void onError(Throwable throwable) {
883
if (debug.on())
884
debug.log(() -> "RequestSubscriber: onError: " + throwable);
885
// ensure that errors are handled within the flow.
886
if (errorRef.compareAndSet(null, throwable)) {
887
sendScheduler.runOrSchedule();
888
}
889
}
890
891
@Override
892
public void onComplete() {
893
if (debug.on()) debug.log("RequestSubscriber: onComplete");
894
int size = outgoing.size();
895
assert size == 0 || size == 1 : "non-zero or one size: " + size;
896
// last byte of request body has been obtained.
897
// ensure that everything is completed within the flow.
898
onNextImpl(COMPLETED);
899
}
900
901
// Attempts to send the data, if any.
902
// Handles errors and completion state.
903
// Pause writing if the send window is exhausted, resume it if the
904
// send window has some bytes that can be acquired.
905
void trySend() {
906
try {
907
// handle errors raised by onError;
908
Throwable t = errorRef.get();
909
if (t != null) {
910
sendScheduler.stop();
911
if (requestBodyCF.isDone()) return;
912
subscription.cancel();
913
requestBodyCF.completeExceptionally(t);
914
cancelImpl(t);
915
return;
916
}
917
int state = streamState;
918
919
do {
920
// handle COMPLETED;
921
ByteBuffer item = outgoing.peekFirst();
922
if (item == null) return;
923
else if (item == COMPLETED) {
924
sendScheduler.stop();
925
complete();
926
return;
927
}
928
929
// handle bytes to send downstream
930
while (item.hasRemaining() && state == 0) {
931
if (debug.on()) debug.log("trySend: %d", item.remaining());
932
DataFrame df = getDataFrame(item);
933
if (df == null) {
934
if (debug.on())
935
debug.log("trySend: can't send yet: %d", item.remaining());
936
return; // the send window is exhausted: come back later
937
}
938
939
if (contentLength > 0) {
940
remainingContentLength -= df.getDataLength();
941
if (remainingContentLength < 0) {
942
String msg = connection().getConnectionFlow()
943
+ " stream=" + streamid + " "
944
+ "[" + Thread.currentThread().getName() + "] "
945
+ "Too many bytes in request body. Expected: "
946
+ contentLength + ", got: "
947
+ (contentLength - remainingContentLength);
948
assert streamid > 0;
949
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
950
throw new IOException(msg);
951
} else if (remainingContentLength == 0) {
952
assert !endStreamSent : "internal error, send data after END_STREAM flag";
953
df.setFlag(DataFrame.END_STREAM);
954
endStreamSent = true;
955
}
956
} else {
957
assert !endStreamSent : "internal error, send data after END_STREAM flag";
958
}
959
if ((state = streamState) != 0) {
960
if (debug.on()) debug.log("trySend: cancelled: %s", String.valueOf(t));
961
break;
962
}
963
if (debug.on())
964
debug.log("trySend: sending: %d", df.getDataLength());
965
sendDataFrame(df);
966
}
967
if (state != 0) break;
968
assert !item.hasRemaining();
969
ByteBuffer b = outgoing.removeFirst();
970
assert b == item;
971
} while (outgoing.peekFirst() != null);
972
973
if (state != 0) {
974
t = errorRef.get();
975
if (t == null) t = new IOException(ResetFrame.stringForCode(streamState));
976
throw t;
977
}
978
979
if (debug.on()) debug.log("trySend: request 1");
980
subscription.request(1);
981
} catch (Throwable ex) {
982
if (debug.on()) debug.log("trySend: ", ex);
983
sendScheduler.stop();
984
subscription.cancel();
985
requestBodyCF.completeExceptionally(ex);
986
// need to cancel the stream to 1. tell the server
987
// we don't want to receive any more data and
988
// 2. ensure that the operation ref count will be
989
// decremented on the HttpClient.
990
cancelImpl(ex);
991
}
992
}
993
994
private void complete() throws IOException {
995
long remaining = remainingContentLength;
996
long written = contentLength - remaining;
997
if (remaining > 0) {
998
connection.resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
999
// let trySend() handle the exception
1000
throw new IOException(connection().getConnectionFlow()
1001
+ " stream=" + streamid + " "
1002
+ "[" + Thread.currentThread().getName() +"] "
1003
+ "Too few bytes returned by the publisher ("
1004
+ written + "/"
1005
+ contentLength + ")");
1006
}
1007
if (!endStreamSent) {
1008
endStreamSent = true;
1009
connection.sendDataFrame(getEmptyEndStreamDataFrame());
1010
}
1011
requestBodyCF.complete(null);
1012
}
1013
}
1014
1015
/**
1016
* Send a RESET frame to tell server to stop sending data on this stream
1017
*/
1018
@Override
1019
public CompletableFuture<Void> ignoreBody() {
1020
try {
1021
connection.resetStream(streamid, ResetFrame.STREAM_CLOSED);
1022
return MinimalFuture.completedFuture(null);
1023
} catch (Throwable e) {
1024
Log.logTrace("Error resetting stream {0}", e.toString());
1025
return MinimalFuture.failedFuture(e);
1026
}
1027
}
1028
1029
DataFrame getDataFrame(ByteBuffer buffer) {
1030
int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
1031
// blocks waiting for stream send window, if exhausted
1032
int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
1033
if (actualAmount <= 0) return null;
1034
ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer, actualAmount);
1035
DataFrame df = new DataFrame(streamid, 0 , outBuf);
1036
return df;
1037
}
1038
1039
private DataFrame getEmptyEndStreamDataFrame() {
1040
return new DataFrame(streamid, DataFrame.END_STREAM, List.of());
1041
}
1042
1043
/**
1044
* A List of responses relating to this stream. Normally there is only
1045
* one response, but intermediate responses like 100 are allowed
1046
* and must be passed up to higher level before continuing. Deals with races
1047
* such as if responses are returned before the CFs get created by
1048
* getResponseAsync()
1049
*/
1050
1051
final List<CompletableFuture<Response>> response_cfs = new ArrayList<>(5);
1052
1053
@Override
1054
CompletableFuture<Response> getResponseAsync(Executor executor) {
1055
CompletableFuture<Response> cf;
1056
// The code below deals with race condition that can be caused when
1057
// completeResponse() is being called before getResponseAsync()
1058
synchronized (response_cfs) {
1059
if (!response_cfs.isEmpty()) {
1060
// This CompletableFuture was created by completeResponse().
1061
// it will be already completed.
1062
cf = response_cfs.remove(0);
1063
// if we find a cf here it should be already completed.
1064
// finding a non completed cf should not happen. just assert it.
1065
assert cf.isDone() : "Removing uncompleted response: could cause code to hang!";
1066
} else {
1067
// getResponseAsync() is called first. Create a CompletableFuture
1068
// that will be completed by completeResponse() when
1069
// completeResponse() is called.
1070
cf = new MinimalFuture<>();
1071
response_cfs.add(cf);
1072
}
1073
}
1074
if (executor != null && !cf.isDone()) {
1075
// protect from executing later chain of CompletableFuture operations from SelectorManager thread
1076
cf = cf.thenApplyAsync(r -> r, executor);
1077
}
1078
Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
1079
PushGroup<?> pg = exchange.getPushGroup();
1080
if (pg != null) {
1081
// if an error occurs make sure it is recorded in the PushGroup
1082
cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
1083
}
1084
return cf;
1085
}
1086
1087
/**
1088
* Completes the first uncompleted CF on list, and removes it. If there is no
1089
* uncompleted CF then creates one (completes it) and adds to list
1090
*/
1091
void completeResponse(Response resp) {
1092
synchronized (response_cfs) {
1093
CompletableFuture<Response> cf;
1094
int cfs_len = response_cfs.size();
1095
for (int i=0; i<cfs_len; i++) {
1096
cf = response_cfs.get(i);
1097
if (!cf.isDone()) {
1098
Log.logTrace("Completing response (streamid={0}): {1}",
1099
streamid, cf);
1100
if (debug.on())
1101
debug.log("Completing responseCF(%d) with response headers", i);
1102
response_cfs.remove(cf);
1103
cf.complete(resp);
1104
return;
1105
} // else we found the previous response: just leave it alone.
1106
}
1107
cf = MinimalFuture.completedFuture(resp);
1108
Log.logTrace("Created completed future (streamid={0}): {1}",
1109
streamid, cf);
1110
if (debug.on())
1111
debug.log("Adding completed responseCF(0) with response headers");
1112
response_cfs.add(cf);
1113
}
1114
}
1115
1116
// methods to update state and remove stream when finished
1117
1118
synchronized void requestSent() {
1119
requestSent = true;
1120
if (responseReceived) {
1121
if (debug.on()) debug.log("requestSent: streamid=%d", streamid);
1122
close();
1123
} else {
1124
if (debug.on()) {
1125
debug.log("requestSent: streamid=%d but response not received", streamid);
1126
}
1127
}
1128
}
1129
1130
synchronized void responseReceived() {
1131
responseReceived = true;
1132
if (requestSent) {
1133
if (debug.on()) debug.log("responseReceived: streamid=%d", streamid);
1134
close();
1135
} else {
1136
if (debug.on()) {
1137
debug.log("responseReceived: streamid=%d but request not sent", streamid);
1138
}
1139
}
1140
}
1141
1142
/**
1143
* same as above but for errors
1144
*/
1145
void completeResponseExceptionally(Throwable t) {
1146
synchronized (response_cfs) {
1147
// use index to avoid ConcurrentModificationException
1148
// caused by removing the CF from within the loop.
1149
for (int i = 0; i < response_cfs.size(); i++) {
1150
CompletableFuture<Response> cf = response_cfs.get(i);
1151
if (!cf.isDone()) {
1152
response_cfs.remove(i);
1153
cf.completeExceptionally(t);
1154
return;
1155
}
1156
}
1157
response_cfs.add(MinimalFuture.failedFuture(t));
1158
}
1159
}
1160
1161
CompletableFuture<Void> sendBodyImpl() {
1162
requestBodyCF.whenComplete((v, t) -> requestSent());
1163
try {
1164
if (requestPublisher != null) {
1165
final RequestSubscriber subscriber = new RequestSubscriber(requestContentLen);
1166
requestPublisher.subscribe(requestSubscriber = subscriber);
1167
} else {
1168
// there is no request body, therefore the request is complete,
1169
// END_STREAM has already sent with outgoing headers
1170
requestBodyCF.complete(null);
1171
}
1172
} catch (Throwable t) {
1173
cancelImpl(t);
1174
requestBodyCF.completeExceptionally(t);
1175
}
1176
return requestBodyCF;
1177
}
1178
1179
@Override
1180
void cancel() {
1181
if ((streamid == 0)) {
1182
cancel(new IOException("Stream cancelled before streamid assigned"));
1183
} else {
1184
cancel(new IOException("Stream " + streamid + " cancelled"));
1185
}
1186
}
1187
1188
void onSubscriptionError(Throwable t) {
1189
errorRef.compareAndSet(null, t);
1190
if (debug.on()) debug.log("Got subscription error: %s", (Object)t);
1191
// This is the special case where the subscriber
1192
// has requested an illegal number of items.
1193
// In this case, the error doesn't come from
1194
// upstream, but from downstream, and we need to
1195
// handle the error without waiting for the inputQ
1196
// to be exhausted.
1197
stopRequested = true;
1198
sched.runOrSchedule();
1199
}
1200
1201
@Override
1202
void cancel(IOException cause) {
1203
cancelImpl(cause);
1204
}
1205
1206
void connectionClosing(Throwable cause) {
1207
Flow.Subscriber<?> subscriber =
1208
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
1209
errorRef.compareAndSet(null, cause);
1210
if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
1211
sched.runOrSchedule();
1212
} else cancelImpl(cause);
1213
}
1214
1215
// This method sends a RST_STREAM frame
1216
void cancelImpl(Throwable e) {
1217
errorRef.compareAndSet(null, e);
1218
if (debug.on()) {
1219
if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);
1220
else debug.log("cancelling stream %d: %s", streamid, e);
1221
}
1222
if (Log.trace()) {
1223
if (streamid == 0) Log.logTrace("cancelling stream: {0}\n", e);
1224
else Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
1225
}
1226
boolean closing;
1227
if (closing = !closed) { // assigning closing to !closed
1228
synchronized (this) {
1229
if (closing = !closed) { // assigning closing to !closed
1230
closed=true;
1231
}
1232
}
1233
}
1234
if (closing) { // true if the stream has not been closed yet
1235
if (responseSubscriber != null || pendingResponseSubscriber != null)
1236
sched.runOrSchedule();
1237
}
1238
completeResponseExceptionally(e);
1239
if (!requestBodyCF.isDone()) {
1240
requestBodyCF.completeExceptionally(errorRef.get()); // we may be sending the body..
1241
}
1242
if (responseBodyCF != null) {
1243
responseBodyCF.completeExceptionally(errorRef.get());
1244
}
1245
try {
1246
// will send a RST_STREAM frame
1247
if (streamid != 0 && streamState == 0) {
1248
e = Utils.getCompletionCause(e);
1249
if (e instanceof EOFException) {
1250
// read EOF: no need to try & send reset
1251
connection.decrementStreamsCount(streamid);
1252
connection.closeStream(streamid);
1253
} else {
1254
// no use to send CANCEL if already closed.
1255
sendCancelStreamFrame();
1256
}
1257
}
1258
} catch (Throwable ex) {
1259
Log.logError(ex);
1260
}
1261
}
1262
1263
void sendCancelStreamFrame() {
1264
// do not reset a stream until it has a streamid.
1265
if (streamid > 0 && markStream(ResetFrame.CANCEL) == 0) {
1266
connection.resetStream(streamid, ResetFrame.CANCEL);
1267
}
1268
close();
1269
}
1270
1271
// This method doesn't send any frame
1272
void close() {
1273
if (closed) return;
1274
synchronized(this) {
1275
if (closed) return;
1276
closed = true;
1277
}
1278
if (debug.on()) debug.log("close stream %d", streamid);
1279
Log.logTrace("Closing stream {0}", streamid);
1280
connection.closeStream(streamid);
1281
Log.logTrace("Stream {0} closed", streamid);
1282
}
1283
1284
static class PushedStream<T> extends Stream<T> {
1285
final PushGroup<T> pushGroup;
1286
// push streams need the response CF allocated up front as it is
1287
// given directly to user via the multi handler callback function.
1288
final CompletableFuture<Response> pushCF;
1289
CompletableFuture<HttpResponse<T>> responseCF;
1290
final HttpRequestImpl pushReq;
1291
HttpResponse.BodyHandler<T> pushHandler;
1292
1293
PushedStream(PushGroup<T> pushGroup,
1294
Http2Connection connection,
1295
Exchange<T> pushReq) {
1296
// ## no request body possible, null window controller
1297
super(connection, pushReq, null);
1298
this.pushGroup = pushGroup;
1299
this.pushReq = pushReq.request();
1300
this.pushCF = new MinimalFuture<>();
1301
this.responseCF = new MinimalFuture<>();
1302
1303
}
1304
1305
CompletableFuture<HttpResponse<T>> responseCF() {
1306
return responseCF;
1307
}
1308
1309
synchronized void setPushHandler(HttpResponse.BodyHandler<T> pushHandler) {
1310
this.pushHandler = pushHandler;
1311
}
1312
1313
synchronized HttpResponse.BodyHandler<T> getPushHandler() {
1314
// ignored parameters to function can be used as BodyHandler
1315
return this.pushHandler;
1316
}
1317
1318
// Following methods call the super class but in case of
1319
// error record it in the PushGroup. The error method is called
1320
// with a null value when no error occurred (is a no-op)
1321
@Override
1322
CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
1323
return super.sendBodyAsync()
1324
.whenComplete((ExchangeImpl<T> v, Throwable t)
1325
-> pushGroup.pushError(Utils.getCompletionCause(t)));
1326
}
1327
1328
@Override
1329
CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
1330
return super.sendHeadersAsync()
1331
.whenComplete((ExchangeImpl<T> ex, Throwable t)
1332
-> pushGroup.pushError(Utils.getCompletionCause(t)));
1333
}
1334
1335
@Override
1336
CompletableFuture<Response> getResponseAsync(Executor executor) {
1337
CompletableFuture<Response> cf = pushCF.whenComplete(
1338
(v, t) -> pushGroup.pushError(Utils.getCompletionCause(t)));
1339
if(executor!=null && !cf.isDone()) {
1340
cf = cf.thenApplyAsync( r -> r, executor);
1341
}
1342
return cf;
1343
}
1344
1345
@Override
1346
CompletableFuture<T> readBodyAsync(
1347
HttpResponse.BodyHandler<T> handler,
1348
boolean returnConnectionToPool,
1349
Executor executor)
1350
{
1351
return super.readBodyAsync(handler, returnConnectionToPool, executor)
1352
.whenComplete((v, t) -> pushGroup.pushError(t));
1353
}
1354
1355
@Override
1356
void completeResponse(Response r) {
1357
Log.logResponse(r::toString);
1358
pushCF.complete(r); // not strictly required for push API
1359
// start reading the body using the obtained BodySubscriber
1360
CompletableFuture<Void> start = new MinimalFuture<>();
1361
start.thenCompose( v -> readBodyAsync(getPushHandler(), false, getExchange().executor()))
1362
.whenComplete((T body, Throwable t) -> {
1363
if (t != null) {
1364
responseCF.completeExceptionally(t);
1365
} else {
1366
HttpResponseImpl<T> resp =
1367
new HttpResponseImpl<>(r.request, r, null, body, getExchange());
1368
responseCF.complete(resp);
1369
}
1370
});
1371
start.completeAsync(() -> null, getExchange().executor());
1372
}
1373
1374
@Override
1375
void completeResponseExceptionally(Throwable t) {
1376
pushCF.completeExceptionally(t);
1377
}
1378
1379
// @Override
1380
// synchronized void responseReceived() {
1381
// super.responseReceived();
1382
// }
1383
1384
// create and return the PushResponseImpl
1385
@Override
1386
protected void handleResponse() {
1387
HttpHeaders responseHeaders = responseHeadersBuilder.build();
1388
responseCode = (int)responseHeaders
1389
.firstValueAsLong(":status")
1390
.orElse(-1);
1391
1392
if (responseCode == -1) {
1393
completeResponseExceptionally(new IOException("No status code"));
1394
}
1395
1396
this.response = new Response(
1397
pushReq, exchange, responseHeaders, connection(),
1398
responseCode, HttpClient.Version.HTTP_2);
1399
1400
/* TODO: review if needs to be removed
1401
the value is not used, but in case `content-length` doesn't parse
1402
as long, there will be NumberFormatException. If left as is, make
1403
sure code up the stack handles NFE correctly. */
1404
responseHeaders.firstValueAsLong("content-length");
1405
1406
if (Log.headers()) {
1407
StringBuilder sb = new StringBuilder("RESPONSE HEADERS");
1408
sb.append(" (streamid=").append(streamid).append("):\n");
1409
Log.dumpHeaders(sb, " ", responseHeaders);
1410
Log.logHeaders(sb.toString());
1411
}
1412
1413
rspHeadersConsumer.reset();
1414
1415
// different implementations for normal streams and pushed streams
1416
completeResponse(response);
1417
}
1418
}
1419
1420
final class StreamWindowUpdateSender extends WindowUpdateSender {
1421
1422
StreamWindowUpdateSender(Http2Connection connection) {
1423
super(connection);
1424
}
1425
1426
@Override
1427
int getStreamId() {
1428
return streamid;
1429
}
1430
1431
@Override
1432
String dbgString() {
1433
String dbg = dbgString;
1434
if (dbg != null) return dbg;
1435
if (streamid == 0) {
1436
return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
1437
} else {
1438
dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
1439
return dbgString = dbg;
1440
}
1441
}
1442
}
1443
1444
/**
1445
* Returns true if this exchange was canceled.
1446
* @return true if this exchange was canceled.
1447
*/
1448
synchronized boolean isCanceled() {
1449
return errorRef.get() != null;
1450
}
1451
1452
/**
1453
* Returns the cause for which this exchange was canceled, if available.
1454
* @return the cause for which this exchange was canceled, if available.
1455
*/
1456
synchronized Throwable getCancelCause() {
1457
return errorRef.get();
1458
}
1459
1460
final String dbgString() {
1461
return connection.dbgString() + "/Stream("+streamid+")";
1462
}
1463
1464
private class HeadersConsumer extends Http2Connection.ValidatingHeadersConsumer {
1465
1466
void reset() {
1467
super.reset();
1468
responseHeadersBuilder.clear();
1469
debug.log("Response builder cleared, ready to receive new headers.");
1470
}
1471
1472
@Override
1473
public void onDecoded(CharSequence name, CharSequence value)
1474
throws UncheckedIOException
1475
{
1476
String n = name.toString();
1477
String v = value.toString();
1478
super.onDecoded(n, v);
1479
responseHeadersBuilder.addHeader(n, v);
1480
if (Log.headers() && Log.trace()) {
1481
Log.logTrace("RECEIVED HEADER (streamid={0}): {1}: {2}",
1482
streamid, n, v);
1483
}
1484
}
1485
}
1486
1487
private static final VarHandle STREAM_STATE;
1488
private static final VarHandle DEREGISTERED;
1489
static {
1490
try {
1491
STREAM_STATE = MethodHandles.lookup()
1492
.findVarHandle(Stream.class, "streamState", int.class);
1493
DEREGISTERED = MethodHandles.lookup()
1494
.findVarHandle(Stream.class, "deRegistered", boolean.class);
1495
} catch (Exception x) {
1496
throw new ExceptionInInitializerError(x);
1497
}
1498
}
1499
}
1500
1501