Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/test/sun/net/www/protocol/https/HttpsURLConnection/TunnelProxy.java
38889 views
/*1* Copyright (c) 2005, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223/*24*25*/2627import java.net.*;28import java.io.*;29import java.nio.*;30import java.nio.channels.*;31import sun.net.www.MessageHeader;32import java.util.*;3334public class TunnelProxy {3536ServerSocketChannel schan;37int threads;38int cperthread;39Server[] servers;4041/**42* Create a <code>TunnelProxy<code> instance with the specified callback object43* for handling requests. One thread is created to handle requests,44* and up to ten TCP connections will be handled simultaneously.45* @param cb the callback object which is invoked to handle each46* incoming request47*/4849public TunnelProxy () throws IOException {50this (1, 10, 0);51}5253/**54* Create a <code>TunnelProxy<code> instance with the specified number of55* threads and maximum number of connections per thread. This functions56* the same as the 4 arg constructor, where the port argument is set to zero.57* @param cb the callback object which is invoked to handle each58* incoming request59* @param threads the number of threads to create to handle requests60* in parallel61* @param cperthread the number of simultaneous TCP connections to62* handle per thread63*/6465public TunnelProxy (int threads, int cperthread)66throws IOException {67this (threads, cperthread, 0);68}6970/**71* Create a <code>TunnelProxy<code> instance with the specified number72* of threads and maximum number of connections per thread and running on73* the specified port. The specified number of threads are created to74* handle incoming requests, and each thread is allowed75* to handle a number of simultaneous TCP connections.76* @param cb the callback object which is invoked to handle77* each incoming request78* @param threads the number of threads to create to handle79* requests in parallel80* @param cperthread the number of simultaneous TCP connections81* to handle per thread82* @param port the port number to bind the server to. <code>Zero</code>83* means choose any free port.84*/8586public TunnelProxy (int threads, int cperthread, int port)87throws IOException {88schan = ServerSocketChannel.open ();89InetSocketAddress addr = new InetSocketAddress (port);90schan.socket().bind (addr);91this.threads = threads;92this.cperthread = cperthread;93servers = new Server [threads];94for (int i=0; i<threads; i++) {95servers[i] = new Server (schan, cperthread);96servers[i].start();97}98}99100/** Tell all threads in the server to exit within 5 seconds.101* This is an abortive termination. Just prior to the thread exiting102* all channels in that thread waiting to be closed are forceably closed.103*/104105public void terminate () {106for (int i=0; i<threads; i++) {107servers[i].terminate ();108}109}110111/**112* return the local port number to which the server is bound.113* @return the local port number114*/115116public int getLocalPort () {117return schan.socket().getLocalPort ();118}119120static class Server extends Thread {121122ServerSocketChannel schan;123Selector selector;124SelectionKey listenerKey;125SelectionKey key; /* the current key being processed */126ByteBuffer consumeBuffer;127int maxconn;128int nconn;129ClosedChannelList clist;130boolean shutdown;131Pipeline pipe1 = null;132Pipeline pipe2 = null;133134Server (ServerSocketChannel schan, int maxconn) {135this.schan = schan;136this.maxconn = maxconn;137nconn = 0;138consumeBuffer = ByteBuffer.allocate (512);139clist = new ClosedChannelList ();140try {141selector = Selector.open ();142schan.configureBlocking (false);143listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);144} catch (IOException e) {145System.err.println ("Server could not start: " + e);146}147}148149/* Stop the thread as soon as possible */150public synchronized void terminate () {151shutdown = true;152if (pipe1 != null) pipe1.terminate();153if (pipe2 != null) pipe2.terminate();154}155156public void run () {157try {158while (true) {159selector.select (1000);160Set selected = selector.selectedKeys();161Iterator iter = selected.iterator();162while (iter.hasNext()) {163key = (SelectionKey)iter.next();164if (key.equals (listenerKey)) {165SocketChannel sock = schan.accept ();166if (sock == null) {167/* false notification */168iter.remove();169continue;170}171sock.configureBlocking (false);172sock.register (selector, SelectionKey.OP_READ);173nconn ++;174if (nconn == maxconn) {175/* deregister */176listenerKey.cancel ();177listenerKey = null;178}179} else {180if (key.isReadable()) {181boolean closed;182SocketChannel chan = (SocketChannel) key.channel();183if (key.attachment() != null) {184closed = consume (chan);185} else {186closed = read (chan, key);187}188if (closed) {189chan.close ();190key.cancel ();191if (nconn == maxconn) {192listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);193}194nconn --;195}196}197}198iter.remove();199}200clist.check();201if (shutdown) {202clist.terminate ();203return;204}205}206} catch (IOException e) {207System.out.println ("Server exception: " + e);208// TODO finish209}210}211212/* read all the data off the channel without looking at it213* return true if connection closed214*/215boolean consume (SocketChannel chan) {216try {217consumeBuffer.clear ();218int c = chan.read (consumeBuffer);219if (c == -1)220return true;221} catch (IOException e) {222return true;223}224return false;225}226227/* return true if the connection is closed, false otherwise */228229private boolean read (SocketChannel chan, SelectionKey key) {230HttpTransaction msg;231boolean res;232try {233InputStream is = new BufferedInputStream (new NioInputStream (chan));234String requestline = readLine (is);235MessageHeader mhead = new MessageHeader (is);236String[] req = requestline.split (" ");237if (req.length < 2) {238/* invalid request line */239return false;240}241String cmd = req[0];242URI uri = null;243if (!("CONNECT".equalsIgnoreCase(cmd))) {244// we expect CONNECT command245return false;246}247try {248uri = new URI("http://" + req[1]);249} catch (URISyntaxException e) {250System.err.println ("Invalid URI: " + e);251res = true;252}253254// CONNECT ack255OutputStream os = new BufferedOutputStream(new NioOutputStream(chan));256byte[] ack = "HTTP/1.1 200 Connection established\r\n\r\n".getBytes();257os.write(ack, 0, ack.length);258os.flush();259260// tunnel anything else261tunnel(is, os, uri);262263res = false;264} catch (IOException e) {265res = true;266}267return res;268}269270private void tunnel(InputStream fromClient, OutputStream toClient, URI serverURI) throws IOException {271Socket sockToServer = new Socket(serverURI.getHost(), serverURI.getPort());272OutputStream toServer = sockToServer.getOutputStream();273InputStream fromServer = sockToServer.getInputStream();274275pipe1 = new Pipeline(fromClient, toServer);276pipe2 = new Pipeline(fromServer, toClient);277// start pump278pipe1.start();279pipe2.start();280// wait them to end281try {282pipe1.join();283} catch (InterruptedException e) {284// No-op285} finally {286sockToServer.close();287}288}289290private String readLine (InputStream is) throws IOException {291boolean done=false, readCR=false;292byte[] b = new byte [512];293int c, l = 0;294295while (!done) {296c = is.read ();297if (c == '\n' && readCR) {298done = true;299} else {300if (c == '\r' && !readCR) {301readCR = true;302} else {303b[l++] = (byte)c;304}305}306}307return new String (b);308}309310/** close the channel associated with the current key by:311* 1. shutdownOutput (send a FIN)312* 2. mark the key so that incoming data is to be consumed and discarded313* 3. After a period, close the socket314*/315316synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {317SocketChannel ch = (SocketChannel)key.channel ();318ch.socket().shutdownOutput();319key.attach (this);320clist.add (key);321}322323synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {324SocketChannel ch = (SocketChannel)key.channel ();325Socket s = ch.socket ();326s.setSoLinger (true, 0);327ch.close();328}329}330331332/**333* Implements blocking reading semantics on top of a non-blocking channel334*/335336static class NioInputStream extends InputStream {337SocketChannel channel;338Selector selector;339ByteBuffer chanbuf;340SelectionKey key;341int available;342byte[] one;343boolean closed;344ByteBuffer markBuf; /* reads may be satisifed from this buffer */345boolean marked;346boolean reset;347int readlimit;348349public NioInputStream (SocketChannel chan) throws IOException {350this.channel = chan;351selector = Selector.open();352chanbuf = ByteBuffer.allocate (1024);353key = chan.register (selector, SelectionKey.OP_READ);354available = 0;355one = new byte[1];356closed = marked = reset = false;357}358359public synchronized int read (byte[] b) throws IOException {360return read (b, 0, b.length);361}362363public synchronized int read () throws IOException {364return read (one, 0, 1);365}366367public synchronized int read (byte[] b, int off, int srclen) throws IOException {368369int canreturn, willreturn;370371if (closed)372return -1;373374if (reset) { /* satisfy from markBuf */375canreturn = markBuf.remaining ();376willreturn = canreturn>srclen ? srclen : canreturn;377markBuf.get(b, off, willreturn);378if (canreturn == willreturn) {379reset = false;380}381} else { /* satisfy from channel */382canreturn = available();383if (canreturn == 0) {384block ();385canreturn = available();386}387willreturn = canreturn>srclen ? srclen : canreturn;388chanbuf.get(b, off, willreturn);389available -= willreturn;390391if (marked) { /* copy into markBuf */392try {393markBuf.put (b, off, willreturn);394} catch (BufferOverflowException e) {395marked = false;396}397}398}399return willreturn;400}401402public synchronized int available () throws IOException {403if (closed)404throw new IOException ("Stream is closed");405406if (reset)407return markBuf.remaining();408409if (available > 0)410return available;411412chanbuf.clear ();413available = channel.read (chanbuf);414if (available > 0)415chanbuf.flip();416else if (available == -1)417throw new IOException ("Stream is closed");418return available;419}420421/**422* block() only called when available==0 and buf is empty423*/424private synchronized void block () throws IOException {425//assert available == 0;426int n = selector.select ();427//assert n == 1;428selector.selectedKeys().clear();429available ();430}431432public void close () throws IOException {433if (closed)434return;435channel.close ();436closed = true;437}438439public synchronized void mark (int readlimit) {440if (closed)441return;442this.readlimit = readlimit;443markBuf = ByteBuffer.allocate (readlimit);444marked = true;445reset = false;446}447448public synchronized void reset () throws IOException {449if (closed )450return;451if (!marked)452throw new IOException ("Stream not marked");453marked = false;454reset = true;455markBuf.flip ();456}457}458459static class NioOutputStream extends OutputStream {460SocketChannel channel;461ByteBuffer buf;462SelectionKey key;463Selector selector;464boolean closed;465byte[] one;466467public NioOutputStream (SocketChannel channel) throws IOException {468this.channel = channel;469selector = Selector.open ();470key = channel.register (selector, SelectionKey.OP_WRITE);471closed = false;472one = new byte [1];473}474475public synchronized void write (int b) throws IOException {476one[0] = (byte)b;477write (one, 0, 1);478}479480public synchronized void write (byte[] b) throws IOException {481write (b, 0, b.length);482}483484public synchronized void write (byte[] b, int off, int len) throws IOException {485if (closed)486throw new IOException ("stream is closed");487488buf = ByteBuffer.allocate (len);489buf.put (b, off, len);490buf.flip ();491int n;492while ((n = channel.write (buf)) < len) {493len -= n;494if (len == 0)495return;496selector.select ();497selector.selectedKeys().clear ();498}499}500501public void close () throws IOException {502if (closed)503return;504channel.close ();505closed = true;506}507}508509/*510* Pipeline object :-511* 1) Will pump every byte from its input stream to output stream512* 2) Is an 'active object'513*/514static class Pipeline implements Runnable {515InputStream in;516OutputStream out;517Thread t;518519public Pipeline(InputStream is, OutputStream os) {520in = is;521out = os;522}523524public void start() {525t = new Thread(this);526t.start();527}528529public void join() throws InterruptedException {530t.join();531}532533public void terminate() {534t.interrupt();535}536537public void run() {538byte[] buffer = new byte[10000];539try {540while (!Thread.interrupted()) {541int len;542while ((len = in.read(buffer)) != -1) {543out.write(buffer, 0, len);544out.flush();545}546}547} catch(IOException e) {548// No-op549} finally {550}551}552}553554/**555* Utilities for synchronization. A condition is556* identified by a string name, and is initialized557* upon first use (ie. setCondition() or waitForCondition()). Threads558* are blocked until some thread calls (or has called) setCondition() for the same559* condition.560* <P>561* A rendezvous built on a condition is also provided for synchronizing562* N threads.563*/564565private static HashMap conditions = new HashMap();566567/*568* Modifiable boolean object569*/570private static class BValue {571boolean v;572}573574/*575* Modifiable int object576*/577private static class IValue {578int v;579IValue (int i) {580v =i;581}582}583584585private static BValue getCond (String condition) {586synchronized (conditions) {587BValue cond = (BValue) conditions.get (condition);588if (cond == null) {589cond = new BValue();590conditions.put (condition, cond);591}592return cond;593}594}595596/**597* Set the condition to true. Any threads that are currently blocked598* waiting on the condition, will be unblocked and allowed to continue.599* Threads that subsequently call waitForCondition() will not block.600* If the named condition did not exist prior to the call, then it is created601* first.602*/603604public static void setCondition (String condition) {605BValue cond = getCond (condition);606synchronized (cond) {607if (cond.v) {608return;609}610cond.v = true;611cond.notifyAll();612}613}614615/**616* If the named condition does not exist, then it is created and initialized617* to false. If the condition exists or has just been created and its value618* is false, then the thread blocks until another thread sets the condition.619* If the condition exists and is already set to true, then this call returns620* immediately without blocking.621*/622623public static void waitForCondition (String condition) {624BValue cond = getCond (condition);625synchronized (cond) {626if (!cond.v) {627try {628cond.wait();629} catch (InterruptedException e) {}630}631}632}633634/* conditions must be locked when accessing this */635static HashMap rv = new HashMap();636637/**638* Force N threads to rendezvous (ie. wait for each other) before proceeding.639* The first thread(s) to call are blocked until the last640* thread makes the call. Then all threads continue.641* <p>642* All threads that call with the same condition name, must use the same value643* for N (or the results may be not be as expected).644* <P>645* Obviously, if fewer than N threads make the rendezvous then the result646* will be a hang.647*/648649public static void rendezvous (String condition, int N) {650BValue cond;651IValue iv;652String name = "RV_"+condition;653654/* get the condition */655656synchronized (conditions) {657cond = (BValue)conditions.get (name);658if (cond == null) {659/* we are first caller */660if (N < 2) {661throw new RuntimeException ("rendezvous must be called with N >= 2");662}663cond = new BValue ();664conditions.put (name, cond);665iv = new IValue (N-1);666rv.put (name, iv);667} else {668/* already initialised, just decrement the counter */669iv = (IValue) rv.get (name);670iv.v --;671}672}673674if (iv.v > 0) {675waitForCondition (name);676} else {677setCondition (name);678synchronized (conditions) {679clearCondition (name);680rv.remove (name);681}682}683}684685/**686* If the named condition exists and is set then remove it, so it can687* be re-initialized and used again. If the condition does not exist, or688* exists but is not set, then the call returns without doing anything.689* Note, some higher level synchronization690* may be needed between clear and the other operations.691*/692693public static void clearCondition(String condition) {694BValue cond;695synchronized (conditions) {696cond = (BValue) conditions.get (condition);697if (cond == null) {698return;699}700synchronized (cond) {701if (cond.v) {702conditions.remove (condition);703}704}705}706}707}708709710