Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
PojavLauncherTeam
GitHub Repository: PojavLauncherTeam/openjdk-multiarch-jdk8u
Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/classes/sun/nio/ch/sctp/SctpMultiChannelImpl.java
32300 views
1
/*
2
* Copyright (c) 2009, 2013, Oracle and/or its affiliates. All rights reserved.
3
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4
*
5
* This code is free software; you can redistribute it and/or modify it
6
* under the terms of the GNU General Public License version 2 only, as
7
* published by the Free Software Foundation. Oracle designates this
8
* particular file as subject to the "Classpath" exception as provided
9
* by Oracle in the LICENSE file that accompanied this code.
10
*
11
* This code is distributed in the hope that it will be useful, but WITHOUT
12
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14
* version 2 for more details (a copy is included in the LICENSE file that
15
* accompanied this code).
16
*
17
* You should have received a copy of the GNU General Public License version
18
* 2 along with this work; if not, write to the Free Software Foundation,
19
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20
*
21
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22
* or visit www.oracle.com if you need additional information or have any
23
* questions.
24
*/
25
package sun.nio.ch.sctp;
26
27
import java.net.InetAddress;
28
import java.net.SocketAddress;
29
import java.net.SocketException;
30
import java.net.InetSocketAddress;
31
import java.io.FileDescriptor;
32
import java.io.IOException;
33
import java.util.Collections;
34
import java.util.Map.Entry;
35
import java.util.Iterator;
36
import java.util.Set;
37
import java.util.HashSet;
38
import java.util.HashMap;
39
import java.nio.ByteBuffer;
40
import java.nio.channels.SelectionKey;
41
import java.nio.channels.ClosedChannelException;
42
import java.nio.channels.NotYetBoundException;
43
import java.nio.channels.spi.SelectorProvider;
44
import com.sun.nio.sctp.AbstractNotificationHandler;
45
import com.sun.nio.sctp.Association;
46
import com.sun.nio.sctp.AssociationChangeNotification;
47
import com.sun.nio.sctp.HandlerResult;
48
import com.sun.nio.sctp.IllegalReceiveException;
49
import com.sun.nio.sctp.InvalidStreamException;
50
import com.sun.nio.sctp.IllegalUnbindException;
51
import com.sun.nio.sctp.NotificationHandler;
52
import com.sun.nio.sctp.MessageInfo;
53
import com.sun.nio.sctp.SctpChannel;
54
import com.sun.nio.sctp.SctpMultiChannel;
55
import com.sun.nio.sctp.SctpSocketOption;
56
import sun.nio.ch.DirectBuffer;
57
import sun.nio.ch.NativeThread;
58
import sun.nio.ch.IOStatus;
59
import sun.nio.ch.IOUtil;
60
import sun.nio.ch.Net;
61
import sun.nio.ch.PollArrayWrapper;
62
import sun.nio.ch.SelChImpl;
63
import sun.nio.ch.SelectionKeyImpl;
64
import sun.nio.ch.Util;
65
import static com.sun.nio.sctp.SctpStandardSocketOptions.*;
66
import static sun.nio.ch.sctp.ResultContainer.*;
67
68
/**
69
* An implementation of SctpMultiChannel
70
*/
71
public class SctpMultiChannelImpl extends SctpMultiChannel
72
implements SelChImpl
73
{
74
private final FileDescriptor fd;
75
76
private final int fdVal;
77
78
/* IDs of native threads doing send and receives, for signalling */
79
private volatile long receiverThread = 0;
80
private volatile long senderThread = 0;
81
82
/* Lock held by current receiving thread */
83
private final Object receiveLock = new Object();
84
85
/* Lock held by current sending thread */
86
private final Object sendLock = new Object();
87
88
/* Lock held by any thread that modifies the state fields declared below
89
* DO NOT invoke a blocking I/O operation while holding this lock! */
90
private final Object stateLock = new Object();
91
92
private enum ChannelState {
93
UNINITIALIZED,
94
KILLPENDING,
95
KILLED,
96
}
97
98
/* -- The following fields are protected by stateLock -- */
99
private ChannelState state = ChannelState.UNINITIALIZED;
100
101
/* Binding: Once bound the port will remain constant. */
102
int port = -1;
103
private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();
104
/* Has the channel been bound to the wildcard address */
105
private boolean wildcard; /* false */
106
107
/* Keeps a map of addresses to association, and visa versa */
108
private HashMap<SocketAddress, Association> addressMap =
109
new HashMap<SocketAddress, Association>();
110
private HashMap<Association, Set<SocketAddress>> associationMap =
111
new HashMap<Association, Set<SocketAddress>>();
112
113
/* -- End of fields protected by stateLock -- */
114
115
/* If an association has been shutdown mark it for removal after
116
* the user handler has been invoked */
117
private final ThreadLocal<Association> associationToRemove =
118
new ThreadLocal<Association>() {
119
@Override protected Association initialValue() {
120
return null;
121
}
122
};
123
124
/* A notification handler cannot invoke receive */
125
private final ThreadLocal<Boolean> receiveInvoked =
126
new ThreadLocal<Boolean>() {
127
@Override protected Boolean initialValue() {
128
return Boolean.FALSE;
129
}
130
};
131
132
public SctpMultiChannelImpl(SelectorProvider provider)
133
throws IOException {
134
//TODO: update provider, remove public modifier
135
super(provider);
136
this.fd = SctpNet.socket(false /*one-to-many*/);
137
this.fdVal = IOUtil.fdVal(fd);
138
}
139
140
@Override
141
public SctpMultiChannel bind(SocketAddress local, int backlog)
142
throws IOException {
143
synchronized (receiveLock) {
144
synchronized (sendLock) {
145
synchronized (stateLock) {
146
ensureOpen();
147
if (isBound())
148
SctpNet.throwAlreadyBoundException();
149
InetSocketAddress isa = (local == null) ?
150
new InetSocketAddress(0) : Net.checkAddress(local);
151
152
SecurityManager sm = System.getSecurityManager();
153
if (sm != null)
154
sm.checkListen(isa.getPort());
155
Net.bind(fd, isa.getAddress(), isa.getPort());
156
157
InetSocketAddress boundIsa = Net.localAddress(fd);
158
port = boundIsa.getPort();
159
localAddresses.add(isa);
160
if (isa.getAddress().isAnyLocalAddress())
161
wildcard = true;
162
163
SctpNet.listen(fdVal, backlog < 1 ? 50 : backlog);
164
}
165
}
166
}
167
return this;
168
}
169
170
@Override
171
public SctpMultiChannel bindAddress(InetAddress address)
172
throws IOException {
173
return bindUnbindAddress(address, true);
174
}
175
176
@Override
177
public SctpMultiChannel unbindAddress(InetAddress address)
178
throws IOException {
179
return bindUnbindAddress(address, false);
180
}
181
182
private SctpMultiChannel bindUnbindAddress(InetAddress address,
183
boolean add)
184
throws IOException {
185
if (address == null)
186
throw new IllegalArgumentException();
187
188
synchronized (receiveLock) {
189
synchronized (sendLock) {
190
synchronized (stateLock) {
191
if (!isOpen())
192
throw new ClosedChannelException();
193
if (!isBound())
194
throw new NotYetBoundException();
195
if (wildcard)
196
throw new IllegalStateException(
197
"Cannot add or remove addresses from a channel that is bound to the wildcard address");
198
if (address.isAnyLocalAddress())
199
throw new IllegalArgumentException(
200
"Cannot add or remove the wildcard address");
201
if (add) {
202
for (InetSocketAddress addr : localAddresses) {
203
if (addr.getAddress().equals(address)) {
204
SctpNet.throwAlreadyBoundException();
205
}
206
}
207
} else { /*removing */
208
/* Verify that there is more than one address
209
* and that address is already bound */
210
if (localAddresses.size() <= 1)
211
throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");
212
boolean foundAddress = false;
213
for (InetSocketAddress addr : localAddresses) {
214
if (addr.getAddress().equals(address)) {
215
foundAddress = true;
216
break;
217
}
218
}
219
if (!foundAddress )
220
throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");
221
}
222
223
SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);
224
225
/* Update our internal Set to reflect the addition/removal */
226
if (add)
227
localAddresses.add(new InetSocketAddress(address, port));
228
else {
229
for (InetSocketAddress addr : localAddresses) {
230
if (addr.getAddress().equals(address)) {
231
localAddresses.remove(addr);
232
break;
233
}
234
}
235
}
236
}
237
}
238
}
239
return this;
240
}
241
242
@Override
243
public Set<Association> associations()
244
throws ClosedChannelException, NotYetBoundException {
245
synchronized (stateLock) {
246
if (!isOpen())
247
throw new ClosedChannelException();
248
if (!isBound())
249
throw new NotYetBoundException();
250
251
return Collections.unmodifiableSet(associationMap.keySet());
252
}
253
}
254
255
private boolean isBound() {
256
synchronized (stateLock) {
257
return port == -1 ? false : true;
258
}
259
}
260
261
private void ensureOpen() throws IOException {
262
synchronized (stateLock) {
263
if (!isOpen())
264
throw new ClosedChannelException();
265
}
266
}
267
268
private void receiverCleanup() throws IOException {
269
synchronized (stateLock) {
270
receiverThread = 0;
271
if (state == ChannelState.KILLPENDING)
272
kill();
273
}
274
}
275
276
private void senderCleanup() throws IOException {
277
synchronized (stateLock) {
278
senderThread = 0;
279
if (state == ChannelState.KILLPENDING)
280
kill();
281
}
282
}
283
284
@Override
285
protected void implConfigureBlocking(boolean block) throws IOException {
286
IOUtil.configureBlocking(fd, block);
287
}
288
289
@Override
290
public void implCloseSelectableChannel() throws IOException {
291
synchronized (stateLock) {
292
SctpNet.preClose(fdVal);
293
294
if (receiverThread != 0)
295
NativeThread.signal(receiverThread);
296
297
if (senderThread != 0)
298
NativeThread.signal(senderThread);
299
300
if (!isRegistered())
301
kill();
302
}
303
}
304
305
@Override
306
public FileDescriptor getFD() {
307
return fd;
308
}
309
310
@Override
311
public int getFDVal() {
312
return fdVal;
313
}
314
315
/**
316
* Translates native poll revent ops into a ready operation ops
317
*/
318
private boolean translateReadyOps(int ops, int initialOps,
319
SelectionKeyImpl sk) {
320
int intOps = sk.nioInterestOps();
321
int oldOps = sk.nioReadyOps();
322
int newOps = initialOps;
323
324
if ((ops & Net.POLLNVAL) != 0) {
325
/* This should only happen if this channel is pre-closed while a
326
* selection operation is in progress
327
* ## Throw an error if this channel has not been pre-closed */
328
return false;
329
}
330
331
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
332
newOps = intOps;
333
sk.nioReadyOps(newOps);
334
return (newOps & ~oldOps) != 0;
335
}
336
337
if (((ops & Net.POLLIN) != 0) &&
338
((intOps & SelectionKey.OP_READ) != 0))
339
newOps |= SelectionKey.OP_READ;
340
341
if (((ops & Net.POLLOUT) != 0) &&
342
((intOps & SelectionKey.OP_WRITE) != 0))
343
newOps |= SelectionKey.OP_WRITE;
344
345
sk.nioReadyOps(newOps);
346
return (newOps & ~oldOps) != 0;
347
}
348
349
@Override
350
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
351
return translateReadyOps(ops, sk.nioReadyOps(), sk);
352
}
353
354
@Override
355
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
356
return translateReadyOps(ops, 0, sk);
357
}
358
359
@Override
360
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
361
int newOps = 0;
362
if ((ops & SelectionKey.OP_READ) != 0)
363
newOps |= Net.POLLIN;
364
if ((ops & SelectionKey.OP_WRITE) != 0)
365
newOps |= Net.POLLOUT;
366
sk.selector.putEventOps(sk, newOps);
367
}
368
369
@Override
370
public void kill() throws IOException {
371
synchronized (stateLock) {
372
if (state == ChannelState.KILLED)
373
return;
374
if (state == ChannelState.UNINITIALIZED) {
375
state = ChannelState.KILLED;
376
return;
377
}
378
assert !isOpen() && !isRegistered();
379
380
/* Postpone the kill if there is a thread sending or receiving. */
381
if (receiverThread == 0 && senderThread == 0) {
382
SctpNet.close(fdVal);
383
state = ChannelState.KILLED;
384
} else {
385
state = ChannelState.KILLPENDING;
386
}
387
}
388
}
389
390
@Override
391
public <T> SctpMultiChannel setOption(SctpSocketOption<T> name,
392
T value,
393
Association association)
394
throws IOException {
395
if (name == null)
396
throw new NullPointerException();
397
if (!(supportedOptions().contains(name)))
398
throw new UnsupportedOperationException("'" + name + "' not supported");
399
400
synchronized (stateLock) {
401
if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
402
name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
403
checkAssociation(association);
404
}
405
if (!isOpen())
406
throw new ClosedChannelException();
407
408
int assocId = association == null ? 0 : association.associationID();
409
SctpNet.setSocketOption(fdVal, name, value, assocId);
410
}
411
return this;
412
}
413
414
@Override
415
@SuppressWarnings("unchecked")
416
public <T> T getOption(SctpSocketOption<T> name, Association association)
417
throws IOException {
418
if (name == null)
419
throw new NullPointerException();
420
if (!supportedOptions().contains(name))
421
throw new UnsupportedOperationException("'" + name + "' not supported");
422
423
synchronized (stateLock) {
424
if (association != null && (name.equals(SCTP_PRIMARY_ADDR) ||
425
name.equals(SCTP_SET_PEER_PRIMARY_ADDR))) {
426
checkAssociation(association);
427
}
428
if (!isOpen())
429
throw new ClosedChannelException();
430
431
int assocId = association == null ? 0 : association.associationID();
432
return (T)SctpNet.getSocketOption(fdVal, name, assocId);
433
}
434
}
435
436
private static class DefaultOptionsHolder {
437
static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();
438
439
private static Set<SctpSocketOption<?>> defaultOptions() {
440
HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);
441
set.add(SCTP_DISABLE_FRAGMENTS);
442
set.add(SCTP_EXPLICIT_COMPLETE);
443
set.add(SCTP_FRAGMENT_INTERLEAVE);
444
set.add(SCTP_INIT_MAXSTREAMS);
445
set.add(SCTP_NODELAY);
446
set.add(SCTP_PRIMARY_ADDR);
447
set.add(SCTP_SET_PEER_PRIMARY_ADDR);
448
set.add(SO_SNDBUF);
449
set.add(SO_RCVBUF);
450
set.add(SO_LINGER);
451
return Collections.unmodifiableSet(set);
452
}
453
}
454
455
@Override
456
public final Set<SctpSocketOption<?>> supportedOptions() {
457
return DefaultOptionsHolder.defaultOptions;
458
}
459
460
@Override
461
public <T> MessageInfo receive(ByteBuffer buffer,
462
T attachment,
463
NotificationHandler<T> handler)
464
throws IOException {
465
if (buffer == null)
466
throw new IllegalArgumentException("buffer cannot be null");
467
468
if (buffer.isReadOnly())
469
throw new IllegalArgumentException("Read-only buffer");
470
471
if (receiveInvoked.get())
472
throw new IllegalReceiveException(
473
"cannot invoke receive from handler");
474
receiveInvoked.set(Boolean.TRUE);
475
476
try {
477
ResultContainer resultContainer = new ResultContainer();
478
do {
479
resultContainer.clear();
480
synchronized (receiveLock) {
481
ensureOpen();
482
if (!isBound())
483
throw new NotYetBoundException();
484
485
int n = 0;
486
try {
487
begin();
488
489
synchronized (stateLock) {
490
if(!isOpen())
491
return null;
492
receiverThread = NativeThread.current();
493
}
494
495
do {
496
n = receive(fdVal, buffer, resultContainer);
497
} while ((n == IOStatus.INTERRUPTED) && isOpen());
498
499
} finally {
500
receiverCleanup();
501
end((n > 0) || (n == IOStatus.UNAVAILABLE));
502
assert IOStatus.check(n);
503
}
504
505
if (!resultContainer.isNotification()) {
506
/* message or nothing */
507
if (resultContainer.hasSomething()) {
508
/* Set the association before returning */
509
MessageInfoImpl info =
510
resultContainer.getMessageInfo();
511
info.setAssociation(lookupAssociation(info.
512
associationID()));
513
SecurityManager sm = System.getSecurityManager();
514
if (sm != null) {
515
InetSocketAddress isa = (InetSocketAddress)info.address();
516
if (!addressMap.containsKey(isa)) {
517
/* must be a new association */
518
try {
519
sm.checkAccept(isa.getAddress().getHostAddress(),
520
isa.getPort());
521
} catch (SecurityException se) {
522
buffer.clear();
523
throw se;
524
}
525
}
526
}
527
528
assert info.association() != null;
529
return info;
530
} else {
531
/* Non-blocking may return null if nothing available*/
532
return null;
533
}
534
} else { /* notification */
535
synchronized (stateLock) {
536
handleNotificationInternal(
537
resultContainer);
538
}
539
}
540
} /* receiveLock */
541
} while (handler == null ? true :
542
(invokeNotificationHandler(resultContainer, handler, attachment)
543
== HandlerResult.CONTINUE));
544
} finally {
545
receiveInvoked.set(Boolean.FALSE);
546
}
547
548
return null;
549
}
550
551
private int receive(int fd,
552
ByteBuffer dst,
553
ResultContainer resultContainer)
554
throws IOException {
555
int pos = dst.position();
556
int lim = dst.limit();
557
assert (pos <= lim);
558
int rem = (pos <= lim ? lim - pos : 0);
559
if (dst instanceof DirectBuffer && rem > 0)
560
return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos);
561
562
/* Substitute a native buffer. */
563
int newSize = Math.max(rem, 1);
564
ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
565
try {
566
int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0);
567
bb.flip();
568
if (n > 0 && rem > 0)
569
dst.put(bb);
570
return n;
571
} finally {
572
Util.releaseTemporaryDirectBuffer(bb);
573
}
574
}
575
576
private int receiveIntoNativeBuffer(int fd,
577
ResultContainer resultContainer,
578
ByteBuffer bb,
579
int rem,
580
int pos)
581
throws IOException {
582
int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem);
583
if (n > 0)
584
bb.position(pos + n);
585
return n;
586
}
587
588
private InternalNotificationHandler internalNotificationHandler =
589
new InternalNotificationHandler();
590
591
private void handleNotificationInternal(ResultContainer resultContainer)
592
{
593
invokeNotificationHandler(resultContainer,
594
internalNotificationHandler, null);
595
}
596
597
private class InternalNotificationHandler
598
extends AbstractNotificationHandler<Object>
599
{
600
@Override
601
public HandlerResult handleNotification(
602
AssociationChangeNotification not, Object unused) {
603
AssociationChange sac = (AssociationChange) not;
604
605
/* Update map to reflect change in association */
606
switch (not.event()) {
607
case COMM_UP :
608
Association newAssociation = new AssociationImpl
609
(sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());
610
addAssociation(newAssociation);
611
break;
612
case SHUTDOWN :
613
case COMM_LOST :
614
//case RESTART: ???
615
/* mark association for removal after user handler invoked*/
616
associationToRemove.set(lookupAssociation(sac.assocId()));
617
}
618
return HandlerResult.CONTINUE;
619
}
620
}
621
622
private <T> HandlerResult invokeNotificationHandler(
623
ResultContainer resultContainer,
624
NotificationHandler<T> handler,
625
T attachment) {
626
HandlerResult result;
627
SctpNotification notification = resultContainer.notification();
628
notification.setAssociation(lookupAssociation(notification.assocId()));
629
630
if (!(handler instanceof AbstractNotificationHandler)) {
631
result = handler.handleNotification(notification, attachment);
632
} else { /* AbstractNotificationHandler */
633
AbstractNotificationHandler<T> absHandler =
634
(AbstractNotificationHandler<T>)handler;
635
switch(resultContainer.type()) {
636
case ASSOCIATION_CHANGED :
637
result = absHandler.handleNotification(
638
resultContainer.getAssociationChanged(), attachment);
639
break;
640
case PEER_ADDRESS_CHANGED :
641
result = absHandler.handleNotification(
642
resultContainer.getPeerAddressChanged(), attachment);
643
break;
644
case SEND_FAILED :
645
result = absHandler.handleNotification(
646
resultContainer.getSendFailed(), attachment);
647
break;
648
case SHUTDOWN :
649
result = absHandler.handleNotification(
650
resultContainer.getShutdown(), attachment);
651
break;
652
default :
653
/* implementation specific handlers */
654
result = absHandler.handleNotification(
655
resultContainer.notification(), attachment);
656
}
657
}
658
659
if (!(handler instanceof InternalNotificationHandler)) {
660
/* Only remove associations after user handler
661
* has finished with them */
662
Association assoc = associationToRemove.get();
663
if (assoc != null) {
664
removeAssociation(assoc);
665
associationToRemove.set(null);
666
}
667
668
}
669
670
return result;
671
}
672
673
private Association lookupAssociation(int assocId) {
674
/* Lookup the association in our internal map */
675
synchronized (stateLock) {
676
Set<Association> assocs = associationMap.keySet();
677
for (Association a : assocs) {
678
if (a.associationID() == assocId) {
679
return a;
680
}
681
}
682
}
683
return null;
684
}
685
686
private void addAssociation(Association association) {
687
synchronized (stateLock) {
688
int assocId = association.associationID();
689
Set<SocketAddress> addresses = null;
690
691
try {
692
addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
693
} catch (IOException unused) {
694
/* OK, determining connected addresses may not be possible
695
* shutdown, connection lost, etc */
696
}
697
698
associationMap.put(association, addresses);
699
if (addresses != null) {
700
for (SocketAddress addr : addresses)
701
addressMap.put(addr, association);
702
}
703
}
704
}
705
706
private void removeAssociation(Association association) {
707
synchronized (stateLock) {
708
int assocId = association.associationID();
709
Set<SocketAddress> addresses = null;
710
711
try {
712
addresses = SctpNet.getRemoteAddresses(fdVal, assocId);
713
} catch (IOException unused) {
714
/* OK, determining connected addresses may not be possible
715
* shutdown, connection lost, etc */
716
}
717
718
Set<Association> assocs = associationMap.keySet();
719
for (Association a : assocs) {
720
if (a.associationID() == assocId) {
721
associationMap.remove(a);
722
break;
723
}
724
}
725
if (addresses != null) {
726
for (SocketAddress addr : addresses)
727
addressMap.remove(addr);
728
} else {
729
/* We cannot determine the connected addresses */
730
Set<java.util.Map.Entry<SocketAddress, Association>> addrAssocs =
731
addressMap.entrySet();
732
Iterator<Entry<SocketAddress, Association>> iterator = addrAssocs.iterator();
733
while (iterator.hasNext()) {
734
Entry<SocketAddress, Association> entry = iterator.next();
735
if (entry.getValue().equals(association)) {
736
iterator.remove();
737
}
738
}
739
}
740
}
741
}
742
743
/**
744
* @throws IllegalArgumentException
745
* If the given association is not controlled by this channel
746
*
747
* @return {@code true} if, and only if, the given association is one
748
* of the current associations controlled by this channel
749
*/
750
private boolean checkAssociation(Association messageAssoc) {
751
synchronized (stateLock) {
752
for (Association association : associationMap.keySet()) {
753
if (messageAssoc.equals(association)) {
754
return true;
755
}
756
}
757
}
758
throw new IllegalArgumentException(
759
"Given Association is not controlled by this channel");
760
}
761
762
private void checkStreamNumber(Association assoc, int streamNumber) {
763
synchronized (stateLock) {
764
if (streamNumber < 0 || streamNumber >= assoc.maxOutboundStreams())
765
throw new InvalidStreamException();
766
}
767
}
768
769
/* TODO: Add support for ttl and isComplete to both 121 12M
770
* SCTP_EOR not yet supported on reference platforms
771
* TTL support limited...
772
*/
773
@Override
774
public int send(ByteBuffer buffer, MessageInfo messageInfo)
775
throws IOException {
776
if (buffer == null)
777
throw new IllegalArgumentException("buffer cannot be null");
778
779
if (messageInfo == null)
780
throw new IllegalArgumentException("messageInfo cannot be null");
781
782
synchronized (sendLock) {
783
ensureOpen();
784
785
if (!isBound())
786
bind(null, 0);
787
788
int n = 0;
789
try {
790
int assocId = -1;
791
SocketAddress address = null;
792
begin();
793
794
synchronized (stateLock) {
795
if(!isOpen())
796
return 0;
797
senderThread = NativeThread.current();
798
799
/* Determine what address or association to send to */
800
Association assoc = messageInfo.association();
801
InetSocketAddress addr = (InetSocketAddress)messageInfo.address();
802
if (assoc != null) {
803
checkAssociation(assoc);
804
checkStreamNumber(assoc, messageInfo.streamNumber());
805
assocId = assoc.associationID();
806
/* have we also got a preferred address */
807
if (addr != null) {
808
if (!assoc.equals(addressMap.get(addr)))
809
throw new IllegalArgumentException("given preferred address is not part of this association");
810
address = addr;
811
}
812
} else if (addr != null) {
813
address = addr;
814
Association association = addressMap.get(addr);
815
if (association != null) {
816
checkStreamNumber(association, messageInfo.streamNumber());
817
assocId = association.associationID();
818
819
} else { /* must be new association */
820
SecurityManager sm = System.getSecurityManager();
821
if (sm != null)
822
sm.checkConnect(addr.getAddress().getHostAddress(),
823
addr.getPort());
824
}
825
} else {
826
throw new AssertionError(
827
"Both association and address cannot be null");
828
}
829
}
830
831
do {
832
n = send(fdVal, buffer, assocId, address, messageInfo);
833
} while ((n == IOStatus.INTERRUPTED) && isOpen());
834
835
return IOStatus.normalize(n);
836
} finally {
837
senderCleanup();
838
end((n > 0) || (n == IOStatus.UNAVAILABLE));
839
assert IOStatus.check(n);
840
}
841
}
842
}
843
844
private int send(int fd,
845
ByteBuffer src,
846
int assocId,
847
SocketAddress target,
848
MessageInfo messageInfo)
849
throws IOException {
850
int streamNumber = messageInfo.streamNumber();
851
boolean unordered = messageInfo.isUnordered();
852
int ppid = messageInfo.payloadProtocolID();
853
854
if (src instanceof DirectBuffer)
855
return sendFromNativeBuffer(fd, src, target, assocId,
856
streamNumber, unordered, ppid);
857
858
/* Substitute a native buffer */
859
int pos = src.position();
860
int lim = src.limit();
861
assert (pos <= lim && streamNumber >= 0);
862
863
int rem = (pos <= lim ? lim - pos : 0);
864
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
865
try {
866
bb.put(src);
867
bb.flip();
868
/* Do not update src until we see how many bytes were written */
869
src.position(pos);
870
871
int n = sendFromNativeBuffer(fd, bb, target, assocId,
872
streamNumber, unordered, ppid);
873
if (n > 0) {
874
/* now update src */
875
src.position(pos + n);
876
}
877
return n;
878
} finally {
879
Util.releaseTemporaryDirectBuffer(bb);
880
}
881
}
882
883
private int sendFromNativeBuffer(int fd,
884
ByteBuffer bb,
885
SocketAddress target,
886
int assocId,
887
int streamNumber,
888
boolean unordered,
889
int ppid)
890
throws IOException {
891
InetAddress addr = null; // no preferred address
892
int port = 0;
893
if (target != null) {
894
InetSocketAddress isa = Net.checkAddress(target);
895
addr = isa.getAddress();
896
port = isa.getPort();
897
}
898
int pos = bb.position();
899
int lim = bb.limit();
900
assert (pos <= lim);
901
int rem = (pos <= lim ? lim - pos : 0);
902
903
int written = send0(fd, ((DirectBuffer)bb).address() + pos, rem, addr,
904
port, assocId, streamNumber, unordered, ppid);
905
if (written > 0)
906
bb.position(pos + written);
907
return written;
908
}
909
910
@Override
911
public SctpMultiChannel shutdown(Association association)
912
throws IOException {
913
synchronized (stateLock) {
914
checkAssociation(association);
915
if (!isOpen())
916
throw new ClosedChannelException();
917
918
SctpNet.shutdown(fdVal, association.associationID());
919
}
920
return this;
921
}
922
923
@Override
924
public Set<SocketAddress> getAllLocalAddresses()
925
throws IOException {
926
synchronized (stateLock) {
927
if (!isOpen())
928
throw new ClosedChannelException();
929
if (!isBound())
930
return Collections.emptySet();
931
932
return SctpNet.getLocalAddresses(fdVal);
933
}
934
}
935
936
@Override
937
public Set<SocketAddress> getRemoteAddresses(Association association)
938
throws IOException {
939
synchronized (stateLock) {
940
checkAssociation(association);
941
if (!isOpen())
942
throw new ClosedChannelException();
943
944
try {
945
return SctpNet.getRemoteAddresses(fdVal, association.associationID());
946
} catch (SocketException se) {
947
/* a valid association should always have remote addresses */
948
Set<SocketAddress> addrs = associationMap.get(association);
949
return addrs != null ? addrs : Collections.<SocketAddress>emptySet();
950
}
951
}
952
}
953
954
@Override
955
public SctpChannel branch(Association association)
956
throws IOException {
957
synchronized (stateLock) {
958
checkAssociation(association);
959
if (!isOpen())
960
throw new ClosedChannelException();
961
962
FileDescriptor bFd = SctpNet.branch(fdVal,
963
association.associationID());
964
/* successfully branched, we can now remove it from assoc list */
965
removeAssociation(association);
966
967
return new SctpChannelImpl(provider(), bFd, association);
968
}
969
}
970
971
/* Use common native implementation shared between
972
* one-to-one and one-to-many */
973
private static int receive0(int fd,
974
ResultContainer resultContainer,
975
long address,
976
int length)
977
throws IOException{
978
return SctpChannelImpl.receive0(fd, resultContainer, address,
979
length, false /*peek */);
980
}
981
982
private static int send0(int fd,
983
long address,
984
int length,
985
InetAddress addr,
986
int port,
987
int assocId,
988
int streamNumber,
989
boolean unordered,
990
int ppid)
991
throws IOException {
992
return SctpChannelImpl.send0(fd, address, length, addr, port, assocId,
993
streamNumber, unordered, ppid);
994
}
995
996
static {
997
IOUtil.load(); /* loads nio & net native libraries */
998
java.security.AccessController.doPrivileged(
999
new java.security.PrivilegedAction<Void>() {
1000
public Void run() {
1001
System.loadLibrary("sctp");
1002
return null;
1003
}
1004
});
1005
}
1006
}
1007
1008