Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/fuse/src/worker.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::fs::File;
6
use std::io;
7
use std::io::BufRead;
8
use std::io::BufReader;
9
use std::io::Cursor;
10
use std::io::Read;
11
use std::io::Write;
12
use std::mem::size_of;
13
use std::os::unix::fs::FileExt;
14
use std::os::unix::io::AsRawFd;
15
use std::sync::Arc;
16
17
use base::Protection;
18
19
use crate::filesystem::FileSystem;
20
use crate::filesystem::ZeroCopyReader;
21
use crate::filesystem::ZeroCopyWriter;
22
use crate::server::Mapper;
23
use crate::server::Reader;
24
use crate::server::Server;
25
use crate::server::Writer;
26
use crate::sys;
27
use crate::Error;
28
use crate::Result;
29
30
struct DevFuseReader {
31
// File representing /dev/fuse for reading, with sufficient buffer to accommodate a FUSE read
32
// transaction.
33
reader: BufReader<File>,
34
}
35
36
impl DevFuseReader {
37
pub fn new(reader: BufReader<File>) -> Self {
38
DevFuseReader { reader }
39
}
40
41
fn drain(&mut self) {
42
self.reader.consume(self.reader.buffer().len());
43
}
44
}
45
46
impl Read for DevFuseReader {
47
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
48
self.reader.read(buf)
49
}
50
}
51
52
impl Reader for DevFuseReader {}
53
54
impl ZeroCopyReader for DevFuseReader {
55
fn read_to(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
56
let buf = self.reader.fill_buf()?;
57
let end = std::cmp::min(count, buf.len());
58
let written = f.write_at(&buf[..end], off)?;
59
self.reader.consume(written);
60
Ok(written)
61
}
62
}
63
64
struct DevFuseWriter {
65
// File representing /dev/fuse for writing.
66
dev_fuse: File,
67
68
// An internal buffer to allow generating data and header out of order, such that they can be
69
// flushed at once. This is wrapped by a cursor for tracking the current written position.
70
write_buf: Cursor<Vec<u8>>,
71
}
72
73
impl DevFuseWriter {
74
pub fn new(dev_fuse: File, write_buf: Cursor<Vec<u8>>) -> Self {
75
debug_assert_eq!(write_buf.position(), 0);
76
77
DevFuseWriter {
78
dev_fuse,
79
write_buf,
80
}
81
}
82
}
83
84
impl Write for DevFuseWriter {
85
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
86
self.write_buf.write(buf)
87
}
88
89
fn flush(&mut self) -> io::Result<()> {
90
self.dev_fuse.write_all(&self.write_buf.get_ref()[..])?;
91
self.write_buf.set_position(0);
92
self.write_buf.get_mut().clear();
93
Ok(())
94
}
95
}
96
97
impl Writer for DevFuseWriter {
98
type ClosureWriter = Self;
99
100
fn write_at<F>(&mut self, offset: usize, f: F) -> io::Result<usize>
101
where
102
F: Fn(&mut Self) -> io::Result<usize>,
103
{
104
// Restore the cursor for idempotent.
105
let original = self.write_buf.position();
106
self.write_buf.set_position(offset as u64);
107
let r = f(self);
108
self.write_buf.set_position(original);
109
r
110
}
111
112
fn has_sufficient_buffer(&self, size: u32) -> bool {
113
(self.write_buf.position() as usize + size as usize) < self.write_buf.get_ref().capacity()
114
}
115
}
116
117
impl ZeroCopyWriter for DevFuseWriter {
118
fn write_from(&mut self, f: &mut File, count: usize, off: u64) -> io::Result<usize> {
119
let pos = self.write_buf.position() as usize;
120
let end = pos + count;
121
let buf = self.write_buf.get_mut();
122
123
let old_end = buf.len();
124
buf.resize(end, 0);
125
let read = f.read_at(&mut buf[pos..end], off)?;
126
127
let new_end = pos + read;
128
debug_assert!(new_end >= old_end);
129
buf.truncate(new_end);
130
self.write_buf.set_position(new_end as u64);
131
Ok(read)
132
}
133
}
134
135
struct DevFuseMapper;
136
137
impl DevFuseMapper {
138
fn new() -> Self {
139
Self {}
140
}
141
}
142
143
impl Mapper for DevFuseMapper {
144
fn map(
145
&self,
146
_mem_offset: u64,
147
_size: usize,
148
_fd: &dyn AsRawFd,
149
_file_offset: u64,
150
_prot: Protection,
151
) -> io::Result<()> {
152
Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
153
}
154
155
fn unmap(&self, _offset: u64, _size: u64) -> io::Result<()> {
156
Err(io::Error::from_raw_os_error(libc::EOPNOTSUPP))
157
}
158
}
159
160
/// Start the FUSE message handling loop. Returns when an error happens.
161
///
162
/// # Arguments
163
///
164
/// * `dev_fuse` - A `File` object of /dev/fuse
165
/// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
166
/// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse. Must be large
167
/// enough (usually equal) to `n` in `MountOption::MaxRead(n)`.
168
///
169
/// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
170
pub fn start_message_loop<F: FileSystem + Sync>(
171
dev_fuse: File,
172
input_buffer_size: u32,
173
output_buffer_size: u32,
174
fs: F,
175
) -> Result<()> {
176
let server = Server::new(fs);
177
do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
178
}
179
180
fn do_start_message_loop<F: FileSystem + Sync>(
181
dev_fuse: File,
182
input_buffer_size: u32,
183
output_buffer_size: u32,
184
server: &Server<F>,
185
) -> Result<()> {
186
let mut dev_fuse_reader = {
187
let rfile = dev_fuse.try_clone().map_err(Error::EndpointSetup)?;
188
let buf_reader = BufReader::with_capacity(
189
input_buffer_size as usize + size_of::<sys::InHeader>() + size_of::<sys::WriteIn>(),
190
rfile,
191
);
192
DevFuseReader::new(buf_reader)
193
};
194
let mut dev_fuse_writer = {
195
let wfile = dev_fuse;
196
let write_buf = Cursor::new(Vec::with_capacity(output_buffer_size as usize));
197
DevFuseWriter::new(wfile, write_buf)
198
};
199
let dev_fuse_mapper = DevFuseMapper::new();
200
loop {
201
server.handle_message(&mut dev_fuse_reader, &mut dev_fuse_writer, &dev_fuse_mapper)?;
202
203
// Since we're reusing the buffer to avoid repeated allocation, drain the possible
204
// residual from the buffer.
205
dev_fuse_reader.drain();
206
}
207
}
208
209
// TODO: Remove worker and this namespace from public
210
pub mod internal {
211
use crossbeam_utils::thread;
212
213
use super::*;
214
215
/// Start the FUSE message handling loops in multiple threads. Returns when an error happens.
216
///
217
/// # Arguments
218
///
219
/// * `dev_fuse` - A `File` object of /dev/fuse
220
/// * `input_buffer_size` - Maximum bytes of the buffer when reads from /dev/fuse.
221
/// * `output_buffer_size` - Maximum bytes of the buffer when writes to /dev/fuse.
222
///
223
/// [deprecated(note="Please migrate to the `FuseConfig` builder API"]
224
pub fn start_message_loop_mt<F: FileSystem + Sync + Send>(
225
dev_fuse: File,
226
input_buffer_size: u32,
227
output_buffer_size: u32,
228
thread_numbers: usize,
229
fs: F,
230
) -> Result<()> {
231
let result = thread::scope(|s| {
232
let server = Arc::new(Server::new(fs));
233
for _ in 0..thread_numbers {
234
let dev_fuse = dev_fuse
235
.try_clone()
236
.map_err(Error::EndpointSetup)
237
.expect("Failed to clone /dev/fuse FD");
238
let server = server.clone();
239
s.spawn(move |_| {
240
do_start_message_loop(dev_fuse, input_buffer_size, output_buffer_size, &server)
241
});
242
}
243
});
244
245
unreachable!("Threads exited or crashed unexpectedly: {:?}", result);
246
}
247
}
248
249