Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/demo/jni/Poller/PollingServer.java
32287 views
1
/*
2
* Copyright (c) 1999, 2011, Oracle and/or its affiliates. All rights reserved.
3
*
4
* Redistribution and use in source and binary forms, with or without
5
* modification, are permitted provided that the following conditions
6
* are met:
7
*
8
* - Redistributions of source code must retain the above copyright
9
* notice, this list of conditions and the following disclaimer.
10
*
11
* - Redistributions in binary form must reproduce the above copyright
12
* notice, this list of conditions and the following disclaimer in the
13
* documentation and/or other materials provided with the distribution.
14
*
15
* - Neither the name of Oracle nor the names of its
16
* contributors may be used to endorse or promote products derived
17
* from this software without specific prior written permission.
18
*
19
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20
* IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
23
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30
*/
31
32
/*
33
* This source code is provided to illustrate the usage of a given feature
34
* or technique and has been deliberately simplified. Additional steps
35
* required for a production-quality application, such as security checks,
36
* input validation and proper error handling, might not be present in
37
* this sample code.
38
*/
39
40
41
import java.io.*;
42
import java.net.*;
43
import java.lang.Byte;
44
45
/**
46
* Simple Java "server" using the Poller class
47
* to multiplex on incoming connections. Note
48
* that handoff of events, via linked Q is not
49
* actually be a performance booster here, since
50
* the processing of events is cheaper than
51
* the overhead in scheduling/executing them.
52
* Although this demo does allow for concurrency
53
* in handling connections, it uses a rather
54
* primitive "gang scheduling" policy to keep
55
* the code simpler.
56
*/
57
58
public class PollingServer
59
{
60
public final static int MAXCONN = 10000;
61
public final static int PORTNUM = 4444;
62
public final static int BYTESPEROP = 10;
63
64
/**
65
* This synchronization object protects access to certain
66
* data (bytesRead,eventsToProcess) by concurrent Consumer threads.
67
*/
68
private final static Object eventSync = new Object();
69
70
private static InputStream[] instr = new InputStream[MAXCONN];
71
private static int[] mapping = new int[65535];
72
private static LinkedQueue linkedQ = new LinkedQueue();
73
private static int bytesRead = 0;
74
private static int bytesToRead;
75
private static int eventsToProcess=0;
76
77
public PollingServer(int concurrency) {
78
Socket[] sockArr = new Socket[MAXCONN];
79
long timestart, timestop;
80
short[] revents = new short[MAXCONN];
81
int[] fds = new int[MAXCONN];
82
int bytes;
83
Poller Mux;
84
int serverFd;
85
int totalConn=0;
86
int connects=0;
87
88
System.out.println ("Serv: Initializing port " + PORTNUM);
89
try {
90
91
ServerSocket skMain = new ServerSocket (PORTNUM);
92
/*
93
* Create the Poller object Mux, allow for up to MAXCONN
94
* sockets/filedescriptors to be polled.
95
*/
96
Mux = new Poller(MAXCONN);
97
serverFd = Mux.add(skMain, Poller.POLLIN);
98
99
Socket ctrlSock = skMain.accept();
100
101
BufferedReader ctrlReader =
102
new BufferedReader(new InputStreamReader(ctrlSock.getInputStream()));
103
String ctrlString = ctrlReader.readLine();
104
bytesToRead = Integer.valueOf(ctrlString).intValue();
105
ctrlString = ctrlReader.readLine();
106
totalConn = Integer.valueOf(ctrlString).intValue();
107
108
System.out.println("Receiving " + bytesToRead + " bytes from " +
109
totalConn + " client connections");
110
111
timestart = System.currentTimeMillis();
112
113
/*
114
* Start the consumer threads to read data.
115
*/
116
for (int consumerThread = 0;
117
consumerThread < concurrency; consumerThread++ ) {
118
new Consumer(consumerThread).start();
119
}
120
121
/*
122
* Take connections, read Data
123
*/
124
int numEvents=0;
125
126
while ( bytesRead < bytesToRead ) {
127
128
int loopWaits=0;
129
while (eventsToProcess > 0) {
130
synchronized (eventSync) {
131
loopWaits++;
132
if (eventsToProcess <= 0) break;
133
try { eventSync.wait(); } catch (Exception e) {e.printStackTrace();};
134
}
135
}
136
if (loopWaits > 1)
137
System.out.println("Done waiting...loops = " + loopWaits +
138
" events " + numEvents +
139
" bytes read : " + bytesRead );
140
141
if (bytesRead >= bytesToRead) break; // may be done!
142
143
/*
144
* Wait for events
145
*/
146
numEvents = Mux.waitMultiple(100, fds, revents);
147
synchronized (eventSync) {
148
eventsToProcess = numEvents;
149
}
150
/*
151
* Process all the events we got from Mux.waitMultiple
152
*/
153
int cnt = 0;
154
while ( (cnt < numEvents) && (bytesRead < bytesToRead) ) {
155
int fd = fds[cnt];
156
157
if (revents[cnt] == Poller.POLLIN) {
158
if (fd == serverFd) {
159
/*
160
* New connection coming in on the ServerSocket
161
* Add the socket to the Mux, keep track of mapping
162
* the fdval returned by Mux.add to the connection.
163
*/
164
sockArr[connects] = skMain.accept();
165
instr[connects] = sockArr[connects].getInputStream();
166
int fdval = Mux.add(sockArr[connects], Poller.POLLIN);
167
mapping[fdval] = connects;
168
synchronized(eventSync) {
169
eventsToProcess--; // just processed this one!
170
}
171
connects++;
172
} else {
173
/*
174
* We've got data from this client connection.
175
* Put it on the queue for the consumer threads to process.
176
*/
177
linkedQ.put(new Integer(fd));
178
}
179
} else {
180
System.out.println("Got revents[" + cnt + "] == " + revents[cnt]);
181
}
182
cnt++;
183
}
184
}
185
timestop = System.currentTimeMillis();
186
System.out.println("Time for all reads (" + totalConn +
187
" sockets) : " + (timestop-timestart));
188
189
// Tell the client it can now go away
190
byte[] buff = new byte[BYTESPEROP];
191
ctrlSock.getOutputStream().write(buff,0,BYTESPEROP);
192
193
// Tell the cunsumer threads they can exit.
194
for (int cThread = 0; cThread < concurrency; cThread++ ) {
195
linkedQ.put(new Integer(-1));
196
}
197
} catch (Exception exc) { exc.printStackTrace(); }
198
}
199
200
/*
201
* main ... just check if a concurrency was specified
202
*/
203
public static void main (String args[])
204
{
205
int concurrency;
206
207
if (args.length == 1)
208
concurrency = java.lang.Integer.valueOf(args[0]).intValue();
209
else
210
concurrency = Poller.getNumCPUs() + 1;
211
PollingServer server = new PollingServer(concurrency);
212
}
213
214
/*
215
* This class is for handling the Client data.
216
* The PollingServer spawns off a number of these based upon
217
* the number of CPUs (or concurrency argument).
218
* Each just loops grabbing events off the queue and
219
* processing them.
220
*/
221
class Consumer extends Thread {
222
private int threadNumber;
223
public Consumer(int i) { threadNumber = i; }
224
225
public void run() {
226
byte[] buff = new byte[BYTESPEROP];
227
int bytes = 0;
228
229
InputStream instream;
230
while (bytesRead < bytesToRead) {
231
try {
232
Integer Fd = (Integer) linkedQ.take();
233
int fd = Fd.intValue();
234
if (fd == -1) break; /* got told we could exit */
235
236
/*
237
* We have to map the fd value returned from waitMultiple
238
* to the actual input stream associated with that fd.
239
* Take a look at how the Mux.add() was done to see how
240
* we stored that.
241
*/
242
int map = mapping[fd];
243
instream = instr[map];
244
bytes = instream.read(buff,0,BYTESPEROP);
245
} catch (Exception e) { System.out.println(e.toString()); }
246
247
if (bytes > 0) {
248
/*
249
* Any real server would do some synchronized and some
250
* unsynchronized work on behalf of the client, and
251
* most likely send some data back...but this is a
252
* gross oversimplification.
253
*/
254
synchronized(eventSync) {
255
bytesRead += bytes;
256
eventsToProcess--;
257
if (eventsToProcess <= 0) {
258
eventSync.notify();
259
}
260
}
261
}
262
}
263
}
264
}
265
}
266
267