Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
nviennot
GitHub Repository: nviennot/core-to-core-latency
Path: blob/main/src/bench/msg_passing.rs
228 views
1
use cache_padded::CachePadded;
2
use core_affinity::CoreId;
3
use std::sync::Barrier;
4
use std::sync::atomic::{Ordering, AtomicU64};
5
use quanta::Clock;
6
7
use super::Count;
8
use crate::utils;
9
10
pub struct Bench {
11
barrier: Barrier,
12
clocks: Vec<CachePadded<AtomicU64>>,
13
}
14
15
impl Bench {
16
pub fn new(num_iterations: u32) -> Self {
17
let clocks = (0..num_iterations as usize).map(|_| Default::default()).collect();
18
Self {
19
barrier: Barrier::new(2),
20
clocks,
21
}
22
}
23
}
24
25
impl super::Bench for Bench {
26
// This test is not symmetric. We are doing one-way message passing.
27
fn is_symmetric(&self) -> bool { false }
28
29
fn run(
30
&self,
31
(recv_core, send_core): (CoreId, CoreId),
32
clock: &Clock,
33
num_iterations: Count,
34
num_samples: Count,
35
) -> Vec<f64> {
36
let clock_read_overhead_sum = utils::clock_read_overhead_sum(clock, num_iterations);
37
38
// A shared time reference
39
let start_time = clock.raw();
40
let state = self;
41
42
crossbeam_utils::thread::scope(|s| {
43
let receiver = s.spawn(|_| {
44
core_affinity::set_for_current(recv_core);
45
let mut results = Vec::with_capacity(num_samples as usize);
46
47
state.barrier.wait();
48
49
for _ in 0..num_samples as usize {
50
let mut latency: u64 = 0;
51
52
state.barrier.wait();
53
for v in &state.clocks {
54
// RDTSC is compensated below
55
let send_time = wait_for_non_zero_value(v, Ordering::Relaxed);
56
let recv_time = clock.raw().saturating_sub(start_time);
57
latency += recv_time.saturating_sub(send_time);
58
}
59
state.barrier.wait();
60
61
let total_latency = clock.delta(0, latency).saturating_sub(clock_read_overhead_sum).as_nanos();
62
results.push(total_latency as f64 / num_iterations as f64);
63
}
64
65
results
66
});
67
68
let sender = s.spawn(|_| {
69
core_affinity::set_for_current(send_core);
70
71
state.barrier.wait();
72
73
for _ in 0..num_samples as usize {
74
state.barrier.wait();
75
for v in &state.clocks {
76
// Stall a bit to make sure the receiver is ready and we're not getting ahead of ourselves
77
// We could also put a state.barrier().wait(), but it's unclear whether it's a good
78
// idea due to additional generated traffic.
79
utils::delay_cycles(10000);
80
81
// max(1) to make sure the value is non-zero, which is what the receiver is waiting on
82
let send_time = clock.raw().saturating_sub(start_time).max(1);
83
v.store(send_time, Ordering::Relaxed);
84
}
85
86
state.barrier.wait();
87
for v in &state.clocks {
88
v.store(0, Ordering::Relaxed);
89
}
90
}
91
});
92
93
sender.join().unwrap();
94
receiver.join().unwrap()
95
}).unwrap()
96
}
97
}
98
99
fn wait_for_non_zero_value(atomic_value: &AtomicU64, ordering: Ordering) -> u64 {
100
loop {
101
match atomic_value.load(ordering) {
102
0 => continue,
103
v => return v,
104
}
105
}
106
}
107
108