Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
microsoft
GitHub Repository: microsoft/vscode
Path: blob/main/cli/src/util/io.rs
3314 views
1
/*---------------------------------------------------------------------------------------------
2
* Copyright (c) Microsoft Corporation. All rights reserved.
3
* Licensed under the MIT License. See License.txt in the project root for license information.
4
*--------------------------------------------------------------------------------------------*/
5
use std::{
6
fs::File,
7
io::{self, BufRead, Seek},
8
task::Poll,
9
time::Duration,
10
};
11
12
use tokio::{
13
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
14
sync::mpsc,
15
time::sleep,
16
};
17
18
use super::ring_buffer::RingBuffer;
19
20
pub trait ReportCopyProgress {
21
fn report_progress(&mut self, bytes_so_far: u64, total_bytes: u64);
22
}
23
24
/// Type that doesn't emit anything for download progress.
25
pub struct SilentCopyProgress();
26
27
impl ReportCopyProgress for SilentCopyProgress {
28
fn report_progress(&mut self, _bytes_so_far: u64, _total_bytes: u64) {}
29
}
30
31
/// Copies from the reader to the writer, reporting progress to the provided
32
/// reporter every so often.
33
pub async fn copy_async_progress<T, R, W>(
34
mut reporter: T,
35
reader: &mut R,
36
writer: &mut W,
37
total_bytes: u64,
38
) -> io::Result<u64>
39
where
40
R: AsyncRead + Unpin,
41
W: AsyncWrite + Unpin,
42
T: ReportCopyProgress,
43
{
44
let mut buf = vec![0; 8 * 1024];
45
let mut bytes_so_far = 0;
46
let mut bytes_last_reported = 0;
47
let report_granularity = std::cmp::min(total_bytes / 10, 2 * 1024 * 1024);
48
49
reporter.report_progress(0, total_bytes);
50
51
loop {
52
let read_buf = match reader.read(&mut buf).await {
53
Ok(0) => break,
54
Ok(n) => &buf[..n],
55
Err(e) => return Err(e),
56
};
57
58
writer.write_all(read_buf).await?;
59
60
bytes_so_far += read_buf.len() as u64;
61
if bytes_so_far - bytes_last_reported > report_granularity {
62
bytes_last_reported = bytes_so_far;
63
reporter.report_progress(bytes_so_far, total_bytes);
64
}
65
}
66
67
reporter.report_progress(bytes_so_far, total_bytes);
68
69
Ok(bytes_so_far)
70
}
71
72
/// Helper used when converting Future interfaces to poll-based interfaces.
73
/// Stores excess data that can be reused on future polls.
74
#[derive(Default)]
75
pub(crate) struct ReadBuffer(Option<(Vec<u8>, usize)>);
76
77
impl ReadBuffer {
78
/// Removes any data stored in the read buffer
79
pub fn take_data(&mut self) -> Option<(Vec<u8>, usize)> {
80
self.0.take()
81
}
82
83
/// Writes as many bytes as possible to the readbuf, stashing any extra.
84
pub fn put_data(
85
&mut self,
86
target: &mut tokio::io::ReadBuf<'_>,
87
bytes: Vec<u8>,
88
start: usize,
89
) -> Poll<std::io::Result<()>> {
90
if bytes.is_empty() {
91
self.0 = None;
92
// should not return Ok(), since if nothing is written to the target
93
// it signals EOF. Instead wait for more data from the source.
94
return Poll::Pending;
95
}
96
97
if target.remaining() >= bytes.len() - start {
98
target.put_slice(&bytes[start..]);
99
self.0 = None;
100
} else {
101
let end = start + target.remaining();
102
target.put_slice(&bytes[start..end]);
103
self.0 = Some((bytes, end));
104
}
105
106
Poll::Ready(Ok(()))
107
}
108
}
109
110
#[derive(Debug)]
111
pub enum TailEvent {
112
/// A new line was read from the file. The line includes its trailing newline character.
113
Line(String),
114
/// The file appears to have been rewritten (size shrunk)
115
Reset,
116
/// An error was encountered with the file.
117
Err(io::Error),
118
}
119
120
/// Simple, naive implementation of `tail -f -n <n> <path>`. Uses polling, so
121
/// it's not the fastest, but simple and working for easy cases.
122
pub fn tailf(file: File, n: usize) -> mpsc::UnboundedReceiver<TailEvent> {
123
let (tx, rx) = mpsc::unbounded_channel();
124
let mut last_len = match file.metadata() {
125
Ok(m) => m.len(),
126
Err(e) => {
127
tx.send(TailEvent::Err(e)).ok();
128
return rx;
129
}
130
};
131
132
let mut reader = io::BufReader::new(file);
133
let mut pos = 0;
134
135
// Read the initial "n" lines back from the request. initial_lines
136
// is a small ring buffer.
137
let mut initial_lines = RingBuffer::new(n);
138
loop {
139
let mut line = String::new();
140
let bytes_read = match reader.read_line(&mut line) {
141
Ok(0) => break,
142
Ok(n) => n,
143
Err(e) => {
144
tx.send(TailEvent::Err(e)).ok();
145
return rx;
146
}
147
};
148
149
if !line.ends_with('\n') {
150
// EOF
151
break;
152
}
153
154
pos += bytes_read as u64;
155
initial_lines.push(line);
156
}
157
158
for line in initial_lines.into_iter() {
159
tx.send(TailEvent::Line(line)).ok();
160
}
161
162
// now spawn the poll process to keep reading new lines
163
tokio::spawn(async move {
164
let poll_interval = Duration::from_millis(500);
165
166
loop {
167
tokio::select! {
168
_ = sleep(poll_interval) => {},
169
_ = tx.closed() => return
170
}
171
172
match reader.get_ref().metadata() {
173
Err(e) => {
174
tx.send(TailEvent::Err(e)).ok();
175
return;
176
}
177
Ok(m) => {
178
if m.len() == last_len {
179
continue;
180
}
181
182
if m.len() < last_len {
183
tx.send(TailEvent::Reset).ok();
184
pos = 0;
185
}
186
187
last_len = m.len();
188
}
189
}
190
191
if let Err(e) = reader.seek(io::SeekFrom::Start(pos)) {
192
tx.send(TailEvent::Err(e)).ok();
193
return;
194
}
195
196
loop {
197
let mut line = String::new();
198
let n = match reader.read_line(&mut line) {
199
Ok(0) => break,
200
Ok(n) => n,
201
Err(e) => {
202
tx.send(TailEvent::Err(e)).ok();
203
return;
204
}
205
};
206
207
if n == 0 || !line.ends_with('\n') {
208
break;
209
}
210
211
pos += n as u64;
212
if tx.send(TailEvent::Line(line)).is_err() {
213
return;
214
}
215
}
216
}
217
});
218
219
rx
220
}
221
222
#[cfg(test)]
223
mod tests {
224
use rand::Rng;
225
use std::{fs::OpenOptions, io::Write};
226
227
use super::*;
228
229
#[tokio::test]
230
async fn test_tailf_empty() {
231
let dir = tempfile::tempdir().unwrap();
232
let file_path = dir.path().join("tmp");
233
234
let read_file = OpenOptions::new()
235
.write(true)
236
.read(true)
237
.create(true)
238
.truncate(true)
239
.open(&file_path)
240
.unwrap();
241
242
let mut rx = tailf(read_file, 32);
243
assert!(rx.try_recv().is_err());
244
245
let mut append_file = OpenOptions::new().append(true).open(&file_path).unwrap();
246
writeln!(&mut append_file, "some line").unwrap();
247
248
let recv = rx.recv().await;
249
if let Some(TailEvent::Line(l)) = recv {
250
assert_eq!("some line\n".to_string(), l);
251
} else {
252
unreachable!("expect a line event, got {:?}", recv)
253
}
254
255
write!(&mut append_file, "partial ").unwrap();
256
writeln!(&mut append_file, "line").unwrap();
257
258
let recv = rx.recv().await;
259
if let Some(TailEvent::Line(l)) = recv {
260
assert_eq!("partial line\n".to_string(), l);
261
} else {
262
unreachable!("expect a line event, got {:?}", recv)
263
}
264
}
265
266
#[tokio::test]
267
async fn test_tailf_resets() {
268
let dir = tempfile::tempdir().unwrap();
269
let file_path = dir.path().join("tmp");
270
271
let mut read_file = OpenOptions::new()
272
.write(true)
273
.read(true)
274
.create(true)
275
.truncate(true)
276
.open(&file_path)
277
.unwrap();
278
279
writeln!(&mut read_file, "some existing content").unwrap();
280
let mut rx = tailf(read_file, 0);
281
assert!(rx.try_recv().is_err());
282
283
let mut append_file = File::create(&file_path).unwrap(); // truncates
284
writeln!(&mut append_file, "some line").unwrap();
285
286
let recv = rx.recv().await;
287
if let Some(TailEvent::Reset) = recv {
288
// ok
289
} else {
290
unreachable!("expect a reset event, got {:?}", recv)
291
}
292
293
let recv = rx.recv().await;
294
if let Some(TailEvent::Line(l)) = recv {
295
assert_eq!("some line\n".to_string(), l);
296
} else {
297
unreachable!("expect a line event, got {:?}", recv)
298
}
299
}
300
301
#[tokio::test]
302
async fn test_tailf_with_data() {
303
let dir = tempfile::tempdir().unwrap();
304
let file_path = dir.path().join("tmp");
305
306
let mut read_file = OpenOptions::new()
307
.write(true)
308
.read(true)
309
.create(true)
310
.truncate(true)
311
.open(&file_path)
312
.unwrap();
313
let mut rng = rand::thread_rng();
314
315
let mut written = vec![];
316
let base_line = "Elit ipsum cillum ex cillum. Adipisicing consequat cupidatat do proident ut in sunt Lorem ipsum tempor. Eiusmod ipsum Lorem labore exercitation sunt pariatur excepteur fugiat cillum velit cillum enim. Nisi Lorem cupidatat ad enim velit officia eiusmod esse tempor aliquip. Deserunt pariatur tempor in duis culpa esse sit nulla irure ullamco ipsum voluptate non laboris. Occaecat officia nulla officia mollit do aliquip reprehenderit ad incididunt.";
317
for i in 0..100 {
318
let line = format!("{}: {}", i, &base_line[..rng.gen_range(0..base_line.len())]);
319
writeln!(&mut read_file, "{line}").unwrap();
320
written.push(line);
321
}
322
write!(&mut read_file, "partial line").unwrap();
323
read_file.seek(io::SeekFrom::Start(0)).unwrap();
324
325
let last_n = 32;
326
let mut rx = tailf(read_file, last_n);
327
for i in 0..last_n {
328
let recv = rx.try_recv().unwrap();
329
if let TailEvent::Line(l) = recv {
330
let mut expected = written[written.len() - last_n + i].to_string();
331
expected.push('\n');
332
assert_eq!(expected, l);
333
} else {
334
unreachable!("expect a line event, got {:?}", recv)
335
}
336
}
337
338
assert!(rx.try_recv().is_err());
339
340
let mut append_file = OpenOptions::new().append(true).open(&file_path).unwrap();
341
writeln!(append_file, " is now complete").unwrap();
342
343
let recv = rx.recv().await;
344
if let Some(TailEvent::Line(l)) = recv {
345
assert_eq!("partial line is now complete\n".to_string(), l);
346
} else {
347
unreachable!("expect a line event, got {:?}", recv)
348
}
349
}
350
}
351
352