Path: blob/main/devices/src/virtio/vhost_user_backend/net/sys/linux.rs
5394 views
// Copyright 2022 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::net::Ipv4Addr;5use std::str::FromStr;6use std::thread;78use anyhow::anyhow;9use anyhow::bail;10use anyhow::Context;11use argh::FromArgs;12use base::error;13use base::info;14use base::validate_raw_descriptor;15use base::warn;16use base::RawDescriptor;17use cros_async::EventAsync;18use cros_async::Executor;19use cros_async::IntoAsync;20use cros_async::IoSource;21use futures::channel::oneshot;22use futures::select_biased;23use futures::FutureExt;24use hypervisor::ProtectionType;25use net_util::sys::linux::Tap;26use net_util::MacAddress;27use net_util::TapT;28use virtio_sys::virtio_net;29use vm_memory::GuestMemory;30use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;3132use crate::virtio;33use crate::virtio::net::process_mrg_rx;34use crate::virtio::net::process_rx;35use crate::virtio::net::validate_and_configure_tap;36use crate::virtio::net::NetError;37use crate::virtio::net::PendingBuffer;38use crate::virtio::vhost_user_backend::connection::sys::VhostUserListener;39use crate::virtio::vhost_user_backend::connection::VhostUserConnectionTrait;40use crate::virtio::vhost_user_backend::handler::VhostUserDevice;41use crate::virtio::vhost_user_backend::net::run_ctrl_queue;42use crate::virtio::vhost_user_backend::net::run_tx_queue;43use crate::virtio::vhost_user_backend::net::NetBackend;44use crate::virtio::vhost_user_backend::net::NET_EXECUTOR;45use crate::virtio::Queue;4647struct TapConfig {48host_ip: Ipv4Addr,49netmask: Ipv4Addr,50mac: MacAddress,51}5253impl FromStr for TapConfig {54type Err = anyhow::Error;5556fn from_str(arg: &str) -> Result<Self, Self::Err> {57let args: Vec<&str> = arg.split(',').collect();58if args.len() != 3 {59bail!("TAP config must consist of 3 parts but {}", args.len());60}6162let host_ip: Ipv4Addr = args[0]63.parse()64.map_err(|e| anyhow!("invalid IP address: {}", e))?;65let netmask: Ipv4Addr = args[1]66.parse()67.map_err(|e| anyhow!("invalid net mask: {}", e))?;68let mac: MacAddress = args[2]69.parse()70.map_err(|e| anyhow!("invalid MAC address: {}", e))?;7172Ok(Self {73host_ip,74netmask,75mac,76})77}78}7980impl<T: 'static> NetBackend<T>81where82T: TapT + IntoAsync,83{84fn new_from_config(config: &TapConfig, mrg_rxbuf: bool) -> anyhow::Result<Self> {85// Create a tap device.86let tap = T::new(true /* vnet_hdr */, false /* multi_queue */)87.context("failed to create tap device")?;88tap.set_ip_addr(config.host_ip)89.context("failed to set IP address")?;90tap.set_netmask(config.netmask)91.context("failed to set netmask")?;92tap.set_mac_address(config.mac)93.context("failed to set MAC address")?;9495Self::new(tap, mrg_rxbuf)96}9798fn new_from_name(name: &str, mrg_rxbuf: bool) -> anyhow::Result<Self> {99let tap = T::new_with_name(name.as_bytes(), true, false).map_err(NetError::TapOpen)?;100Self::new(tap, mrg_rxbuf)101}102103pub fn new_from_tap_fd(tap_fd: RawDescriptor, mrg_rxbuf: bool) -> anyhow::Result<Self> {104let tap_fd = validate_raw_descriptor(tap_fd).context("failed to validate tap fd")?;105// SAFETY:106// Safe because we ensure that we get a unique handle to the fd.107let tap = unsafe { T::from_raw_descriptor(tap_fd).context("failed to create tap device")? };108109Self::new(tap, mrg_rxbuf)110}111112pub fn new(tap: T, mrg_rxbuf: bool) -> anyhow::Result<Self> {113let vq_pairs = Self::max_vq_pairs();114validate_and_configure_tap(&tap, vq_pairs as u16)115.context("failed to validate and configure tap")?;116117let mut avail_features = virtio::base_features(ProtectionType::Unprotected)118| 1 << virtio_net::VIRTIO_NET_F_GUEST_CSUM119| 1 << virtio_net::VIRTIO_NET_F_CSUM120| 1 << virtio_net::VIRTIO_NET_F_CTRL_VQ121| 1 << virtio_net::VIRTIO_NET_F_CTRL_GUEST_OFFLOADS122| 1 << virtio_net::VIRTIO_NET_F_GUEST_TSO4123| 1 << virtio_net::VIRTIO_NET_F_GUEST_UFO124| 1 << virtio_net::VIRTIO_NET_F_HOST_TSO4125| 1 << virtio_net::VIRTIO_NET_F_HOST_UFO126| 1 << virtio_net::VIRTIO_NET_F_MTU127| 1 << VHOST_USER_F_PROTOCOL_FEATURES;128129if mrg_rxbuf {130avail_features |= 1 << virtio_net::VIRTIO_NET_F_MRG_RXBUF;131}132133let mtu = tap.mtu()?;134135Ok(Self {136tap,137avail_features,138acked_features: 0,139mtu,140workers: Default::default(),141})142}143}144145async fn run_rx_queue<T: TapT>(146mut queue: Queue,147mut tap: IoSource<T>,148kick_evt: EventAsync,149mut stop_rx: oneshot::Receiver<()>,150mrg_rxbuf: bool,151) -> Queue {152let mut pending_buffer = if mrg_rxbuf {153Some(PendingBuffer::new())154} else {155None156};157loop {158let pending_length = pending_buffer159.as_ref()160.map_or(0, |pending_buffer| pending_buffer.length);161if pending_length == 0 {162select_biased! {163// `tap.wait_readable()` requires an immutable reference to `tap`, but `process_rx`164// requires a mutable reference to `tap`, so this future needs to be recreated on165// every iteration. If more arms are added that doesn't break out of the loop, then166// this future could be recreated too many times.167rx = tap.wait_readable().fuse() => {168if let Err(e) = rx {169error!("Failed to wait for tap device to become readable: {}", e);170break;171}172}173_ = stop_rx => {174break;175}176}177}178let res = match pending_buffer.as_mut() {179Some(pending_buffer) => process_mrg_rx(&mut queue, tap.as_source_mut(), pending_buffer),180None => process_rx(&mut queue, tap.as_source_mut()),181};182183match res {184Ok(()) => {}185Err(NetError::RxDescriptorsExhausted) => {186select_biased! {187kick_evt = kick_evt.next_val().fuse() => {188if let Err(e) = kick_evt {189error!("Failed to read kick event for rx queue: {}", e);190break;191}192},193_ = stop_rx => {194break;195}196};197}198Err(e) => {199error!("Failed to process rx queue: {}", e);200break;201}202}203}204queue205}206207/// Platform specific impl of VhostUserDevice::start_queue.208pub(in crate::virtio::vhost_user_backend::net) fn start_queue<T: 'static + IntoAsync + TapT>(209backend: &mut NetBackend<T>,210idx: usize,211queue: virtio::Queue,212_mem: GuestMemory,213) -> anyhow::Result<()> {214if backend.workers[idx].is_some() {215warn!("Starting new queue handler without stopping old handler");216backend.stop_queue(idx)?;217}218219NET_EXECUTOR.with(|ex| {220// Safe because the executor is initialized in main() below.221let ex = ex.get().expect("Executor not initialized");222223let kick_evt = queue224.event()225.try_clone()226.context("failed to clone queue event")?;227let kick_evt =228EventAsync::new(kick_evt, ex).context("failed to create EventAsync for kick_evt")?;229let tap = backend230.tap231.try_clone()232.context("failed to clone tap device")?;233let worker_tuple = match idx {2340 => {235let tap = ex236.async_from(tap)237.context("failed to create async tap device")?;238239let mrg_rxbuf =240(backend.acked_features & 1 << virtio_net::VIRTIO_NET_F_MRG_RXBUF) != 0;241let (stop_tx, stop_rx) = futures::channel::oneshot::channel();242(243ex.spawn_local(run_rx_queue(queue, tap, kick_evt, stop_rx, mrg_rxbuf)),244stop_tx,245)246}2471 => {248let (stop_tx, stop_rx) = futures::channel::oneshot::channel();249(250ex.spawn_local(run_tx_queue(queue, tap, kick_evt, stop_rx)),251stop_tx,252)253}2542 => {255let (stop_tx, stop_rx) = futures::channel::oneshot::channel();256(257ex.spawn_local(run_ctrl_queue(258queue,259tap,260kick_evt,261backend.acked_features,2621, /* vq_pairs */263stop_rx,264)),265stop_tx,266)267}268_ => bail!("attempted to start unknown queue: {}", idx),269};270271backend.workers[idx] = Some(worker_tuple);272Ok(())273})274}275276#[derive(FromArgs)]277#[argh(subcommand, name = "net")]278/// Net device279pub struct Options {280#[argh(option, arg_name = "SOCKET_PATH,IP_ADDR,NET_MASK,MAC_ADDR")]281/// TAP device config. (e.g. "path/to/sock,10.0.2.2,255.255.255.0,12:34:56:78:9a:bc")282device: Vec<String>,283#[argh(option, arg_name = "SOCKET_PATH,TAP_FD")]284/// TAP FD with a socket path"285tap_fd: Vec<String>,286#[argh(option, arg_name = "SOCKET_PATH,TAP_NAME")]287/// TAP NAME with a socket path288tap_name: Vec<String>,289#[argh(switch, arg_name = "MRG_RXBUF")]290/// whether enable MRG_RXBUF feature.291mrg_rxbuf: bool,292}293294enum Connection {295Socket(String),296}297298fn new_backend_from_device_arg(299arg: &str,300mrg_rxbuf: bool,301) -> anyhow::Result<(String, NetBackend<Tap>)> {302let pos = match arg.find(',') {303Some(p) => p,304None => {305bail!("device must take comma-separated argument");306}307};308let conn = &arg[0..pos];309let cfg = &arg[pos + 1..]310.parse::<TapConfig>()311.context("failed to parse tap config")?;312let backend = NetBackend::<Tap>::new_from_config(cfg, mrg_rxbuf)313.context("failed to create NetBackend")?;314Ok((conn.to_string(), backend))315}316317fn new_backend_from_tap_name(318arg: &str,319mrg_rxbuf: bool,320) -> anyhow::Result<(String, NetBackend<Tap>)> {321let pos = match arg.find(',') {322Some(p) => p,323None => {324bail!("device must take comma-separated argument");325}326};327let conn = &arg[0..pos];328let tap_name = &arg[pos + 1..];329330let backend = NetBackend::<Tap>::new_from_name(tap_name, mrg_rxbuf)331.context("failed to create NetBackend")?;332Ok((conn.to_string(), backend))333}334335fn new_backend_from_tapfd_arg(336arg: &str,337mrg_rxbuf: bool,338) -> anyhow::Result<(String, NetBackend<Tap>)> {339let pos = match arg.find(',') {340Some(p) => p,341None => {342bail!("'tap-fd' flag must take comma-separated argument");343}344};345let conn = &arg[0..pos];346let tap_fd = &arg[pos + 1..]347.parse::<i32>()348.context("failed to parse tap-fd")?;349let backend = NetBackend::<Tap>::new_from_tap_fd(*tap_fd, mrg_rxbuf)350.context("failed to create NetBackend")?;351Ok((conn.to_string(), backend))352}353354/// Starts a vhost-user net device.355/// Returns an error if the given `args` is invalid or the device fails to run.356pub fn start_device(opts: Options) -> anyhow::Result<()> {357let num_devices = opts.device.len() + opts.tap_fd.len() + opts.tap_name.len();358359if num_devices == 0 {360bail!("no device option was passed");361}362363let mut devices: Vec<(Connection, NetBackend<Tap>)> = Vec::with_capacity(num_devices);364365// vhost-user366for arg in opts.device.iter() {367devices.push(368new_backend_from_device_arg(arg, opts.mrg_rxbuf)369.map(|(s, backend)| (Connection::Socket(s), backend))?,370);371}372373for arg in opts.tap_name.iter() {374devices.push(375new_backend_from_tap_name(arg, opts.mrg_rxbuf)376.map(|(s, backend)| (Connection::Socket(s), backend))?,377);378}379for arg in opts.tap_fd.iter() {380devices.push(381new_backend_from_tapfd_arg(arg, opts.mrg_rxbuf)382.map(|(s, backend)| (Connection::Socket(s), backend))?,383);384}385386let mut threads = Vec::with_capacity(num_devices);387388for (conn, backend) in devices {389let ex = Executor::new().context("failed to create executor")?;390391match conn {392Connection::Socket(socket) => {393threads.push(thread::spawn(move || {394NET_EXECUTOR.with(|thread_ex| {395let _ = thread_ex.set(ex.clone());396});397let listener = VhostUserListener::new(&socket)?;398// run_until() returns an Result<Result<..>> which the ? operator lets us399// flatten.400ex.run_until(listener.run_backend(backend, &ex))?401}));402}403};404}405406info!("vhost-user net device ready, loop threads started.");407for t in threads {408match t.join() {409Ok(r) => r?,410Err(e) => bail!("thread panicked: {:?}", e),411}412}413Ok(())414}415416417