Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/test/sun/net/www/httptest/TestHttpServer.java
38854 views
/*1* Copyright (c) 2002, 2012, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation.7*8* This code is distributed in the hope that it will be useful, but WITHOUT9* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or10* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License11* version 2 for more details (a copy is included in the LICENSE file that12* accompanied this code).13*14* You should have received a copy of the GNU General Public License version15* 2 along with this work; if not, write to the Free Software Foundation,16* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.17*18* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA19* or visit www.oracle.com if you need additional information or have any20* questions.21*/2223import java.net.*;24import java.io.*;25import java.nio.*;26import java.nio.channels.*;27import sun.net.www.MessageHeader;28import java.util.*;2930/**31* This class implements a simple HTTP server. It uses multiple threads to32* handle connections in parallel, and also multiple connections/requests33* can be handled per thread.34* <p>35* It must be instantiated with a {@link HttpCallback} object to which36* requests are given and must be handled.37* <p>38* Simple synchronization between the client(s) and server can be done39* using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and40* {@link #rendezvous(String,int)} methods.41*42* NOTE NOTE NOTE NOTE NOTE NOTE NOTE43*44* If changes are made here, please sure they are propagated to45* the HTTPS equivalent in the JSSE regression test suite.46*47* NOTE NOTE NOTE NOTE NOTE NOTE NOTE48*/4950public class TestHttpServer {5152ServerSocketChannel schan;53int threads;54int cperthread;55HttpCallback cb;56Server[] servers;5758/**59* Create a <code>TestHttpServer<code> instance with the specified callback object60* for handling requests. One thread is created to handle requests,61* and up to ten TCP connections will be handled simultaneously.62* @param cb the callback object which is invoked to handle each63* incoming request64*/6566public TestHttpServer (HttpCallback cb) throws IOException {67this (cb, 1, 10, 0);68}6970/**71* Create a <code>TestHttpServer<code> instance with the specified number of72* threads and maximum number of connections per thread. This functions73* the same as the 4 arg constructor, where the port argument is set to zero.74* @param cb the callback object which is invoked to handle each75* incoming request76* @param threads the number of threads to create to handle requests77* in parallel78* @param cperthread the number of simultaneous TCP connections to79* handle per thread80*/8182public TestHttpServer (HttpCallback cb, int threads, int cperthread)83throws IOException {84this (cb, threads, cperthread, 0);85}8687/**88* Create a <code>TestHttpServer<code> instance with the specified number89* of threads and maximum number of connections per thread and running on90* the specified port. The specified number of threads are created to91* handle incoming requests, and each thread is allowed92* to handle a number of simultaneous TCP connections.93* @param cb the callback object which is invoked to handle94* each incoming request95* @param threads the number of threads to create to handle96* requests in parallel97* @param cperthread the number of simultaneous TCP connections98* to handle per thread99* @param port the port number to bind the server to. <code>Zero</code>100* means choose any free port.101*/102103public TestHttpServer (HttpCallback cb, int threads, int cperthread, int port)104throws IOException {105schan = ServerSocketChannel.open ();106InetSocketAddress addr = new InetSocketAddress (port);107schan.socket().bind (addr);108this.threads = threads;109this.cb = cb;110this.cperthread = cperthread;111servers = new Server [threads];112for (int i=0; i<threads; i++) {113servers[i] = new Server (cb, schan, cperthread);114servers[i].start();115}116}117118/** Tell all threads in the server to exit within 5 seconds.119* This is an abortive termination. Just prior to the thread exiting120* all channels in that thread waiting to be closed are forceably closed.121*/122123public void terminate () {124for (int i=0; i<threads; i++) {125servers[i].terminate ();126}127}128129/**130* return the local port number to which the server is bound.131* @return the local port number132*/133134public int getLocalPort () {135return schan.socket().getLocalPort ();136}137138static class Server extends Thread {139140ServerSocketChannel schan;141Selector selector;142SelectionKey listenerKey;143SelectionKey key; /* the current key being processed */144HttpCallback cb;145ByteBuffer consumeBuffer;146int maxconn;147int nconn;148ClosedChannelList clist;149boolean shutdown;150151Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) {152this.schan = schan;153this.maxconn = maxconn;154this.cb = cb;155nconn = 0;156consumeBuffer = ByteBuffer.allocate (512);157clist = new ClosedChannelList ();158try {159selector = Selector.open ();160schan.configureBlocking (false);161listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);162} catch (IOException e) {163System.err.println ("Server could not start: " + e);164}165}166167/* Stop the thread as soon as possible */168public synchronized void terminate () {169shutdown = true;170}171172public void run () {173try {174while (true) {175selector.select (1000);176Set selected = selector.selectedKeys();177Iterator iter = selected.iterator();178while (iter.hasNext()) {179key = (SelectionKey)iter.next();180if (key.equals (listenerKey)) {181SocketChannel sock = schan.accept ();182if (sock == null) {183/* false notification */184iter.remove();185continue;186}187sock.configureBlocking (false);188sock.register (selector, SelectionKey.OP_READ);189nconn ++;190System.out.println("SERVER: new connection. chan[" + sock + "]");191if (nconn == maxconn) {192/* deregister */193listenerKey.cancel ();194listenerKey = null;195}196} else {197if (key.isReadable()) {198boolean closed;199SocketChannel chan = (SocketChannel) key.channel();200System.out.println("SERVER: connection readable. chan[" + chan + "]");201if (key.attachment() != null) {202System.out.println("Server: comsume");203closed = consume (chan);204} else {205closed = read (chan, key);206}207if (closed) {208chan.close ();209key.cancel ();210if (nconn == maxconn) {211listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);212}213nconn --;214}215}216}217iter.remove();218}219clist.check();220if (shutdown) {221clist.terminate ();222return;223}224}225} catch (IOException e) {226System.out.println ("Server exception: " + e);227// TODO finish228}229}230231/* read all the data off the channel without looking at it232* return true if connection closed233*/234boolean consume (SocketChannel chan) {235try {236consumeBuffer.clear ();237int c = chan.read (consumeBuffer);238if (c == -1)239return true;240} catch (IOException e) {241return true;242}243return false;244}245246/* return true if the connection is closed, false otherwise */247248private boolean read (SocketChannel chan, SelectionKey key) {249HttpTransaction msg;250boolean res;251try {252InputStream is = new BufferedInputStream (new NioInputStream (chan));253String requestline = readLine (is);254MessageHeader mhead = new MessageHeader (is);255String clen = mhead.findValue ("Content-Length");256String trferenc = mhead.findValue ("Transfer-Encoding");257String data = null;258if (trferenc != null && trferenc.equals ("chunked"))259data = new String (readChunkedData (is));260else if (clen != null)261data = new String (readNormalData (is, Integer.parseInt (clen)));262String[] req = requestline.split (" ");263if (req.length < 2) {264/* invalid request line */265return false;266}267String cmd = req[0];268URI uri = null;269try {270uri = new URI (req[1]);271msg = new HttpTransaction (this, cmd, uri, mhead, data, null, key);272cb.request (msg);273} catch (URISyntaxException e) {274System.err.println ("Invalid URI: " + e);275msg = new HttpTransaction (this, cmd, null, null, null, null, key);276msg.sendResponse (501, "Whatever");277}278res = false;279} catch (IOException e) {280res = true;281}282return res;283}284285byte[] readNormalData (InputStream is, int len) throws IOException {286byte [] buf = new byte [len];287int c, off=0, remain=len;288while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {289remain -= c;290off += c;291}292return buf;293}294295private void readCRLF(InputStream is) throws IOException {296int cr = is.read();297int lf = is.read();298299if (((cr & 0xff) != 0x0d) ||300((lf & 0xff) != 0x0a)) {301throw new IOException(302"Expected <CR><LF>: got '" + cr + "/" + lf + "'");303}304}305306byte[] readChunkedData (InputStream is) throws IOException {307LinkedList l = new LinkedList ();308int total = 0;309for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {310l.add (readNormalData(is, len));311total += len;312readCRLF(is); // CRLF at end of chunk313}314readCRLF(is); // CRLF at end of Chunked Stream.315byte[] buf = new byte [total];316Iterator i = l.iterator();317int x = 0;318while (i.hasNext()) {319byte[] b = (byte[])i.next();320System.arraycopy (b, 0, buf, x, b.length);321x += b.length;322}323return buf;324}325326private int readChunkLen (InputStream is) throws IOException {327int c, len=0;328boolean done=false, readCR=false;329while (!done) {330c = is.read ();331if (c == '\n' && readCR) {332done = true;333} else {334if (c == '\r' && !readCR) {335readCR = true;336} else {337int x=0;338if (c >= 'a' && c <= 'f') {339x = c - 'a' + 10;340} else if (c >= 'A' && c <= 'F') {341x = c - 'A' + 10;342} else if (c >= '0' && c <= '9') {343x = c - '0';344}345len = len * 16 + x;346}347}348}349return len;350}351352private String readLine (InputStream is) throws IOException {353boolean done=false, readCR=false;354byte[] b = new byte [512];355int c, l = 0;356357while (!done) {358c = is.read ();359if (c == '\n' && readCR) {360done = true;361} else {362if (c == '\r' && !readCR) {363readCR = true;364} else {365b[l++] = (byte)c;366}367}368}369return new String (b);370}371372/** close the channel associated with the current key by:373* 1. shutdownOutput (send a FIN)374* 2. mark the key so that incoming data is to be consumed and discarded375* 3. After a period, close the socket376*/377378synchronized void orderlyCloseChannel (SelectionKey key) throws IOException {379SocketChannel ch = (SocketChannel)key.channel ();380System.out.println("SERVER: orderlyCloseChannel chan[" + ch + "]");381ch.socket().shutdownOutput();382key.attach (this);383clist.add (key);384}385386synchronized void abortiveCloseChannel (SelectionKey key) throws IOException {387SocketChannel ch = (SocketChannel)key.channel ();388System.out.println("SERVER: abortiveCloseChannel chan[" + ch + "]");389390Socket s = ch.socket ();391s.setSoLinger (true, 0);392ch.close();393}394}395396397/**398* Implements blocking reading semantics on top of a non-blocking channel399*/400401static class NioInputStream extends InputStream {402SocketChannel channel;403Selector selector;404ByteBuffer chanbuf;405SelectionKey key;406int available;407byte[] one;408boolean closed;409ByteBuffer markBuf; /* reads may be satisifed from this buffer */410boolean marked;411boolean reset;412int readlimit;413414public NioInputStream (SocketChannel chan) throws IOException {415this.channel = chan;416selector = Selector.open();417chanbuf = ByteBuffer.allocate (1024);418key = chan.register (selector, SelectionKey.OP_READ);419available = 0;420one = new byte[1];421closed = marked = reset = false;422}423424public synchronized int read (byte[] b) throws IOException {425return read (b, 0, b.length);426}427428public synchronized int read () throws IOException {429return read (one, 0, 1);430}431432public synchronized int read (byte[] b, int off, int srclen) throws IOException {433434int canreturn, willreturn;435436if (closed)437return -1;438439if (reset) { /* satisfy from markBuf */440canreturn = markBuf.remaining ();441willreturn = canreturn>srclen ? srclen : canreturn;442markBuf.get(b, off, willreturn);443if (canreturn == willreturn) {444reset = false;445}446} else { /* satisfy from channel */447canreturn = available();448if (canreturn == 0) {449block ();450canreturn = available();451}452willreturn = canreturn>srclen ? srclen : canreturn;453chanbuf.get(b, off, willreturn);454available -= willreturn;455456if (marked) { /* copy into markBuf */457try {458markBuf.put (b, off, willreturn);459} catch (BufferOverflowException e) {460marked = false;461}462}463}464return willreturn;465}466467public synchronized int available () throws IOException {468if (closed)469throw new IOException ("Stream is closed");470471if (reset)472return markBuf.remaining();473474if (available > 0)475return available;476477chanbuf.clear ();478available = channel.read (chanbuf);479if (available > 0)480chanbuf.flip();481else if (available == -1)482throw new IOException ("Stream is closed");483return available;484}485486/**487* block() only called when available==0 and buf is empty488*/489private synchronized void block () throws IOException {490//assert available == 0;491int n = selector.select ();492//assert n == 1;493selector.selectedKeys().clear();494available ();495}496497public void close () throws IOException {498if (closed)499return;500channel.close ();501closed = true;502}503504public synchronized void mark (int readlimit) {505if (closed)506return;507this.readlimit = readlimit;508markBuf = ByteBuffer.allocate (readlimit);509marked = true;510reset = false;511}512513public synchronized void reset () throws IOException {514if (closed )515return;516if (!marked)517throw new IOException ("Stream not marked");518marked = false;519reset = true;520markBuf.flip ();521}522}523524static class NioOutputStream extends OutputStream {525SocketChannel channel;526ByteBuffer buf;527SelectionKey key;528Selector selector;529boolean closed;530byte[] one;531532public NioOutputStream (SocketChannel channel) throws IOException {533this.channel = channel;534selector = Selector.open ();535key = channel.register (selector, SelectionKey.OP_WRITE);536closed = false;537one = new byte [1];538}539540public synchronized void write (int b) throws IOException {541one[0] = (byte)b;542write (one, 0, 1);543}544545public synchronized void write (byte[] b) throws IOException {546write (b, 0, b.length);547}548549public synchronized void write (byte[] b, int off, int len) throws IOException {550if (closed)551throw new IOException ("stream is closed");552553buf = ByteBuffer.allocate (len);554buf.put (b, off, len);555buf.flip ();556int n;557while ((n = channel.write (buf)) < len) {558len -= n;559if (len == 0)560return;561selector.select ();562selector.selectedKeys().clear ();563}564}565566public void close () throws IOException {567if (closed)568return;569channel.close ();570closed = true;571}572}573574/**575* Utilities for synchronization. A condition is576* identified by a string name, and is initialized577* upon first use (ie. setCondition() or waitForCondition()). Threads578* are blocked until some thread calls (or has called) setCondition() for the same579* condition.580* <P>581* A rendezvous built on a condition is also provided for synchronizing582* N threads.583*/584585private static HashMap conditions = new HashMap();586587/*588* Modifiable boolean object589*/590private static class BValue {591boolean v;592}593594/*595* Modifiable int object596*/597private static class IValue {598int v;599IValue (int i) {600v =i;601}602}603604605private static BValue getCond (String condition) {606synchronized (conditions) {607BValue cond = (BValue) conditions.get (condition);608if (cond == null) {609cond = new BValue();610conditions.put (condition, cond);611}612return cond;613}614}615616/**617* Set the condition to true. Any threads that are currently blocked618* waiting on the condition, will be unblocked and allowed to continue.619* Threads that subsequently call waitForCondition() will not block.620* If the named condition did not exist prior to the call, then it is created621* first.622*/623624public static void setCondition (String condition) {625BValue cond = getCond (condition);626synchronized (cond) {627if (cond.v) {628return;629}630cond.v = true;631cond.notifyAll();632}633}634635/**636* If the named condition does not exist, then it is created and initialized637* to false. If the condition exists or has just been created and its value638* is false, then the thread blocks until another thread sets the condition.639* If the condition exists and is already set to true, then this call returns640* immediately without blocking.641*/642643public static void waitForCondition (String condition) {644BValue cond = getCond (condition);645synchronized (cond) {646if (!cond.v) {647try {648cond.wait();649} catch (InterruptedException e) {}650}651}652}653654/* conditions must be locked when accessing this */655static HashMap rv = new HashMap();656657/**658* Force N threads to rendezvous (ie. wait for each other) before proceeding.659* The first thread(s) to call are blocked until the last660* thread makes the call. Then all threads continue.661* <p>662* All threads that call with the same condition name, must use the same value663* for N (or the results may be not be as expected).664* <P>665* Obviously, if fewer than N threads make the rendezvous then the result666* will be a hang.667*/668669public static void rendezvous (String condition, int N) {670BValue cond;671IValue iv;672String name = "RV_"+condition;673674/* get the condition */675676synchronized (conditions) {677cond = (BValue)conditions.get (name);678if (cond == null) {679/* we are first caller */680if (N < 2) {681throw new RuntimeException ("rendezvous must be called with N >= 2");682}683cond = new BValue ();684conditions.put (name, cond);685iv = new IValue (N-1);686rv.put (name, iv);687} else {688/* already initialised, just decrement the counter */689iv = (IValue) rv.get (name);690iv.v --;691}692}693694if (iv.v > 0) {695waitForCondition (name);696} else {697setCondition (name);698synchronized (conditions) {699clearCondition (name);700rv.remove (name);701}702}703}704705/**706* If the named condition exists and is set then remove it, so it can707* be re-initialized and used again. If the condition does not exist, or708* exists but is not set, then the call returns without doing anything.709* Note, some higher level synchronization710* may be needed between clear and the other operations.711*/712713public static void clearCondition(String condition) {714BValue cond;715synchronized (conditions) {716cond = (BValue) conditions.get (condition);717if (cond == null) {718return;719}720synchronized (cond) {721if (cond.v) {722conditions.remove (condition);723}724}725}726}727}728729730