Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/test/sun/net/www/protocol/https/TestHttpsServer.java
38867 views
1
/*
2
* Copyright (c) 2002, 2012, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation.
8
*
9
* This code is distributed in the hope that it will be useful, but WITHOUT
10
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12
* version 2 for more details (a copy is included in the LICENSE file that
13
* accompanied this code).
14
*
15
* You should have received a copy of the GNU General Public License version
16
* 2 along with this work; if not, write to the Free Software Foundation,
17
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
*
19
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20
* or visit www.oracle.com if you need additional information or have any
21
* questions.
22
*/
23
24
import java.net.*;
25
import java.io.*;
26
import java.nio.*;
27
import java.nio.channels.*;
28
import sun.net.www.MessageHeader;
29
import java.util.*;
30
import javax.net.ssl.*;
31
import javax.net.ssl.SSLEngineResult.*;
32
import java.security.*;
33
34
/**
35
* This class implements a simple HTTPS server. It uses multiple threads to
36
* handle connections in parallel, and will spin off a new thread to handle
37
* each request. (this is easier to implement with SSLEngine)
38
* <p>
39
* It must be instantiated with a {@link HttpCallback} object to which
40
* requests are given and must be handled.
41
* <p>
42
* Simple synchronization between the client(s) and server can be done
43
* using the {@link #waitForCondition(String)}, {@link #setCondition(String)} and
44
* {@link #rendezvous(String,int)} methods.
45
*
46
* NOTE NOTE NOTE NOTE NOTE NOTE NOTE
47
*
48
* If you make a change in here, please don't forget to make the
49
* corresponding change in the J2SE equivalent.
50
*
51
* NOTE NOTE NOTE NOTE NOTE NOTE NOTE
52
*/
53
54
public class TestHttpsServer {
55
56
ServerSocketChannel schan;
57
int threads;
58
int cperthread;
59
HttpCallback cb;
60
Server[] servers;
61
62
// ssl related fields
63
static SSLContext sslCtx;
64
65
/**
66
* Create a <code>TestHttpsServer<code> instance with the specified callback object
67
* for handling requests. One thread is created to handle requests,
68
* and up to ten TCP connections will be handled simultaneously.
69
* @param cb the callback object which is invoked to handle each
70
* incoming request
71
*/
72
73
public TestHttpsServer (HttpCallback cb) throws IOException {
74
this (cb, 1, 10, 0);
75
}
76
77
/**
78
* Create a <code>TestHttpsServer<code> instance with the specified number of
79
* threads and maximum number of connections per thread. This functions
80
* the same as the 4 arg constructor, where the port argument is set to zero.
81
* @param cb the callback object which is invoked to handle each
82
* incoming request
83
* @param threads the number of threads to create to handle requests
84
* in parallel
85
* @param cperthread the number of simultaneous TCP connections to
86
* handle per thread
87
*/
88
89
public TestHttpsServer (HttpCallback cb, int threads, int cperthread)
90
throws IOException {
91
this (cb, threads, cperthread, 0);
92
}
93
94
/**
95
* Create a <code>TestHttpsServer<code> instance with the specified number
96
* of threads and maximum number of connections per thread and running on
97
* the specified port. The specified number of threads are created to
98
* handle incoming requests, and each thread is allowed
99
* to handle a number of simultaneous TCP connections.
100
* @param cb the callback object which is invoked to handle
101
* each incoming request
102
* @param threads the number of threads to create to handle
103
* requests in parallel
104
* @param cperthread the number of simultaneous TCP connections
105
* to handle per thread
106
* @param port the port number to bind the server to. <code>Zero</code>
107
* means choose any free port.
108
*/
109
110
public TestHttpsServer (HttpCallback cb, int threads, int cperthread, int port)
111
throws IOException {
112
schan = ServerSocketChannel.open ();
113
InetSocketAddress addr = new InetSocketAddress (port);
114
schan.socket().bind (addr);
115
this.threads = threads;
116
this.cb = cb;
117
this.cperthread = cperthread;
118
119
try {
120
// create and initialize a SSLContext
121
KeyStore ks = KeyStore.getInstance("JKS");
122
KeyStore ts = KeyStore.getInstance("JKS");
123
char[] passphrase = "passphrase".toCharArray();
124
125
ks.load(new FileInputStream(System.getProperty("javax.net.ssl.keyStore")), passphrase);
126
ts.load(new FileInputStream(System.getProperty("javax.net.ssl.trustStore")), passphrase);
127
128
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
129
kmf.init(ks, passphrase);
130
131
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
132
tmf.init(ts);
133
134
sslCtx = SSLContext.getInstance("TLS");
135
136
sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
137
138
servers = new Server [threads];
139
for (int i=0; i<threads; i++) {
140
servers[i] = new Server (cb, schan, cperthread);
141
servers[i].start();
142
}
143
} catch (Exception ex) {
144
throw new RuntimeException("test failed. cause: "+ex.getMessage());
145
}
146
}
147
148
/** Tell all threads in the server to exit within 5 seconds.
149
* This is an abortive termination. Just prior to the thread exiting
150
* all channels in that thread waiting to be closed are forceably closed.
151
*/
152
153
public void terminate () {
154
for (int i=0; i<threads; i++) {
155
servers[i].terminate ();
156
}
157
}
158
159
/**
160
* return the local port number to which the server is bound.
161
* @return the local port number
162
*/
163
164
public int getLocalPort () {
165
return schan.socket().getLocalPort ();
166
}
167
168
static class Server extends Thread {
169
170
ServerSocketChannel schan;
171
Selector selector;
172
SelectionKey listenerKey;
173
SelectionKey key; /* the current key being processed */
174
HttpCallback cb;
175
ByteBuffer consumeBuffer;
176
int maxconn;
177
int nconn;
178
ClosedChannelList clist;
179
boolean shutdown;
180
181
Server (HttpCallback cb, ServerSocketChannel schan, int maxconn) {
182
this.schan = schan;
183
this.maxconn = maxconn;
184
this.cb = cb;
185
nconn = 0;
186
consumeBuffer = ByteBuffer.allocate (512);
187
clist = new ClosedChannelList ();
188
try {
189
selector = Selector.open ();
190
schan.configureBlocking (false);
191
listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
192
} catch (IOException e) {
193
System.err.println ("Server could not start: " + e);
194
}
195
}
196
197
/* Stop the thread as soon as possible */
198
public synchronized void terminate () {
199
shutdown = true;
200
}
201
202
public void run () {
203
try {
204
while (true) {
205
selector.select (1000);
206
Set selected = selector.selectedKeys();
207
Iterator iter = selected.iterator();
208
while (iter.hasNext()) {
209
key = (SelectionKey)iter.next();
210
if (key.equals (listenerKey)) {
211
SocketChannel sock = schan.accept ();
212
if (sock == null) {
213
/* false notification */
214
iter.remove();
215
continue;
216
}
217
sock.configureBlocking (true);
218
SSLEngine sslEng = sslCtx.createSSLEngine();
219
sslEng.setUseClientMode(false);
220
new ServerWorker(cb, sock, sslEng).start();
221
nconn ++;
222
if (nconn == maxconn) {
223
/* deregister */
224
listenerKey.cancel ();
225
listenerKey = null;
226
}
227
} else {
228
if (key.isReadable()) {
229
boolean closed = false;
230
SocketChannel chan = (SocketChannel) key.channel();
231
if (key.attachment() != null) {
232
closed = consume (chan);
233
}
234
235
if (closed) {
236
chan.close ();
237
key.cancel ();
238
if (nconn == maxconn) {
239
listenerKey = schan.register (selector, SelectionKey.OP_ACCEPT);
240
}
241
nconn --;
242
}
243
}
244
}
245
iter.remove();
246
}
247
clist.check();
248
249
synchronized (this) {
250
if (shutdown) {
251
clist.terminate ();
252
return;
253
}
254
}
255
}
256
} catch (IOException e) {
257
System.out.println ("Server exception: " + e);
258
// TODO finish
259
}
260
}
261
262
/* read all the data off the channel without looking at it
263
* return true if connection closed
264
*/
265
boolean consume (SocketChannel chan) {
266
try {
267
consumeBuffer.clear ();
268
int c = chan.read (consumeBuffer);
269
if (c == -1)
270
return true;
271
} catch (IOException e) {
272
return true;
273
}
274
return false;
275
}
276
}
277
278
static class ServerWorker extends Thread {
279
private ByteBuffer inNetBB;
280
private ByteBuffer outNetBB;
281
private ByteBuffer inAppBB;
282
private ByteBuffer outAppBB;
283
284
SSLEngine sslEng;
285
SocketChannel schan;
286
HttpCallback cb;
287
HandshakeStatus currentHSStatus;
288
boolean initialHSComplete;
289
/*
290
* All inbound data goes through this buffer.
291
*
292
* It might be nice to use a cache of ByteBuffers so we're
293
* not alloc/dealloc'ing all over the place.
294
*/
295
296
/*
297
* Application buffers, also used for handshaking
298
*/
299
private int appBBSize;
300
301
ServerWorker (HttpCallback cb, SocketChannel schan, SSLEngine sslEng) {
302
this.sslEng = sslEng;
303
this.schan = schan;
304
this.cb = cb;
305
currentHSStatus = HandshakeStatus.NEED_UNWRAP;
306
initialHSComplete = false;
307
int netBBSize = sslEng.getSession().getPacketBufferSize();
308
inNetBB = ByteBuffer.allocate(netBBSize);
309
outNetBB = ByteBuffer.allocate(netBBSize);
310
appBBSize = sslEng.getSession().getApplicationBufferSize();
311
inAppBB = ByteBuffer.allocate(appBBSize);
312
outAppBB = ByteBuffer.allocate(appBBSize);
313
}
314
315
public SSLEngine getSSLEngine() {
316
return sslEng;
317
}
318
319
public ByteBuffer outNetBB() {
320
return outNetBB;
321
}
322
323
public ByteBuffer outAppBB() {
324
return outAppBB;
325
}
326
327
public void run () {
328
try {
329
SSLEngineResult result;
330
331
while (!initialHSComplete) {
332
333
switch (currentHSStatus) {
334
335
case NEED_UNWRAP:
336
int bytes = schan.read(inNetBB);
337
338
needIO:
339
while (currentHSStatus == HandshakeStatus.NEED_UNWRAP) {
340
/*
341
* Don't need to resize requestBB, since no app data should
342
* be generated here.
343
*/
344
inNetBB.flip();
345
result = sslEng.unwrap(inNetBB, inAppBB);
346
inNetBB.compact();
347
currentHSStatus = result.getHandshakeStatus();
348
349
switch (result.getStatus()) {
350
351
case OK:
352
switch (currentHSStatus) {
353
case NOT_HANDSHAKING:
354
throw new IOException(
355
"Not handshaking during initial handshake");
356
357
case NEED_TASK:
358
Runnable task;
359
while ((task = sslEng.getDelegatedTask()) != null) {
360
task.run();
361
currentHSStatus = sslEng.getHandshakeStatus();
362
}
363
break;
364
}
365
366
break;
367
368
case BUFFER_UNDERFLOW:
369
break needIO;
370
371
default: // BUFFER_OVERFLOW/CLOSED:
372
throw new IOException("Received" + result.getStatus() +
373
"during initial handshaking");
374
}
375
}
376
377
/*
378
* Just transitioned from read to write.
379
*/
380
if (currentHSStatus != HandshakeStatus.NEED_WRAP) {
381
break;
382
}
383
384
// Fall through and fill the write buffer.
385
386
case NEED_WRAP:
387
/*
388
* The flush above guarantees the out buffer to be empty
389
*/
390
outNetBB.clear();
391
result = sslEng.wrap(inAppBB, outNetBB);
392
outNetBB.flip();
393
schan.write (outNetBB);
394
outNetBB.compact();
395
currentHSStatus = result.getHandshakeStatus();
396
397
switch (result.getStatus()) {
398
case OK:
399
400
if (currentHSStatus == HandshakeStatus.NEED_TASK) {
401
Runnable task;
402
while ((task = sslEng.getDelegatedTask()) != null) {
403
task.run();
404
currentHSStatus = sslEng.getHandshakeStatus();
405
}
406
}
407
408
break;
409
410
default: // BUFFER_OVERFLOW/BUFFER_UNDERFLOW/CLOSED:
411
throw new IOException("Received" + result.getStatus() +
412
"during initial handshaking");
413
}
414
break;
415
416
case FINISHED:
417
initialHSComplete = true;
418
break;
419
default: // NOT_HANDSHAKING/NEED_TASK
420
throw new RuntimeException("Invalid Handshaking State" +
421
currentHSStatus);
422
} // switch
423
}
424
// read the application data; using non-blocking mode
425
schan.configureBlocking(false);
426
read(schan, sslEng);
427
} catch (Exception ex) {
428
throw new RuntimeException(ex);
429
}
430
}
431
432
/* return true if the connection is closed, false otherwise */
433
434
private boolean read (SocketChannel chan, SSLEngine sslEng) {
435
HttpTransaction msg;
436
boolean res;
437
try {
438
InputStream is = new BufferedInputStream (new NioInputStream (chan, sslEng, inNetBB, inAppBB));
439
String requestline = readLine (is);
440
MessageHeader mhead = new MessageHeader (is);
441
String clen = mhead.findValue ("Content-Length");
442
String trferenc = mhead.findValue ("Transfer-Encoding");
443
String data = null;
444
if (trferenc != null && trferenc.equals ("chunked"))
445
data = new String (readChunkedData (is));
446
else if (clen != null)
447
data = new String (readNormalData (is, Integer.parseInt (clen)));
448
String[] req = requestline.split (" ");
449
if (req.length < 2) {
450
/* invalid request line */
451
return false;
452
}
453
String cmd = req[0];
454
URI uri = null;
455
try {
456
uri = new URI (req[1]);
457
msg = new HttpTransaction (this, cmd, uri, mhead, data, null, chan);
458
cb.request (msg);
459
} catch (URISyntaxException e) {
460
System.err.println ("Invalid URI: " + e);
461
msg = new HttpTransaction (this, cmd, null, null, null, null, chan);
462
msg.sendResponse (501, "Whatever");
463
}
464
res = false;
465
} catch (IOException e) {
466
res = true;
467
}
468
return res;
469
}
470
471
byte[] readNormalData (InputStream is, int len) throws IOException {
472
byte [] buf = new byte [len];
473
int c, off=0, remain=len;
474
while (remain > 0 && ((c=is.read (buf, off, remain))>0)) {
475
remain -= c;
476
off += c;
477
}
478
return buf;
479
}
480
481
private void readCRLF(InputStream is) throws IOException {
482
int cr = is.read();
483
int lf = is.read();
484
485
if (((cr & 0xff) != 0x0d) ||
486
((lf & 0xff) != 0x0a)) {
487
throw new IOException(
488
"Expected <CR><LF>: got '" + cr + "/" + lf + "'");
489
}
490
}
491
492
byte[] readChunkedData (InputStream is) throws IOException {
493
LinkedList l = new LinkedList ();
494
int total = 0;
495
for (int len=readChunkLen(is); len!=0; len=readChunkLen(is)) {
496
l.add (readNormalData(is, len));
497
total += len;
498
readCRLF(is); // CRLF at end of chunk
499
}
500
readCRLF(is); // CRLF at end of Chunked Stream.
501
byte[] buf = new byte [total];
502
Iterator i = l.iterator();
503
int x = 0;
504
while (i.hasNext()) {
505
byte[] b = (byte[])i.next();
506
System.arraycopy (b, 0, buf, x, b.length);
507
x += b.length;
508
}
509
return buf;
510
}
511
512
private int readChunkLen (InputStream is) throws IOException {
513
int c, len=0;
514
boolean done=false, readCR=false;
515
while (!done) {
516
c = is.read ();
517
if (c == '\n' && readCR) {
518
done = true;
519
} else {
520
if (c == '\r' && !readCR) {
521
readCR = true;
522
} else {
523
int x=0;
524
if (c >= 'a' && c <= 'f') {
525
x = c - 'a' + 10;
526
} else if (c >= 'A' && c <= 'F') {
527
x = c - 'A' + 10;
528
} else if (c >= '0' && c <= '9') {
529
x = c - '0';
530
}
531
len = len * 16 + x;
532
}
533
}
534
}
535
return len;
536
}
537
538
private String readLine (InputStream is) throws IOException {
539
boolean done=false, readCR=false;
540
byte[] b = new byte [512];
541
int c, l = 0;
542
543
while (!done) {
544
c = is.read ();
545
if (c == '\n' && readCR) {
546
done = true;
547
} else {
548
if (c == '\r' && !readCR) {
549
readCR = true;
550
} else {
551
b[l++] = (byte)c;
552
}
553
}
554
}
555
return new String (b);
556
}
557
558
/** close the channel associated with the current key by:
559
* 1. shutdownOutput (send a FIN)
560
* 2. mark the key so that incoming data is to be consumed and discarded
561
* 3. After a period, close the socket
562
*/
563
564
synchronized void orderlyCloseChannel (SocketChannel ch) throws IOException {
565
ch.socket().shutdownOutput();
566
}
567
568
synchronized void abortiveCloseChannel (SocketChannel ch) throws IOException {
569
Socket s = ch.socket ();
570
s.setSoLinger (true, 0);
571
ch.close();
572
}
573
}
574
575
576
/**
577
* Implements blocking reading semantics on top of a non-blocking channel
578
*/
579
580
static class NioInputStream extends InputStream {
581
SSLEngine sslEng;
582
SocketChannel channel;
583
Selector selector;
584
ByteBuffer inNetBB;
585
ByteBuffer inAppBB;
586
SelectionKey key;
587
int available;
588
byte[] one;
589
boolean closed;
590
ByteBuffer markBuf; /* reads may be satisifed from this buffer */
591
boolean marked;
592
boolean reset;
593
int readlimit;
594
595
public NioInputStream (SocketChannel chan, SSLEngine sslEng, ByteBuffer inNetBB, ByteBuffer inAppBB) throws IOException {
596
this.sslEng = sslEng;
597
this.channel = chan;
598
selector = Selector.open();
599
this.inNetBB = inNetBB;
600
this.inAppBB = inAppBB;
601
key = chan.register (selector, SelectionKey.OP_READ);
602
available = 0;
603
one = new byte[1];
604
closed = marked = reset = false;
605
}
606
607
public synchronized int read (byte[] b) throws IOException {
608
return read (b, 0, b.length);
609
}
610
611
public synchronized int read () throws IOException {
612
return read (one, 0, 1);
613
}
614
615
public synchronized int read (byte[] b, int off, int srclen) throws IOException {
616
617
int canreturn, willreturn;
618
619
if (closed)
620
return -1;
621
622
if (reset) { /* satisfy from markBuf */
623
canreturn = markBuf.remaining ();
624
willreturn = canreturn>srclen ? srclen : canreturn;
625
markBuf.get(b, off, willreturn);
626
if (canreturn == willreturn) {
627
reset = false;
628
}
629
} else { /* satisfy from channel */
630
canreturn = available();
631
if (canreturn == 0) {
632
block ();
633
canreturn = available();
634
}
635
willreturn = canreturn>srclen ? srclen : canreturn;
636
inAppBB.get(b, off, willreturn);
637
available -= willreturn;
638
639
if (marked) { /* copy into markBuf */
640
try {
641
markBuf.put (b, off, willreturn);
642
} catch (BufferOverflowException e) {
643
marked = false;
644
}
645
}
646
}
647
return willreturn;
648
}
649
650
public synchronized int available () throws IOException {
651
if (closed)
652
throw new IOException ("Stream is closed");
653
654
if (reset)
655
return markBuf.remaining();
656
657
if (available > 0)
658
return available;
659
660
inAppBB.clear ();
661
int bytes = channel.read (inNetBB);
662
663
int needed = sslEng.getSession().getApplicationBufferSize();
664
if (needed > inAppBB.remaining()) {
665
inAppBB = ByteBuffer.allocate(needed);
666
}
667
inNetBB.flip();
668
SSLEngineResult result = sslEng.unwrap(inNetBB, inAppBB);
669
inNetBB.compact();
670
available = result.bytesProduced();
671
672
if (available > 0)
673
inAppBB.flip();
674
else if (available == -1)
675
throw new IOException ("Stream is closed");
676
return available;
677
}
678
679
/**
680
* block() only called when available==0 and buf is empty
681
*/
682
private synchronized void block () throws IOException {
683
//assert available == 0;
684
int n = selector.select ();
685
//assert n == 1;
686
selector.selectedKeys().clear();
687
available ();
688
}
689
690
public void close () throws IOException {
691
if (closed)
692
return;
693
channel.close ();
694
closed = true;
695
}
696
697
public synchronized void mark (int readlimit) {
698
if (closed)
699
return;
700
this.readlimit = readlimit;
701
markBuf = ByteBuffer.allocate (readlimit);
702
marked = true;
703
reset = false;
704
}
705
706
public synchronized void reset () throws IOException {
707
if (closed )
708
return;
709
if (!marked)
710
throw new IOException ("Stream not marked");
711
marked = false;
712
reset = true;
713
markBuf.flip ();
714
}
715
}
716
717
static class NioOutputStream extends OutputStream {
718
SSLEngine sslEng;
719
SocketChannel channel;
720
ByteBuffer outNetBB;
721
ByteBuffer outAppBB;
722
SelectionKey key;
723
Selector selector;
724
boolean closed;
725
byte[] one;
726
727
public NioOutputStream (SocketChannel channel, SSLEngine sslEng, ByteBuffer outNetBB, ByteBuffer outAppBB) throws IOException {
728
this.sslEng = sslEng;
729
this.channel = channel;
730
this.outNetBB = outNetBB;
731
this.outAppBB = outAppBB;
732
selector = Selector.open ();
733
key = channel.register (selector, SelectionKey.OP_WRITE);
734
closed = false;
735
one = new byte [1];
736
}
737
738
public synchronized void write (int b) throws IOException {
739
one[0] = (byte)b;
740
write (one, 0, 1);
741
}
742
743
public synchronized void write (byte[] b) throws IOException {
744
write (b, 0, b.length);
745
}
746
747
public synchronized void write (byte[] b, int off, int len) throws IOException {
748
if (closed)
749
throw new IOException ("stream is closed");
750
751
outAppBB = ByteBuffer.allocate (len);
752
outAppBB.put (b, off, len);
753
outAppBB.flip ();
754
int n;
755
outNetBB.clear();
756
int needed = sslEng.getSession().getPacketBufferSize();
757
if (outNetBB.capacity() < needed) {
758
outNetBB = ByteBuffer.allocate(needed);
759
}
760
SSLEngineResult ret = sslEng.wrap(outAppBB, outNetBB);
761
outNetBB.flip();
762
int newLen = ret.bytesProduced();
763
while ((n = channel.write (outNetBB)) < newLen) {
764
newLen -= n;
765
if (newLen == 0)
766
return;
767
selector.select ();
768
selector.selectedKeys().clear ();
769
}
770
}
771
772
public void close () throws IOException {
773
if (closed)
774
return;
775
channel.close ();
776
closed = true;
777
}
778
}
779
780
/**
781
* Utilities for synchronization. A condition is
782
* identified by a string name, and is initialized
783
* upon first use (ie. setCondition() or waitForCondition()). Threads
784
* are blocked until some thread calls (or has called) setCondition() for the same
785
* condition.
786
* <P>
787
* A rendezvous built on a condition is also provided for synchronizing
788
* N threads.
789
*/
790
791
private static HashMap conditions = new HashMap();
792
793
/*
794
* Modifiable boolean object
795
*/
796
private static class BValue {
797
boolean v;
798
}
799
800
/*
801
* Modifiable int object
802
*/
803
private static class IValue {
804
int v;
805
IValue (int i) {
806
v =i;
807
}
808
}
809
810
811
private static BValue getCond (String condition) {
812
synchronized (conditions) {
813
BValue cond = (BValue) conditions.get (condition);
814
if (cond == null) {
815
cond = new BValue();
816
conditions.put (condition, cond);
817
}
818
return cond;
819
}
820
}
821
822
/**
823
* Set the condition to true. Any threads that are currently blocked
824
* waiting on the condition, will be unblocked and allowed to continue.
825
* Threads that subsequently call waitForCondition() will not block.
826
* If the named condition did not exist prior to the call, then it is created
827
* first.
828
*/
829
830
public static void setCondition (String condition) {
831
BValue cond = getCond (condition);
832
synchronized (cond) {
833
if (cond.v) {
834
return;
835
}
836
cond.v = true;
837
cond.notifyAll();
838
}
839
}
840
841
/**
842
* If the named condition does not exist, then it is created and initialized
843
* to false. If the condition exists or has just been created and its value
844
* is false, then the thread blocks until another thread sets the condition.
845
* If the condition exists and is already set to true, then this call returns
846
* immediately without blocking.
847
*/
848
849
public static void waitForCondition (String condition) {
850
BValue cond = getCond (condition);
851
synchronized (cond) {
852
if (!cond.v) {
853
try {
854
cond.wait();
855
} catch (InterruptedException e) {}
856
}
857
}
858
}
859
860
/* conditions must be locked when accessing this */
861
static HashMap rv = new HashMap();
862
863
/**
864
* Force N threads to rendezvous (ie. wait for each other) before proceeding.
865
* The first thread(s) to call are blocked until the last
866
* thread makes the call. Then all threads continue.
867
* <p>
868
* All threads that call with the same condition name, must use the same value
869
* for N (or the results may be not be as expected).
870
* <P>
871
* Obviously, if fewer than N threads make the rendezvous then the result
872
* will be a hang.
873
*/
874
875
public static void rendezvous (String condition, int N) {
876
BValue cond;
877
IValue iv;
878
String name = "RV_"+condition;
879
880
/* get the condition */
881
882
synchronized (conditions) {
883
cond = (BValue)conditions.get (name);
884
if (cond == null) {
885
/* we are first caller */
886
if (N < 2) {
887
throw new RuntimeException ("rendezvous must be called with N >= 2");
888
}
889
cond = new BValue ();
890
conditions.put (name, cond);
891
iv = new IValue (N-1);
892
rv.put (name, iv);
893
} else {
894
/* already initialised, just decrement the counter */
895
iv = (IValue) rv.get (name);
896
iv.v --;
897
}
898
}
899
900
if (iv.v > 0) {
901
waitForCondition (name);
902
} else {
903
setCondition (name);
904
synchronized (conditions) {
905
clearCondition (name);
906
rv.remove (name);
907
}
908
}
909
}
910
911
/**
912
* If the named condition exists and is set then remove it, so it can
913
* be re-initialized and used again. If the condition does not exist, or
914
* exists but is not set, then the call returns without doing anything.
915
* Note, some higher level synchronization
916
* may be needed between clear and the other operations.
917
*/
918
919
public static void clearCondition(String condition) {
920
BValue cond;
921
synchronized (conditions) {
922
cond = (BValue) conditions.get (condition);
923
if (cond == null) {
924
return;
925
}
926
synchronized (cond) {
927
if (cond.v) {
928
conditions.remove (condition);
929
}
930
}
931
}
932
}
933
}
934
935