Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/jdk17u
Path: blob/master/test/jdk/sun/net/www/protocol/https/HttpsURLConnection/TunnelProxy.java
66649 views
1
/*
2
* Copyright (c) 2005, 2021, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
/*
25
*
26
*/
27
28
import java.io.BufferedInputStream;
29
import java.io.BufferedOutputStream;
30
import java.io.IOException;
31
import java.io.InputStream;
32
import java.io.OutputStream;
33
import java.net.InetAddress;
34
import java.net.InetSocketAddress;
35
import java.net.Socket;
36
import java.net.URI;
37
import java.net.URISyntaxException;
38
import java.nio.BufferOverflowException;
39
import java.nio.ByteBuffer;
40
import java.nio.channels.SelectionKey;
41
import java.nio.channels.Selector;
42
import java.nio.channels.ServerSocketChannel;
43
import java.nio.channels.SocketChannel;
44
import java.util.HashMap;
45
import java.util.Iterator;
46
import java.util.Set;
47
48
import sun.net.www.MessageHeader;
49
50
public class TunnelProxy {
51
52
ServerSocketChannel schan;
53
int threads;
54
int cperthread;
55
Server[] servers;
56
57
/**
58
* Create a <code>TunnelProxy<code> instance with the specified callback object
59
* for handling requests. One thread is created to handle requests,
60
* and up to ten TCP connections will be handled simultaneously.
61
* incoming request
62
*/
63
64
public TunnelProxy () throws IOException {
65
this (1, 10, 0);
66
}
67
68
/**
69
* Create a <code>TunnelProxy<code> instance with the specified number of
70
* threads and maximum number of connections per thread. This functions
71
* the same as the 4 arg constructor, where the port argument is set to zero.
72
* @param threads the number of threads to create to handle requests
73
* in parallel
74
* @param cperthread the number of simultaneous TCP connections to
75
* handle per thread
76
*/
77
78
public TunnelProxy (int threads, int cperthread)
79
throws IOException {
80
this (threads, cperthread, 0);
81
}
82
83
/**
84
* Create a <code>TunnelProxy<code> instance with the specified number
85
* of threads and maximum number of connections per thread and running on
86
* the specified port. The specified number of threads are created to
87
* handle incoming requests, and each thread is allowed
88
* to handle a number of simultaneous TCP connections.
89
* @param threads the number of threads to create to handle
90
* requests in parallel
91
* @param cperthread the number of simultaneous TCP connections
92
* to handle per thread
93
* @param port the port number to bind the server to. <code>Zero</code>
94
* means choose any free port.
95
*/
96
97
public TunnelProxy (int threads, int cperthread, int port)
98
throws IOException {
99
this(threads, cperthread, null, 0);
100
}
101
102
/**
103
* Create a <code>TunnelProxy<code> instance with the specified number
104
* of threads and maximum number of connections per thread and running on
105
* the specified port. The specified number of threads are created to
106
* handle incoming requests, and each thread is allowed
107
* to handle a number of simultaneous TCP connections.
108
* @param threads the number of threads to create to handle
109
* requests in parallel
110
* @param cperthread the number of simultaneous TCP connections
111
* to handle per thread
112
* @param address the address to bind to. null means all addresses.
113
* @param port the port number to bind the server to. <code>Zero</code>
114
* means choose any free port.
115
*/
116
public TunnelProxy (int threads, int cperthread, InetAddress address, int port)
117
throws IOException {
118
schan = ServerSocketChannel.open ();
119
InetSocketAddress addr = new InetSocketAddress (address, port);
120
schan.socket().bind (addr);
121
this.threads = threads;
122
this.cperthread = cperthread;
123
servers = new Server [threads];
124
for (int i=0; i<threads; i++) {
125
servers[i] = new Server (schan, cperthread);
126
servers[i].start();
127
}
128
}
129
130
/** Tell all threads in the server to exit within 5 seconds.
131
* This is an abortive termination. Just prior to the thread exiting
132
* all channels in that thread waiting to be closed are forceably closed.
133
*/
134
135
public void terminate () {
136
for (int i=0; i<threads; i++) {
137
servers[i].terminate ();
138
}
139
}
140
141
/**
142
* return the local port number to which the server is bound.
143
* @return the local port number
144
*/
145
146
public int getLocalPort () {
147
return schan.socket().getLocalPort ();
148
}
149
150
static class Server extends Thread {
151
152
ServerSocketChannel schan;
153
Selector selector;
154
SelectionKey listenerKey;
155
SelectionKey key; /* the current key being processed */
156
ByteBuffer consumeBuffer;
157
int maxconn;
158
int nconn;
159
ClosedChannelList clist;
160
boolean shutdown;
161
Pipeline pipe1 = null;
162
Pipeline pipe2 = null;
163
164
Server (ServerSocketChannel schan, int maxconn) {
165
this.schan = schan;
166
this.maxconn = maxconn;
167
nconn = 0;
168
consumeBuffer = ByteBuffer.allocate (512);
169
clist = new ClosedChannelList ();
170
try {
171
selector = Selector.open ();
172
schan.configureBlocking (false);
173
listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
174
} catch (IOException e) {
175
System.err.println ("Server could not start: " + e);
176
}
177
}
178
179
/* Stop the thread as soon as possible */
180
public synchronized void terminate () {
181
shutdown = true;
182
if (pipe1 != null) pipe1.terminate();
183
if (pipe2 != null) pipe2.terminate();
184
}
185
186
public void run () {
187
try {
188
while (true) {
189
selector.select (1000);
190
Set selected = selector.selectedKeys();
191
Iterator iter = selected.iterator();
192
while (iter.hasNext()) {
193
key = (SelectionKey)iter.next();
194
if (key.equals (listenerKey)) {
195
SocketChannel sock = schan.accept ();
196
if (sock == null) {
197
/* false notification */
198
iter.remove();
199
continue;
200
}
201
sock.configureBlocking (false);
202
sock.register (selector, SelectionKey.OP_READ);
203
nconn ++;
204
if (nconn == maxconn) {
205
/* deregister */
206
listenerKey.cancel ();
207
listenerKey = null;
208
}
209
} else {
210
if (key.isReadable()) {
211
boolean closed;
212
SocketChannel chan = (SocketChannel) key.channel();
213
if (key.attachment() != null) {
214
closed = consume (chan);
215
} else {
216
closed = read (chan, key);
217
}
218
if (closed) {
219
chan.close ();
220
key.cancel ();
221
if (nconn == maxconn) {
222
listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
223
}
224
nconn --;
225
}
226
}
227
}
228
iter.remove();
229
}
230
clist.check();
231
if (shutdown) {
232
clist.terminate ();
233
return;
234
}
235
}
236
} catch (IOException e) {
237
System.out.println ("Server exception: " + e);
238
// TODO finish
239
}
240
}
241
242
/* read all the data off the channel without looking at it
243
* return true if connection closed
244
*/
245
boolean consume (SocketChannel chan) {
246
try {
247
consumeBuffer.clear ();
248
int c = chan.read (consumeBuffer);
249
if (c == -1)
250
return true;
251
} catch (IOException e) {
252
return true;
253
}
254
return false;
255
}
256
257
/* return true if the connection is closed, false otherwise */
258
259
private boolean read (SocketChannel chan, SelectionKey key) {
260
boolean res;
261
try {
262
InputStream is = new BufferedInputStream (new NioInputStream (chan));
263
String requestline = readLine (is);
264
MessageHeader mhead = new MessageHeader (is);
265
String[] req = requestline.split (" ");
266
if (req.length < 2) {
267
/* invalid request line */
268
return false;
269
}
270
String cmd = req[0];
271
URI uri = null;
272
if (!("CONNECT".equalsIgnoreCase(cmd))) {
273
// we expect CONNECT command
274
return false;
275
}
276
try {
277
uri = new URI("http://" + req[1]);
278
} catch (URISyntaxException e) {
279
System.err.println ("Invalid URI: " + e);
280
res = true;
281
}
282
283
// CONNECT ack
284
OutputStream os = new BufferedOutputStream(new NioOutputStream(chan));
285
byte[] ack = "HTTP/1.1 200 Connection established\r\n\r\n".getBytes();
286
os.write(ack, 0, ack.length);
287
os.flush();
288
289
// tunnel anything else
290
tunnel(is, os, uri);
291
292
res = false;
293
} catch (IOException e) {
294
res = true;
295
}
296
return res;
297
}
298
299
private void tunnel(InputStream fromClient, OutputStream toClient, URI serverURI) throws IOException {
300
Socket sockToServer = new Socket(serverURI.getHost(), serverURI.getPort());
301
OutputStream toServer = sockToServer.getOutputStream();
302
InputStream fromServer = sockToServer.getInputStream();
303
304
pipe1 = new Pipeline(fromClient, toServer);
305
pipe2 = new Pipeline(fromServer, toClient);
306
// start pump
307
pipe1.start();
308
pipe2.start();
309
// wait them to end
310
try {
311
pipe1.join();
312
} catch (InterruptedException e) {
313
// No-op
314
} finally {
315
sockToServer.close();
316
}
317
}
318
319
private String readLine (InputStream is) throws IOException {
320
boolean done=false, readCR=false;
321
byte[] b = new byte [512];
322
int c, l = 0;
323
324
while (!done) {
325
c = is.read ();
326
if (c == '\n' && readCR) {
327
done = true;
328
} else {
329
if (c == '\r' && !readCR) {
330
readCR = true;
331
} else {
332
b[l++] = (byte)c;
333
}
334
}
335
}
336
return new String (b);
337
}
338
339
/** close the channel associated with the current key by:
340
* 1. shutdownOutput (send a FIN)
341
* 2. mark the key so that incoming data is to be consumed and discarded
342
* 3. After a period, close the socket
343
*/
344
345
synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {
346
SocketChannel ch = (SocketChannel)key.channel ();
347
ch.socket().shutdownOutput();
348
key.attach (this);
349
clist.add (key);
350
}
351
352
synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {
353
SocketChannel ch = (SocketChannel)key.channel ();
354
Socket s = ch.socket ();
355
s.setSoLinger (true, 0);
356
ch.close();
357
}
358
}
359
360
361
/**
362
* Implements blocking reading semantics on top of a non-blocking channel
363
*/
364
365
static class NioInputStream extends InputStream {
366
SocketChannel channel;
367
Selector selector;
368
ByteBuffer chanbuf;
369
SelectionKey key;
370
int available;
371
byte[] one;
372
boolean closed;
373
ByteBuffer markBuf; /* reads may be satisifed from this buffer */
374
boolean marked;
375
boolean reset;
376
int readlimit;
377
378
public NioInputStream (SocketChannel chan) throws IOException {
379
this.channel = chan;
380
selector = Selector.open();
381
chanbuf = ByteBuffer.allocate (1024);
382
key = chan.register (selector, SelectionKey.OP_READ);
383
available = 0;
384
one = new byte[1];
385
closed = marked = reset = false;
386
}
387
388
public synchronized int read (byte[] b) throws IOException {
389
return read (b, 0, b.length);
390
}
391
392
public synchronized int read () throws IOException {
393
return read (one, 0, 1);
394
}
395
396
public synchronized int read (byte[] b, int off, int srclen) throws IOException {
397
398
int canreturn, willreturn;
399
400
if (closed)
401
return -1;
402
403
if (reset) { /* satisfy from markBuf */
404
canreturn = markBuf.remaining ();
405
willreturn = canreturn>srclen ? srclen : canreturn;
406
markBuf.get(b, off, willreturn);
407
if (canreturn == willreturn) {
408
reset = false;
409
}
410
} else { /* satisfy from channel */
411
canreturn = available();
412
if (canreturn == 0) {
413
block ();
414
canreturn = available();
415
}
416
willreturn = canreturn>srclen ? srclen : canreturn;
417
chanbuf.get(b, off, willreturn);
418
available -= willreturn;
419
420
if (marked) { /* copy into markBuf */
421
try {
422
markBuf.put (b, off, willreturn);
423
} catch (BufferOverflowException e) {
424
marked = false;
425
}
426
}
427
}
428
return willreturn;
429
}
430
431
public synchronized int available () throws IOException {
432
if (closed)
433
throw new IOException ("Stream is closed");
434
435
if (reset)
436
return markBuf.remaining();
437
438
if (available > 0)
439
return available;
440
441
chanbuf.clear ();
442
available = channel.read (chanbuf);
443
if (available > 0)
444
chanbuf.flip();
445
else if (available == -1)
446
throw new IOException ("Stream is closed");
447
return available;
448
}
449
450
/**
451
* block() only called when available==0 and buf is empty
452
*/
453
private synchronized void block () throws IOException {
454
//assert available == 0;
455
int n = selector.select ();
456
//assert n == 1;
457
selector.selectedKeys().clear();
458
available ();
459
}
460
461
public void close () throws IOException {
462
if (closed)
463
return;
464
channel.close ();
465
closed = true;
466
}
467
468
public synchronized void mark (int readlimit) {
469
if (closed)
470
return;
471
this.readlimit = readlimit;
472
markBuf = ByteBuffer.allocate (readlimit);
473
marked = true;
474
reset = false;
475
}
476
477
public synchronized void reset () throws IOException {
478
if (closed )
479
return;
480
if (!marked)
481
throw new IOException ("Stream not marked");
482
marked = false;
483
reset = true;
484
markBuf.flip ();
485
}
486
}
487
488
static class NioOutputStream extends OutputStream {
489
SocketChannel channel;
490
ByteBuffer buf;
491
SelectionKey key;
492
Selector selector;
493
boolean closed;
494
byte[] one;
495
496
public NioOutputStream (SocketChannel channel) throws IOException {
497
this.channel = channel;
498
selector = Selector.open ();
499
key = channel.register (selector, SelectionKey.OP_WRITE);
500
closed = false;
501
one = new byte [1];
502
}
503
504
public synchronized void write (int b) throws IOException {
505
one[0] = (byte)b;
506
write (one, 0, 1);
507
}
508
509
public synchronized void write (byte[] b) throws IOException {
510
write (b, 0, b.length);
511
}
512
513
public synchronized void write (byte[] b, int off, int len) throws IOException {
514
if (closed)
515
throw new IOException ("stream is closed");
516
517
buf = ByteBuffer.allocate (len);
518
buf.put (b, off, len);
519
buf.flip ();
520
int n;
521
while ((n = channel.write (buf)) < len) {
522
len -= n;
523
if (len == 0)
524
return;
525
selector.select ();
526
selector.selectedKeys().clear ();
527
}
528
}
529
530
public void close () throws IOException {
531
if (closed)
532
return;
533
channel.close ();
534
closed = true;
535
}
536
}
537
538
/*
539
* Pipeline object :-
540
* 1) Will pump every byte from its input stream to output stream
541
* 2) Is an 'active object'
542
*/
543
static class Pipeline implements Runnable {
544
InputStream in;
545
OutputStream out;
546
Thread t;
547
548
public Pipeline(InputStream is, OutputStream os) {
549
in = is;
550
out = os;
551
}
552
553
public void start() {
554
t = new Thread(this);
555
t.start();
556
}
557
558
public void join() throws InterruptedException {
559
t.join();
560
}
561
562
public void terminate() {
563
t.interrupt();
564
}
565
566
public void run() {
567
byte[] buffer = new byte[10000];
568
try {
569
while (!Thread.interrupted()) {
570
int len;
571
while ((len = in.read(buffer)) != -1) {
572
out.write(buffer, 0, len);
573
out.flush();
574
}
575
}
576
} catch(IOException e) {
577
// No-op
578
} finally {
579
}
580
}
581
}
582
583
/**
584
* Utilities for synchronization. A condition is
585
* identified by a string name, and is initialized
586
* upon first use (ie. setCondition() or waitForCondition()). Threads
587
* are blocked until some thread calls (or has called) setCondition() for the same
588
* condition.
589
* <P>
590
* A rendezvous built on a condition is also provided for synchronizing
591
* N threads.
592
*/
593
594
private static HashMap conditions = new HashMap();
595
596
/*
597
* Modifiable boolean object
598
*/
599
private static class BValue {
600
boolean v;
601
}
602
603
/*
604
* Modifiable int object
605
*/
606
private static class IValue {
607
int v;
608
IValue (int i) {
609
v =i;
610
}
611
}
612
613
614
private static BValue getCond (String condition) {
615
synchronized (conditions) {
616
BValue cond = (BValue) conditions.get (condition);
617
if (cond == null) {
618
cond = new BValue();
619
conditions.put (condition, cond);
620
}
621
return cond;
622
}
623
}
624
625
/**
626
* Set the condition to true. Any threads that are currently blocked
627
* waiting on the condition, will be unblocked and allowed to continue.
628
* Threads that subsequently call waitForCondition() will not block.
629
* If the named condition did not exist prior to the call, then it is created
630
* first.
631
*/
632
633
public static void setCondition (String condition) {
634
BValue cond = getCond (condition);
635
synchronized (cond) {
636
if (cond.v) {
637
return;
638
}
639
cond.v = true;
640
cond.notifyAll();
641
}
642
}
643
644
/**
645
* If the named condition does not exist, then it is created and initialized
646
* to false. If the condition exists or has just been created and its value
647
* is false, then the thread blocks until another thread sets the condition.
648
* If the condition exists and is already set to true, then this call returns
649
* immediately without blocking.
650
*/
651
652
public static void waitForCondition (String condition) {
653
BValue cond = getCond (condition);
654
synchronized (cond) {
655
if (!cond.v) {
656
try {
657
cond.wait();
658
} catch (InterruptedException e) {}
659
}
660
}
661
}
662
663
/* conditions must be locked when accessing this */
664
static HashMap rv = new HashMap();
665
666
/**
667
* Force N threads to rendezvous (ie. wait for each other) before proceeding.
668
* The first thread(s) to call are blocked until the last
669
* thread makes the call. Then all threads continue.
670
* <p>
671
* All threads that call with the same condition name, must use the same value
672
* for N (or the results may be not be as expected).
673
* <P>
674
* Obviously, if fewer than N threads make the rendezvous then the result
675
* will be a hang.
676
*/
677
678
public static void rendezvous (String condition, int N) {
679
BValue cond;
680
IValue iv;
681
String name = "RV_"+condition;
682
683
/* get the condition */
684
685
synchronized (conditions) {
686
cond = (BValue)conditions.get (name);
687
if (cond == null) {
688
/* we are first caller */
689
if (N < 2) {
690
throw new RuntimeException ("rendezvous must be called with N >= 2");
691
}
692
cond = new BValue ();
693
conditions.put (name, cond);
694
iv = new IValue (N-1);
695
rv.put (name, iv);
696
} else {
697
/* already initialised, just decrement the counter */
698
iv = (IValue) rv.get (name);
699
iv.v --;
700
}
701
}
702
703
if (iv.v > 0) {
704
waitForCondition (name);
705
} else {
706
setCondition (name);
707
synchronized (conditions) {
708
clearCondition (name);
709
rv.remove (name);
710
}
711
}
712
}
713
714
/**
715
* If the named condition exists and is set then remove it, so it can
716
* be re-initialized and used again. If the condition does not exist, or
717
* exists but is not set, then the call returns without doing anything.
718
* Note, some higher level synchronization
719
* may be needed between clear and the other operations.
720
*/
721
722
public static void clearCondition(String condition) {
723
BValue cond;
724
synchronized (conditions) {
725
cond = (BValue) conditions.get (condition);
726
if (cond == null) {
727
return;
728
}
729
synchronized (cond) {
730
if (cond.v) {
731
conditions.remove (condition);
732
}
733
}
734
}
735
}
736
}
737
738