Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/test/sun/net/www/protocol/https/HttpsURLConnection/TunnelProxy.java
38889 views
1
/*
2
* Copyright (c) 2005, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/*
25
*
26
*/
27
28
import java.net.*;
29
import java.io.*;
30
import java.nio.*;
31
import java.nio.channels.*;
32
import sun.net.www.MessageHeader;
33
import java.util.*;
34
35
public class TunnelProxy {
36
37
ServerSocketChannel schan;
38
int threads;
39
int cperthread;
40
Server[] servers;
41
42
/**
43
* Create a <code>TunnelProxy<code> instance with the specified callback object
44
* for handling requests. One thread is created to handle requests,
45
* and up to ten TCP connections will be handled simultaneously.
46
* @param cb the callback object which is invoked to handle each
47
* incoming request
48
*/
49
50
public TunnelProxy () throws IOException {
51
this (1, 10, 0);
52
}
53
54
/**
55
* Create a <code>TunnelProxy<code> instance with the specified number of
56
* threads and maximum number of connections per thread. This functions
57
* the same as the 4 arg constructor, where the port argument is set to zero.
58
* @param cb the callback object which is invoked to handle each
59
* incoming request
60
* @param threads the number of threads to create to handle requests
61
* in parallel
62
* @param cperthread the number of simultaneous TCP connections to
63
* handle per thread
64
*/
65
66
public TunnelProxy (int threads, int cperthread)
67
throws IOException {
68
this (threads, cperthread, 0);
69
}
70
71
/**
72
* Create a <code>TunnelProxy<code> instance with the specified number
73
* of threads and maximum number of connections per thread and running on
74
* the specified port. The specified number of threads are created to
75
* handle incoming requests, and each thread is allowed
76
* to handle a number of simultaneous TCP connections.
77
* @param cb the callback object which is invoked to handle
78
* each incoming request
79
* @param threads the number of threads to create to handle
80
* requests in parallel
81
* @param cperthread the number of simultaneous TCP connections
82
* to handle per thread
83
* @param port the port number to bind the server to. <code>Zero</code>
84
* means choose any free port.
85
*/
86
87
public TunnelProxy (int threads, int cperthread, int port)
88
throws IOException {
89
schan = ServerSocketChannel.open ();
90
InetSocketAddress addr = new InetSocketAddress (port);
91
schan.socket().bind (addr);
92
this.threads = threads;
93
this.cperthread = cperthread;
94
servers = new Server [threads];
95
for (int i=0; i<threads; i++) {
96
servers[i] = new Server (schan, cperthread);
97
servers[i].start();
98
}
99
}
100
101
/** Tell all threads in the server to exit within 5 seconds.
102
* This is an abortive termination. Just prior to the thread exiting
103
* all channels in that thread waiting to be closed are forceably closed.
104
*/
105
106
public void terminate () {
107
for (int i=0; i<threads; i++) {
108
servers[i].terminate ();
109
}
110
}
111
112
/**
113
* return the local port number to which the server is bound.
114
* @return the local port number
115
*/
116
117
public int getLocalPort () {
118
return schan.socket().getLocalPort ();
119
}
120
121
static class Server extends Thread {
122
123
ServerSocketChannel schan;
124
Selector selector;
125
SelectionKey listenerKey;
126
SelectionKey key; /* the current key being processed */
127
ByteBuffer consumeBuffer;
128
int maxconn;
129
int nconn;
130
ClosedChannelList clist;
131
boolean shutdown;
132
Pipeline pipe1 = null;
133
Pipeline pipe2 = null;
134
135
Server (ServerSocketChannel schan, int maxconn) {
136
this.schan = schan;
137
this.maxconn = maxconn;
138
nconn = 0;
139
consumeBuffer = ByteBuffer.allocate (512);
140
clist = new ClosedChannelList ();
141
try {
142
selector = Selector.open ();
143
schan.configureBlocking (false);
144
listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
145
} catch (IOException e) {
146
System.err.println ("Server could not start: " + e);
147
}
148
}
149
150
/* Stop the thread as soon as possible */
151
public synchronized void terminate () {
152
shutdown = true;
153
if (pipe1 != null) pipe1.terminate();
154
if (pipe2 != null) pipe2.terminate();
155
}
156
157
public void run () {
158
try {
159
while (true) {
160
selector.select (1000);
161
Set selected = selector.selectedKeys();
162
Iterator iter = selected.iterator();
163
while (iter.hasNext()) {
164
key = (SelectionKey)iter.next();
165
if (key.equals (listenerKey)) {
166
SocketChannel sock = schan.accept ();
167
if (sock == null) {
168
/* false notification */
169
iter.remove();
170
continue;
171
}
172
sock.configureBlocking (false);
173
sock.register (selector, SelectionKey.OP_READ);
174
nconn ++;
175
if (nconn == maxconn) {
176
/* deregister */
177
listenerKey.cancel ();
178
listenerKey = null;
179
}
180
} else {
181
if (key.isReadable()) {
182
boolean closed;
183
SocketChannel chan = (SocketChannel) key.channel();
184
if (key.attachment() != null) {
185
closed = consume (chan);
186
} else {
187
closed = read (chan, key);
188
}
189
if (closed) {
190
chan.close ();
191
key.cancel ();
192
if (nconn == maxconn) {
193
listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
194
}
195
nconn --;
196
}
197
}
198
}
199
iter.remove();
200
}
201
clist.check();
202
if (shutdown) {
203
clist.terminate ();
204
return;
205
}
206
}
207
} catch (IOException e) {
208
System.out.println ("Server exception: " + e);
209
// TODO finish
210
}
211
}
212
213
/* read all the data off the channel without looking at it
214
* return true if connection closed
215
*/
216
boolean consume (SocketChannel chan) {
217
try {
218
consumeBuffer.clear ();
219
int c = chan.read (consumeBuffer);
220
if (c == -1)
221
return true;
222
} catch (IOException e) {
223
return true;
224
}
225
return false;
226
}
227
228
/* return true if the connection is closed, false otherwise */
229
230
private boolean read (SocketChannel chan, SelectionKey key) {
231
HttpTransaction msg;
232
boolean res;
233
try {
234
InputStream is = new BufferedInputStream (new NioInputStream (chan));
235
String requestline = readLine (is);
236
MessageHeader mhead = new MessageHeader (is);
237
String[] req = requestline.split (" ");
238
if (req.length < 2) {
239
/* invalid request line */
240
return false;
241
}
242
String cmd = req[0];
243
URI uri = null;
244
if (!("CONNECT".equalsIgnoreCase(cmd))) {
245
// we expect CONNECT command
246
return false;
247
}
248
try {
249
uri = new URI("http://" + req[1]);
250
} catch (URISyntaxException e) {
251
System.err.println ("Invalid URI: " + e);
252
res = true;
253
}
254
255
// CONNECT ack
256
OutputStream os = new BufferedOutputStream(new NioOutputStream(chan));
257
byte[] ack = "HTTP/1.1 200 Connection established\r\n\r\n".getBytes();
258
os.write(ack, 0, ack.length);
259
os.flush();
260
261
// tunnel anything else
262
tunnel(is, os, uri);
263
264
res = false;
265
} catch (IOException e) {
266
res = true;
267
}
268
return res;
269
}
270
271
private void tunnel(InputStream fromClient, OutputStream toClient, URI serverURI) throws IOException {
272
Socket sockToServer = new Socket(serverURI.getHost(), serverURI.getPort());
273
OutputStream toServer = sockToServer.getOutputStream();
274
InputStream fromServer = sockToServer.getInputStream();
275
276
pipe1 = new Pipeline(fromClient, toServer);
277
pipe2 = new Pipeline(fromServer, toClient);
278
// start pump
279
pipe1.start();
280
pipe2.start();
281
// wait them to end
282
try {
283
pipe1.join();
284
} catch (InterruptedException e) {
285
// No-op
286
} finally {
287
sockToServer.close();
288
}
289
}
290
291
private String readLine (InputStream is) throws IOException {
292
boolean done=false, readCR=false;
293
byte[] b = new byte [512];
294
int c, l = 0;
295
296
while (!done) {
297
c = is.read ();
298
if (c == '\n' && readCR) {
299
done = true;
300
} else {
301
if (c == '\r' && !readCR) {
302
readCR = true;
303
} else {
304
b[l++] = (byte)c;
305
}
306
}
307
}
308
return new String (b);
309
}
310
311
/** close the channel associated with the current key by:
312
* 1. shutdownOutput (send a FIN)
313
* 2. mark the key so that incoming data is to be consumed and discarded
314
* 3. After a period, close the socket
315
*/
316
317
synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {
318
SocketChannel ch = (SocketChannel)key.channel ();
319
ch.socket().shutdownOutput();
320
key.attach (this);
321
clist.add (key);
322
}
323
324
synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {
325
SocketChannel ch = (SocketChannel)key.channel ();
326
Socket s = ch.socket ();
327
s.setSoLinger (true, 0);
328
ch.close();
329
}
330
}
331
332
333
/**
334
* Implements blocking reading semantics on top of a non-blocking channel
335
*/
336
337
static class NioInputStream extends InputStream {
338
SocketChannel channel;
339
Selector selector;
340
ByteBuffer chanbuf;
341
SelectionKey key;
342
int available;
343
byte[] one;
344
boolean closed;
345
ByteBuffer markBuf; /* reads may be satisifed from this buffer */
346
boolean marked;
347
boolean reset;
348
int readlimit;
349
350
public NioInputStream (SocketChannel chan) throws IOException {
351
this.channel = chan;
352
selector = Selector.open();
353
chanbuf = ByteBuffer.allocate (1024);
354
key = chan.register (selector, SelectionKey.OP_READ);
355
available = 0;
356
one = new byte[1];
357
closed = marked = reset = false;
358
}
359
360
public synchronized int read (byte[] b) throws IOException {
361
return read (b, 0, b.length);
362
}
363
364
public synchronized int read () throws IOException {
365
return read (one, 0, 1);
366
}
367
368
public synchronized int read (byte[] b, int off, int srclen) throws IOException {
369
370
int canreturn, willreturn;
371
372
if (closed)
373
return -1;
374
375
if (reset) { /* satisfy from markBuf */
376
canreturn = markBuf.remaining ();
377
willreturn = canreturn>srclen ? srclen : canreturn;
378
markBuf.get(b, off, willreturn);
379
if (canreturn == willreturn) {
380
reset = false;
381
}
382
} else { /* satisfy from channel */
383
canreturn = available();
384
if (canreturn == 0) {
385
block ();
386
canreturn = available();
387
}
388
willreturn = canreturn>srclen ? srclen : canreturn;
389
chanbuf.get(b, off, willreturn);
390
available -= willreturn;
391
392
if (marked) { /* copy into markBuf */
393
try {
394
markBuf.put (b, off, willreturn);
395
} catch (BufferOverflowException e) {
396
marked = false;
397
}
398
}
399
}
400
return willreturn;
401
}
402
403
public synchronized int available () throws IOException {
404
if (closed)
405
throw new IOException ("Stream is closed");
406
407
if (reset)
408
return markBuf.remaining();
409
410
if (available > 0)
411
return available;
412
413
chanbuf.clear ();
414
available = channel.read (chanbuf);
415
if (available > 0)
416
chanbuf.flip();
417
else if (available == -1)
418
throw new IOException ("Stream is closed");
419
return available;
420
}
421
422
/**
423
* block() only called when available==0 and buf is empty
424
*/
425
private synchronized void block () throws IOException {
426
//assert available == 0;
427
int n = selector.select ();
428
//assert n == 1;
429
selector.selectedKeys().clear();
430
available ();
431
}
432
433
public void close () throws IOException {
434
if (closed)
435
return;
436
channel.close ();
437
closed = true;
438
}
439
440
public synchronized void mark (int readlimit) {
441
if (closed)
442
return;
443
this.readlimit = readlimit;
444
markBuf = ByteBuffer.allocate (readlimit);
445
marked = true;
446
reset = false;
447
}
448
449
public synchronized void reset () throws IOException {
450
if (closed )
451
return;
452
if (!marked)
453
throw new IOException ("Stream not marked");
454
marked = false;
455
reset = true;
456
markBuf.flip ();
457
}
458
}
459
460
static class NioOutputStream extends OutputStream {
461
SocketChannel channel;
462
ByteBuffer buf;
463
SelectionKey key;
464
Selector selector;
465
boolean closed;
466
byte[] one;
467
468
public NioOutputStream (SocketChannel channel) throws IOException {
469
this.channel = channel;
470
selector = Selector.open ();
471
key = channel.register (selector, SelectionKey.OP_WRITE);
472
closed = false;
473
one = new byte [1];
474
}
475
476
public synchronized void write (int b) throws IOException {
477
one[0] = (byte)b;
478
write (one, 0, 1);
479
}
480
481
public synchronized void write (byte[] b) throws IOException {
482
write (b, 0, b.length);
483
}
484
485
public synchronized void write (byte[] b, int off, int len) throws IOException {
486
if (closed)
487
throw new IOException ("stream is closed");
488
489
buf = ByteBuffer.allocate (len);
490
buf.put (b, off, len);
491
buf.flip ();
492
int n;
493
while ((n = channel.write (buf)) < len) {
494
len -= n;
495
if (len == 0)
496
return;
497
selector.select ();
498
selector.selectedKeys().clear ();
499
}
500
}
501
502
public void close () throws IOException {
503
if (closed)
504
return;
505
channel.close ();
506
closed = true;
507
}
508
}
509
510
/*
511
* Pipeline object :-
512
* 1) Will pump every byte from its input stream to output stream
513
* 2) Is an 'active object'
514
*/
515
static class Pipeline implements Runnable {
516
InputStream in;
517
OutputStream out;
518
Thread t;
519
520
public Pipeline(InputStream is, OutputStream os) {
521
in = is;
522
out = os;
523
}
524
525
public void start() {
526
t = new Thread(this);
527
t.start();
528
}
529
530
public void join() throws InterruptedException {
531
t.join();
532
}
533
534
public void terminate() {
535
t.interrupt();
536
}
537
538
public void run() {
539
byte[] buffer = new byte[10000];
540
try {
541
while (!Thread.interrupted()) {
542
int len;
543
while ((len = in.read(buffer)) != -1) {
544
out.write(buffer, 0, len);
545
out.flush();
546
}
547
}
548
} catch(IOException e) {
549
// No-op
550
} finally {
551
}
552
}
553
}
554
555
/**
556
* Utilities for synchronization. A condition is
557
* identified by a string name, and is initialized
558
* upon first use (ie. setCondition() or waitForCondition()). Threads
559
* are blocked until some thread calls (or has called) setCondition() for the same
560
* condition.
561
* <P>
562
* A rendezvous built on a condition is also provided for synchronizing
563
* N threads.
564
*/
565
566
private static HashMap conditions = new HashMap();
567
568
/*
569
* Modifiable boolean object
570
*/
571
private static class BValue {
572
boolean v;
573
}
574
575
/*
576
* Modifiable int object
577
*/
578
private static class IValue {
579
int v;
580
IValue (int i) {
581
v =i;
582
}
583
}
584
585
586
private static BValue getCond (String condition) {
587
synchronized (conditions) {
588
BValue cond = (BValue) conditions.get (condition);
589
if (cond == null) {
590
cond = new BValue();
591
conditions.put (condition, cond);
592
}
593
return cond;
594
}
595
}
596
597
/**
598
* Set the condition to true. Any threads that are currently blocked
599
* waiting on the condition, will be unblocked and allowed to continue.
600
* Threads that subsequently call waitForCondition() will not block.
601
* If the named condition did not exist prior to the call, then it is created
602
* first.
603
*/
604
605
public static void setCondition (String condition) {
606
BValue cond = getCond (condition);
607
synchronized (cond) {
608
if (cond.v) {
609
return;
610
}
611
cond.v = true;
612
cond.notifyAll();
613
}
614
}
615
616
/**
617
* If the named condition does not exist, then it is created and initialized
618
* to false. If the condition exists or has just been created and its value
619
* is false, then the thread blocks until another thread sets the condition.
620
* If the condition exists and is already set to true, then this call returns
621
* immediately without blocking.
622
*/
623
624
public static void waitForCondition (String condition) {
625
BValue cond = getCond (condition);
626
synchronized (cond) {
627
if (!cond.v) {
628
try {
629
cond.wait();
630
} catch (InterruptedException e) {}
631
}
632
}
633
}
634
635
/* conditions must be locked when accessing this */
636
static HashMap rv = new HashMap();
637
638
/**
639
* Force N threads to rendezvous (ie. wait for each other) before proceeding.
640
* The first thread(s) to call are blocked until the last
641
* thread makes the call. Then all threads continue.
642
* <p>
643
* All threads that call with the same condition name, must use the same value
644
* for N (or the results may be not be as expected).
645
* <P>
646
* Obviously, if fewer than N threads make the rendezvous then the result
647
* will be a hang.
648
*/
649
650
public static void rendezvous (String condition, int N) {
651
BValue cond;
652
IValue iv;
653
String name = "RV_"+condition;
654
655
/* get the condition */
656
657
synchronized (conditions) {
658
cond = (BValue)conditions.get (name);
659
if (cond == null) {
660
/* we are first caller */
661
if (N < 2) {
662
throw new RuntimeException ("rendezvous must be called with N >= 2");
663
}
664
cond = new BValue ();
665
conditions.put (name, cond);
666
iv = new IValue (N-1);
667
rv.put (name, iv);
668
} else {
669
/* already initialised, just decrement the counter */
670
iv = (IValue) rv.get (name);
671
iv.v --;
672
}
673
}
674
675
if (iv.v > 0) {
676
waitForCondition (name);
677
} else {
678
setCondition (name);
679
synchronized (conditions) {
680
clearCondition (name);
681
rv.remove (name);
682
}
683
}
684
}
685
686
/**
687
* If the named condition exists and is set then remove it, so it can
688
* be re-initialized and used again. If the condition does not exist, or
689
* exists but is not set, then the call returns without doing anything.
690
* Note, some higher level synchronization
691
* may be needed between clear and the other operations.
692
*/
693
694
public static void clearCondition(String condition) {
695
BValue cond;
696
synchronized (conditions) {
697
cond = (BValue) conditions.get (condition);
698
if (cond == null) {
699
return;
700
}
701
synchronized (cond) {
702
if (cond.v) {
703
conditions.remove (condition);
704
}
705
}
706
}
707
}
708
}
709
710