Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/classes/sun/nio/ch/sctp/SctpChannelImpl.java
32301 views
1
/*
2
* Copyright (c) 2009, 2013, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
package sun.nio.ch.sctp;
26
27
import java.net.InetAddress;
28
import java.net.SocketAddress;
29
import java.net.SocketException;
30
import java.net.InetSocketAddress;
31
import java.io.FileDescriptor;
32
import java.io.IOException;
33
import java.util.Collections;
34
import java.util.Set;
35
import java.util.HashSet;
36
import java.nio.ByteBuffer;
37
import java.nio.channels.SelectionKey;
38
import java.nio.channels.ClosedChannelException;
39
import java.nio.channels.ConnectionPendingException;
40
import java.nio.channels.NoConnectionPendingException;
41
import java.nio.channels.AlreadyConnectedException;
42
import java.nio.channels.NotYetBoundException;
43
import java.nio.channels.NotYetConnectedException;
44
import java.nio.channels.spi.SelectorProvider;
45
import com.sun.nio.sctp.AbstractNotificationHandler;
46
import com.sun.nio.sctp.Association;
47
import com.sun.nio.sctp.AssociationChangeNotification;
48
import com.sun.nio.sctp.HandlerResult;
49
import com.sun.nio.sctp.IllegalReceiveException;
50
import com.sun.nio.sctp.InvalidStreamException;
51
import com.sun.nio.sctp.IllegalUnbindException;
52
import com.sun.nio.sctp.MessageInfo;
53
import com.sun.nio.sctp.NotificationHandler;
54
import com.sun.nio.sctp.SctpChannel;
55
import com.sun.nio.sctp.SctpSocketOption;
56
import sun.nio.ch.DirectBuffer;
57
import sun.nio.ch.IOStatus;
58
import sun.nio.ch.IOUtil;
59
import sun.nio.ch.NativeThread;
60
import sun.nio.ch.Net;
61
import sun.nio.ch.PollArrayWrapper;
62
import sun.nio.ch.SelChImpl;
63
import sun.nio.ch.SelectionKeyImpl;
64
import sun.nio.ch.Util;
65
import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
66
import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;
67
import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;
68
import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;
69
import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;
70
71
/**
72
* An implementation of an SctpChannel
73
*/
74
public class SctpChannelImpl extends SctpChannel
75
implements SelChImpl
76
{
77
private final FileDescriptor fd;
78
79
private final int fdVal;
80
81
/* IDs of native threads doing send and receivess, for signalling */
82
private volatile long receiverThread = 0;
83
private volatile long senderThread = 0;
84
85
/* Lock held by current receiving or connecting thread */
86
private final Object receiveLock = new Object();
87
88
/* Lock held by current sending or connecting thread */
89
private final Object sendLock = new Object();
90
91
private final ThreadLocal<Boolean> receiveInvoked =
92
new ThreadLocal<Boolean>() {
93
@Override protected Boolean initialValue() {
94
return Boolean.FALSE;
95
}
96
};
97
98
/* Lock held by any thread that modifies the state fields declared below
99
DO NOT invoke a blocking I/O operation while holding this lock! */
100
private final Object stateLock = new Object();
101
102
private enum ChannelState {
103
UNINITIALIZED,
104
UNCONNECTED,
105
PENDING,
106
CONNECTED,
107
KILLPENDING,
108
KILLED,
109
}
110
/* -- The following fields are protected by stateLock -- */
111
private ChannelState state = ChannelState.UNINITIALIZED;
112
113
/* Binding; Once bound the port will remain constant. */
114
int port = -1;
115
private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
116
/* Has the channel been bound to the wildcard address */
117
private boolean wildcard; /* false */
118
//private InetSocketAddress remoteAddress = null;
119
120
/* Input/Output open */
121
private boolean readyToConnect;
122
123
/* Shutdown */
124
private boolean isShutdown;
125
126
private Association association;
127
128
private Set<SocketAddress> remoteAddresses = Collections.emptySet();
129
130
/* -- End of fields protected by stateLock -- */
131
132
/**
133
* Constructor for normal connecting sockets
134
*/
135
public SctpChannelImpl(SelectorProvider provider) throws IOException {
136
//TODO: update provider remove public modifier
137
super(provider);
138
this.fd = SctpNet.socket(true);
139
this.fdVal = IOUtil.fdVal(fd);
140
this.state = ChannelState.UNCONNECTED;
141
}
142
143
/**
144
* Constructor for sockets obtained from server sockets
145
*/
146
public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)
147
throws IOException {
148
this(provider, fd, null);
149
}
150
151
/**
152
* Constructor for sockets obtained from branching
153
*/
154
public SctpChannelImpl(SelectorProvider provider,
155
FileDescriptor fd,
156
Association association)
157
throws IOException {
158
super(provider);
159
this.fd = fd;
160
this.fdVal = IOUtil.fdVal(fd);
161
this.state = ChannelState.CONNECTED;
162
port = (Net.localAddress(fd)).getPort();
163
164
if (association != null) { /* branched */
165
this.association = association;
166
} else { /* obtained from server channel */
167
/* Receive COMM_UP */
168
ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
169
try {
170
receive(buf, null, null, true);
171
} finally {
172
Util.releaseTemporaryDirectBuffer(buf);
173
}
174
}
175
}
176
177
/**
178
* Binds the channel's socket to a local address.
179
*/
180
@Override
181
public SctpChannel bind(SocketAddress local) throws IOException {
182
synchronized (receiveLock) {
183
synchronized (sendLock) {
184
synchronized (stateLock) {
185
ensureOpenAndUnconnected();
186
if (isBound())
187
SctpNet.throwAlreadyBoundException();
188
InetSocketAddress isa = (local == null) ?
189
new InetSocketAddress(0) : Net.checkAddress(local);
190
SecurityManager sm = System.getSecurityManager();
191
if (sm != null) {
192
sm.checkListen(isa.getPort());
193
}
194
Net.bind(fd, isa.getAddress(), isa.getPort());
195
InetSocketAddress boundIsa = Net.localAddress(fd);
196
port = boundIsa.getPort();
197
localAddresses.add(isa);
198
if (isa.getAddress().isAnyLocalAddress())
199
wildcard = true;
200
}
201
}
202
}
203
return this;
204
}
205
206
@Override
207
public SctpChannel bindAddress(InetAddress address)
208
throws IOException {
209
bindUnbindAddress(address, true);
210
localAddresses.add(new InetSocketAddress(address, port));
211
return this;
212
}
213
214
@Override
215
public SctpChannel unbindAddress(InetAddress address)
216
throws IOException {
217
bindUnbindAddress(address, false);
218
localAddresses.remove(new InetSocketAddress(address, port));
219
return this;
220
}
221
222
private SctpChannel bindUnbindAddress(InetAddress address, boolean add)
223
throws IOException {
224
if (address == null)
225
throw new IllegalArgumentException();
226
227
synchronized (receiveLock) {
228
synchronized (sendLock) {
229
synchronized (stateLock) {
230
if (!isOpen())
231
throw new ClosedChannelException();
232
if (!isBound())
233
throw new NotYetBoundException();
234
if (wildcard)
235
throw new IllegalStateException(
236
"Cannot add or remove addresses from a channel that is bound to the wildcard address");
237
if (address.isAnyLocalAddress())
238
throw new IllegalArgumentException(
239
"Cannot add or remove the wildcard address");
240
if (add) {
241
for (InetSocketAddress addr : localAddresses) {
242
if (addr.getAddress().equals(address)) {
243
SctpNet.throwAlreadyBoundException();
244
}
245
}
246
} else { /*removing */
247
/* Verify that there is more than one address
248
* and that address is already bound */
249
if (localAddresses.size() <= 1)
250
throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
251
boolean foundAddress = false;
252
for (InetSocketAddress addr : localAddresses) {
253
if (addr.getAddress().equals(address)) {
254
foundAddress = true;
255
break;
256
}
257
}
258
if (!foundAddress )
259
throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
260
}
261
262
SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
263
264
/* Update our internal Set to reflect the addition/removal */
265
if (add)
266
localAddresses.add(new InetSocketAddress(address, port));
267
else {
268
for (InetSocketAddress addr : localAddresses) {
269
if (addr.getAddress().equals(address)) {
270
localAddresses.remove(addr);
271
break;
272
}
273
}
274
}
275
}
276
}
277
}
278
return this;
279
}
280
281
private boolean isBound() {
282
synchronized (stateLock) {
283
return port == -1 ? false : true;
284
}
285
}
286
287
private boolean isConnected() {
288
synchronized (stateLock) {
289
return (state == ChannelState.CONNECTED);
290
}
291
}
292
293
private void ensureOpenAndUnconnected() throws IOException {
294
synchronized (stateLock) {
295
if (!isOpen())
296
throw new ClosedChannelException();
297
if (isConnected())
298
throw new AlreadyConnectedException();
299
if (state == ChannelState.PENDING)
300
throw new ConnectionPendingException();
301
}
302
}
303
304
private boolean ensureReceiveOpen() throws ClosedChannelException {
305
synchronized (stateLock) {
306
if (!isOpen())
307
throw new ClosedChannelException();
308
if (!isConnected())
309
throw new NotYetConnectedException();
310
else
311
return true;
312
}
313
}
314
315
private void ensureSendOpen() throws ClosedChannelException {
316
synchronized (stateLock) {
317
if (!isOpen())
318
throw new ClosedChannelException();
319
if (isShutdown)
320
throw new ClosedChannelException();
321
if (!isConnected())
322
throw new NotYetConnectedException();
323
}
324
}
325
326
private void receiverCleanup() throws IOException {
327
synchronized (stateLock) {
328
receiverThread = 0;
329
if (state == ChannelState.KILLPENDING)
330
kill();
331
}
332
}
333
334
private void senderCleanup() throws IOException {
335
synchronized (stateLock) {
336
senderThread = 0;
337
if (state == ChannelState.KILLPENDING)
338
kill();
339
}
340
}
341
342
@Override
343
public Association association() throws ClosedChannelException {
344
synchronized (stateLock) {
345
if (!isOpen())
346
throw new ClosedChannelException();
347
if (!isConnected())
348
return null;
349
350
return association;
351
}
352
}
353
354
@Override
355
public boolean connect(SocketAddress endpoint) throws IOException {
356
synchronized (receiveLock) {
357
synchronized (sendLock) {
358
ensureOpenAndUnconnected();
359
InetSocketAddress isa = Net.checkAddress(endpoint);
360
SecurityManager sm = System.getSecurityManager();
361
if (sm != null)
362
sm.checkConnect(isa.getAddress().getHostAddress(),
363
isa.getPort());
364
synchronized (blockingLock()) {
365
int n = 0;
366
try {
367
try {
368
begin();
369
synchronized (stateLock) {
370
if (!isOpen()) {
371
return false;
372
}
373
receiverThread = NativeThread.current();
374
}
375
for (;;) {
376
InetAddress ia = isa.getAddress();
377
if (ia.isAnyLocalAddress())
378
ia = InetAddress.getLocalHost();
379
n = SctpNet.connect(fdVal, ia, isa.getPort());
380
if ( (n == IOStatus.INTERRUPTED)
381
&& isOpen())
382
continue;
383
break;
384
}
385
} finally {
386
receiverCleanup();
387
end((n > 0) || (n == IOStatus.UNAVAILABLE));
388
assert IOStatus.check(n);
389
}
390
} catch (IOException x) {
391
/* If an exception was thrown, close the channel after
392
* invoking end() so as to avoid bogus
393
* AsynchronousCloseExceptions */
394
close();
395
throw x;
396
}
397
398
if (n > 0) {
399
synchronized (stateLock) {
400
/* Connection succeeded */
401
state = ChannelState.CONNECTED;
402
if (!isBound()) {
403
InetSocketAddress boundIsa =
404
Net.localAddress(fd);
405
port = boundIsa.getPort();
406
}
407
408
/* Receive COMM_UP */
409
ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
410
try {
411
receive(buf, null, null, true);
412
} finally {
413
Util.releaseTemporaryDirectBuffer(buf);
414
}
415
416
/* cache remote addresses */
417
try {
418
remoteAddresses = getRemoteAddresses();
419
} catch (IOException unused) { /* swallow exception */ }
420
421
return true;
422
}
423
} else {
424
synchronized (stateLock) {
425
/* If nonblocking and no exception then connection
426
* pending; disallow another invocation */
427
if (!isBlocking())
428
state = ChannelState.PENDING;
429
else
430
assert false;
431
}
432
}
433
}
434
return false;
435
}
436
}
437
}
438
439
@Override
440
public boolean connect(SocketAddress endpoint,
441
int maxOutStreams,
442
int maxInStreams)
443
throws IOException {
444
ensureOpenAndUnconnected();
445
return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.
446
create(maxInStreams, maxOutStreams)).connect(endpoint);
447
448
}
449
450
@Override
451
public boolean isConnectionPending() {
452
synchronized (stateLock) {
453
return (state == ChannelState.PENDING);
454
}
455
}
456
457
@Override
458
public boolean finishConnect() throws IOException {
459
synchronized (receiveLock) {
460
synchronized (sendLock) {
461
synchronized (stateLock) {
462
if (!isOpen())
463
throw new ClosedChannelException();
464
if (isConnected())
465
return true;
466
if (state != ChannelState.PENDING)
467
throw new NoConnectionPendingException();
468
}
469
int n = 0;
470
try {
471
try {
472
begin();
473
synchronized (blockingLock()) {
474
synchronized (stateLock) {
475
if (!isOpen()) {
476
return false;
477
}
478
receiverThread = NativeThread.current();
479
}
480
if (!isBlocking()) {
481
for (;;) {
482
n = checkConnect(fd, false, readyToConnect);
483
if ( (n == IOStatus.INTERRUPTED)
484
&& isOpen())
485
continue;
486
break;
487
}
488
} else {
489
for (;;) {
490
n = checkConnect(fd, true, readyToConnect);
491
if (n == 0) {
492
// Loop in case of
493
// spurious notifications
494
continue;
495
}
496
if ( (n == IOStatus.INTERRUPTED)
497
&& isOpen())
498
continue;
499
break;
500
}
501
}
502
}
503
} finally {
504
synchronized (stateLock) {
505
receiverThread = 0;
506
if (state == ChannelState.KILLPENDING) {
507
kill();
508
/* poll()/getsockopt() does not report
509
* error (throws exception, with n = 0)
510
* on Linux platform after dup2 and
511
* signal-wakeup. Force n to 0 so the
512
* end() can throw appropriate exception */
513
n = 0;
514
}
515
}
516
end((n > 0) || (n == IOStatus.UNAVAILABLE));
517
assert IOStatus.check(n);
518
}
519
} catch (IOException x) {
520
/* If an exception was thrown, close the channel after
521
* invoking end() so as to avoid bogus
522
* AsynchronousCloseExceptions */
523
close();
524
throw x;
525
}
526
527
if (n > 0) {
528
synchronized (stateLock) {
529
state = ChannelState.CONNECTED;
530
if (!isBound()) {
531
InetSocketAddress boundIsa =
532
Net.localAddress(fd);
533
port = boundIsa.getPort();
534
}
535
536
/* Receive COMM_UP */
537
ByteBuffer buf = Util.getTemporaryDirectBuffer(50);
538
try {
539
receive(buf, null, null, true);
540
} finally {
541
Util.releaseTemporaryDirectBuffer(buf);
542
}
543
544
/* cache remote addresses */
545
try {
546
remoteAddresses = getRemoteAddresses();
547
} catch (IOException unused) { /* swallow exception */ }
548
549
return true;
550
}
551
}
552
}
553
}
554
return false;
555
}
556
557
@Override
558
protected void implConfigureBlocking(boolean block) throws IOException {
559
IOUtil.configureBlocking(fd, block);
560
}
561
562
@Override
563
public void implCloseSelectableChannel() throws IOException {
564
synchronized (stateLock) {
565
SctpNet.preClose(fdVal);
566
567
if (receiverThread != 0)
568
NativeThread.signal(receiverThread);
569
570
if (senderThread != 0)
571
NativeThread.signal(senderThread);
572
573
if (!isRegistered())
574
kill();
575
}
576
}
577
578
@Override
579
public FileDescriptor getFD() {
580
return fd;
581
}
582
583
@Override
584
public int getFDVal() {
585
return fdVal;
586
}
587
588
/**
589
* Translates native poll revent ops into a ready operation ops
590
*/
591
private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
592
int intOps = sk.nioInterestOps();
593
int oldOps = sk.nioReadyOps();
594
int newOps = initialOps;
595
596
if ((ops & Net.POLLNVAL) != 0) {
597
/* This should only happen if this channel is pre-closed while a
598
* selection operation is in progress
599
* ## Throw an error if this channel has not been pre-closed */
600
return false;
601
}
602
603
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
604
newOps = intOps;
605
sk.nioReadyOps(newOps);
606
/* No need to poll again in checkConnect,
607
* the error will be detected there */
608
readyToConnect = true;
609
return (newOps & ~oldOps) != 0;
610
}
611
612
if (((ops & Net.POLLIN) != 0) &&
613
((intOps & SelectionKey.OP_READ) != 0) &&
614
isConnected())
615
newOps |= SelectionKey.OP_READ;
616
617
if (((ops & Net.POLLCONN) != 0) &&
618
((intOps & SelectionKey.OP_CONNECT) != 0) &&
619
((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {
620
newOps |= SelectionKey.OP_CONNECT;
621
readyToConnect = true;
622
}
623
624
if (((ops & Net.POLLOUT) != 0) &&
625
((intOps & SelectionKey.OP_WRITE) != 0) &&
626
isConnected())
627
newOps |= SelectionKey.OP_WRITE;
628
629
sk.nioReadyOps(newOps);
630
return (newOps & ~oldOps) != 0;
631
}
632
633
@Override
634
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
635
return translateReadyOps(ops, sk.nioReadyOps(), sk);
636
}
637
638
@Override
639
@SuppressWarnings("all")
640
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
641
return translateReadyOps(ops, 0, sk);
642
}
643
644
@Override
645
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
646
int newOps = 0;
647
if ((ops & SelectionKey.OP_READ) != 0)
648
newOps |= Net.POLLIN;
649
if ((ops & SelectionKey.OP_WRITE) != 0)
650
newOps |= Net.POLLOUT;
651
if ((ops & SelectionKey.OP_CONNECT) != 0)
652
newOps |= Net.POLLCONN;
653
sk.selector.putEventOps(sk, newOps);
654
}
655
656
@Override
657
public void kill() throws IOException {
658
synchronized (stateLock) {
659
if (state == ChannelState.KILLED)
660
return;
661
if (state == ChannelState.UNINITIALIZED) {
662
state = ChannelState.KILLED;
663
return;
664
}
665
assert !isOpen() && !isRegistered();
666
667
/* Postpone the kill if there is a waiting reader
668
* or writer thread. */
669
if (receiverThread == 0 && senderThread == 0) {
670
SctpNet.close(fdVal);
671
state = ChannelState.KILLED;
672
} else {
673
state = ChannelState.KILLPENDING;
674
}
675
}
676
}
677
678
@Override
679
public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)
680
throws IOException {
681
if (name == null)
682
throw new NullPointerException();
683
if (!supportedOptions().contains(name))
684
throw new UnsupportedOperationException("'" + name + "' not supported");
685
686
synchronized (stateLock) {
687
if (!isOpen())
688
throw new ClosedChannelException();
689
690
SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);
691
}
692
return this;
693
}
694
695
@Override
696
@SuppressWarnings("unchecked")
697
public <T> T getOption(SctpSocketOption<T> name) throws IOException {
698
if (name == null)
699
throw new NullPointerException();
700
if (!supportedOptions().contains(name))
701
throw new UnsupportedOperationException("'" + name + "' not supported");
702
703
synchronized (stateLock) {
704
if (!isOpen())
705
throw new ClosedChannelException();
706
707
return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);
708
}
709
}
710
711
private static class DefaultOptionsHolder {
712
static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
713
714
private static Set<SctpSocketOption<?>> defaultOptions() {
715
HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
716
set.add(SCTP_DISABLE_FRAGMENTS);
717
set.add(SCTP_EXPLICIT_COMPLETE);
718
set.add(SCTP_FRAGMENT_INTERLEAVE);
719
set.add(SCTP_INIT_MAXSTREAMS);
720
set.add(SCTP_NODELAY);
721
set.add(SCTP_PRIMARY_ADDR);
722
set.add(SCTP_SET_PEER_PRIMARY_ADDR);
723
set.add(SO_SNDBUF);
724
set.add(SO_RCVBUF);
725
set.add(SO_LINGER);
726
return Collections.unmodifiableSet(set);
727
}
728
}
729
730
@Override
731
public final Set<SctpSocketOption<?>> supportedOptions() {
732
return DefaultOptionsHolder.defaultOptions;
733
}
734
735
@Override
736
public <T> MessageInfo receive(ByteBuffer buffer,
737
T attachment,
738
NotificationHandler<T> handler)
739
throws IOException {
740
return receive(buffer, attachment, handler, false);
741
}
742
743
private <T> MessageInfo receive(ByteBuffer buffer,
744
T attachment,
745
NotificationHandler<T> handler,
746
boolean fromConnect)
747
throws IOException {
748
if (buffer == null)
749
throw new IllegalArgumentException("buffer cannot be null");
750
751
if (buffer.isReadOnly())
752
throw new IllegalArgumentException("Read-only buffer");
753
754
if (receiveInvoked.get())
755
throw new IllegalReceiveException(
756
"cannot invoke receive from handler");
757
receiveInvoked.set(Boolean.TRUE);
758
759
try {
760
ResultContainer resultContainer = new ResultContainer();
761
do {
762
resultContainer.clear();
763
synchronized (receiveLock) {
764
if (!ensureReceiveOpen())
765
return null;
766
767
int n = 0;
768
try {
769
begin();
770
771
synchronized (stateLock) {
772
if(!isOpen())
773
return null;
774
receiverThread = NativeThread.current();
775
}
776
777
do {
778
n = receive(fdVal, buffer, resultContainer, fromConnect);
779
} while ((n == IOStatus.INTERRUPTED) && isOpen());
780
} finally {
781
receiverCleanup();
782
end((n > 0) || (n == IOStatus.UNAVAILABLE));
783
assert IOStatus.check(n);
784
}
785
786
if (!resultContainer.isNotification()) {
787
/* message or nothing */
788
if (resultContainer.hasSomething()) {
789
/* Set the association before returning */
790
MessageInfoImpl info =
791
resultContainer.getMessageInfo();
792
synchronized (stateLock) {
793
assert association != null;
794
info.setAssociation(association);
795
}
796
return info;
797
} else
798
/* Non-blocking may return null if nothing available*/
799
return null;
800
} else { /* notification */
801
synchronized (stateLock) {
802
handleNotificationInternal(
803
resultContainer);
804
}
805
}
806
807
if (fromConnect) {
808
/* If we reach here, then it was connect that invoked
809
* receive and received the COMM_UP. We have already
810
* handled the COMM_UP with the internal notification
811
* handler. Simply return. */
812
return null;
813
}
814
} /* receiveLock */
815
} while (handler == null ? true :
816
(invokeNotificationHandler(resultContainer, handler, attachment)
817
== HandlerResult.CONTINUE));
818
819
return null;
820
} finally {
821
receiveInvoked.set(Boolean.FALSE);
822
}
823
}
824
825
private int receive(int fd,
826
ByteBuffer dst,
827
ResultContainer resultContainer,
828
boolean peek)
829
throws IOException {
830
int pos = dst.position();
831
int lim = dst.limit();
832
assert (pos <= lim);
833
int rem = (pos <= lim ? lim - pos : 0);
834
if (dst instanceof DirectBuffer && rem > 0)
835
return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);
836
837
/* Substitute a native buffer */
838
int newSize = Math.max(rem, 1);
839
ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
840
try {
841
int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);
842
bb.flip();
843
if (n > 0 && rem > 0)
844
dst.put(bb);
845
return n;
846
} finally {
847
Util.releaseTemporaryDirectBuffer(bb);
848
}
849
}
850
851
private int receiveIntoNativeBuffer(int fd,
852
ResultContainer resultContainer,
853
ByteBuffer bb,
854
int rem,
855
int pos,
856
boolean peek)
857
throws IOException
858
{
859
int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);
860
861
if (n > 0)
862
bb.position(pos + n);
863
return n;
864
}
865
866
private InternalNotificationHandler internalNotificationHandler =
867
new InternalNotificationHandler();
868
869
private void handleNotificationInternal(ResultContainer resultContainer)
870
{
871
invokeNotificationHandler(resultContainer,
872
internalNotificationHandler, null);
873
}
874
875
private class InternalNotificationHandler
876
extends AbstractNotificationHandler<Object>
877
{
878
@Override
879
public HandlerResult handleNotification(
880
AssociationChangeNotification not, Object unused) {
881
if (not.event().equals(
882
AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&
883
association == null) {
884
AssociationChange sac = (AssociationChange) not;
885
association = new AssociationImpl
886
(sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
887
}
888
return HandlerResult.CONTINUE;
889
}
890
}
891
892
private <T> HandlerResult invokeNotificationHandler
893
(ResultContainer resultContainer,
894
NotificationHandler<T> handler,
895
T attachment) {
896
SctpNotification notification = resultContainer.notification();
897
synchronized (stateLock) {
898
notification.setAssociation(association);
899
}
900
901
if (!(handler instanceof AbstractNotificationHandler)) {
902
return handler.handleNotification(notification, attachment);
903
}
904
905
/* AbstractNotificationHandler */
906
AbstractNotificationHandler<T> absHandler =
907
(AbstractNotificationHandler<T>)handler;
908
switch(resultContainer.type()) {
909
case ASSOCIATION_CHANGED :
910
return absHandler.handleNotification(
911
resultContainer.getAssociationChanged(), attachment);
912
case PEER_ADDRESS_CHANGED :
913
return absHandler.handleNotification(
914
resultContainer.getPeerAddressChanged(), attachment);
915
case SEND_FAILED :
916
return absHandler.handleNotification(
917
resultContainer.getSendFailed(), attachment);
918
case SHUTDOWN :
919
return absHandler.handleNotification(
920
resultContainer.getShutdown(), attachment);
921
default :
922
/* implementation specific handlers */
923
return absHandler.handleNotification(
924
resultContainer.notification(), attachment);
925
}
926
}
927
928
private void checkAssociation(Association sendAssociation) {
929
synchronized (stateLock) {
930
if (sendAssociation != null && !sendAssociation.equals(association)) {
931
throw new IllegalArgumentException(
932
"Cannot send to another association");
933
}
934
}
935
}
936
937
private void checkStreamNumber(int streamNumber) {
938
synchronized (stateLock) {
939
if (association != null) {
940
if (streamNumber < 0 ||
941
streamNumber >= association.maxOutboundStreams())
942
throw new InvalidStreamException();
943
}
944
}
945
}
946
947
/* TODO: Add support for ttl and isComplete to both 121 12M
948
* SCTP_EOR not yet supported on reference platforms
949
* TTL support limited...
950
*/
951
@Override
952
public int send(ByteBuffer buffer, MessageInfo messageInfo)
953
throws IOException {
954
if (buffer == null)
955
throw new IllegalArgumentException("buffer cannot be null");
956
957
if (messageInfo == null)
958
throw new IllegalArgumentException("messageInfo cannot be null");
959
960
checkAssociation(messageInfo.association());
961
checkStreamNumber(messageInfo.streamNumber());
962
963
synchronized (sendLock) {
964
ensureSendOpen();
965
966
int n = 0;
967
try {
968
begin();
969
970
synchronized (stateLock) {
971
if(!isOpen())
972
return 0;
973
senderThread = NativeThread.current();
974
}
975
976
do {
977
n = send(fdVal, buffer, messageInfo);
978
} while ((n == IOStatus.INTERRUPTED) && isOpen());
979
980
return IOStatus.normalize(n);
981
} finally {
982
senderCleanup();
983
end((n > 0) || (n == IOStatus.UNAVAILABLE));
984
assert IOStatus.check(n);
985
}
986
}
987
}
988
989
private int send(int fd, ByteBuffer src, MessageInfo messageInfo)
990
throws IOException {
991
int streamNumber = messageInfo.streamNumber();
992
SocketAddress target = messageInfo.address();
993
boolean unordered = messageInfo.isUnordered();
994
int ppid = messageInfo.payloadProtocolID();
995
996
if (src instanceof DirectBuffer)
997
return sendFromNativeBuffer(fd, src, target, streamNumber,
998
unordered, ppid);
999
1000
/* Substitute a native buffer */
1001
int pos = src.position();
1002
int lim = src.limit();
1003
assert (pos <= lim && streamNumber >= 0);
1004
1005
int rem = (pos <= lim ? lim - pos : 0);
1006
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
1007
try {
1008
bb.put(src);
1009
bb.flip();
1010
/* Do not update src until we see how many bytes were written */
1011
src.position(pos);
1012
1013
int n = sendFromNativeBuffer(fd, bb, target, streamNumber,
1014
unordered, ppid);
1015
if (n > 0) {
1016
/* now update src */
1017
src.position(pos + n);
1018
}
1019
return n;
1020
} finally {
1021
Util.releaseTemporaryDirectBuffer(bb);
1022
}
1023
}
1024
1025
private int sendFromNativeBuffer(int fd,
1026
ByteBuffer bb,
1027
SocketAddress target,
1028
int streamNumber,
1029
boolean unordered,
1030
int ppid)
1031
throws IOException {
1032
InetAddress addr = null; // no preferred address
1033
int port = 0;
1034
if (target != null) {
1035
InetSocketAddress isa = Net.checkAddress(target);
1036
addr = isa.getAddress();
1037
port = isa.getPort();
1038
}
1039
1040
int pos = bb.position();
1041
int lim = bb.limit();
1042
assert (pos <= lim);
1043
int rem = (pos <= lim ? lim - pos : 0);
1044
1045
int written = send0(fd, ((DirectBuffer)bb).address() + pos, rem, addr,
1046
port, -1 /*121*/, streamNumber, unordered, ppid);
1047
if (written > 0)
1048
bb.position(pos + written);
1049
return written;
1050
}
1051
1052
@Override
1053
public SctpChannel shutdown() throws IOException {
1054
synchronized(stateLock) {
1055
if (isShutdown)
1056
return this;
1057
1058
ensureSendOpen();
1059
SctpNet.shutdown(fdVal, -1);
1060
if (senderThread != 0)
1061
NativeThread.signal(senderThread);
1062
isShutdown = true;
1063
}
1064
return this;
1065
}
1066
1067
@Override
1068
public Set<SocketAddress> getAllLocalAddresses()
1069
throws IOException {
1070
synchronized (stateLock) {
1071
if (!isOpen())
1072
throw new ClosedChannelException();
1073
if (!isBound())
1074
return Collections.emptySet();
1075
1076
return SctpNet.getLocalAddresses(fdVal);
1077
}
1078
}
1079
1080
@Override
1081
public Set<SocketAddress> getRemoteAddresses()
1082
throws IOException {
1083
synchronized (stateLock) {
1084
if (!isOpen())
1085
throw new ClosedChannelException();
1086
if (!isConnected() || isShutdown)
1087
return Collections.emptySet();
1088
1089
try {
1090
return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);
1091
} catch (SocketException unused) {
1092
/* an open connected channel should always have remote addresses */
1093
return remoteAddresses;
1094
}
1095
}
1096
}
1097
1098
/* Native */
1099
private static native void initIDs();
1100
1101
static native int receive0(int fd, ResultContainer resultContainer,
1102
long address, int length, boolean peek) throws IOException;
1103
1104
static native int send0(int fd, long address, int length,
1105
InetAddress addr, int port, int assocId, int streamNumber,
1106
boolean unordered, int ppid) throws IOException;
1107
1108
private static native int checkConnect(FileDescriptor fd, boolean block,
1109
boolean ready) throws IOException;
1110
1111
static {
1112
IOUtil.load(); /* loads nio & net native libraries */
1113
java.security.AccessController.doPrivileged(
1114
new java.security.PrivilegedAction<Void>() {
1115
public Void run() {
1116
System.loadLibrary("sctp");
1117
return null;
1118
}
1119
});
1120
initIDs();
1121
}
1122
}
1123
1124