Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/devices/src/utils/event_loop.rs
5394 views
1
// Copyright 2018 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::collections::BTreeMap;
6
use std::mem::drop;
7
use std::sync::Arc;
8
use std::sync::Weak;
9
use std::thread;
10
11
use base::error;
12
use base::warn;
13
use base::AsRawDescriptor;
14
use base::Descriptor;
15
use base::Event;
16
use base::EventType;
17
use base::WaitContext;
18
use sync::Mutex;
19
20
use super::error::Error;
21
use super::error::Result;
22
23
/// A fail handle will do the clean up when we cannot recover from some error.
24
pub trait FailHandle: Send + Sync {
25
/// Fail the code.
26
fn fail(&self);
27
/// Returns true if already failed.
28
fn failed(&self) -> bool;
29
}
30
31
impl FailHandle for Option<Arc<dyn FailHandle>> {
32
fn fail(&self) {
33
match self {
34
Some(handle) => handle.fail(),
35
None => error!("event loop trying to fail without a fail handle"),
36
}
37
}
38
39
fn failed(&self) -> bool {
40
match self {
41
Some(handle) => handle.failed(),
42
None => false,
43
}
44
}
45
}
46
47
/// EventLoop is an event loop blocked on a set of fds. When a monitered events is triggered,
48
/// event loop will invoke the mapped handler.
49
pub struct EventLoop {
50
fail_handle: Option<Arc<dyn FailHandle>>,
51
poll_ctx: Arc<WaitContext<Descriptor>>,
52
handlers: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>>,
53
stop_evt: Event,
54
}
55
56
/// Interface for event handler.
57
pub trait EventHandler: Send + Sync {
58
fn on_event(&self) -> anyhow::Result<()>;
59
}
60
61
impl EventLoop {
62
/// Start an event loop. An optional fail handle could be passed to the event loop.
63
pub fn start(
64
name: String,
65
fail_handle: Option<Arc<dyn FailHandle>>,
66
) -> Result<(EventLoop, thread::JoinHandle<()>)> {
67
let (self_stop_evt, stop_evt) = Event::new()
68
.and_then(|e| Ok((e.try_clone()?, e)))
69
.map_err(Error::CreateEvent)?;
70
71
let fd_callbacks: Arc<Mutex<BTreeMap<Descriptor, Weak<dyn EventHandler>>>> =
72
Arc::new(Mutex::new(BTreeMap::new()));
73
let poll_ctx: WaitContext<Descriptor> = WaitContext::new()
74
.and_then(|pc| {
75
pc.add(&stop_evt, Descriptor(stop_evt.as_raw_descriptor()))
76
.and(Ok(pc))
77
})
78
.map_err(Error::CreateWaitContext)?;
79
80
let poll_ctx = Arc::new(poll_ctx);
81
let event_loop = EventLoop {
82
fail_handle: fail_handle.clone(),
83
poll_ctx: poll_ctx.clone(),
84
handlers: fd_callbacks.clone(),
85
stop_evt: self_stop_evt,
86
};
87
88
let handle = thread::Builder::new()
89
.name(name)
90
.spawn(move || {
91
loop {
92
if fail_handle.failed() {
93
error!("event loop already failed");
94
return;
95
}
96
let events = match poll_ctx.wait() {
97
Ok(events) => events,
98
Err(e) => {
99
error!("cannot wait on events {:?}", e);
100
fail_handle.fail();
101
return;
102
}
103
};
104
for event in &events {
105
let fd = event.token.as_raw_descriptor();
106
if fd == stop_evt.as_raw_descriptor() {
107
return;
108
}
109
110
let mut locked = fd_callbacks.lock();
111
let weak_handler = match locked.get(&Descriptor(fd)) {
112
Some(cb) => cb.clone(),
113
None => {
114
warn!("callback for fd {} already removed", fd);
115
continue;
116
}
117
};
118
119
// If the file descriptor is hung up, remove it after calling the handler
120
// one final time.
121
let mut remove = event.is_hungup;
122
123
if let Some(handler) = weak_handler.upgrade() {
124
// Drop lock before triggering the event.
125
drop(locked);
126
if let Err(e) = handler.on_event() {
127
error!("removing event handler due to error: {:#}", e);
128
remove = true;
129
}
130
locked = fd_callbacks.lock();
131
} else {
132
// If the handler is already gone, we remove the fd.
133
remove = true;
134
}
135
136
if remove {
137
let _ = poll_ctx.delete(&event.token);
138
let _ = locked.remove(&Descriptor(fd));
139
}
140
}
141
}
142
})
143
.map_err(Error::StartThread)?;
144
145
Ok((event_loop, handle))
146
}
147
148
/// Add a new event to event loop. The event handler will be invoked when `event` happens on
149
/// `descriptor`.
150
///
151
/// If the same `descriptor` is added multiple times, the old handler will be replaced.
152
/// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered,
153
/// the event will be removed.
154
pub fn add_event(
155
&self,
156
descriptor: &dyn AsRawDescriptor,
157
event_type: EventType,
158
handler: Weak<dyn EventHandler>,
159
) -> Result<()> {
160
if self.fail_handle.failed() {
161
return Err(Error::EventLoopAlreadyFailed);
162
}
163
self.handlers
164
.lock()
165
.insert(Descriptor(descriptor.as_raw_descriptor()), handler);
166
self.poll_ctx
167
.add_for_event(
168
descriptor,
169
event_type,
170
Descriptor(descriptor.as_raw_descriptor()),
171
)
172
.map_err(Error::WaitContextAddDescriptor)
173
}
174
175
/// Removes event for this `descriptor`. This function is safe to call even when the
176
/// `descriptor` is not actively being polled because it's been paused.
177
///
178
/// EventLoop does not guarantee all events for `descriptor` is handled.
179
pub fn remove_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
180
if self.fail_handle.failed() {
181
return Err(Error::EventLoopAlreadyFailed);
182
}
183
self.poll_ctx
184
.delete(descriptor)
185
.map_err(Error::WaitContextDeleteDescriptor)?;
186
self.handlers
187
.lock()
188
.remove(&Descriptor(descriptor.as_raw_descriptor()));
189
Ok(())
190
}
191
192
/// Pauses polling on the given `descriptor`. It keeps a reference to the `descriptor` and its
193
/// handler so it can be resumed by calling `resume_event_for_descriptor()`.
194
pub fn pause_event_for_descriptor(&self, descriptor: &dyn AsRawDescriptor) -> Result<()> {
195
if self.fail_handle.failed() {
196
return Err(Error::EventLoopAlreadyFailed);
197
}
198
self.poll_ctx
199
.delete(descriptor)
200
.map_err(Error::WaitContextDeleteDescriptor)?;
201
Ok(())
202
}
203
204
/// Resumes polling on the given `descriptor` with the previously-provided handler. If
205
/// `descriptor` was not paused beforehand, this function does nothing. If `descriptor` does
206
/// not exist in the event loop, it returns an error.
207
/// `event_type` does not need to match the previously registered event type.
208
pub fn resume_event_for_descriptor(
209
&self,
210
descriptor: &dyn AsRawDescriptor,
211
event_type: EventType,
212
) -> Result<()> {
213
let handler = self
214
.handlers
215
.lock()
216
.get(&Descriptor(descriptor.as_raw_descriptor()))
217
.ok_or(Error::EventLoopMissingHandler)?
218
.clone();
219
self.add_event(descriptor, event_type, handler)
220
}
221
222
/// Stops this event loop asynchronously. Previous events might not be handled.
223
pub fn stop(&self) {
224
match self.stop_evt.signal() {
225
Ok(_) => {}
226
Err(_) => {
227
error!("fail to send event loop stop event, it might already stopped");
228
}
229
}
230
}
231
}
232
233
#[cfg(test)]
234
mod tests {
235
use std::sync::Arc;
236
use std::sync::Condvar;
237
use std::sync::Mutex;
238
239
use base::Event;
240
241
use super::*;
242
243
struct EventLoopTestHandler {
244
val: Mutex<u8>,
245
cvar: Condvar,
246
evt: Event,
247
}
248
249
impl EventHandler for EventLoopTestHandler {
250
fn on_event(&self) -> anyhow::Result<()> {
251
self.evt.wait().unwrap();
252
*self.val.lock().unwrap() += 1;
253
self.cvar.notify_one();
254
Ok(())
255
}
256
}
257
258
#[test]
259
fn event_loop_test() {
260
let (l, j) = EventLoop::start("test".to_string(), None).unwrap();
261
let (self_evt, evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) {
262
Ok(v) => v,
263
Err(e) => {
264
error!("failed creating Event pair: {:?}", e);
265
return;
266
}
267
};
268
let h = Arc::new(EventLoopTestHandler {
269
val: Mutex::new(0),
270
cvar: Condvar::new(),
271
evt,
272
});
273
let t: Arc<dyn EventHandler> = h.clone();
274
l.add_event(&h.evt, EventType::Read, Arc::downgrade(&t))
275
.unwrap();
276
self_evt.signal().unwrap();
277
{
278
let mut val = h.val.lock().unwrap();
279
while *val < 1 {
280
val = h.cvar.wait(val).unwrap();
281
}
282
}
283
l.stop();
284
j.join().unwrap();
285
assert_eq!(*(h.val.lock().unwrap()), 1);
286
}
287
}
288
289