Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/jdk17u
Path: blob/master/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
67707 views
1
/*
2
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
26
package jdk.internal.net.http;
27
28
import java.io.EOFException;
29
import java.io.IOException;
30
import java.io.UncheckedIOException;
31
import java.net.InetSocketAddress;
32
import java.net.URI;
33
import java.nio.ByteBuffer;
34
import java.nio.charset.StandardCharsets;
35
import java.util.Iterator;
36
import java.util.List;
37
import java.util.Locale;
38
import java.util.Map;
39
import java.util.Set;
40
import java.util.concurrent.CompletableFuture;
41
import java.util.ArrayList;
42
import java.util.Objects;
43
import java.util.concurrent.ConcurrentMap;
44
import java.util.concurrent.ConcurrentHashMap;
45
import java.util.concurrent.ConcurrentLinkedQueue;
46
import java.util.concurrent.Flow;
47
import java.util.function.Function;
48
import java.util.function.Supplier;
49
import javax.net.ssl.SSLEngine;
50
import javax.net.ssl.SSLException;
51
import java.net.http.HttpClient;
52
import java.net.http.HttpHeaders;
53
import jdk.internal.net.http.HttpConnection.HttpPublisher;
54
import jdk.internal.net.http.common.FlowTube;
55
import jdk.internal.net.http.common.FlowTube.TubeSubscriber;
56
import jdk.internal.net.http.common.HttpHeadersBuilder;
57
import jdk.internal.net.http.common.Log;
58
import jdk.internal.net.http.common.Logger;
59
import jdk.internal.net.http.common.MinimalFuture;
60
import jdk.internal.net.http.common.SequentialScheduler;
61
import jdk.internal.net.http.common.Utils;
62
import jdk.internal.net.http.frame.ContinuationFrame;
63
import jdk.internal.net.http.frame.DataFrame;
64
import jdk.internal.net.http.frame.ErrorFrame;
65
import jdk.internal.net.http.frame.FramesDecoder;
66
import jdk.internal.net.http.frame.FramesEncoder;
67
import jdk.internal.net.http.frame.GoAwayFrame;
68
import jdk.internal.net.http.frame.HeaderFrame;
69
import jdk.internal.net.http.frame.HeadersFrame;
70
import jdk.internal.net.http.frame.Http2Frame;
71
import jdk.internal.net.http.frame.MalformedFrame;
72
import jdk.internal.net.http.frame.OutgoingHeaders;
73
import jdk.internal.net.http.frame.PingFrame;
74
import jdk.internal.net.http.frame.PushPromiseFrame;
75
import jdk.internal.net.http.frame.ResetFrame;
76
import jdk.internal.net.http.frame.SettingsFrame;
77
import jdk.internal.net.http.frame.WindowUpdateFrame;
78
import jdk.internal.net.http.hpack.Encoder;
79
import jdk.internal.net.http.hpack.Decoder;
80
import jdk.internal.net.http.hpack.DecodingCallback;
81
import static java.nio.charset.StandardCharsets.UTF_8;
82
import static jdk.internal.net.http.frame.SettingsFrame.*;
83
84
/**
85
* An Http2Connection. Encapsulates the socket(channel) and any SSLEngine used
86
* over it. Contains an HttpConnection which hides the SocketChannel SSL stuff.
87
*
88
* Http2Connections belong to a Http2ClientImpl, (one of) which belongs
89
* to a HttpClientImpl.
90
*
91
* Creation cases:
92
* 1) upgraded HTTP/1.1 plain tcp connection
93
* 2) prior knowledge directly created plain tcp connection
94
* 3) directly created HTTP/2 SSL connection which uses ALPN.
95
*
96
* Sending is done by writing directly to underlying HttpConnection object which
97
* is operating in async mode. No flow control applies on output at this level
98
* and all writes are just executed as puts to an output Q belonging to HttpConnection
99
* Flow control is implemented by HTTP/2 protocol itself.
100
*
101
* Hpack header compression
102
* and outgoing stream creation is also done here, because these operations
103
* must be synchronized at the socket level. Stream objects send frames simply
104
* by placing them on the connection's output Queue. sendFrame() is called
105
* from a higher level (Stream) thread.
106
*
107
* asyncReceive(ByteBuffer) is always called from the selector thread. It assembles
108
* incoming Http2Frames, and directs them to the appropriate Stream.incoming()
109
* or handles them directly itself. This thread performs hpack decompression
110
* and incoming stream creation (Server push). Incoming frames destined for a
111
* stream are provided by calling Stream.incoming().
112
*/
113
class Http2Connection {
114
115
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
116
final static Logger DEBUG_LOGGER =
117
Utils.getDebugLogger("Http2Connection"::toString, Utils.DEBUG);
118
private final Logger debugHpack =
119
Utils.getHpackLogger(this::dbgString, Utils.DEBUG_HPACK);
120
static final ByteBuffer EMPTY_TRIGGER = ByteBuffer.allocate(0);
121
122
static private final int MAX_CLIENT_STREAM_ID = Integer.MAX_VALUE; // 2147483647
123
static private final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646
124
125
/**
126
* Flag set when no more streams to be opened on this connection.
127
* Two cases where it is used.
128
*
129
* 1. Two connections to the same server were opened concurrently, in which
130
* case one of them will be put in the cache, and the second will expire
131
* when all its opened streams (which usually should be a single client
132
* stream + possibly some additional push-promise server streams) complete.
133
* 2. A cached connection reaches its maximum number of streams (~ 2^31-1)
134
* either server / or client allocated, in which case it will be taken
135
* out of the cache - allowing a new connection to replace it. It will
136
* expire when all its still open streams (which could be many) eventually
137
* complete.
138
*/
139
private boolean finalStream;
140
141
/*
142
* ByteBuffer pooling strategy for HTTP/2 protocol.
143
*
144
* In general there are 4 points where ByteBuffers are used:
145
* - incoming/outgoing frames from/to ByteBuffers plus incoming/outgoing
146
* encrypted data in case of SSL connection.
147
*
148
* 1. Outgoing frames encoded to ByteBuffers.
149
*
150
* Outgoing ByteBuffers are created with required size and frequently
151
* small (except DataFrames, etc). At this place no pools at all. All
152
* outgoing buffers should eventually be collected by GC.
153
*
154
* 2. Incoming ByteBuffers (decoded to frames).
155
*
156
* Here, total elimination of BB pool is not a good idea.
157
* We don't know how many bytes we will receive through network.
158
*
159
* A possible future improvement ( currently not implemented ):
160
* Allocate buffers of reasonable size. The following life of the BB:
161
* - If all frames decoded from the BB are other than DataFrame and
162
* HeaderFrame (and HeaderFrame subclasses) BB is returned to pool,
163
* - If a DataFrame is decoded from the BB. In that case DataFrame refers
164
* to sub-buffer obtained by slice(). Such a BB is never returned to the
165
* pool and will eventually be GC'ed.
166
* - If a HeadersFrame is decoded from the BB. Then header decoding is
167
* performed inside processFrame method and the buffer could be release
168
* back to pool.
169
*
170
* 3. SSL encrypted buffers ( received ).
171
*
172
* The current implementation recycles encrypted buffers read from the
173
* channel. The pool of buffers has a maximum size of 3, SocketTube.MAX_BUFFERS,
174
* direct buffers which are shared by all connections on a given client.
175
* The pool is used by all SSL connections - whether HTTP/1.1 or HTTP/2,
176
* but only for SSL encrypted buffers that circulate between the SocketTube
177
* Publisher and the SSLFlowDelegate Reader. Limiting the pool to this
178
* particular segment allows the use of direct buffers, thus avoiding any
179
* additional copy in the NIO socket channel implementation. See
180
* HttpClientImpl.SSLDirectBufferSupplier, SocketTube.SSLDirectBufferSource,
181
* and SSLTube.recycler.
182
*/
183
184
185
// A small class that allows to control frames with respect to the state of
186
// the connection preface. Any data received before the connection
187
// preface is sent will be buffered.
188
private final class FramesController {
189
volatile boolean prefaceSent;
190
volatile List<ByteBuffer> pending;
191
192
boolean processReceivedData(FramesDecoder decoder, ByteBuffer buf)
193
throws IOException
194
{
195
// if preface is not sent, buffers data in the pending list
196
if (!prefaceSent) {
197
if (debug.on())
198
debug.log("Preface not sent: buffering %d", buf.remaining());
199
synchronized (this) {
200
if (!prefaceSent) {
201
if (pending == null) pending = new ArrayList<>();
202
pending.add(buf);
203
if (debug.on())
204
debug.log("there are now %d bytes buffered waiting for preface to be sent"
205
+ Utils.remaining(pending)
206
);
207
return false;
208
}
209
}
210
}
211
212
// Preface is sent. Checks for pending data and flush it.
213
// We rely on this method being called from within the Http2TubeSubscriber
214
// scheduler, so we know that no other thread could execute this method
215
// concurrently while we're here.
216
// This ensures that later incoming buffers will not
217
// be processed before we have flushed the pending queue.
218
// No additional synchronization is therefore necessary here.
219
List<ByteBuffer> pending = this.pending;
220
this.pending = null;
221
if (pending != null) {
222
// flush pending data
223
if (debug.on()) debug.log(() -> "Processing buffered data: "
224
+ Utils.remaining(pending));
225
for (ByteBuffer b : pending) {
226
decoder.decode(b);
227
}
228
}
229
// push the received buffer to the frames decoder.
230
if (buf != EMPTY_TRIGGER) {
231
if (debug.on()) debug.log("Processing %d", buf.remaining());
232
decoder.decode(buf);
233
}
234
return true;
235
}
236
237
// Mark that the connection preface is sent
238
void markPrefaceSent() {
239
assert !prefaceSent;
240
synchronized (this) {
241
prefaceSent = true;
242
}
243
}
244
}
245
246
volatile boolean closed;
247
248
//-------------------------------------
249
final HttpConnection connection;
250
private final Http2ClientImpl client2;
251
private final ConcurrentMap<Integer,Stream<?>> streams = new ConcurrentHashMap<>();
252
private int nextstreamid;
253
private int nextPushStream = 2;
254
// actual stream ids are not allocated until the Headers frame is ready
255
// to be sent. The following two fields are updated as soon as a stream
256
// is created and assigned to a connection. They are checked before
257
// assigning a stream to a connection.
258
private int lastReservedClientStreamid = 1;
259
private int lastReservedServerStreamid = 0;
260
private int numReservedClientStreams = 0; // count of current streams
261
private int numReservedServerStreams = 0; // count of current streams
262
private final Encoder hpackOut;
263
private final Decoder hpackIn;
264
final SettingsFrame clientSettings;
265
private volatile SettingsFrame serverSettings;
266
private final String key; // for HttpClientImpl.connections map
267
private final FramesDecoder framesDecoder;
268
private final FramesEncoder framesEncoder = new FramesEncoder();
269
270
/**
271
* Send Window controller for both connection and stream windows.
272
* Each of this connection's Streams MUST use this controller.
273
*/
274
private final WindowController windowController = new WindowController();
275
private final FramesController framesController = new FramesController();
276
private final Http2TubeSubscriber subscriber;
277
final ConnectionWindowUpdateSender windowUpdater;
278
private volatile Throwable cause;
279
private volatile Supplier<ByteBuffer> initial;
280
281
static final int DEFAULT_FRAME_SIZE = 16 * 1024;
282
283
284
// TODO: need list of control frames from other threads
285
// that need to be sent
286
287
private Http2Connection(HttpConnection connection,
288
Http2ClientImpl client2,
289
int nextstreamid,
290
String key) {
291
this.connection = connection;
292
this.client2 = client2;
293
this.subscriber = new Http2TubeSubscriber(client2.client());
294
this.nextstreamid = nextstreamid;
295
this.key = key;
296
this.clientSettings = this.client2.getClientSettings();
297
this.framesDecoder = new FramesDecoder(this::processFrame,
298
clientSettings.getParameter(SettingsFrame.MAX_FRAME_SIZE));
299
// serverSettings will be updated by server
300
this.serverSettings = SettingsFrame.defaultRFCSettings();
301
this.hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE));
302
this.hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE));
303
if (debugHpack.on()) {
304
debugHpack.log("For the record:" + super.toString());
305
debugHpack.log("Decoder created: %s", hpackIn);
306
debugHpack.log("Encoder created: %s", hpackOut);
307
}
308
this.windowUpdater = new ConnectionWindowUpdateSender(this,
309
client2.getConnectionWindowSize(clientSettings));
310
}
311
312
/**
313
* Case 1) Create from upgraded HTTP/1.1 connection.
314
* Is ready to use. Can't be SSL. exchange is the Exchange
315
* that initiated the connection, whose response will be delivered
316
* on a Stream.
317
*/
318
private Http2Connection(HttpConnection connection,
319
Http2ClientImpl client2,
320
Exchange<?> exchange,
321
Supplier<ByteBuffer> initial)
322
throws IOException, InterruptedException
323
{
324
this(connection,
325
client2,
326
3, // stream 1 is registered during the upgrade
327
keyFor(connection));
328
reserveStream(true);
329
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
330
331
Stream<?> initialStream = createStream(exchange);
332
boolean opened = initialStream.registerStream(1, true);
333
if (debug.on() && !opened) {
334
debug.log("Initial stream was cancelled - but connection is maintained: " +
335
"reset frame will need to be sent later");
336
}
337
windowController.registerStream(1, getInitialSendWindowSize());
338
initialStream.requestSent();
339
// Upgrading:
340
// set callbacks before sending preface - makes sure anything that
341
// might be sent by the server will come our way.
342
this.initial = initial;
343
connectFlows(connection);
344
sendConnectionPreface();
345
if (!opened) {
346
debug.log("ensure reset frame is sent to cancel initial stream");
347
initialStream.sendCancelStreamFrame();
348
}
349
350
}
351
352
// Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
353
// agreement from the server. Async style but completes immediately, because
354
// the connection is already connected.
355
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
356
Http2ClientImpl client2,
357
Exchange<?> exchange,
358
Supplier<ByteBuffer> initial)
359
{
360
return MinimalFuture.supply(() -> new Http2Connection(connection, client2, exchange, initial));
361
}
362
363
// Requires TLS handshake. So, is really async
364
static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
365
Http2ClientImpl h2client,
366
Exchange<?> exchange) {
367
assert request.secure();
368
AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
369
HttpConnection.getConnection(request.getAddress(),
370
h2client.client(),
371
request,
372
HttpClient.Version.HTTP_2);
373
374
// Expose the underlying connection to the exchange's aborter so it can
375
// be closed if a timeout occurs.
376
exchange.connectionAborter.connection(connection);
377
378
return connection.connectAsync(exchange)
379
.thenCompose(unused -> connection.finishConnect())
380
.thenCompose(unused -> checkSSLConfig(connection))
381
.thenCompose(notused-> {
382
CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
383
try {
384
Http2Connection hc = new Http2Connection(request, h2client, connection);
385
cf.complete(hc);
386
} catch (IOException e) {
387
cf.completeExceptionally(e);
388
}
389
return cf; } );
390
}
391
392
/**
393
* Cases 2) 3)
394
*
395
* request is request to be sent.
396
*/
397
private Http2Connection(HttpRequestImpl request,
398
Http2ClientImpl h2client,
399
HttpConnection connection)
400
throws IOException
401
{
402
this(connection,
403
h2client,
404
1,
405
keyFor(request.uri(), request.proxy()));
406
407
Log.logTrace("Connection send window size {0} ", windowController.connectionWindowSize());
408
409
// safe to resume async reading now.
410
connectFlows(connection);
411
sendConnectionPreface();
412
}
413
414
private void connectFlows(HttpConnection connection) {
415
FlowTube tube = connection.getConnectionFlow();
416
// Connect the flow to our Http2TubeSubscriber:
417
tube.connectFlows(connection.publisher(), subscriber);
418
}
419
420
final HttpClientImpl client() {
421
return client2.client();
422
}
423
424
// call these before assigning a request/stream to a connection
425
// if false returned then a new Http2Connection is required
426
// if true, the stream may be assigned to this connection
427
// for server push, if false returned, then the stream should be cancelled
428
synchronized boolean reserveStream(boolean clientInitiated) throws IOException {
429
if (finalStream) {
430
return false;
431
}
432
if (clientInitiated && (lastReservedClientStreamid + 2) >= MAX_CLIENT_STREAM_ID) {
433
setFinalStream();
434
client2.deleteConnection(this);
435
return false;
436
} else if (!clientInitiated && (lastReservedServerStreamid + 2) >= MAX_SERVER_STREAM_ID) {
437
setFinalStream();
438
client2.deleteConnection(this);
439
return false;
440
}
441
if (clientInitiated)
442
lastReservedClientStreamid+=2;
443
else
444
lastReservedServerStreamid+=2;
445
446
assert numReservedClientStreams >= 0;
447
assert numReservedServerStreams >= 0;
448
if (clientInitiated &&numReservedClientStreams >= maxConcurrentClientInitiatedStreams()) {
449
throw new IOException("too many concurrent streams");
450
} else if (clientInitiated) {
451
numReservedClientStreams++;
452
}
453
if (!clientInitiated && numReservedServerStreams >= maxConcurrentServerInitiatedStreams()) {
454
return false;
455
} else if (!clientInitiated) {
456
numReservedServerStreams++;
457
}
458
return true;
459
}
460
461
/**
462
* Throws an IOException if h2 was not negotiated
463
*/
464
private static CompletableFuture<?> checkSSLConfig(AbstractAsyncSSLConnection aconn) {
465
assert aconn.isSecure();
466
467
Function<String, CompletableFuture<Void>> checkAlpnCF = (alpn) -> {
468
CompletableFuture<Void> cf = new MinimalFuture<>();
469
SSLEngine engine = aconn.getEngine();
470
String engineAlpn = engine.getApplicationProtocol();
471
assert Objects.equals(alpn, engineAlpn)
472
: "alpn: %s, engine: %s".formatted(alpn, engineAlpn);
473
474
DEBUG_LOGGER.log("checkSSLConfig: alpn: %s", alpn );
475
476
if (alpn == null || !alpn.equals("h2")) {
477
String msg;
478
if (alpn == null) {
479
Log.logSSL("ALPN not supported");
480
msg = "ALPN not supported";
481
} else {
482
switch (alpn) {
483
case "":
484
Log.logSSL(msg = "No ALPN negotiated");
485
break;
486
case "http/1.1":
487
Log.logSSL( msg = "HTTP/1.1 ALPN returned");
488
break;
489
default:
490
Log.logSSL(msg = "Unexpected ALPN: " + alpn);
491
cf.completeExceptionally(new IOException(msg));
492
}
493
}
494
cf.completeExceptionally(new ALPNException(msg, aconn));
495
return cf;
496
}
497
cf.complete(null);
498
return cf;
499
};
500
501
return aconn.getALPN()
502
.whenComplete((r,t) -> {
503
if (t != null && t instanceof SSLException) {
504
// something went wrong during the initial handshake
505
// close the connection
506
aconn.close();
507
}
508
})
509
.thenCompose(checkAlpnCF);
510
}
511
512
synchronized boolean finalStream() {
513
return finalStream;
514
}
515
516
/**
517
* Mark this connection so no more streams created on it and it will close when
518
* all are complete.
519
*/
520
synchronized void setFinalStream() {
521
finalStream = true;
522
}
523
524
static String keyFor(HttpConnection connection) {
525
boolean isProxy = connection.isProxied(); // tunnel or plain clear connection through proxy
526
boolean isSecure = connection.isSecure();
527
InetSocketAddress addr = connection.address();
528
InetSocketAddress proxyAddr = connection.proxy();
529
assert isProxy == (proxyAddr != null);
530
531
return keyString(isSecure, proxyAddr, addr.getHostString(), addr.getPort());
532
}
533
534
static String keyFor(URI uri, InetSocketAddress proxy) {
535
boolean isSecure = uri.getScheme().equalsIgnoreCase("https");
536
537
String host = uri.getHost();
538
int port = uri.getPort();
539
return keyString(isSecure, proxy, host, port);
540
}
541
542
543
// Compute the key for an HttpConnection in the Http2ClientImpl pool:
544
// The key string follows one of the three forms below:
545
// {C,S}:H:host:port
546
// C:P:proxy-host:proxy-port
547
// S:T:H:host:port;P:proxy-host:proxy-port
548
// C indicates clear text connection "http"
549
// S indicates secure "https"
550
// H indicates host (direct) connection
551
// P indicates proxy
552
// T indicates a tunnel connection through a proxy
553
//
554
// The first form indicates a direct connection to a server:
555
// - direct clear connection to an HTTP host:
556
// e.g.: "C:H:foo.com:80"
557
// - direct secure connection to an HTTPS host:
558
// e.g.: "S:H:foo.com:443"
559
// The second form indicates a clear connection to an HTTP/1.1 proxy:
560
// e.g.: "C:P:myproxy:8080"
561
// The third form indicates a secure tunnel connection to an HTTPS
562
// host through an HTTP/1.1 proxy:
563
// e.g: "S:T:H:foo.com:80;P:myproxy:8080"
564
static String keyString(boolean secure, InetSocketAddress proxy, String host, int port) {
565
if (secure && port == -1)
566
port = 443;
567
else if (!secure && port == -1)
568
port = 80;
569
var key = (secure ? "S:" : "C:");
570
if (proxy != null && !secure) {
571
// clear connection through proxy
572
key = key + "P:" + proxy.getHostString() + ":" + proxy.getPort();
573
} else if (proxy == null) {
574
// direct connection to host
575
key = key + "H:" + host + ":" + port;
576
} else {
577
// tunnel connection through proxy
578
key = key + "T:H:" + host + ":" + port + ";P:" + proxy.getHostString() + ":" + proxy.getPort();
579
}
580
return key;
581
}
582
583
String key() {
584
return this.key;
585
}
586
587
boolean offerConnection() {
588
return client2.offerConnection(this);
589
}
590
591
private HttpPublisher publisher() {
592
return connection.publisher();
593
}
594
595
private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
596
throws IOException
597
{
598
if (debugHpack.on()) debugHpack.log("decodeHeaders(%s)", decoder);
599
600
boolean endOfHeaders = frame.getFlag(HeaderFrame.END_HEADERS);
601
602
List<ByteBuffer> buffers = frame.getHeaderBlock();
603
int len = buffers.size();
604
for (int i = 0; i < len; i++) {
605
ByteBuffer b = buffers.get(i);
606
hpackIn.decode(b, endOfHeaders && (i == len - 1), decoder);
607
}
608
}
609
610
final int getInitialSendWindowSize() {
611
return serverSettings.getParameter(INITIAL_WINDOW_SIZE);
612
}
613
614
final int maxConcurrentClientInitiatedStreams() {
615
return serverSettings.getParameter(MAX_CONCURRENT_STREAMS);
616
}
617
618
final int maxConcurrentServerInitiatedStreams() {
619
return clientSettings.getParameter(MAX_CONCURRENT_STREAMS);
620
}
621
622
void close() {
623
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
624
GoAwayFrame f = new GoAwayFrame(0,
625
ErrorFrame.NO_ERROR,
626
"Requested by user".getBytes(UTF_8));
627
// TODO: set last stream. For now zero ok.
628
sendFrame(f);
629
}
630
631
long count;
632
final void asyncReceive(ByteBuffer buffer) {
633
// We don't need to read anything and
634
// we don't want to send anything back to the server
635
// until the connection preface has been sent.
636
// Therefore we're going to wait if needed before reading
637
// (and thus replying) to anything.
638
// Starting to reply to something (e.g send an ACK to a
639
// SettingsFrame sent by the server) before the connection
640
// preface is fully sent might result in the server
641
// sending a GOAWAY frame with 'invalid_preface'.
642
//
643
// Note: asyncReceive is only called from the Http2TubeSubscriber
644
// sequential scheduler.
645
try {
646
Supplier<ByteBuffer> bs = initial;
647
// ensure that we always handle the initial buffer first,
648
// if any.
649
if (bs != null) {
650
initial = null;
651
ByteBuffer b = bs.get();
652
if (b.hasRemaining()) {
653
long c = ++count;
654
if (debug.on())
655
debug.log(() -> "H2 Receiving Initial(" + c +"): " + b.remaining());
656
framesController.processReceivedData(framesDecoder, b);
657
}
658
}
659
ByteBuffer b = buffer;
660
// the Http2TubeSubscriber scheduler ensures that the order of incoming
661
// buffers is preserved.
662
if (b == EMPTY_TRIGGER) {
663
if (debug.on()) debug.log("H2 Received EMPTY_TRIGGER");
664
boolean prefaceSent = framesController.prefaceSent;
665
assert prefaceSent;
666
// call framesController.processReceivedData to potentially
667
// trigger the processing of all the data buffered there.
668
framesController.processReceivedData(framesDecoder, buffer);
669
if (debug.on()) debug.log("H2 processed buffered data");
670
} else {
671
long c = ++count;
672
if (debug.on())
673
debug.log("H2 Receiving(%d): %d", c, b.remaining());
674
framesController.processReceivedData(framesDecoder, buffer);
675
if (debug.on()) debug.log("H2 processed(%d)", c);
676
}
677
} catch (Throwable e) {
678
String msg = Utils.stackTrace(e);
679
Log.logTrace(msg);
680
shutdown(e);
681
}
682
}
683
684
Throwable getRecordedCause() {
685
return cause;
686
}
687
688
void shutdown(Throwable t) {
689
if (debug.on()) debug.log(() -> "Shutting down h2c (closed="+closed+"): " + t);
690
if (closed == true) return;
691
synchronized (this) {
692
if (closed == true) return;
693
closed = true;
694
}
695
if (Log.errors()) {
696
if (!(t instanceof EOFException) || isActive()) {
697
Log.logError(t);
698
} else if (t != null) {
699
Log.logError("Shutting down connection: {0}", t.getMessage());
700
}
701
}
702
Throwable initialCause = this.cause;
703
if (initialCause == null) this.cause = t;
704
client2.deleteConnection(this);
705
for (Stream<?> s : streams.values()) {
706
try {
707
s.connectionClosing(t);
708
} catch (Throwable e) {
709
Log.logError("Failed to close stream {0}: {1}", s.streamid, e);
710
}
711
}
712
connection.close();
713
}
714
715
/**
716
* Streams initiated by a client MUST use odd-numbered stream
717
* identifiers; those initiated by the server MUST use even-numbered
718
* stream identifiers.
719
*/
720
private static final boolean isServerInitiatedStream(int streamid) {
721
return (streamid & 0x1) == 0;
722
}
723
724
/**
725
* Handles stream 0 (common) frames that apply to whole connection and passes
726
* other stream specific frames to that Stream object.
727
*
728
* Invokes Stream.incoming() which is expected to process frame without
729
* blocking.
730
*/
731
void processFrame(Http2Frame frame) throws IOException {
732
Log.logFrames(frame, "IN");
733
int streamid = frame.streamid();
734
if (frame instanceof MalformedFrame) {
735
Log.logError(((MalformedFrame) frame).getMessage());
736
if (streamid == 0) {
737
framesDecoder.close("Malformed frame on stream 0");
738
protocolError(((MalformedFrame) frame).getErrorCode(),
739
((MalformedFrame) frame).getMessage());
740
} else {
741
if (debug.on())
742
debug.log(() -> "Reset stream: " + ((MalformedFrame) frame).getMessage());
743
resetStream(streamid, ((MalformedFrame) frame).getErrorCode());
744
}
745
return;
746
}
747
if (streamid == 0) {
748
handleConnectionFrame(frame);
749
} else {
750
if (frame instanceof SettingsFrame) {
751
// The stream identifier for a SETTINGS frame MUST be zero
752
framesDecoder.close(
753
"The stream identifier for a SETTINGS frame MUST be zero");
754
protocolError(GoAwayFrame.PROTOCOL_ERROR);
755
return;
756
}
757
758
Stream<?> stream = getStream(streamid);
759
if (stream == null) {
760
// Should never receive a frame with unknown stream id
761
762
if (frame instanceof HeaderFrame) {
763
// always decode the headers as they may affect
764
// connection-level HPACK decoding state
765
DecodingCallback decoder = new ValidatingHeadersConsumer();
766
try {
767
decodeHeaders((HeaderFrame) frame, decoder);
768
} catch (UncheckedIOException e) {
769
protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
770
return;
771
}
772
}
773
774
if (!(frame instanceof ResetFrame)) {
775
if (frame instanceof DataFrame) {
776
dropDataFrame((DataFrame)frame);
777
}
778
if (isServerInitiatedStream(streamid)) {
779
if (streamid < nextPushStream) {
780
// trailing data on a cancelled push promise stream,
781
// reset will already have been sent, ignore
782
Log.logTrace("Ignoring cancelled push promise frame " + frame);
783
} else {
784
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
785
}
786
} else if (streamid >= nextstreamid) {
787
// otherwise the stream has already been reset/closed
788
resetStream(streamid, ResetFrame.PROTOCOL_ERROR);
789
}
790
}
791
return;
792
}
793
if (frame instanceof PushPromiseFrame) {
794
PushPromiseFrame pp = (PushPromiseFrame)frame;
795
try {
796
handlePushPromise(stream, pp);
797
} catch (UncheckedIOException e) {
798
protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
799
return;
800
}
801
} else if (frame instanceof HeaderFrame) {
802
// decode headers (or continuation)
803
try {
804
decodeHeaders((HeaderFrame) frame, stream.rspHeadersConsumer());
805
} catch (UncheckedIOException e) {
806
debug.log("Error decoding headers: " + e.getMessage(), e);
807
protocolError(ResetFrame.PROTOCOL_ERROR, e.getMessage());
808
return;
809
}
810
stream.incoming(frame);
811
} else {
812
stream.incoming(frame);
813
}
814
}
815
}
816
817
final void dropDataFrame(DataFrame df) {
818
if (closed) return;
819
if (debug.on()) {
820
debug.log("Dropping data frame for stream %d (%d payload bytes)",
821
df.streamid(), df.payloadLength());
822
}
823
ensureWindowUpdated(df);
824
}
825
826
final void ensureWindowUpdated(DataFrame df) {
827
try {
828
if (closed) return;
829
int length = df.payloadLength();
830
if (length > 0) {
831
windowUpdater.update(length);
832
}
833
} catch(Throwable t) {
834
Log.logError("Unexpected exception while updating window: {0}", (Object)t);
835
}
836
}
837
838
private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
839
throws IOException
840
{
841
// always decode the headers as they may affect connection-level HPACK
842
// decoding state
843
HeaderDecoder decoder = new HeaderDecoder();
844
decodeHeaders(pp, decoder);
845
846
HttpRequestImpl parentReq = parent.request;
847
int promisedStreamid = pp.getPromisedStream();
848
if (promisedStreamid != nextPushStream) {
849
resetStream(promisedStreamid, ResetFrame.PROTOCOL_ERROR);
850
return;
851
} else if (!reserveStream(false)) {
852
resetStream(promisedStreamid, ResetFrame.REFUSED_STREAM);
853
return;
854
} else {
855
nextPushStream += 2;
856
}
857
858
HttpHeaders headers = decoder.headers();
859
HttpRequestImpl pushReq = HttpRequestImpl.createPushRequest(parentReq, headers);
860
Exchange<T> pushExch = new Exchange<>(pushReq, parent.exchange.multi);
861
Stream.PushedStream<T> pushStream = createPushStream(parent, pushExch);
862
pushExch.exchImpl = pushStream;
863
pushStream.registerStream(promisedStreamid, true);
864
parent.incoming_pushPromise(pushReq, pushStream);
865
}
866
867
private void handleConnectionFrame(Http2Frame frame)
868
throws IOException
869
{
870
switch (frame.type()) {
871
case SettingsFrame.TYPE -> handleSettings((SettingsFrame) frame);
872
case PingFrame.TYPE -> handlePing((PingFrame) frame);
873
case GoAwayFrame.TYPE -> handleGoAway((GoAwayFrame) frame);
874
case WindowUpdateFrame.TYPE -> handleWindowUpdate((WindowUpdateFrame) frame);
875
876
default -> protocolError(ErrorFrame.PROTOCOL_ERROR);
877
}
878
}
879
880
void resetStream(int streamid, int code) {
881
try {
882
if (connection.channel().isOpen()) {
883
// no need to try & send a reset frame if the
884
// connection channel is already closed.
885
Log.logError(
886
"Resetting stream {0,number,integer} with error code {1,number,integer}",
887
streamid, code);
888
markStream(streamid, code);
889
ResetFrame frame = new ResetFrame(streamid, code);
890
sendFrame(frame);
891
} else if (debug.on()) {
892
debug.log("Channel already closed, no need to reset stream %d",
893
streamid);
894
}
895
} finally {
896
decrementStreamsCount(streamid);
897
closeStream(streamid);
898
}
899
}
900
901
private void markStream(int streamid, int code) {
902
Stream<?> s = streams.get(streamid);
903
if (s != null) s.markStream(code);
904
}
905
906
// reduce count of streams by 1 if stream still exists
907
synchronized void decrementStreamsCount(int streamid) {
908
Stream<?> s = streams.get(streamid);
909
if (s == null || !s.deRegister())
910
return;
911
if (streamid % 2 == 1) {
912
numReservedClientStreams--;
913
assert numReservedClientStreams >= 0 :
914
"negative client stream count for stream=" + streamid;
915
} else {
916
numReservedServerStreams--;
917
assert numReservedServerStreams >= 0 :
918
"negative server stream count for stream=" + streamid;
919
}
920
}
921
922
void closeStream(int streamid) {
923
if (debug.on()) debug.log("Closed stream %d", streamid);
924
boolean isClient = (streamid % 2) == 1;
925
Stream<?> s = streams.remove(streamid);
926
if (s != null) {
927
// decrement the reference count on the HttpClientImpl
928
// to allow the SelectorManager thread to exit if no
929
// other operation is pending and the facade is no
930
// longer referenced.
931
client().streamUnreference();
932
}
933
// ## Remove s != null. It is a hack for delayed cancellation,reset
934
if (s != null && !(s instanceof Stream.PushedStream)) {
935
// Since PushStreams have no request body, then they have no
936
// corresponding entry in the window controller.
937
windowController.removeStream(streamid);
938
}
939
if (finalStream() && streams.isEmpty()) {
940
// should be only 1 stream, but there might be more if server push
941
close();
942
}
943
}
944
945
/**
946
* Increments this connection's send Window by the amount in the given frame.
947
*/
948
private void handleWindowUpdate(WindowUpdateFrame f)
949
throws IOException
950
{
951
int amount = f.getUpdate();
952
if (amount <= 0) {
953
// ## temporarily disable to workaround a bug in Jetty where it
954
// ## sends Window updates with a 0 update value.
955
//protocolError(ErrorFrame.PROTOCOL_ERROR);
956
} else {
957
boolean success = windowController.increaseConnectionWindow(amount);
958
if (!success) {
959
protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow
960
}
961
}
962
}
963
964
private void protocolError(int errorCode)
965
throws IOException
966
{
967
protocolError(errorCode, null);
968
}
969
970
private void protocolError(int errorCode, String msg)
971
throws IOException
972
{
973
GoAwayFrame frame = new GoAwayFrame(0, errorCode);
974
sendFrame(frame);
975
shutdown(new IOException("protocol error" + (msg == null?"":(": " + msg))));
976
}
977
978
private void handleSettings(SettingsFrame frame)
979
throws IOException
980
{
981
assert frame.streamid() == 0;
982
if (!frame.getFlag(SettingsFrame.ACK)) {
983
int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE);
984
if (newWindowSize != -1) {
985
int oldWindowSize = serverSettings.getParameter(INITIAL_WINDOW_SIZE);
986
int diff = newWindowSize - oldWindowSize;
987
if (diff != 0) {
988
windowController.adjustActiveStreams(diff);
989
}
990
}
991
992
serverSettings.update(frame);
993
sendFrame(new SettingsFrame(SettingsFrame.ACK));
994
}
995
}
996
997
private void handlePing(PingFrame frame)
998
throws IOException
999
{
1000
frame.setFlag(PingFrame.ACK);
1001
sendUnorderedFrame(frame);
1002
}
1003
1004
private void handleGoAway(GoAwayFrame frame)
1005
throws IOException
1006
{
1007
shutdown(new IOException(
1008
String.valueOf(connection.channel().getLocalAddress())
1009
+": GOAWAY received"));
1010
}
1011
1012
/**
1013
* Max frame size we are allowed to send
1014
*/
1015
public int getMaxSendFrameSize() {
1016
int param = serverSettings.getParameter(MAX_FRAME_SIZE);
1017
if (param == -1) {
1018
param = DEFAULT_FRAME_SIZE;
1019
}
1020
return param;
1021
}
1022
1023
/**
1024
* Max frame size we will receive
1025
*/
1026
public int getMaxReceiveFrameSize() {
1027
return clientSettings.getParameter(MAX_FRAME_SIZE);
1028
}
1029
1030
private static final String CLIENT_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1031
1032
private static final byte[] PREFACE_BYTES =
1033
CLIENT_PREFACE.getBytes(StandardCharsets.ISO_8859_1);
1034
1035
/**
1036
* Sends Connection preface and Settings frame with current preferred
1037
* values
1038
*/
1039
private void sendConnectionPreface() throws IOException {
1040
Log.logTrace("{0}: start sending connection preface to {1}",
1041
connection.channel().getLocalAddress(),
1042
connection.address());
1043
SettingsFrame sf = new SettingsFrame(clientSettings);
1044
ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
1045
Log.logFrames(sf, "OUT");
1046
// send preface bytes and SettingsFrame together
1047
HttpPublisher publisher = publisher();
1048
publisher.enqueueUnordered(List.of(buf));
1049
publisher.signalEnqueued();
1050
// mark preface sent.
1051
framesController.markPrefaceSent();
1052
Log.logTrace("PREFACE_BYTES sent");
1053
Log.logTrace("Settings Frame sent");
1054
1055
// send a Window update for the receive buffer we are using
1056
// minus the initial 64 K -1 specified in protocol:
1057
// RFC 7540, Section 6.9.2:
1058
// "[...] the connection flow-control window is set to the default
1059
// initial window size until a WINDOW_UPDATE frame is received."
1060
//
1061
// Note that the default initial window size, not to be confused
1062
// with the initial window size, is defined by RFC 7540 as
1063
// 64K -1.
1064
final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
1065
if (len != 0) {
1066
if (Log.channel()) {
1067
Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
1068
len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
1069
}
1070
windowUpdater.sendWindowUpdate(len);
1071
}
1072
// there will be an ACK to the windows update - which should
1073
// cause any pending data stored before the preface was sent to be
1074
// flushed (see PrefaceController).
1075
Log.logTrace("finished sending connection preface");
1076
if (debug.on())
1077
debug.log("Triggering processing of buffered data"
1078
+ " after sending connection preface");
1079
subscriber.onNext(List.of(EMPTY_TRIGGER));
1080
}
1081
1082
/**
1083
* Returns an existing Stream with given id, or null if doesn't exist
1084
*/
1085
@SuppressWarnings("unchecked")
1086
<T> Stream<T> getStream(int streamid) {
1087
return (Stream<T>)streams.get(streamid);
1088
}
1089
1090
/**
1091
* Creates Stream with given id.
1092
*/
1093
final <T> Stream<T> createStream(Exchange<T> exchange) {
1094
Stream<T> stream = new Stream<>(this, exchange, windowController);
1095
return stream;
1096
}
1097
1098
<T> Stream.PushedStream<T> createPushStream(Stream<T> parent, Exchange<T> pushEx) {
1099
PushGroup<T> pg = parent.exchange.getPushGroup();
1100
return new Stream.PushedStream<>(pg, this, pushEx);
1101
}
1102
1103
<T> void putStream(Stream<T> stream, int streamid) {
1104
// increment the reference count on the HttpClientImpl
1105
// to prevent the SelectorManager thread from exiting until
1106
// the stream is closed.
1107
client().streamReference();
1108
streams.put(streamid, stream);
1109
}
1110
1111
/**
1112
* Encode the headers into a List<ByteBuffer> and then create HEADERS
1113
* and CONTINUATION frames from the list and return the List<Http2Frame>.
1114
*/
1115
private List<HeaderFrame> encodeHeaders(OutgoingHeaders<Stream<?>> frame) {
1116
// max value of frame size is clamped by default frame size to avoid OOM
1117
int bufferSize = Math.min(Math.max(getMaxSendFrameSize(), 1024), DEFAULT_FRAME_SIZE);
1118
List<ByteBuffer> buffers = encodeHeadersImpl(
1119
bufferSize,
1120
frame.getAttachment().getRequestPseudoHeaders(),
1121
frame.getUserHeaders(),
1122
frame.getSystemHeaders());
1123
1124
List<HeaderFrame> frames = new ArrayList<>(buffers.size());
1125
Iterator<ByteBuffer> bufIterator = buffers.iterator();
1126
HeaderFrame oframe = new HeadersFrame(frame.streamid(), frame.getFlags(), bufIterator.next());
1127
frames.add(oframe);
1128
while(bufIterator.hasNext()) {
1129
oframe = new ContinuationFrame(frame.streamid(), bufIterator.next());
1130
frames.add(oframe);
1131
}
1132
oframe.setFlag(HeaderFrame.END_HEADERS);
1133
return frames;
1134
}
1135
1136
// Dedicated cache for headers encoding ByteBuffer.
1137
// There can be no concurrent access to this buffer as all access to this buffer
1138
// and its content happen within a single critical code block section protected
1139
// by the sendLock. / (see sendFrame())
1140
// private final ByteBufferPool headerEncodingPool = new ByteBufferPool();
1141
1142
private ByteBuffer getHeaderBuffer(int size) {
1143
ByteBuffer buf = ByteBuffer.allocate(size);
1144
buf.limit(size);
1145
return buf;
1146
}
1147
1148
/*
1149
* Encodes all the headers from the given HttpHeaders into the given List
1150
* of buffers.
1151
*
1152
* From https://tools.ietf.org/html/rfc7540#section-8.1.2 :
1153
*
1154
* ...Just as in HTTP/1.x, header field names are strings of ASCII
1155
* characters that are compared in a case-insensitive fashion. However,
1156
* header field names MUST be converted to lowercase prior to their
1157
* encoding in HTTP/2...
1158
*/
1159
private List<ByteBuffer> encodeHeadersImpl(int bufferSize, HttpHeaders... headers) {
1160
ByteBuffer buffer = getHeaderBuffer(bufferSize);
1161
List<ByteBuffer> buffers = new ArrayList<>();
1162
for(HttpHeaders header : headers) {
1163
for (Map.Entry<String, List<String>> e : header.map().entrySet()) {
1164
String lKey = e.getKey().toLowerCase(Locale.US);
1165
List<String> values = e.getValue();
1166
for (String value : values) {
1167
hpackOut.header(lKey, value);
1168
while (!hpackOut.encode(buffer)) {
1169
buffer.flip();
1170
buffers.add(buffer);
1171
buffer = getHeaderBuffer(bufferSize);
1172
}
1173
}
1174
}
1175
}
1176
buffer.flip();
1177
buffers.add(buffer);
1178
return buffers;
1179
}
1180
1181
1182
private List<ByteBuffer> encodeHeaders(OutgoingHeaders<Stream<?>> oh, Stream<?> stream) {
1183
oh.streamid(stream.streamid);
1184
if (Log.headers()) {
1185
StringBuilder sb = new StringBuilder("HEADERS FRAME (stream=");
1186
sb.append(stream.streamid).append(")\n");
1187
Log.dumpHeaders(sb, " ", oh.getAttachment().getRequestPseudoHeaders());
1188
Log.dumpHeaders(sb, " ", oh.getSystemHeaders());
1189
Log.dumpHeaders(sb, " ", oh.getUserHeaders());
1190
Log.logHeaders(sb.toString());
1191
}
1192
List<HeaderFrame> frames = encodeHeaders(oh);
1193
return encodeFrames(frames);
1194
}
1195
1196
private List<ByteBuffer> encodeFrames(List<HeaderFrame> frames) {
1197
if (Log.frames()) {
1198
frames.forEach(f -> Log.logFrames(f, "OUT"));
1199
}
1200
return framesEncoder.encodeFrames(frames);
1201
}
1202
1203
private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
1204
Stream<?> stream = oh.getAttachment();
1205
assert stream.streamid == 0;
1206
int streamid = nextstreamid;
1207
if (stream.registerStream(streamid, false)) {
1208
// set outgoing window here. This allows thread sending
1209
// body to proceed.
1210
nextstreamid += 2;
1211
windowController.registerStream(streamid, getInitialSendWindowSize());
1212
return stream;
1213
} else {
1214
stream.cancelImpl(new IOException("Request cancelled"));
1215
if (finalStream() && streams.isEmpty()) {
1216
close();
1217
}
1218
return null;
1219
}
1220
}
1221
1222
private final Object sendlock = new Object();
1223
1224
void sendFrame(Http2Frame frame) {
1225
try {
1226
HttpPublisher publisher = publisher();
1227
synchronized (sendlock) {
1228
if (frame instanceof OutgoingHeaders) {
1229
@SuppressWarnings("unchecked")
1230
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
1231
Stream<?> stream = registerNewStream(oh);
1232
// provide protection from inserting unordered frames between Headers and Continuation
1233
if (stream != null) {
1234
publisher.enqueue(encodeHeaders(oh, stream));
1235
}
1236
} else {
1237
publisher.enqueue(encodeFrame(frame));
1238
}
1239
}
1240
publisher.signalEnqueued();
1241
} catch (IOException e) {
1242
if (!closed) {
1243
Log.logError(e);
1244
shutdown(e);
1245
}
1246
}
1247
}
1248
1249
private List<ByteBuffer> encodeFrame(Http2Frame frame) {
1250
Log.logFrames(frame, "OUT");
1251
return framesEncoder.encodeFrame(frame);
1252
}
1253
1254
void sendDataFrame(DataFrame frame) {
1255
try {
1256
HttpPublisher publisher = publisher();
1257
publisher.enqueue(encodeFrame(frame));
1258
publisher.signalEnqueued();
1259
} catch (IOException e) {
1260
if (!closed) {
1261
Log.logError(e);
1262
shutdown(e);
1263
}
1264
}
1265
}
1266
1267
/*
1268
* Direct call of the method bypasses synchronization on "sendlock" and
1269
* allowed only of control frames: WindowUpdateFrame, PingFrame and etc.
1270
* prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame.
1271
*/
1272
void sendUnorderedFrame(Http2Frame frame) {
1273
try {
1274
HttpPublisher publisher = publisher();
1275
publisher.enqueueUnordered(encodeFrame(frame));
1276
publisher.signalEnqueued();
1277
} catch (IOException e) {
1278
if (!closed) {
1279
Log.logError(e);
1280
shutdown(e);
1281
}
1282
}
1283
}
1284
1285
/**
1286
* A simple tube subscriber for reading from the connection flow.
1287
*/
1288
final class Http2TubeSubscriber implements TubeSubscriber {
1289
private volatile Flow.Subscription subscription;
1290
private volatile boolean completed;
1291
private volatile boolean dropped;
1292
private volatile Throwable error;
1293
private final ConcurrentLinkedQueue<ByteBuffer> queue
1294
= new ConcurrentLinkedQueue<>();
1295
private final SequentialScheduler scheduler =
1296
SequentialScheduler.lockingScheduler(this::processQueue);
1297
private final HttpClientImpl client;
1298
1299
Http2TubeSubscriber(HttpClientImpl client) {
1300
this.client = Objects.requireNonNull(client);
1301
}
1302
1303
final void processQueue() {
1304
try {
1305
while (!queue.isEmpty() && !scheduler.isStopped()) {
1306
ByteBuffer buffer = queue.poll();
1307
if (debug.on())
1308
debug.log("sending %d to Http2Connection.asyncReceive",
1309
buffer.remaining());
1310
asyncReceive(buffer);
1311
}
1312
} catch (Throwable t) {
1313
Throwable x = error;
1314
if (x == null) error = t;
1315
} finally {
1316
Throwable x = error;
1317
if (x != null) {
1318
if (debug.on()) debug.log("Stopping scheduler", x);
1319
scheduler.stop();
1320
Http2Connection.this.shutdown(x);
1321
}
1322
}
1323
}
1324
1325
private final void runOrSchedule() {
1326
if (client.isSelectorThread()) {
1327
scheduler.runOrSchedule(client.theExecutor());
1328
} else scheduler.runOrSchedule();
1329
}
1330
1331
@Override
1332
public void onSubscribe(Flow.Subscription subscription) {
1333
// supports being called multiple time.
1334
// doesn't cancel the previous subscription, since that is
1335
// most probably the same as the new subscription.
1336
assert this.subscription == null || dropped == false;
1337
this.subscription = subscription;
1338
dropped = false;
1339
// TODO FIXME: request(1) should be done by the delegate.
1340
if (!completed) {
1341
if (debug.on())
1342
debug.log("onSubscribe: requesting Long.MAX_VALUE for reading");
1343
subscription.request(Long.MAX_VALUE);
1344
} else {
1345
if (debug.on()) debug.log("onSubscribe: already completed");
1346
}
1347
}
1348
1349
@Override
1350
public void onNext(List<ByteBuffer> item) {
1351
if (debug.on()) debug.log(() -> "onNext: got " + Utils.remaining(item)
1352
+ " bytes in " + item.size() + " buffers");
1353
queue.addAll(item);
1354
runOrSchedule();
1355
}
1356
1357
@Override
1358
public void onError(Throwable throwable) {
1359
if (debug.on()) debug.log(() -> "onError: " + throwable);
1360
error = throwable;
1361
completed = true;
1362
runOrSchedule();
1363
}
1364
1365
@Override
1366
public void onComplete() {
1367
String msg = isActive()
1368
? "EOF reached while reading"
1369
: "Idle connection closed by HTTP/2 peer";
1370
if (debug.on()) debug.log(msg);
1371
error = new EOFException(msg);
1372
completed = true;
1373
runOrSchedule();
1374
}
1375
1376
@Override
1377
public void dropSubscription() {
1378
if (debug.on()) debug.log("dropSubscription");
1379
// we could probably set subscription to null here...
1380
// then we might not need the 'dropped' boolean?
1381
dropped = true;
1382
}
1383
}
1384
1385
synchronized boolean isActive() {
1386
return numReservedClientStreams > 0 || numReservedServerStreams > 0;
1387
}
1388
1389
@Override
1390
public final String toString() {
1391
return dbgString();
1392
}
1393
1394
final String dbgString() {
1395
return "Http2Connection("
1396
+ connection.getConnectionFlow() + ")";
1397
}
1398
1399
static class HeaderDecoder extends ValidatingHeadersConsumer {
1400
1401
HttpHeadersBuilder headersBuilder;
1402
1403
HeaderDecoder() {
1404
this.headersBuilder = new HttpHeadersBuilder();
1405
}
1406
1407
@Override
1408
public void onDecoded(CharSequence name, CharSequence value) {
1409
String n = name.toString();
1410
String v = value.toString();
1411
super.onDecoded(n, v);
1412
headersBuilder.addHeader(n, v);
1413
}
1414
1415
HttpHeaders headers() {
1416
return headersBuilder.build();
1417
}
1418
}
1419
1420
/*
1421
* Checks RFC 7540 rules (relaxed) compliance regarding pseudo-headers.
1422
*/
1423
static class ValidatingHeadersConsumer implements DecodingCallback {
1424
1425
private static final Set<String> PSEUDO_HEADERS =
1426
Set.of(":authority", ":method", ":path", ":scheme", ":status");
1427
1428
/** Used to check that if there are pseudo-headers, they go first */
1429
private boolean pseudoHeadersEnded;
1430
1431
/**
1432
* Called when END_HEADERS was received. This consumer may be invoked
1433
* again after reset() is called, but for a whole new set of headers.
1434
*/
1435
void reset() {
1436
pseudoHeadersEnded = false;
1437
}
1438
1439
@Override
1440
public void onDecoded(CharSequence name, CharSequence value)
1441
throws UncheckedIOException
1442
{
1443
String n = name.toString();
1444
if (n.startsWith(":")) {
1445
if (pseudoHeadersEnded) {
1446
throw newException("Unexpected pseudo-header '%s'", n);
1447
} else if (!PSEUDO_HEADERS.contains(n)) {
1448
throw newException("Unknown pseudo-header '%s'", n);
1449
}
1450
} else {
1451
pseudoHeadersEnded = true;
1452
if (!Utils.isValidName(n)) {
1453
throw newException("Bad header name '%s'", n);
1454
}
1455
}
1456
String v = value.toString();
1457
if (!Utils.isValidValue(v)) {
1458
throw newException("Bad header value '%s'", v);
1459
}
1460
}
1461
1462
private UncheckedIOException newException(String message, String header)
1463
{
1464
return new UncheckedIOException(
1465
new IOException(String.format(message, header)));
1466
}
1467
}
1468
1469
static final class ConnectionWindowUpdateSender extends WindowUpdateSender {
1470
1471
final int initialWindowSize;
1472
public ConnectionWindowUpdateSender(Http2Connection connection,
1473
int initialWindowSize) {
1474
super(connection, initialWindowSize);
1475
this.initialWindowSize = initialWindowSize;
1476
}
1477
1478
@Override
1479
int getStreamId() {
1480
return 0;
1481
}
1482
}
1483
1484
/**
1485
* Thrown when https handshake negotiates http/1.1 alpn instead of h2
1486
*/
1487
static final class ALPNException extends IOException {
1488
private static final long serialVersionUID = 0L;
1489
final transient AbstractAsyncSSLConnection connection;
1490
1491
ALPNException(String msg, AbstractAsyncSSLConnection connection) {
1492
super(msg);
1493
this.connection = connection;
1494
}
1495
1496
AbstractAsyncSSLConnection getConnection() {
1497
return connection;
1498
}
1499
}
1500
}
1501
1502