Path: blob/main/cranelift/filetests/src/concurrent.rs
1691 views
//! Run tests concurrently.1//!2//! This module provides the `ConcurrentRunner` struct which uses a pool of threads to run tests3//! concurrently.45use crate::runone;6use cranelift_codegen::dbg::LOG_FILENAME_PREFIX;7use cranelift_codegen::timing;8use log::error;9use std::panic::catch_unwind;10use std::path::{Path, PathBuf};11use std::sync::mpsc::{Receiver, Sender, channel};12use std::sync::{Arc, Mutex};13use std::thread;14use std::time::Duration;1516/// Request sent to worker threads contains jobid and path.17struct Request(usize, PathBuf);1819/// Reply from worker thread,20pub enum Reply {21Starting {22jobid: usize,23},24Done {25jobid: usize,26result: anyhow::Result<Duration>,27},28Tick,29}3031/// Manage threads that run test jobs concurrently.32pub struct ConcurrentRunner {33/// Channel for sending requests to the worker threads.34/// The workers are sharing the receiver with an `Arc<Mutex<Receiver>>`.35/// This is `None` when shutting down.36request_tx: Option<Sender<Request>>,3738/// Channel for receiving replies from the workers.39/// Workers have their own `Sender`.40reply_rx: Receiver<Reply>,4142handles: Vec<thread::JoinHandle<timing::PassTimes>>,43}4445impl ConcurrentRunner {46/// Create a new `ConcurrentRunner` with threads spun up.47pub fn new() -> Self {48let (request_tx, request_rx) = channel();49let request_mutex = Arc::new(Mutex::new(request_rx));50let (reply_tx, reply_rx) = channel();5152heartbeat_thread(reply_tx.clone());5354let num_threads = std::env::var("CRANELIFT_FILETESTS_THREADS")55.ok()56.map(|s| {57use std::str::FromStr;58let n = usize::from_str(&s).unwrap();59assert!(n > 0);60n61})62.unwrap_or_else(|| num_cpus::get());63let handles = (0..num_threads)64.map(|num| worker_thread(num, request_mutex.clone(), reply_tx.clone()))65.collect();6667Self {68request_tx: Some(request_tx),69reply_rx,70handles,71}72}7374/// Shut down worker threads orderly. They will finish any queued jobs first.75pub fn shutdown(&mut self) {76self.request_tx = None;77}7879/// Join all the worker threads.80/// Transfer pass timings from the worker threads to the current thread.81pub fn join(&mut self) -> timing::PassTimes {82assert!(self.request_tx.is_none(), "must shutdown before join");83let mut pass_times = timing::PassTimes::default();84for h in self.handles.drain(..) {85match h.join() {86Ok(t) => pass_times.add(&t),87Err(e) => println!("worker panicked: {e:?}"),88}89}90pass_times91}9293/// Add a new job to the queues.94pub fn put(&mut self, jobid: usize, path: &Path) {95self.request_tx96.as_ref()97.expect("cannot push after shutdown")98.send(Request(jobid, path.to_owned()))99.expect("all the worker threads are gone");100}101102/// Get a job reply without blocking.103pub fn try_get(&mut self) -> Option<Reply> {104self.reply_rx.try_recv().ok()105}106107/// Get a job reply, blocking until one is available.108pub fn get(&mut self) -> Option<Reply> {109self.reply_rx.recv().ok()110}111}112113/// Spawn a heartbeat thread which sends ticks down the reply channel every second.114/// This lets us implement timeouts without the not yet stable `recv_timeout`.115fn heartbeat_thread(replies: Sender<Reply>) -> thread::JoinHandle<()> {116thread::Builder::new()117.name("heartbeat".to_string())118.spawn(move || {119file_per_thread_logger::initialize(LOG_FILENAME_PREFIX);120while replies.send(Reply::Tick).is_ok() {121thread::sleep(Duration::from_secs(1));122}123})124.unwrap()125}126127/// Spawn a worker thread running tests.128fn worker_thread(129thread_num: usize,130requests: Arc<Mutex<Receiver<Request>>>,131replies: Sender<Reply>,132) -> thread::JoinHandle<timing::PassTimes> {133thread::Builder::new()134.name(format!("worker #{thread_num}"))135.spawn(move || {136file_per_thread_logger::initialize(LOG_FILENAME_PREFIX);137loop {138// Lock the mutex only long enough to extract a request.139let Request(jobid, path) = match requests.lock().unwrap().recv() {140Err(..) => break, // TX end shut down. exit thread.141Ok(req) => req,142};143144// Tell them we're starting this job.145// The receiver should always be present for this as long as we have jobs.146replies.send(Reply::Starting { jobid }).unwrap();147148let result = catch_unwind(|| runone::run(path.as_path(), None, None))149.unwrap_or_else(|e| {150// The test panicked, leaving us a `Box<Any>`.151// Panics are usually strings.152if let Some(msg) = e.downcast_ref::<String>() {153anyhow::bail!("panicked in worker #{}: {}", thread_num, msg)154} else if let Some(msg) = e.downcast_ref::<&'static str>() {155anyhow::bail!("panicked in worker #{}: {}", thread_num, msg)156} else {157anyhow::bail!("panicked in worker #{}", thread_num)158}159});160161if let Err(ref msg) = result {162error!("FAIL: {msg}");163}164165replies.send(Reply::Done { jobid, result }).unwrap();166}167168// Timing is accumulated independently per thread.169// Timings from this worker thread will be aggregated by `ConcurrentRunner::join()`.170timing::take_current()171})172.unwrap()173}174175176