Path: blob/aarch64-shenandoah-jdk8u272-b10/jdk/src/solaris/classes/sun/nio/ch/sctp/SctpChannelImpl.java
32301 views
/*1* Copyright (c) 2009, 2013, Oracle and/or its affiliates. All rights reserved.2* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.3*4* This code is free software; you can redistribute it and/or modify it5* under the terms of the GNU General Public License version 2 only, as6* published by the Free Software Foundation. Oracle designates this7* particular file as subject to the "Classpath" exception as provided8* by Oracle in the LICENSE file that accompanied this code.9*10* This code is distributed in the hope that it will be useful, but WITHOUT11* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or12* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License13* version 2 for more details (a copy is included in the LICENSE file that14* accompanied this code).15*16* You should have received a copy of the GNU General Public License version17* 2 along with this work; if not, write to the Free Software Foundation,18* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.19*20* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA21* or visit www.oracle.com if you need additional information or have any22* questions.23*/24package sun.nio.ch.sctp;2526import java.net.InetAddress;27import java.net.SocketAddress;28import java.net.SocketException;29import java.net.InetSocketAddress;30import java.io.FileDescriptor;31import java.io.IOException;32import java.util.Collections;33import java.util.Set;34import java.util.HashSet;35import java.nio.ByteBuffer;36import java.nio.channels.SelectionKey;37import java.nio.channels.ClosedChannelException;38import java.nio.channels.ConnectionPendingException;39import java.nio.channels.NoConnectionPendingException;40import java.nio.channels.AlreadyConnectedException;41import java.nio.channels.NotYetBoundException;42import java.nio.channels.NotYetConnectedException;43import java.nio.channels.spi.SelectorProvider;44import com.sun.nio.sctp.AbstractNotificationHandler;45import com.sun.nio.sctp.Association;46import com.sun.nio.sctp.AssociationChangeNotification;47import com.sun.nio.sctp.HandlerResult;48import com.sun.nio.sctp.IllegalReceiveException;49import com.sun.nio.sctp.InvalidStreamException;50import com.sun.nio.sctp.IllegalUnbindException;51import com.sun.nio.sctp.MessageInfo;52import com.sun.nio.sctp.NotificationHandler;53import com.sun.nio.sctp.SctpChannel;54import com.sun.nio.sctp.SctpSocketOption;55import sun.nio.ch.DirectBuffer;56import sun.nio.ch.IOStatus;57import sun.nio.ch.IOUtil;58import sun.nio.ch.NativeThread;59import sun.nio.ch.Net;60import sun.nio.ch.PollArrayWrapper;61import sun.nio.ch.SelChImpl;62import sun.nio.ch.SelectionKeyImpl;63import sun.nio.ch.Util;64import static com.sun.nio.sctp.SctpStandardSocketOptions.*;65import static sun.nio.ch.sctp.ResultContainer.SEND_FAILED;66import static sun.nio.ch.sctp.ResultContainer.ASSOCIATION_CHANGED;67import static sun.nio.ch.sctp.ResultContainer.PEER_ADDRESS_CHANGED;68import static sun.nio.ch.sctp.ResultContainer.SHUTDOWN;6970/**71* An implementation of an SctpChannel72*/73public class SctpChannelImpl extends SctpChannel74implements SelChImpl75{76private final FileDescriptor fd;7778private final int fdVal;7980/* IDs of native threads doing send and receivess, for signalling */81private volatile long receiverThread = 0;82private volatile long senderThread = 0;8384/* Lock held by current receiving or connecting thread */85private final Object receiveLock = new Object();8687/* Lock held by current sending or connecting thread */88private final Object sendLock = new Object();8990private final ThreadLocal<Boolean> receiveInvoked =91new ThreadLocal<Boolean>() {92@Override protected Boolean initialValue() {93return Boolean.FALSE;94}95};9697/* Lock held by any thread that modifies the state fields declared below98DO NOT invoke a blocking I/O operation while holding this lock! */99private final Object stateLock = new Object();100101private enum ChannelState {102UNINITIALIZED,103UNCONNECTED,104PENDING,105CONNECTED,106KILLPENDING,107KILLED,108}109/* -- The following fields are protected by stateLock -- */110private ChannelState state = ChannelState.UNINITIALIZED;111112/* Binding; Once bound the port will remain constant. */113int port = -1;114private HashSet<InetSocketAddress> localAddresses = new HashSet<InetSocketAddress>();115/* Has the channel been bound to the wildcard address */116private boolean wildcard; /* false */117//private InetSocketAddress remoteAddress = null;118119/* Input/Output open */120private boolean readyToConnect;121122/* Shutdown */123private boolean isShutdown;124125private Association association;126127private Set<SocketAddress> remoteAddresses = Collections.emptySet();128129/* -- End of fields protected by stateLock -- */130131/**132* Constructor for normal connecting sockets133*/134public SctpChannelImpl(SelectorProvider provider) throws IOException {135//TODO: update provider remove public modifier136super(provider);137this.fd = SctpNet.socket(true);138this.fdVal = IOUtil.fdVal(fd);139this.state = ChannelState.UNCONNECTED;140}141142/**143* Constructor for sockets obtained from server sockets144*/145public SctpChannelImpl(SelectorProvider provider, FileDescriptor fd)146throws IOException {147this(provider, fd, null);148}149150/**151* Constructor for sockets obtained from branching152*/153public SctpChannelImpl(SelectorProvider provider,154FileDescriptor fd,155Association association)156throws IOException {157super(provider);158this.fd = fd;159this.fdVal = IOUtil.fdVal(fd);160this.state = ChannelState.CONNECTED;161port = (Net.localAddress(fd)).getPort();162163if (association != null) { /* branched */164this.association = association;165} else { /* obtained from server channel */166/* Receive COMM_UP */167ByteBuffer buf = Util.getTemporaryDirectBuffer(50);168try {169receive(buf, null, null, true);170} finally {171Util.releaseTemporaryDirectBuffer(buf);172}173}174}175176/**177* Binds the channel's socket to a local address.178*/179@Override180public SctpChannel bind(SocketAddress local) throws IOException {181synchronized (receiveLock) {182synchronized (sendLock) {183synchronized (stateLock) {184ensureOpenAndUnconnected();185if (isBound())186SctpNet.throwAlreadyBoundException();187InetSocketAddress isa = (local == null) ?188new InetSocketAddress(0) : Net.checkAddress(local);189SecurityManager sm = System.getSecurityManager();190if (sm != null) {191sm.checkListen(isa.getPort());192}193Net.bind(fd, isa.getAddress(), isa.getPort());194InetSocketAddress boundIsa = Net.localAddress(fd);195port = boundIsa.getPort();196localAddresses.add(isa);197if (isa.getAddress().isAnyLocalAddress())198wildcard = true;199}200}201}202return this;203}204205@Override206public SctpChannel bindAddress(InetAddress address)207throws IOException {208bindUnbindAddress(address, true);209localAddresses.add(new InetSocketAddress(address, port));210return this;211}212213@Override214public SctpChannel unbindAddress(InetAddress address)215throws IOException {216bindUnbindAddress(address, false);217localAddresses.remove(new InetSocketAddress(address, port));218return this;219}220221private SctpChannel bindUnbindAddress(InetAddress address, boolean add)222throws IOException {223if (address == null)224throw new IllegalArgumentException();225226synchronized (receiveLock) {227synchronized (sendLock) {228synchronized (stateLock) {229if (!isOpen())230throw new ClosedChannelException();231if (!isBound())232throw new NotYetBoundException();233if (wildcard)234throw new IllegalStateException(235"Cannot add or remove addresses from a channel that is bound to the wildcard address");236if (address.isAnyLocalAddress())237throw new IllegalArgumentException(238"Cannot add or remove the wildcard address");239if (add) {240for (InetSocketAddress addr : localAddresses) {241if (addr.getAddress().equals(address)) {242SctpNet.throwAlreadyBoundException();243}244}245} else { /*removing */246/* Verify that there is more than one address247* and that address is already bound */248if (localAddresses.size() <= 1)249throw new IllegalUnbindException("Cannot remove address from a channel with only one address bound");250boolean foundAddress = false;251for (InetSocketAddress addr : localAddresses) {252if (addr.getAddress().equals(address)) {253foundAddress = true;254break;255}256}257if (!foundAddress )258throw new IllegalUnbindException("Cannot remove address from a channel that is not bound to that address");259}260261SctpNet.bindx(fdVal, new InetAddress[]{address}, port, add);262263/* Update our internal Set to reflect the addition/removal */264if (add)265localAddresses.add(new InetSocketAddress(address, port));266else {267for (InetSocketAddress addr : localAddresses) {268if (addr.getAddress().equals(address)) {269localAddresses.remove(addr);270break;271}272}273}274}275}276}277return this;278}279280private boolean isBound() {281synchronized (stateLock) {282return port == -1 ? false : true;283}284}285286private boolean isConnected() {287synchronized (stateLock) {288return (state == ChannelState.CONNECTED);289}290}291292private void ensureOpenAndUnconnected() throws IOException {293synchronized (stateLock) {294if (!isOpen())295throw new ClosedChannelException();296if (isConnected())297throw new AlreadyConnectedException();298if (state == ChannelState.PENDING)299throw new ConnectionPendingException();300}301}302303private boolean ensureReceiveOpen() throws ClosedChannelException {304synchronized (stateLock) {305if (!isOpen())306throw new ClosedChannelException();307if (!isConnected())308throw new NotYetConnectedException();309else310return true;311}312}313314private void ensureSendOpen() throws ClosedChannelException {315synchronized (stateLock) {316if (!isOpen())317throw new ClosedChannelException();318if (isShutdown)319throw new ClosedChannelException();320if (!isConnected())321throw new NotYetConnectedException();322}323}324325private void receiverCleanup() throws IOException {326synchronized (stateLock) {327receiverThread = 0;328if (state == ChannelState.KILLPENDING)329kill();330}331}332333private void senderCleanup() throws IOException {334synchronized (stateLock) {335senderThread = 0;336if (state == ChannelState.KILLPENDING)337kill();338}339}340341@Override342public Association association() throws ClosedChannelException {343synchronized (stateLock) {344if (!isOpen())345throw new ClosedChannelException();346if (!isConnected())347return null;348349return association;350}351}352353@Override354public boolean connect(SocketAddress endpoint) throws IOException {355synchronized (receiveLock) {356synchronized (sendLock) {357ensureOpenAndUnconnected();358InetSocketAddress isa = Net.checkAddress(endpoint);359SecurityManager sm = System.getSecurityManager();360if (sm != null)361sm.checkConnect(isa.getAddress().getHostAddress(),362isa.getPort());363synchronized (blockingLock()) {364int n = 0;365try {366try {367begin();368synchronized (stateLock) {369if (!isOpen()) {370return false;371}372receiverThread = NativeThread.current();373}374for (;;) {375InetAddress ia = isa.getAddress();376if (ia.isAnyLocalAddress())377ia = InetAddress.getLocalHost();378n = SctpNet.connect(fdVal, ia, isa.getPort());379if ( (n == IOStatus.INTERRUPTED)380&& isOpen())381continue;382break;383}384} finally {385receiverCleanup();386end((n > 0) || (n == IOStatus.UNAVAILABLE));387assert IOStatus.check(n);388}389} catch (IOException x) {390/* If an exception was thrown, close the channel after391* invoking end() so as to avoid bogus392* AsynchronousCloseExceptions */393close();394throw x;395}396397if (n > 0) {398synchronized (stateLock) {399/* Connection succeeded */400state = ChannelState.CONNECTED;401if (!isBound()) {402InetSocketAddress boundIsa =403Net.localAddress(fd);404port = boundIsa.getPort();405}406407/* Receive COMM_UP */408ByteBuffer buf = Util.getTemporaryDirectBuffer(50);409try {410receive(buf, null, null, true);411} finally {412Util.releaseTemporaryDirectBuffer(buf);413}414415/* cache remote addresses */416try {417remoteAddresses = getRemoteAddresses();418} catch (IOException unused) { /* swallow exception */ }419420return true;421}422} else {423synchronized (stateLock) {424/* If nonblocking and no exception then connection425* pending; disallow another invocation */426if (!isBlocking())427state = ChannelState.PENDING;428else429assert false;430}431}432}433return false;434}435}436}437438@Override439public boolean connect(SocketAddress endpoint,440int maxOutStreams,441int maxInStreams)442throws IOException {443ensureOpenAndUnconnected();444return setOption(SCTP_INIT_MAXSTREAMS, InitMaxStreams.445create(maxInStreams, maxOutStreams)).connect(endpoint);446447}448449@Override450public boolean isConnectionPending() {451synchronized (stateLock) {452return (state == ChannelState.PENDING);453}454}455456@Override457public boolean finishConnect() throws IOException {458synchronized (receiveLock) {459synchronized (sendLock) {460synchronized (stateLock) {461if (!isOpen())462throw new ClosedChannelException();463if (isConnected())464return true;465if (state != ChannelState.PENDING)466throw new NoConnectionPendingException();467}468int n = 0;469try {470try {471begin();472synchronized (blockingLock()) {473synchronized (stateLock) {474if (!isOpen()) {475return false;476}477receiverThread = NativeThread.current();478}479if (!isBlocking()) {480for (;;) {481n = checkConnect(fd, false, readyToConnect);482if ( (n == IOStatus.INTERRUPTED)483&& isOpen())484continue;485break;486}487} else {488for (;;) {489n = checkConnect(fd, true, readyToConnect);490if (n == 0) {491// Loop in case of492// spurious notifications493continue;494}495if ( (n == IOStatus.INTERRUPTED)496&& isOpen())497continue;498break;499}500}501}502} finally {503synchronized (stateLock) {504receiverThread = 0;505if (state == ChannelState.KILLPENDING) {506kill();507/* poll()/getsockopt() does not report508* error (throws exception, with n = 0)509* on Linux platform after dup2 and510* signal-wakeup. Force n to 0 so the511* end() can throw appropriate exception */512n = 0;513}514}515end((n > 0) || (n == IOStatus.UNAVAILABLE));516assert IOStatus.check(n);517}518} catch (IOException x) {519/* If an exception was thrown, close the channel after520* invoking end() so as to avoid bogus521* AsynchronousCloseExceptions */522close();523throw x;524}525526if (n > 0) {527synchronized (stateLock) {528state = ChannelState.CONNECTED;529if (!isBound()) {530InetSocketAddress boundIsa =531Net.localAddress(fd);532port = boundIsa.getPort();533}534535/* Receive COMM_UP */536ByteBuffer buf = Util.getTemporaryDirectBuffer(50);537try {538receive(buf, null, null, true);539} finally {540Util.releaseTemporaryDirectBuffer(buf);541}542543/* cache remote addresses */544try {545remoteAddresses = getRemoteAddresses();546} catch (IOException unused) { /* swallow exception */ }547548return true;549}550}551}552}553return false;554}555556@Override557protected void implConfigureBlocking(boolean block) throws IOException {558IOUtil.configureBlocking(fd, block);559}560561@Override562public void implCloseSelectableChannel() throws IOException {563synchronized (stateLock) {564SctpNet.preClose(fdVal);565566if (receiverThread != 0)567NativeThread.signal(receiverThread);568569if (senderThread != 0)570NativeThread.signal(senderThread);571572if (!isRegistered())573kill();574}575}576577@Override578public FileDescriptor getFD() {579return fd;580}581582@Override583public int getFDVal() {584return fdVal;585}586587/**588* Translates native poll revent ops into a ready operation ops589*/590private boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {591int intOps = sk.nioInterestOps();592int oldOps = sk.nioReadyOps();593int newOps = initialOps;594595if ((ops & Net.POLLNVAL) != 0) {596/* This should only happen if this channel is pre-closed while a597* selection operation is in progress598* ## Throw an error if this channel has not been pre-closed */599return false;600}601602if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {603newOps = intOps;604sk.nioReadyOps(newOps);605/* No need to poll again in checkConnect,606* the error will be detected there */607readyToConnect = true;608return (newOps & ~oldOps) != 0;609}610611if (((ops & Net.POLLIN) != 0) &&612((intOps & SelectionKey.OP_READ) != 0) &&613isConnected())614newOps |= SelectionKey.OP_READ;615616if (((ops & Net.POLLCONN) != 0) &&617((intOps & SelectionKey.OP_CONNECT) != 0) &&618((state == ChannelState.UNCONNECTED) || (state == ChannelState.PENDING))) {619newOps |= SelectionKey.OP_CONNECT;620readyToConnect = true;621}622623if (((ops & Net.POLLOUT) != 0) &&624((intOps & SelectionKey.OP_WRITE) != 0) &&625isConnected())626newOps |= SelectionKey.OP_WRITE;627628sk.nioReadyOps(newOps);629return (newOps & ~oldOps) != 0;630}631632@Override633public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {634return translateReadyOps(ops, sk.nioReadyOps(), sk);635}636637@Override638@SuppressWarnings("all")639public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {640return translateReadyOps(ops, 0, sk);641}642643@Override644public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {645int newOps = 0;646if ((ops & SelectionKey.OP_READ) != 0)647newOps |= Net.POLLIN;648if ((ops & SelectionKey.OP_WRITE) != 0)649newOps |= Net.POLLOUT;650if ((ops & SelectionKey.OP_CONNECT) != 0)651newOps |= Net.POLLCONN;652sk.selector.putEventOps(sk, newOps);653}654655@Override656public void kill() throws IOException {657synchronized (stateLock) {658if (state == ChannelState.KILLED)659return;660if (state == ChannelState.UNINITIALIZED) {661state = ChannelState.KILLED;662return;663}664assert !isOpen() && !isRegistered();665666/* Postpone the kill if there is a waiting reader667* or writer thread. */668if (receiverThread == 0 && senderThread == 0) {669SctpNet.close(fdVal);670state = ChannelState.KILLED;671} else {672state = ChannelState.KILLPENDING;673}674}675}676677@Override678public <T> SctpChannel setOption(SctpSocketOption<T> name, T value)679throws IOException {680if (name == null)681throw new NullPointerException();682if (!supportedOptions().contains(name))683throw new UnsupportedOperationException("'" + name + "' not supported");684685synchronized (stateLock) {686if (!isOpen())687throw new ClosedChannelException();688689SctpNet.setSocketOption(fdVal, name, value, 0 /*oneToOne*/);690}691return this;692}693694@Override695@SuppressWarnings("unchecked")696public <T> T getOption(SctpSocketOption<T> name) throws IOException {697if (name == null)698throw new NullPointerException();699if (!supportedOptions().contains(name))700throw new UnsupportedOperationException("'" + name + "' not supported");701702synchronized (stateLock) {703if (!isOpen())704throw new ClosedChannelException();705706return (T)SctpNet.getSocketOption(fdVal, name, 0 /*oneToOne*/);707}708}709710private static class DefaultOptionsHolder {711static final Set<SctpSocketOption<?>> defaultOptions = defaultOptions();712713private static Set<SctpSocketOption<?>> defaultOptions() {714HashSet<SctpSocketOption<?>> set = new HashSet<SctpSocketOption<?>>(10);715set.add(SCTP_DISABLE_FRAGMENTS);716set.add(SCTP_EXPLICIT_COMPLETE);717set.add(SCTP_FRAGMENT_INTERLEAVE);718set.add(SCTP_INIT_MAXSTREAMS);719set.add(SCTP_NODELAY);720set.add(SCTP_PRIMARY_ADDR);721set.add(SCTP_SET_PEER_PRIMARY_ADDR);722set.add(SO_SNDBUF);723set.add(SO_RCVBUF);724set.add(SO_LINGER);725return Collections.unmodifiableSet(set);726}727}728729@Override730public final Set<SctpSocketOption<?>> supportedOptions() {731return DefaultOptionsHolder.defaultOptions;732}733734@Override735public <T> MessageInfo receive(ByteBuffer buffer,736T attachment,737NotificationHandler<T> handler)738throws IOException {739return receive(buffer, attachment, handler, false);740}741742private <T> MessageInfo receive(ByteBuffer buffer,743T attachment,744NotificationHandler<T> handler,745boolean fromConnect)746throws IOException {747if (buffer == null)748throw new IllegalArgumentException("buffer cannot be null");749750if (buffer.isReadOnly())751throw new IllegalArgumentException("Read-only buffer");752753if (receiveInvoked.get())754throw new IllegalReceiveException(755"cannot invoke receive from handler");756receiveInvoked.set(Boolean.TRUE);757758try {759ResultContainer resultContainer = new ResultContainer();760do {761resultContainer.clear();762synchronized (receiveLock) {763if (!ensureReceiveOpen())764return null;765766int n = 0;767try {768begin();769770synchronized (stateLock) {771if(!isOpen())772return null;773receiverThread = NativeThread.current();774}775776do {777n = receive(fdVal, buffer, resultContainer, fromConnect);778} while ((n == IOStatus.INTERRUPTED) && isOpen());779} finally {780receiverCleanup();781end((n > 0) || (n == IOStatus.UNAVAILABLE));782assert IOStatus.check(n);783}784785if (!resultContainer.isNotification()) {786/* message or nothing */787if (resultContainer.hasSomething()) {788/* Set the association before returning */789MessageInfoImpl info =790resultContainer.getMessageInfo();791synchronized (stateLock) {792assert association != null;793info.setAssociation(association);794}795return info;796} else797/* Non-blocking may return null if nothing available*/798return null;799} else { /* notification */800synchronized (stateLock) {801handleNotificationInternal(802resultContainer);803}804}805806if (fromConnect) {807/* If we reach here, then it was connect that invoked808* receive and received the COMM_UP. We have already809* handled the COMM_UP with the internal notification810* handler. Simply return. */811return null;812}813} /* receiveLock */814} while (handler == null ? true :815(invokeNotificationHandler(resultContainer, handler, attachment)816== HandlerResult.CONTINUE));817818return null;819} finally {820receiveInvoked.set(Boolean.FALSE);821}822}823824private int receive(int fd,825ByteBuffer dst,826ResultContainer resultContainer,827boolean peek)828throws IOException {829int pos = dst.position();830int lim = dst.limit();831assert (pos <= lim);832int rem = (pos <= lim ? lim - pos : 0);833if (dst instanceof DirectBuffer && rem > 0)834return receiveIntoNativeBuffer(fd, resultContainer, dst, rem, pos, peek);835836/* Substitute a native buffer */837int newSize = Math.max(rem, 1);838ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);839try {840int n = receiveIntoNativeBuffer(fd, resultContainer, bb, newSize, 0, peek);841bb.flip();842if (n > 0 && rem > 0)843dst.put(bb);844return n;845} finally {846Util.releaseTemporaryDirectBuffer(bb);847}848}849850private int receiveIntoNativeBuffer(int fd,851ResultContainer resultContainer,852ByteBuffer bb,853int rem,854int pos,855boolean peek)856throws IOException857{858int n = receive0(fd, resultContainer, ((DirectBuffer)bb).address() + pos, rem, peek);859860if (n > 0)861bb.position(pos + n);862return n;863}864865private InternalNotificationHandler internalNotificationHandler =866new InternalNotificationHandler();867868private void handleNotificationInternal(ResultContainer resultContainer)869{870invokeNotificationHandler(resultContainer,871internalNotificationHandler, null);872}873874private class InternalNotificationHandler875extends AbstractNotificationHandler<Object>876{877@Override878public HandlerResult handleNotification(879AssociationChangeNotification not, Object unused) {880if (not.event().equals(881AssociationChangeNotification.AssocChangeEvent.COMM_UP) &&882association == null) {883AssociationChange sac = (AssociationChange) not;884association = new AssociationImpl885(sac.assocId(), sac.maxInStreams(), sac.maxOutStreams());886}887return HandlerResult.CONTINUE;888}889}890891private <T> HandlerResult invokeNotificationHandler892(ResultContainer resultContainer,893NotificationHandler<T> handler,894T attachment) {895SctpNotification notification = resultContainer.notification();896synchronized (stateLock) {897notification.setAssociation(association);898}899900if (!(handler instanceof AbstractNotificationHandler)) {901return handler.handleNotification(notification, attachment);902}903904/* AbstractNotificationHandler */905AbstractNotificationHandler<T> absHandler =906(AbstractNotificationHandler<T>)handler;907switch(resultContainer.type()) {908case ASSOCIATION_CHANGED :909return absHandler.handleNotification(910resultContainer.getAssociationChanged(), attachment);911case PEER_ADDRESS_CHANGED :912return absHandler.handleNotification(913resultContainer.getPeerAddressChanged(), attachment);914case SEND_FAILED :915return absHandler.handleNotification(916resultContainer.getSendFailed(), attachment);917case SHUTDOWN :918return absHandler.handleNotification(919resultContainer.getShutdown(), attachment);920default :921/* implementation specific handlers */922return absHandler.handleNotification(923resultContainer.notification(), attachment);924}925}926927private void checkAssociation(Association sendAssociation) {928synchronized (stateLock) {929if (sendAssociation != null && !sendAssociation.equals(association)) {930throw new IllegalArgumentException(931"Cannot send to another association");932}933}934}935936private void checkStreamNumber(int streamNumber) {937synchronized (stateLock) {938if (association != null) {939if (streamNumber < 0 ||940streamNumber >= association.maxOutboundStreams())941throw new InvalidStreamException();942}943}944}945946/* TODO: Add support for ttl and isComplete to both 121 12M947* SCTP_EOR not yet supported on reference platforms948* TTL support limited...949*/950@Override951public int send(ByteBuffer buffer, MessageInfo messageInfo)952throws IOException {953if (buffer == null)954throw new IllegalArgumentException("buffer cannot be null");955956if (messageInfo == null)957throw new IllegalArgumentException("messageInfo cannot be null");958959checkAssociation(messageInfo.association());960checkStreamNumber(messageInfo.streamNumber());961962synchronized (sendLock) {963ensureSendOpen();964965int n = 0;966try {967begin();968969synchronized (stateLock) {970if(!isOpen())971return 0;972senderThread = NativeThread.current();973}974975do {976n = send(fdVal, buffer, messageInfo);977} while ((n == IOStatus.INTERRUPTED) && isOpen());978979return IOStatus.normalize(n);980} finally {981senderCleanup();982end((n > 0) || (n == IOStatus.UNAVAILABLE));983assert IOStatus.check(n);984}985}986}987988private int send(int fd, ByteBuffer src, MessageInfo messageInfo)989throws IOException {990int streamNumber = messageInfo.streamNumber();991SocketAddress target = messageInfo.address();992boolean unordered = messageInfo.isUnordered();993int ppid = messageInfo.payloadProtocolID();994995if (src instanceof DirectBuffer)996return sendFromNativeBuffer(fd, src, target, streamNumber,997unordered, ppid);998999/* Substitute a native buffer */1000int pos = src.position();1001int lim = src.limit();1002assert (pos <= lim && streamNumber >= 0);10031004int rem = (pos <= lim ? lim - pos : 0);1005ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);1006try {1007bb.put(src);1008bb.flip();1009/* Do not update src until we see how many bytes were written */1010src.position(pos);10111012int n = sendFromNativeBuffer(fd, bb, target, streamNumber,1013unordered, ppid);1014if (n > 0) {1015/* now update src */1016src.position(pos + n);1017}1018return n;1019} finally {1020Util.releaseTemporaryDirectBuffer(bb);1021}1022}10231024private int sendFromNativeBuffer(int fd,1025ByteBuffer bb,1026SocketAddress target,1027int streamNumber,1028boolean unordered,1029int ppid)1030throws IOException {1031InetAddress addr = null; // no preferred address1032int port = 0;1033if (target != null) {1034InetSocketAddress isa = Net.checkAddress(target);1035addr = isa.getAddress();1036port = isa.getPort();1037}10381039int pos = bb.position();1040int lim = bb.limit();1041assert (pos <= lim);1042int rem = (pos <= lim ? lim - pos : 0);10431044int written = send0(fd, ((DirectBuffer)bb).address() + pos, rem, addr,1045port, -1 /*121*/, streamNumber, unordered, ppid);1046if (written > 0)1047bb.position(pos + written);1048return written;1049}10501051@Override1052public SctpChannel shutdown() throws IOException {1053synchronized(stateLock) {1054if (isShutdown)1055return this;10561057ensureSendOpen();1058SctpNet.shutdown(fdVal, -1);1059if (senderThread != 0)1060NativeThread.signal(senderThread);1061isShutdown = true;1062}1063return this;1064}10651066@Override1067public Set<SocketAddress> getAllLocalAddresses()1068throws IOException {1069synchronized (stateLock) {1070if (!isOpen())1071throw new ClosedChannelException();1072if (!isBound())1073return Collections.emptySet();10741075return SctpNet.getLocalAddresses(fdVal);1076}1077}10781079@Override1080public Set<SocketAddress> getRemoteAddresses()1081throws IOException {1082synchronized (stateLock) {1083if (!isOpen())1084throw new ClosedChannelException();1085if (!isConnected() || isShutdown)1086return Collections.emptySet();10871088try {1089return SctpNet.getRemoteAddresses(fdVal, 0/*unused*/);1090} catch (SocketException unused) {1091/* an open connected channel should always have remote addresses */1092return remoteAddresses;1093}1094}1095}10961097/* Native */1098private static native void initIDs();10991100static native int receive0(int fd, ResultContainer resultContainer,1101long address, int length, boolean peek) throws IOException;11021103static native int send0(int fd, long address, int length,1104InetAddress addr, int port, int assocId, int streamNumber,1105boolean unordered, int ppid) throws IOException;11061107private static native int checkConnect(FileDescriptor fd, boolean block,1108boolean ready) throws IOException;11091110static {1111IOUtil.load(); /* loads nio & net native libraries */1112java.security.AccessController.doPrivileged(1113new java.security.PrivilegedAction<Void>() {1114public Void run() {1115System.loadLibrary("sctp");1116return null;1117}1118});1119initIDs();1120}1121}112211231124