Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/devices/src/virtio/vhost_user_backend/net/sys/linux.rs
5394 views
1
// Copyright 2022 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::net::Ipv4Addr;
6
use std::str::FromStr;
7
use std::thread;
8
9
use anyhow::anyhow;
10
use anyhow::bail;
11
use anyhow::Context;
12
use argh::FromArgs;
13
use base::error;
14
use base::info;
15
use base::validate_raw_descriptor;
16
use base::warn;
17
use base::RawDescriptor;
18
use cros_async::EventAsync;
19
use cros_async::Executor;
20
use cros_async::IntoAsync;
21
use cros_async::IoSource;
22
use futures::channel::oneshot;
23
use futures::select_biased;
24
use futures::FutureExt;
25
use hypervisor::ProtectionType;
26
use net_util::sys::linux::Tap;
27
use net_util::MacAddress;
28
use net_util::TapT;
29
use virtio_sys::virtio_net;
30
use vm_memory::GuestMemory;
31
use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;
32
33
use crate::virtio;
34
use crate::virtio::net::process_mrg_rx;
35
use crate::virtio::net::process_rx;
36
use crate::virtio::net::validate_and_configure_tap;
37
use crate::virtio::net::NetError;
38
use crate::virtio::net::PendingBuffer;
39
use crate::virtio::vhost_user_backend::connection::sys::VhostUserListener;
40
use crate::virtio::vhost_user_backend::connection::VhostUserConnectionTrait;
41
use crate::virtio::vhost_user_backend::handler::VhostUserDevice;
42
use crate::virtio::vhost_user_backend::net::run_ctrl_queue;
43
use crate::virtio::vhost_user_backend::net::run_tx_queue;
44
use crate::virtio::vhost_user_backend::net::NetBackend;
45
use crate::virtio::vhost_user_backend::net::NET_EXECUTOR;
46
use crate::virtio::Queue;
47
48
struct TapConfig {
49
host_ip: Ipv4Addr,
50
netmask: Ipv4Addr,
51
mac: MacAddress,
52
}
53
54
impl FromStr for TapConfig {
55
type Err = anyhow::Error;
56
57
fn from_str(arg: &str) -> Result<Self, Self::Err> {
58
let args: Vec<&str> = arg.split(',').collect();
59
if args.len() != 3 {
60
bail!("TAP config must consist of 3 parts but {}", args.len());
61
}
62
63
let host_ip: Ipv4Addr = args[0]
64
.parse()
65
.map_err(|e| anyhow!("invalid IP address: {}", e))?;
66
let netmask: Ipv4Addr = args[1]
67
.parse()
68
.map_err(|e| anyhow!("invalid net mask: {}", e))?;
69
let mac: MacAddress = args[2]
70
.parse()
71
.map_err(|e| anyhow!("invalid MAC address: {}", e))?;
72
73
Ok(Self {
74
host_ip,
75
netmask,
76
mac,
77
})
78
}
79
}
80
81
impl<T: 'static> NetBackend<T>
82
where
83
T: TapT + IntoAsync,
84
{
85
fn new_from_config(config: &TapConfig, mrg_rxbuf: bool) -> anyhow::Result<Self> {
86
// Create a tap device.
87
let tap = T::new(true /* vnet_hdr */, false /* multi_queue */)
88
.context("failed to create tap device")?;
89
tap.set_ip_addr(config.host_ip)
90
.context("failed to set IP address")?;
91
tap.set_netmask(config.netmask)
92
.context("failed to set netmask")?;
93
tap.set_mac_address(config.mac)
94
.context("failed to set MAC address")?;
95
96
Self::new(tap, mrg_rxbuf)
97
}
98
99
fn new_from_name(name: &str, mrg_rxbuf: bool) -> anyhow::Result<Self> {
100
let tap = T::new_with_name(name.as_bytes(), true, false).map_err(NetError::TapOpen)?;
101
Self::new(tap, mrg_rxbuf)
102
}
103
104
pub fn new_from_tap_fd(tap_fd: RawDescriptor, mrg_rxbuf: bool) -> anyhow::Result<Self> {
105
let tap_fd = validate_raw_descriptor(tap_fd).context("failed to validate tap fd")?;
106
// SAFETY:
107
// Safe because we ensure that we get a unique handle to the fd.
108
let tap = unsafe { T::from_raw_descriptor(tap_fd).context("failed to create tap device")? };
109
110
Self::new(tap, mrg_rxbuf)
111
}
112
113
pub fn new(tap: T, mrg_rxbuf: bool) -> anyhow::Result<Self> {
114
let vq_pairs = Self::max_vq_pairs();
115
validate_and_configure_tap(&tap, vq_pairs as u16)
116
.context("failed to validate and configure tap")?;
117
118
let mut avail_features = virtio::base_features(ProtectionType::Unprotected)
119
| 1 << virtio_net::VIRTIO_NET_F_GUEST_CSUM
120
| 1 << virtio_net::VIRTIO_NET_F_CSUM
121
| 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ
122
| 1 << virtio_net::VIRTIO_NET_F_CTRL_GUEST_OFFLOADS
123
| 1 << virtio_net::VIRTIO_NET_F_GUEST_TSO4
124
| 1 << virtio_net::VIRTIO_NET_F_GUEST_UFO
125
| 1 << virtio_net::VIRTIO_NET_F_HOST_TSO4
126
| 1 << virtio_net::VIRTIO_NET_F_HOST_UFO
127
| 1 << virtio_net::VIRTIO_NET_F_MTU
128
| 1 << VHOST_USER_F_PROTOCOL_FEATURES;
129
130
if mrg_rxbuf {
131
avail_features |= 1 << virtio_net::VIRTIO_NET_F_MRG_RXBUF;
132
}
133
134
let mtu = tap.mtu()?;
135
136
Ok(Self {
137
tap,
138
avail_features,
139
acked_features: 0,
140
mtu,
141
workers: Default::default(),
142
})
143
}
144
}
145
146
async fn run_rx_queue<T: TapT>(
147
mut queue: Queue,
148
mut tap: IoSource<T>,
149
kick_evt: EventAsync,
150
mut stop_rx: oneshot::Receiver<()>,
151
mrg_rxbuf: bool,
152
) -> Queue {
153
let mut pending_buffer = if mrg_rxbuf {
154
Some(PendingBuffer::new())
155
} else {
156
None
157
};
158
loop {
159
let pending_length = pending_buffer
160
.as_ref()
161
.map_or(0, |pending_buffer| pending_buffer.length);
162
if pending_length == 0 {
163
select_biased! {
164
// `tap.wait_readable()` requires an immutable reference to `tap`, but `process_rx`
165
// requires a mutable reference to `tap`, so this future needs to be recreated on
166
// every iteration. If more arms are added that doesn't break out of the loop, then
167
// this future could be recreated too many times.
168
rx = tap.wait_readable().fuse() => {
169
if let Err(e) = rx {
170
error!("Failed to wait for tap device to become readable: {}", e);
171
break;
172
}
173
}
174
_ = stop_rx => {
175
break;
176
}
177
}
178
}
179
let res = match pending_buffer.as_mut() {
180
Some(pending_buffer) => process_mrg_rx(&mut queue, tap.as_source_mut(), pending_buffer),
181
None => process_rx(&mut queue, tap.as_source_mut()),
182
};
183
184
match res {
185
Ok(()) => {}
186
Err(NetError::RxDescriptorsExhausted) => {
187
select_biased! {
188
kick_evt = kick_evt.next_val().fuse() => {
189
if let Err(e) = kick_evt {
190
error!("Failed to read kick event for rx queue: {}", e);
191
break;
192
}
193
},
194
_ = stop_rx => {
195
break;
196
}
197
};
198
}
199
Err(e) => {
200
error!("Failed to process rx queue: {}", e);
201
break;
202
}
203
}
204
}
205
queue
206
}
207
208
/// Platform specific impl of VhostUserDevice::start_queue.
209
pub(in crate::virtio::vhost_user_backend::net) fn start_queue<T: 'static + IntoAsync + TapT>(
210
backend: &mut NetBackend<T>,
211
idx: usize,
212
queue: virtio::Queue,
213
_mem: GuestMemory,
214
) -> anyhow::Result<()> {
215
if backend.workers[idx].is_some() {
216
warn!("Starting new queue handler without stopping old handler");
217
backend.stop_queue(idx)?;
218
}
219
220
NET_EXECUTOR.with(|ex| {
221
// Safe because the executor is initialized in main() below.
222
let ex = ex.get().expect("Executor not initialized");
223
224
let kick_evt = queue
225
.event()
226
.try_clone()
227
.context("failed to clone queue event")?;
228
let kick_evt =
229
EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;
230
let tap = backend
231
.tap
232
.try_clone()
233
.context("failed to clone tap device")?;
234
let worker_tuple = match idx {
235
0 => {
236
let tap = ex
237
.async_from(tap)
238
.context("failed to create async tap device")?;
239
240
let mrg_rxbuf =
241
(backend.acked_features & 1 << virtio_net::VIRTIO_NET_F_MRG_RXBUF) != 0;
242
let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
243
(
244
ex.spawn_local(run_rx_queue(queue, tap, kick_evt, stop_rx, mrg_rxbuf)),
245
stop_tx,
246
)
247
}
248
1 => {
249
let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
250
(
251
ex.spawn_local(run_tx_queue(queue, tap, kick_evt, stop_rx)),
252
stop_tx,
253
)
254
}
255
2 => {
256
let (stop_tx, stop_rx) = futures::channel::oneshot::channel();
257
(
258
ex.spawn_local(run_ctrl_queue(
259
queue,
260
tap,
261
kick_evt,
262
backend.acked_features,
263
1, /* vq_pairs */
264
stop_rx,
265
)),
266
stop_tx,
267
)
268
}
269
_ => bail!("attempted to start unknown queue: {}", idx),
270
};
271
272
backend.workers[idx] = Some(worker_tuple);
273
Ok(())
274
})
275
}
276
277
#[derive(FromArgs)]
278
#[argh(subcommand, name = "net")]
279
/// Net device
280
pub struct Options {
281
#[argh(option, arg_name = "SOCKET_PATH,IP_ADDR,NET_MASK,MAC_ADDR")]
282
/// TAP device config. (e.g. "path/to/sock,10.0.2.2,255.255.255.0,12:34:56:78:9a:bc")
283
device: Vec<String>,
284
#[argh(option, arg_name = "SOCKET_PATH,TAP_FD")]
285
/// TAP FD with a socket path"
286
tap_fd: Vec<String>,
287
#[argh(option, arg_name = "SOCKET_PATH,TAP_NAME")]
288
/// TAP NAME with a socket path
289
tap_name: Vec<String>,
290
#[argh(switch, arg_name = "MRG_RXBUF")]
291
/// whether enable MRG_RXBUF feature.
292
mrg_rxbuf: bool,
293
}
294
295
enum Connection {
296
Socket(String),
297
}
298
299
fn new_backend_from_device_arg(
300
arg: &str,
301
mrg_rxbuf: bool,
302
) -> anyhow::Result<(String, NetBackend<Tap>)> {
303
let pos = match arg.find(',') {
304
Some(p) => p,
305
None => {
306
bail!("device must take comma-separated argument");
307
}
308
};
309
let conn = &arg[0..pos];
310
let cfg = &arg[pos + 1..]
311
.parse::<TapConfig>()
312
.context("failed to parse tap config")?;
313
let backend = NetBackend::<Tap>::new_from_config(cfg, mrg_rxbuf)
314
.context("failed to create NetBackend")?;
315
Ok((conn.to_string(), backend))
316
}
317
318
fn new_backend_from_tap_name(
319
arg: &str,
320
mrg_rxbuf: bool,
321
) -> anyhow::Result<(String, NetBackend<Tap>)> {
322
let pos = match arg.find(',') {
323
Some(p) => p,
324
None => {
325
bail!("device must take comma-separated argument");
326
}
327
};
328
let conn = &arg[0..pos];
329
let tap_name = &arg[pos + 1..];
330
331
let backend = NetBackend::<Tap>::new_from_name(tap_name, mrg_rxbuf)
332
.context("failed to create NetBackend")?;
333
Ok((conn.to_string(), backend))
334
}
335
336
fn new_backend_from_tapfd_arg(
337
arg: &str,
338
mrg_rxbuf: bool,
339
) -> anyhow::Result<(String, NetBackend<Tap>)> {
340
let pos = match arg.find(',') {
341
Some(p) => p,
342
None => {
343
bail!("'tap-fd' flag must take comma-separated argument");
344
}
345
};
346
let conn = &arg[0..pos];
347
let tap_fd = &arg[pos + 1..]
348
.parse::<i32>()
349
.context("failed to parse tap-fd")?;
350
let backend = NetBackend::<Tap>::new_from_tap_fd(*tap_fd, mrg_rxbuf)
351
.context("failed to create NetBackend")?;
352
Ok((conn.to_string(), backend))
353
}
354
355
/// Starts a vhost-user net device.
356
/// Returns an error if the given `args` is invalid or the device fails to run.
357
pub fn start_device(opts: Options) -> anyhow::Result<()> {
358
let num_devices = opts.device.len() + opts.tap_fd.len() + opts.tap_name.len();
359
360
if num_devices == 0 {
361
bail!("no device option was passed");
362
}
363
364
let mut devices: Vec<(Connection, NetBackend<Tap>)> = Vec::with_capacity(num_devices);
365
366
// vhost-user
367
for arg in opts.device.iter() {
368
devices.push(
369
new_backend_from_device_arg(arg, opts.mrg_rxbuf)
370
.map(|(s, backend)| (Connection::Socket(s), backend))?,
371
);
372
}
373
374
for arg in opts.tap_name.iter() {
375
devices.push(
376
new_backend_from_tap_name(arg, opts.mrg_rxbuf)
377
.map(|(s, backend)| (Connection::Socket(s), backend))?,
378
);
379
}
380
for arg in opts.tap_fd.iter() {
381
devices.push(
382
new_backend_from_tapfd_arg(arg, opts.mrg_rxbuf)
383
.map(|(s, backend)| (Connection::Socket(s), backend))?,
384
);
385
}
386
387
let mut threads = Vec::with_capacity(num_devices);
388
389
for (conn, backend) in devices {
390
let ex = Executor::new().context("failed to create executor")?;
391
392
match conn {
393
Connection::Socket(socket) => {
394
threads.push(thread::spawn(move || {
395
NET_EXECUTOR.with(|thread_ex| {
396
let _ = thread_ex.set(ex.clone());
397
});
398
let listener = VhostUserListener::new(&socket)?;
399
// run_until() returns an Result<Result<..>> which the ? operator lets us
400
// flatten.
401
ex.run_until(listener.run_backend(backend, &ex))?
402
}));
403
}
404
};
405
}
406
407
info!("vhost-user net device ready, loop threads started.");
408
for t in threads {
409
match t.join() {
410
Ok(r) => r?,
411
Err(e) => bail!("thread panicked: {:?}", e),
412
}
413
}
414
Ok(())
415
}
416
417