Path: blob/master/test/jdk/sun/net/www/protocol/https/HttpsURLConnection/TunnelProxy.java
66649 views
/*1* Copyright (c) 2005, 2021, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/*24*25*/2627import java.io.BufferedInputStream;28import java.io.BufferedOutputStream;29import java.io.IOException;30import java.io.InputStream;31import java.io.OutputStream;32import java.net.InetAddress;33import java.net.InetSocketAddress;34import java.net.Socket;35import java.net.URI;36import java.net.URISyntaxException;37import java.nio.BufferOverflowException;38import java.nio.ByteBuffer;39import java.nio.channels.SelectionKey;40import java.nio.channels.Selector;41import java.nio.channels.ServerSocketChannel;42import java.nio.channels.SocketChannel;43import java.util.HashMap;44import java.util.Iterator;45import java.util.Set;4647import sun.net.www.MessageHeader;4849public class TunnelProxy {5051ServerSocketChannel schan;52int threads;53int cperthread;54Server[] servers;5556/**57* Create a <code>TunnelProxy<code> instance with the specified callback object58* for handling requests. One thread is created to handle requests,59* and up to ten TCP connections will be handled simultaneously.60* incoming request61*/6263public TunnelProxy () throws IOException {64this (1, 10, 0);65}6667/**68* Create a <code>TunnelProxy<code> instance with the specified number of69* threads and maximum number of connections per thread. This functions70* the same as the 4 arg constructor, where the port argument is set to zero.71* @param threads the number of threads to create to handle requests72* in parallel73* @param cperthread the number of simultaneous TCP connections to74* handle per thread75*/7677public TunnelProxy (int threads, int cperthread)78throws IOException {79this (threads, cperthread, 0);80}8182/**83* Create a <code>TunnelProxy<code> instance with the specified number84* of threads and maximum number of connections per thread and running on85* the specified port. The specified number of threads are created to86* handle incoming requests, and each thread is allowed87* to handle a number of simultaneous TCP connections.88* @param threads the number of threads to create to handle89* requests in parallel90* @param cperthread the number of simultaneous TCP connections91* to handle per thread92* @param port the port number to bind the server to. <code>Zero</code>93* means choose any free port.94*/9596public TunnelProxy (int threads, int cperthread, int port)97throws IOException {98this(threads, cperthread, null, 0);99}100101/**102* Create a <code>TunnelProxy<code> instance with the specified number103* of threads and maximum number of connections per thread and running on104* the specified port. The specified number of threads are created to105* handle incoming requests, and each thread is allowed106* to handle a number of simultaneous TCP connections.107* @param threads the number of threads to create to handle108* requests in parallel109* @param cperthread the number of simultaneous TCP connections110* to handle per thread111* @param address the address to bind to. null means all addresses.112* @param port the port number to bind the server to. <code>Zero</code>113* means choose any free port.114*/115public TunnelProxy (int threads, int cperthread, InetAddress address, int port)116throws IOException {117schan = ServerSocketChannel.open ();118InetSocketAddress addr = new InetSocketAddress (address, port);119schan.socket().bind (addr);120this.threads = threads;121this.cperthread = cperthread;122servers = new Server [threads];123for (int i=0; i<threads; i++) {124servers[i] = new Server (schan, cperthread);125servers[i].start();126}127}128129/** Tell all threads in the server to exit within 5 seconds.130* This is an abortive termination. Just prior to the thread exiting131* all channels in that thread waiting to be closed are forceably closed.132*/133134public void terminate () {135for (int i=0; i<threads; i++) {136servers[i].terminate ();137}138}139140/**141* return the local port number to which the server is bound.142* @return the local port number143*/144145public int getLocalPort () {146return schan.socket().getLocalPort ();147}148149static class Server extends Thread {150151ServerSocketChannel schan;152Selector selector;153SelectionKey listenerKey;154SelectionKey key; /* the current key being processed */155ByteBuffer consumeBuffer;156int maxconn;157int nconn;158ClosedChannelList clist;159boolean shutdown;160Pipeline pipe1 = null;161Pipeline pipe2 = null;162163Server (ServerSocketChannel schan, int maxconn) {164this.schan = schan;165this.maxconn = maxconn;166nconn = 0;167consumeBuffer = ByteBuffer.allocate (512);168clist = new ClosedChannelList ();169try {170selector = Selector.open ();171schan.configureBlocking (false);172listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);173} catch (IOException e) {174System.err.println ("Server could not start: " + e);175}176}177178/* Stop the thread as soon as possible */179public synchronized void terminate () {180shutdown = true;181if (pipe1 != null) pipe1.terminate();182if (pipe2 != null) pipe2.terminate();183}184185public void run () {186try {187while (true) {188selector.select (1000);189Set selected = selector.selectedKeys();190Iterator iter = selected.iterator();191while (iter.hasNext()) {192key = (SelectionKey)iter.next();193if (key.equals (listenerKey)) {194SocketChannel sock = schan.accept ();195if (sock == null) {196/* false notification */197iter.remove();198continue;199}200sock.configureBlocking (false);201sock.register (selector, SelectionKey.OP_READ);202nconn ++;203if (nconn == maxconn) {204/* deregister */205listenerKey.cancel ();206listenerKey = null;207}208} else {209if (key.isReadable()) {210boolean closed;211SocketChannel chan = (SocketChannel) key.channel();212if (key.attachment() != null) {213closed = consume (chan);214} else {215closed = read (chan, key);216}217if (closed) {218chan.close ();219key.cancel ();220if (nconn == maxconn) {221listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);222}223nconn --;224}225}226}227iter.remove();228}229clist.check();230if (shutdown) {231clist.terminate ();232return;233}234}235} catch (IOException e) {236System.out.println ("Server exception: " + e);237// TODO finish238}239}240241/* read all the data off the channel without looking at it242* return true if connection closed243*/244boolean consume (SocketChannel chan) {245try {246consumeBuffer.clear ();247int c = chan.read (consumeBuffer);248if (c == -1)249return true;250} catch (IOException e) {251return true;252}253return false;254}255256/* return true if the connection is closed, false otherwise */257258private boolean read (SocketChannel chan, SelectionKey key) {259boolean res;260try {261InputStream is = new BufferedInputStream (new NioInputStream (chan));262String requestline = readLine (is);263MessageHeader mhead = new MessageHeader (is);264String[] req = requestline.split (" ");265if (req.length < 2) {266/* invalid request line */267return false;268}269String cmd = req[0];270URI uri = null;271if (!("CONNECT".equalsIgnoreCase(cmd))) {272// we expect CONNECT command273return false;274}275try {276uri = new URI("http://" + req[1]);277} catch (URISyntaxException e) {278System.err.println ("Invalid URI: " + e);279res = true;280}281282// CONNECT ack283OutputStream os = new BufferedOutputStream(new NioOutputStream(chan));284byte[] ack = "HTTP/1.1 200 Connection established\r\n\r\n".getBytes();285os.write(ack, 0, ack.length);286os.flush();287288// tunnel anything else289tunnel(is, os, uri);290291res = false;292} catch (IOException e) {293res = true;294}295return res;296}297298private void tunnel(InputStream fromClient, OutputStream toClient, URI serverURI) throws IOException {299Socket sockToServer = new Socket(serverURI.getHost(), serverURI.getPort());300OutputStream toServer = sockToServer.getOutputStream();301InputStream fromServer = sockToServer.getInputStream();302303pipe1 = new Pipeline(fromClient, toServer);304pipe2 = new Pipeline(fromServer, toClient);305// start pump306pipe1.start();307pipe2.start();308// wait them to end309try {310pipe1.join();311} catch (InterruptedException e) {312// No-op313} finally {314sockToServer.close();315}316}317318private String readLine (InputStream is) throws IOException {319boolean done=false, readCR=false;320byte[] b = new byte [512];321int c, l = 0;322323while (!done) {324c = is.read ();325if (c == '\n' && readCR) {326done = true;327} else {328if (c == '\r' && !readCR) {329readCR = true;330} else {331b[l++] = (byte)c;332}333}334}335return new String (b);336}337338/** close the channel associated with the current key by:339* 1. shutdownOutput (send a FIN)340* 2. mark the key so that incoming data is to be consumed and discarded341* 3. After a period, close the socket342*/343344synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {345SocketChannel ch = (SocketChannel)key.channel ();346ch.socket().shutdownOutput();347key.attach (this);348clist.add (key);349}350351synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {352SocketChannel ch = (SocketChannel)key.channel ();353Socket s = ch.socket ();354s.setSoLinger (true, 0);355ch.close();356}357}358359360/**361* Implements blocking reading semantics on top of a non-blocking channel362*/363364static class NioInputStream extends InputStream {365SocketChannel channel;366Selector selector;367ByteBuffer chanbuf;368SelectionKey key;369int available;370byte[] one;371boolean closed;372ByteBuffer markBuf; /* reads may be satisifed from this buffer */373boolean marked;374boolean reset;375int readlimit;376377public NioInputStream (SocketChannel chan) throws IOException {378this.channel = chan;379selector = Selector.open();380chanbuf = ByteBuffer.allocate (1024);381key = chan.register (selector, SelectionKey.OP_READ);382available = 0;383one = new byte[1];384closed = marked = reset = false;385}386387public synchronized int read (byte[] b) throws IOException {388return read (b, 0, b.length);389}390391public synchronized int read () throws IOException {392return read (one, 0, 1);393}394395public synchronized int read (byte[] b, int off, int srclen) throws IOException {396397int canreturn, willreturn;398399if (closed)400return -1;401402if (reset) { /* satisfy from markBuf */403canreturn = markBuf.remaining ();404willreturn = canreturn>srclen ? srclen : canreturn;405markBuf.get(b, off, willreturn);406if (canreturn == willreturn) {407reset = false;408}409} else { /* satisfy from channel */410canreturn = available();411if (canreturn == 0) {412block ();413canreturn = available();414}415willreturn = canreturn>srclen ? srclen : canreturn;416chanbuf.get(b, off, willreturn);417available -= willreturn;418419if (marked) { /* copy into markBuf */420try {421markBuf.put (b, off, willreturn);422} catch (BufferOverflowException e) {423marked = false;424}425}426}427return willreturn;428}429430public synchronized int available () throws IOException {431if (closed)432throw new IOException ("Stream is closed");433434if (reset)435return markBuf.remaining();436437if (available > 0)438return available;439440chanbuf.clear ();441available = channel.read (chanbuf);442if (available > 0)443chanbuf.flip();444else if (available == -1)445throw new IOException ("Stream is closed");446return available;447}448449/**450* block() only called when available==0 and buf is empty451*/452private synchronized void block () throws IOException {453//assert available == 0;454int n = selector.select ();455//assert n == 1;456selector.selectedKeys().clear();457available ();458}459460public void close () throws IOException {461if (closed)462return;463channel.close ();464closed = true;465}466467public synchronized void mark (int readlimit) {468if (closed)469return;470this.readlimit = readlimit;471markBuf = ByteBuffer.allocate (readlimit);472marked = true;473reset = false;474}475476public synchronized void reset () throws IOException {477if (closed )478return;479if (!marked)480throw new IOException ("Stream not marked");481marked = false;482reset = true;483markBuf.flip ();484}485}486487static class NioOutputStream extends OutputStream {488SocketChannel channel;489ByteBuffer buf;490SelectionKey key;491Selector selector;492boolean closed;493byte[] one;494495public NioOutputStream (SocketChannel channel) throws IOException {496this.channel = channel;497selector = Selector.open ();498key = channel.register (selector, SelectionKey.OP_WRITE);499closed = false;500one = new byte [1];501}502503public synchronized void write (int b) throws IOException {504one[0] = (byte)b;505write (one, 0, 1);506}507508public synchronized void write (byte[] b) throws IOException {509write (b, 0, b.length);510}511512public synchronized void write (byte[] b, int off, int len) throws IOException {513if (closed)514throw new IOException ("stream is closed");515516buf = ByteBuffer.allocate (len);517buf.put (b, off, len);518buf.flip ();519int n;520while ((n = channel.write (buf)) < len) {521len -= n;522if (len == 0)523return;524selector.select ();525selector.selectedKeys().clear ();526}527}528529public void close () throws IOException {530if (closed)531return;532channel.close ();533closed = true;534}535}536537/*538* Pipeline object :-539* 1) Will pump every byte from its input stream to output stream540* 2) Is an 'active object'541*/542static class Pipeline implements Runnable {543InputStream in;544OutputStream out;545Thread t;546547public Pipeline(InputStream is, OutputStream os) {548in = is;549out = os;550}551552public void start() {553t = new Thread(this);554t.start();555}556557public void join() throws InterruptedException {558t.join();559}560561public void terminate() {562t.interrupt();563}564565public void run() {566byte[] buffer = new byte[10000];567try {568while (!Thread.interrupted()) {569int len;570while ((len = in.read(buffer)) != -1) {571out.write(buffer, 0, len);572out.flush();573}574}575} catch(IOException e) {576// No-op577} finally {578}579}580}581582/**583* Utilities for synchronization. A condition is584* identified by a string name, and is initialized585* upon first use (ie. setCondition() or waitForCondition()). Threads586* are blocked until some thread calls (or has called) setCondition() for the same587* condition.588* <P>589* A rendezvous built on a condition is also provided for synchronizing590* N threads.591*/592593private static HashMap conditions = new HashMap();594595/*596* Modifiable boolean object597*/598private static class BValue {599boolean v;600}601602/*603* Modifiable int object604*/605private static class IValue {606int v;607IValue (int i) {608v =i;609}610}611612613private static BValue getCond (String condition) {614synchronized (conditions) {615BValue cond = (BValue) conditions.get (condition);616if (cond == null) {617cond = new BValue();618conditions.put (condition, cond);619}620return cond;621}622}623624/**625* Set the condition to true. Any threads that are currently blocked626* waiting on the condition, will be unblocked and allowed to continue.627* Threads that subsequently call waitForCondition() will not block.628* If the named condition did not exist prior to the call, then it is created629* first.630*/631632public static void setCondition (String condition) {633BValue cond = getCond (condition);634synchronized (cond) {635if (cond.v) {636return;637}638cond.v = true;639cond.notifyAll();640}641}642643/**644* If the named condition does not exist, then it is created and initialized645* to false. If the condition exists or has just been created and its value646* is false, then the thread blocks until another thread sets the condition.647* If the condition exists and is already set to true, then this call returns648* immediately without blocking.649*/650651public static void waitForCondition (String condition) {652BValue cond = getCond (condition);653synchronized (cond) {654if (!cond.v) {655try {656cond.wait();657} catch (InterruptedException e) {}658}659}660}661662/* conditions must be locked when accessing this */663static HashMap rv = new HashMap();664665/**666* Force N threads to rendezvous (ie. wait for each other) before proceeding.667* The first thread(s) to call are blocked until the last668* thread makes the call. Then all threads continue.669* <p>670* All threads that call with the same condition name, must use the same value671* for N (or the results may be not be as expected).672* <P>673* Obviously, if fewer than N threads make the rendezvous then the result674* will be a hang.675*/676677public static void rendezvous (String condition, int N) {678BValue cond;679IValue iv;680String name = "RV_"+condition;681682/* get the condition */683684synchronized (conditions) {685cond = (BValue)conditions.get (name);686if (cond == null) {687/* we are first caller */688if (N < 2) {689throw new RuntimeException ("rendezvous must be called with N >= 2");690}691cond = new BValue ();692conditions.put (name, cond);693iv = new IValue (N-1);694rv.put (name, iv);695} else {696/* already initialised, just decrement the counter */697iv = (IValue) rv.get (name);698iv.v --;699}700}701702if (iv.v > 0) {703waitForCondition (name);704} else {705setCondition (name);706synchronized (conditions) {707clearCondition (name);708rv.remove (name);709}710}711}712713/**714* If the named condition exists and is set then remove it, so it can715* be re-initialized and used again. If the condition does not exist, or716* exists but is not set, then the call returns without doing anything.717* Note, some higher level synchronization718* may be needed between clear and the other operations.719*/720721public static void clearCondition(String condition) {722BValue cond;723synchronized (conditions) {724cond = (BValue) conditions.get (condition);725if (cond == null) {726return;727}728synchronized (cond) {729if (cond.v) {730conditions.remove (condition);731}732}733}734}735}736737738