Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/demo/jni/Poller/PollingServer.java
32287 views
/*1* Copyright (c) 1999, 2011, Oracle and/or its affiliates. All rights reserved.2*3* Redistribution and use in source and binary forms, with or without4* modification, are permitted provided that the following conditions5* are met:6*7* - Redistributions of source code must retain the above copyright8* notice, this list of conditions and the following disclaimer.9*10* - Redistributions in binary form must reproduce the above copyright11* notice, this list of conditions and the following disclaimer in the12* documentation and/or other materials provided with the distribution.13*14* - Neither the name of Oracle nor the names of its15* contributors may be used to endorse or promote products derived16* from this software without specific prior written permission.17*18* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS19* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,20* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR21* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR22* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,23* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,24* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR25* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF26* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING27* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS28* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.29*/3031/*32* This source code is provided to illustrate the usage of a given feature33* or technique and has been deliberately simplified. Additional steps34* required for a production-quality application, such as security checks,35* input validation and proper error handling, might not be present in36* this sample code.37*/383940import java.io.*;41import java.net.*;42import java.lang.Byte;4344/**45* Simple Java "server" using the Poller class46* to multiplex on incoming connections. Note47* that handoff of events, via linked Q is not48* actually be a performance booster here, since49* the processing of events is cheaper than50* the overhead in scheduling/executing them.51* Although this demo does allow for concurrency52* in handling connections, it uses a rather53* primitive "gang scheduling" policy to keep54* the code simpler.55*/5657public class PollingServer58{59public final static int MAXCONN = 10000;60public final static int PORTNUM = 4444;61public final static int BYTESPEROP = 10;6263/**64* This synchronization object protects access to certain65* data (bytesRead,eventsToProcess) by concurrent Consumer threads.66*/67private final static Object eventSync = new Object();6869private static InputStream[] instr = new InputStream[MAXCONN];70private static int[] mapping = new int[65535];71private static LinkedQueue linkedQ = new LinkedQueue();72private static int bytesRead = 0;73private static int bytesToRead;74private static int eventsToProcess=0;7576public PollingServer(int concurrency) {77Socket[] sockArr = new Socket[MAXCONN];78long timestart, timestop;79short[] revents = new short[MAXCONN];80int[] fds = new int[MAXCONN];81int bytes;82Poller Mux;83int serverFd;84int totalConn=0;85int connects=0;8687System.out.println ("Serv: Initializing port " + PORTNUM);88try {8990ServerSocket skMain = new ServerSocket (PORTNUM);91/*92* Create the Poller object Mux, allow for up to MAXCONN93* sockets/filedescriptors to be polled.94*/95Mux = new Poller(MAXCONN);96serverFd = Mux.add(skMain, Poller.POLLIN);9798Socket ctrlSock = skMain.accept();99100BufferedReader ctrlReader =101new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));102String ctrlString = ctrlReader.readLine();103bytesToRead = Integer.valueOf(ctrlString).intValue();104ctrlString = ctrlReader.readLine();105totalConn = Integer.valueOf(ctrlString).intValue();106107System.out.println("Receiving " + bytesToRead + " bytes from " +108totalConn + " client connections");109110timestart = System.currentTimeMillis();111112/*113* Start the consumer threads to read data.114*/115for (int consumerThread = 0;116consumerThread < concurrency; consumerThread++ ) {117new Consumer(consumerThread).start();118}119120/*121* Take connections, read Data122*/123int numEvents=0;124125while ( bytesRead < bytesToRead ) {126127int loopWaits=0;128while (eventsToProcess > 0) {129synchronized (eventSync) {130loopWaits++;131if (eventsToProcess <= 0) break;132try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};133}134}135if (loopWaits > 1)136System.out.println("Done waiting...loops = " + loopWaits +137" events " + numEvents +138" bytes read : " + bytesRead );139140if (bytesRead >= bytesToRead) break; // may be done!141142/*143* Wait for events144*/145numEvents = Mux.waitMultiple(100, fds, revents);146synchronized (eventSync) {147eventsToProcess = numEvents;148}149/*150* Process all the events we got from Mux.waitMultiple151*/152int cnt = 0;153while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {154int fd = fds[cnt];155156if (revents[cnt] == Poller.POLLIN) {157if (fd == serverFd) {158/*159* New connection coming in on the ServerSocket160* Add the socket to the Mux, keep track of mapping161* the fdval returned by Mux.add to the connection.162*/163sockArr[connects] = skMain.accept();164instr[connects] = sockArr[connects].getInputStream();165int fdval = Mux.add(sockArr[connects], Poller.POLLIN);166mapping[fdval] = connects;167synchronized(eventSync) {168eventsToProcess--; // just processed this one!169}170connects++;171} else {172/*173* We've got data from this client connection.174* Put it on the queue for the consumer threads to process.175*/176linkedQ.put(new Integer(fd));177}178} else {179System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);180}181cnt++;182}183}184timestop = System.currentTimeMillis();185System.out.println("Time for all reads (" + totalConn +186" sockets) : " + (timestop-timestart));187188// Tell the client it can now go away189byte[] buff = new byte[BYTESPEROP];190ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);191192// Tell the cunsumer threads they can exit.193for (int cThread = 0; cThread < concurrency; cThread++ ) {194linkedQ.put(new Integer(-1));195}196} catch (Exception exc) { exc.printStackTrace(); }197}198199/*200* main ... just check if a concurrency was specified201*/202public static void main (String args[])203{204int concurrency;205206if (args.length == 1)207concurrency = java.lang.Integer.valueOf(args[0]).intValue();208else209concurrency = Poller.getNumCPUs() + 1;210PollingServer server = new PollingServer(concurrency);211}212213/*214* This class is for handling the Client data.215* The PollingServer spawns off a number of these based upon216* the number of CPUs (or concurrency argument).217* Each just loops grabbing events off the queue and218* processing them.219*/220class Consumer extends Thread {221private int threadNumber;222public Consumer(int i) { threadNumber = i; }223224public void run() {225byte[] buff = new byte[BYTESPEROP];226int bytes = 0;227228InputStream instream;229while (bytesRead < bytesToRead) {230try {231Integer Fd = (Integer) linkedQ.take();232int fd = Fd.intValue();233if (fd == -1) break; /* got told we could exit */234235/*236* We have to map the fd value returned from waitMultiple237* to the actual input stream associated with that fd.238* Take a look at how the Mux.add() was done to see how239* we stored that.240*/241int map = mapping[fd];242instream = instr[map];243bytes = instream.read(buff,0,BYTESPEROP);244} catch (Exception e) { System.out.println(e.toString()); }245246if (bytes > 0) {247/*248* Any real server would do some synchronized and some249* unsynchronized work on behalf of the client, and250* most likely send some data back...but this is a251* gross oversimplification.252*/253synchronized(eventSync) {254bytesRead += bytes;255eventsToProcess--;256if (eventsToProcess <= 0) {257eventSync.notify();258}259}260}261}262}263}264}265266267