Path: blob/main/contrib/llvm-project/lldb/source/Core/ThreadedCommunication.cpp
39587 views
//===-- ThreadedCommunication.cpp -----------------------------------------===//1//2// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.3// See https://llvm.org/LICENSE.txt for license information.4// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception5//6//===----------------------------------------------------------------------===//78#include "lldb/Core/ThreadedCommunication.h"910#include "lldb/Host/ThreadLauncher.h"11#include "lldb/Utility/Connection.h"12#include "lldb/Utility/ConstString.h"13#include "lldb/Utility/Event.h"14#include "lldb/Utility/LLDBLog.h"15#include "lldb/Utility/Listener.h"16#include "lldb/Utility/Log.h"17#include "lldb/Utility/Status.h"1819#include "llvm/Support/Compiler.h"2021#include <algorithm>22#include <chrono>23#include <cstring>24#include <memory>25#include <shared_mutex>2627#include <cerrno>28#include <cinttypes>29#include <cstdio>3031using namespace lldb;32using namespace lldb_private;3334llvm::StringRef ThreadedCommunication::GetStaticBroadcasterClass() {35static constexpr llvm::StringLiteral class_name("lldb.communication");36return class_name;37}3839ThreadedCommunication::ThreadedCommunication(const char *name)40: Communication(), Broadcaster(nullptr, name), m_read_thread_enabled(false),41m_read_thread_did_exit(false), m_bytes(), m_bytes_mutex(),42m_synchronize_mutex(), m_callback(nullptr), m_callback_baton(nullptr) {43LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),44"{0} ThreadedCommunication::ThreadedCommunication (name = {1})",45this, name);4647SetEventName(eBroadcastBitDisconnected, "disconnected");48SetEventName(eBroadcastBitReadThreadGotBytes, "got bytes");49SetEventName(eBroadcastBitReadThreadDidExit, "read thread did exit");50SetEventName(eBroadcastBitReadThreadShouldExit, "read thread should exit");51SetEventName(eBroadcastBitPacketAvailable, "packet available");52SetEventName(eBroadcastBitNoMorePendingInput, "no more pending input");5354CheckInWithManager();55}5657ThreadedCommunication::~ThreadedCommunication() {58LLDB_LOG(GetLog(LLDBLog::Object | LLDBLog::Communication),59"{0} ThreadedCommunication::~ThreadedCommunication (name = {1})",60this, GetBroadcasterName());61}6263void ThreadedCommunication::Clear() {64SetReadThreadBytesReceivedCallback(nullptr, nullptr);65StopReadThread(nullptr);66Communication::Clear();67}6869ConnectionStatus ThreadedCommunication::Disconnect(Status *error_ptr) {70assert((!m_read_thread_enabled || m_read_thread_did_exit) &&71"Disconnecting while the read thread is running is racy!");72return Communication::Disconnect(error_ptr);73}7475size_t ThreadedCommunication::Read(void *dst, size_t dst_len,76const Timeout<std::micro> &timeout,77ConnectionStatus &status,78Status *error_ptr) {79Log *log = GetLog(LLDBLog::Communication);80LLDB_LOG(81log,82"this = {0}, dst = {1}, dst_len = {2}, timeout = {3}, connection = {4}",83this, dst, dst_len, timeout, m_connection_sp.get());8485if (m_read_thread_enabled) {86// We have a dedicated read thread that is getting data for us87size_t cached_bytes = GetCachedBytes(dst, dst_len);88if (cached_bytes > 0) {89status = eConnectionStatusSuccess;90return cached_bytes;91}92if (timeout && timeout->count() == 0) {93if (error_ptr)94error_ptr->SetErrorString("Timed out.");95status = eConnectionStatusTimedOut;96return 0;97}9899if (!m_connection_sp) {100if (error_ptr)101error_ptr->SetErrorString("Invalid connection.");102status = eConnectionStatusNoConnection;103return 0;104}105106// No data yet, we have to start listening.107ListenerSP listener_sp(108Listener::MakeListener("ThreadedCommunication::Read"));109listener_sp->StartListeningForEvents(110this, eBroadcastBitReadThreadGotBytes | eBroadcastBitReadThreadDidExit);111112// Re-check for data, as it might have arrived while we were setting up our113// listener.114cached_bytes = GetCachedBytes(dst, dst_len);115if (cached_bytes > 0) {116status = eConnectionStatusSuccess;117return cached_bytes;118}119120EventSP event_sp;121// Explicitly check for the thread exit, for the same reason.122if (m_read_thread_did_exit) {123// We've missed the event, lets just conjure one up.124event_sp = std::make_shared<Event>(eBroadcastBitReadThreadDidExit);125} else {126if (!listener_sp->GetEvent(event_sp, timeout)) {127if (error_ptr)128error_ptr->SetErrorString("Timed out.");129status = eConnectionStatusTimedOut;130return 0;131}132}133const uint32_t event_type = event_sp->GetType();134if (event_type & eBroadcastBitReadThreadGotBytes) {135return GetCachedBytes(dst, dst_len);136}137138if (event_type & eBroadcastBitReadThreadDidExit) {139// If the thread exited of its own accord, it either means it140// hit an end-of-file condition or an error.141status = m_pass_status;142if (error_ptr)143*error_ptr = std::move(m_pass_error);144145if (GetCloseOnEOF())146Disconnect(nullptr);147return 0;148}149llvm_unreachable("Got unexpected event type!");150}151152// We aren't using a read thread, just read the data synchronously in this153// thread.154return Communication::Read(dst, dst_len, timeout, status, error_ptr);155}156157bool ThreadedCommunication::StartReadThread(Status *error_ptr) {158std::lock_guard<std::mutex> lock(m_read_thread_mutex);159160if (error_ptr)161error_ptr->Clear();162163if (m_read_thread.IsJoinable())164return true;165166LLDB_LOG(GetLog(LLDBLog::Communication),167"{0} ThreadedCommunication::StartReadThread ()", this);168169const std::string thread_name =170llvm::formatv("<lldb.comm.{0}>", GetBroadcasterName());171172m_read_thread_enabled = true;173m_read_thread_did_exit = false;174auto maybe_thread = ThreadLauncher::LaunchThread(175thread_name, [this] { return ReadThread(); });176if (maybe_thread) {177m_read_thread = *maybe_thread;178} else {179if (error_ptr)180*error_ptr = Status(maybe_thread.takeError());181else {182LLDB_LOG_ERROR(GetLog(LLDBLog::Host), maybe_thread.takeError(),183"failed to launch host thread: {0}");184}185}186187if (!m_read_thread.IsJoinable())188m_read_thread_enabled = false;189190return m_read_thread_enabled;191}192193bool ThreadedCommunication::StopReadThread(Status *error_ptr) {194std::lock_guard<std::mutex> lock(m_read_thread_mutex);195196if (!m_read_thread.IsJoinable())197return true;198199LLDB_LOG(GetLog(LLDBLog::Communication),200"{0} ThreadedCommunication::StopReadThread ()", this);201202m_read_thread_enabled = false;203204BroadcastEvent(eBroadcastBitReadThreadShouldExit, nullptr);205206Status error = m_read_thread.Join(nullptr);207return error.Success();208}209210bool ThreadedCommunication::JoinReadThread(Status *error_ptr) {211std::lock_guard<std::mutex> lock(m_read_thread_mutex);212213if (!m_read_thread.IsJoinable())214return true;215216Status error = m_read_thread.Join(nullptr);217return error.Success();218}219220size_t ThreadedCommunication::GetCachedBytes(void *dst, size_t dst_len) {221std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);222if (!m_bytes.empty()) {223// If DST is nullptr and we have a thread, then return the number of bytes224// that are available so the caller can call again225if (dst == nullptr)226return m_bytes.size();227228const size_t len = std::min<size_t>(dst_len, m_bytes.size());229230::memcpy(dst, m_bytes.c_str(), len);231m_bytes.erase(m_bytes.begin(), m_bytes.begin() + len);232233return len;234}235return 0;236}237238void ThreadedCommunication::AppendBytesToCache(const uint8_t *bytes, size_t len,239bool broadcast,240ConnectionStatus status) {241LLDB_LOG(GetLog(LLDBLog::Communication),242"{0} ThreadedCommunication::AppendBytesToCache (src = {1}, src_len "243"= {2}, "244"broadcast = {3})",245this, bytes, (uint64_t)len, broadcast);246if ((bytes == nullptr || len == 0) &&247(status != lldb::eConnectionStatusEndOfFile))248return;249if (m_callback) {250// If the user registered a callback, then call it and do not broadcast251m_callback(m_callback_baton, bytes, len);252} else if (bytes != nullptr && len > 0) {253std::lock_guard<std::recursive_mutex> guard(m_bytes_mutex);254m_bytes.append((const char *)bytes, len);255if (broadcast)256BroadcastEventIfUnique(eBroadcastBitReadThreadGotBytes);257}258}259260bool ThreadedCommunication::ReadThreadIsRunning() {261return m_read_thread_enabled;262}263264lldb::thread_result_t ThreadedCommunication::ReadThread() {265Log *log = GetLog(LLDBLog::Communication);266267LLDB_LOG(log, "Communication({0}) thread starting...", this);268269uint8_t buf[1024];270271Status error;272ConnectionStatus status = eConnectionStatusSuccess;273bool done = false;274bool disconnect = false;275while (!done && m_read_thread_enabled) {276size_t bytes_read = ReadFromConnection(277buf, sizeof(buf), std::chrono::seconds(5), status, &error);278if (bytes_read > 0 || status == eConnectionStatusEndOfFile)279AppendBytesToCache(buf, bytes_read, true, status);280281switch (status) {282case eConnectionStatusSuccess:283break;284285case eConnectionStatusEndOfFile:286done = true;287disconnect = GetCloseOnEOF();288break;289case eConnectionStatusError: // Check GetError() for details290if (error.GetType() == eErrorTypePOSIX && error.GetError() == EIO) {291// EIO on a pipe is usually caused by remote shutdown292disconnect = GetCloseOnEOF();293done = true;294}295if (error.Fail())296LLDB_LOG(log, "error: {0}, status = {1}", error,297ThreadedCommunication::ConnectionStatusAsString(status));298break;299case eConnectionStatusInterrupted: // Synchronization signal from300// SynchronizeWithReadThread()301// The connection returns eConnectionStatusInterrupted only when there is302// no input pending to be read, so we can signal that.303BroadcastEvent(eBroadcastBitNoMorePendingInput);304break;305case eConnectionStatusNoConnection: // No connection306case eConnectionStatusLostConnection: // Lost connection while connected to307// a valid connection308done = true;309[[fallthrough]];310case eConnectionStatusTimedOut: // Request timed out311if (error.Fail())312LLDB_LOG(log, "error: {0}, status = {1}", error,313ThreadedCommunication::ConnectionStatusAsString(status));314break;315}316}317m_pass_status = status;318m_pass_error = std::move(error);319LLDB_LOG(log, "Communication({0}) thread exiting...", this);320321// Start shutting down. We need to do this in a very specific order to ensure322// we don't race with threads wanting to read/synchronize with us.323324// First, we signal our intent to exit. This ensures no new thread start325// waiting on events from us.326m_read_thread_did_exit = true;327328// Unblock any existing thread waiting for the synchronization signal.329BroadcastEvent(eBroadcastBitNoMorePendingInput);330331{332// Wait for the synchronization thread to finish...333std::lock_guard<std::mutex> guard(m_synchronize_mutex);334// ... and disconnect.335if (disconnect)336Disconnect();337}338339// Finally, unblock any readers waiting for us to exit.340BroadcastEvent(eBroadcastBitReadThreadDidExit);341return {};342}343344void ThreadedCommunication::SetReadThreadBytesReceivedCallback(345ReadThreadBytesReceived callback, void *callback_baton) {346m_callback = callback;347m_callback_baton = callback_baton;348}349350void ThreadedCommunication::SynchronizeWithReadThread() {351// Only one thread can do the synchronization dance at a time.352std::lock_guard<std::mutex> guard(m_synchronize_mutex);353354// First start listening for the synchronization event.355ListenerSP listener_sp(Listener::MakeListener(356"ThreadedCommunication::SyncronizeWithReadThread"));357listener_sp->StartListeningForEvents(this, eBroadcastBitNoMorePendingInput);358359// If the thread is not running, there is no point in synchronizing.360if (!m_read_thread_enabled || m_read_thread_did_exit)361return;362363// Notify the read thread.364m_connection_sp->InterruptRead();365366// Wait for the synchronization event.367EventSP event_sp;368listener_sp->GetEvent(event_sp, std::nullopt);369}370371void ThreadedCommunication::SetConnection(372std::unique_ptr<Connection> connection) {373StopReadThread(nullptr);374Communication::SetConnection(std::move(connection));375}376377378