Path: blob/master/runtime/compiler/net/ServerStream.hpp
6000 views
/*******************************************************************************1* Copyright (c) 2018, 2021 IBM Corp. and others2*3* This program and the accompanying materials are made available under4* the terms of the Eclipse Public License 2.0 which accompanies this5* distribution and is available at https://www.eclipse.org/legal/epl-2.0/6* or the Apache License, Version 2.0 which accompanies this distribution and7* is available at https://www.apache.org/licenses/LICENSE-2.0.8*9* This Source Code may also be made available under the following10* Secondary Licenses when the conditions for such availability set11* forth in the Eclipse Public License, v. 2.0 are satisfied: GNU12* General Public License, version 2 with the GNU Classpath13* Exception [1] and GNU General Public License, version 2 with the14* OpenJDK Assembly Exception [2].15*16* [1] https://www.gnu.org/software/classpath/license.html17* [2] http://openjdk.java.net/legal/assembly-exception.html18*19* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 OR LicenseRef-GPL-2.0 WITH Assembly-exception20*******************************************************************************/2122#ifndef SERVER_STREAM_H23#define SERVER_STREAM_H2425#include "net/RawTypeConvert.hpp"26#include "net/CommunicationStream.hpp"27#include "env/VerboseLog.hpp"28#include "control/CompilationThread.hpp" // for TR::compInfoPT->getCompThreadId()29#include "control/Options.hpp"30#include "runtime/JITClientSession.hpp"3132class SSLOutputStream;33class SSLInputStream;3435namespace JITServer36{3738/**39@class ServerStream40@brief Implementation of the communication API for a server receiving JIT compilations requests4142Typical usage:431) Define a compilation handler by extending JITServer::BaseCompileDispatcher and providing44an implementation for abstract method "compile(JITServer::ServerStream *stream)"45This method will be called when a new connection request has been received at the server.462) Create a dedicated thread that will listen for incoming connection requests473) In this thread, instantiate a CompileDispatcher from a class defined in step (1)48E.g.: J9CompileDispatcher handler(jitConfig);494) Call "TR_Listener::serveRemoteCompilationRequests(&handler);"50which will wait for a connection, accept the connection, create a ServerStream and call51handler->compile(stream) for further processing, e.g. add the stream52to a compilation queue535) On a different thread, extract the compilation entry from the queue (which contains54the stream) and wait for a compilation request:55auto req = stream->readCompileRequest<....>();566) At this point the server could query the client with:57stream->write(MessageType type, T... args);58auto recv = stream->read<....>();597) When compilation is completed successfully, the server responds with finishCompilation(T... args).60When compilation is aborted, the sever responds with writeError(uint32_t statusCode).61*/62class ServerStream : public CommunicationStream63{64public:65/**66@brief Constructor of ServerStream class6768@param connfd socket descriptor for the communication channel69@param ssl BIO for the SSL enabled stream70@param timeout timeout value (ms) to be set for connfd71*/72explicit ServerStream(int connfd, BIO *ssl);73virtual ~ServerStream()74{75_numConnectionsClosed++;76_pClientSessionData = NULL;77}7879/**80@brief Send a message to the client8182@param [in] type Message type to be sent83@param [in] args Variable number of additional parameters to be sent84*/85template <typename ...Args>86void write(MessageType type, Args... args)87{88if (isReadingClassUnload() &&89isClassUnloadingAttempted() &&90TR::compInfoPT->compilationCanBeInterrupted() &&91(MessageType::compilationFailure != type) &&92(MessageType::compilationCode != type))93{94if (TR::Options::getVerboseOption(TR_VerboseJITServer))95TR_VerboseLog::writeLineLocked(TR_Vlog_JITServer, "compThreadID=%d MessageType[%u] %s: throw TR::CompilationInterrupted",96TR::compInfoPT->getCompThreadId(), type, messageNames[type]);9798throw TR::CompilationInterrupted();99}100101_sMsg.setType(type);102setArgsRaw<Args...>(_sMsg, args...);103writeMessage(_sMsg);104}105106/**107@brief Read a message from the client108109The read operation is blocking, subject to a timeout.110If the message received is `compilationInterrupted` then an exception of type StreamInterrupted is thrown.111If the message received is `connectionTerminate` then an exception of type StreamConnectionTerminate is thrown.112If the server detects an incompatibility with the client then a StreamMessageTypeMissmatch113exception is thrown.114Otherwise, the arguments sent by the client are returned as a tuple115116@return Returns a tuple of arguments sent by the client117*/118template <typename ...T>119std::tuple<T...> read()120{121readMessage(_cMsg);122switch (_cMsg.type())123{124case MessageType::compilationInterrupted:125{126// If we are inside an uninterruptible operation on the server,127// but the corresponding operation hasn't yet started on the client,128// ignore the request to interrupt a compilation129if (TR::compInfoPT->compilationCanBeInterrupted())130throw StreamInterrupted();131}132case MessageType::connectionTerminate:133{134throw StreamConnectionTerminate();135}136default:137{138// We are expecting the response type (_cMsg.type()) to be the same as the request type (_sMsg.type())139if (_cMsg.type() != _sMsg.type())140throw StreamMessageTypeMismatch(_sMsg.type(), _cMsg.type());141}142}143return getArgsRaw<T...>(_cMsg);144}145146/**147@brief Function to read the compilation request from a client148149This is the first type of message received after a connection is established.150The number and position of parameters in the template must match the151the one sent by the client. In order to ensure this, the client will embed152version information in the first message it sends after a connection is established.153The server will check whether its version matches the client's version and throw154`StreamVersionIncompatible` if it doesn't.155156Exceptions thrown: StreamConnectionTerminate, StreamClientSessionTerminate, StreamVersionIncompatible, StreamMessageTypeMismatch157158@return Returns a tuple with information sent by the client159*/160template <typename... T>161std::tuple<T...> readCompileRequest()162{163readMessage(_cMsg);164if (_cMsg.fullVersion() != 0 && _cMsg.fullVersion() != getJITServerFullVersion())165{166throw StreamVersionIncompatible(getJITServerFullVersion(), _cMsg.fullVersion());167}168169switch (_cMsg.type())170{171case MessageType::connectionTerminate:172{173throw StreamConnectionTerminate();174}175case MessageType::clientSessionTerminate:176{177uint64_t clientId = std::get<0>(getRecvData<uint64_t>());178throw StreamClientSessionTerminate(clientId);179}180case MessageType::compilationRequest:181{182return getArgsRaw<T...>(_cMsg);183}184default:185{186throw StreamMessageTypeMismatch(MessageType::compilationRequest, _cMsg.type());187}188}189}190191/**192@brief Extract the data from the received message and return it193*/194template <typename... T>195std::tuple<T...> getRecvData()196{197return getArgsRaw<T...>(_cMsg);198}199200/**201@brief Function invoked by server when compilation is completed successfully202203This should be the last message sent by a server as a response to a compilation request.204It includes a variable number of parameters with compilation artifacts (including the compiled body).205*/206template <typename... T>207void finishCompilation(T... args)208{209try210{211write(MessageType::compilationCode, args...);212}213catch (std::exception &e)214{215if (TR::Options::getVerboseOption(TR_VerboseJITServer))216TR_VerboseLog::writeLineLocked(TR_Vlog_JITServer, "Could not finish compilation: %s", e.what());217}218}219220/**221@brief Function invoked by server when compilation is aborted222*/223void writeError(uint32_t statusCode, uint64_t otherData = -1)224{225try226{227if (TR::Options::getVerboseOption(TR_VerboseJITServer))228TR_VerboseLog::writeLineLocked(TR_Vlog_JITServer, "compThreadID=%d MessageType::compilationFailure: statusCode %u",229TR::compInfoPT->getCompThreadId(), statusCode);230write(MessageType::compilationFailure, statusCode, otherData);231}232catch (std::exception &e)233{234if (TR::Options::getVerboseOption(TR_VerboseJITServer))235TR_VerboseLog::writeLineLocked(TR_Vlog_JITServer, "Could not write error code: %s", e.what());236}237}238239void setClientId(uint64_t clientId)240{241_clientId = clientId;242}243uint64_t getClientId() const244{245return _clientId;246}247248void setClientData(ClientSessionData *pClientData)249{250_pClientSessionData = pClientData;251}252253volatile bool isReadingClassUnload()254{255return (_pClientSessionData) ? _pClientSessionData->isReadingClassUnload() : false;256}257258volatile bool isClassUnloadingAttempted()259{260return (_pClientSessionData) ? _pClientSessionData->isClassUnloadingAttempted() : false;261}262263// Statistics264static int getNumConnectionsOpened() { return _numConnectionsOpened; }265static int getNumConnectionsClosed() { return _numConnectionsClosed; }266267private:268static int _numConnectionsOpened;269static int _numConnectionsClosed;270uint64_t _clientId; // UID of client connected to this communication stream271ClientSessionData *_pClientSessionData;272};273274}275276#endif // SERVER_STREAM_H277278279