Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/utils/file.rs
8422 views
1
use std::io;
2
#[cfg(feature = "cloud")]
3
use std::num::NonZeroUsize;
4
use std::ops::{Deref, DerefMut};
5
use std::sync::Arc;
6
7
#[cfg(feature = "cloud")]
8
pub use async_writeable::{AsyncDynWriteable, AsyncWriteable};
9
use polars_error::{PolarsResult, feature_gated, polars_err};
10
use polars_utils::create_file;
11
use polars_utils::file::close_file;
12
use polars_utils::mmap::ensure_not_mapped;
13
use polars_utils::pl_path::{PlRefPath, format_file_uri};
14
15
use super::sync_on_close::SyncOnCloseType;
16
use crate::cloud::CloudOptions;
17
use crate::metrics::IOMetrics;
18
use crate::resolve_homedir;
19
20
// TODO document precise contract.
21
pub trait WriteableTrait: std::io::Write {
22
fn close(&mut self) -> std::io::Result<()>;
23
fn sync_all(&self) -> std::io::Result<()>;
24
fn sync_data(&self) -> std::io::Result<()>;
25
}
26
27
/// Holds a non-async writeable file, abstracted over local files or cloud files.
28
///
29
/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
30
///
31
/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
32
#[allow(clippy::large_enum_variant)] // It will be boxed
33
pub enum Writeable {
34
/// An abstract implementation for writable.
35
///
36
/// This is used to implement writing to in-memory and arbitrary file descriptors.
37
Dyn(Box<dyn WriteableTrait + Send>),
38
Local(std::fs::File),
39
#[cfg(feature = "cloud")]
40
Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
41
}
42
43
impl Writeable {
44
pub fn try_new(
45
path: PlRefPath,
46
#[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_options: Option<&CloudOptions>,
47
#[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_chunk_size: usize,
48
#[cfg_attr(not(feature = "cloud"), expect(unused))] cloud_upload_concurrency: usize,
49
io_metrics: Option<Arc<IOMetrics>>,
50
) -> PolarsResult<Self> {
51
Ok(if path.has_scheme() {
52
feature_gated!("cloud", {
53
use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
54
use crate::pl_async::get_runtime;
55
56
let writer = get_runtime().block_in_place_on(new_cloud_writer(
57
path,
58
cloud_options,
59
cloud_upload_chunk_size,
60
cloud_upload_concurrency.try_into().unwrap(),
61
io_metrics,
62
))?;
63
64
Self::Cloud(CloudWriterIoTraitWrap::from(writer))
65
})
66
} else if polars_config::config().force_async() {
67
feature_gated!("cloud", {
68
let path = resolve_homedir(path.as_std_path());
69
create_file(&path)?;
70
let path = std::fs::canonicalize(&path)?;
71
72
ensure_not_mapped(&path.metadata()?)?;
73
74
let path = path.to_str().ok_or_else(|| polars_err!(non_utf8_path))?;
75
let path = format_file_uri(path);
76
77
use crate::cloud::cloud_writer::CloudWriterIoTraitWrap;
78
use crate::pl_async::get_runtime;
79
80
let writer = get_runtime().block_in_place_on(new_cloud_writer(
81
path,
82
cloud_options,
83
cloud_upload_chunk_size,
84
cloud_upload_concurrency.try_into().unwrap(),
85
io_metrics,
86
))?;
87
88
Self::Cloud(CloudWriterIoTraitWrap::from(writer))
89
})
90
} else {
91
let path = resolve_homedir(path.as_std_path());
92
create_file(&path)?;
93
94
Self::Local(polars_utils::open_file_write(&path)?)
95
})
96
}
97
98
/// This returns `Result<>` - if a write was performed before calling this,
99
/// `CloudWriter` can be in an Err(_) state.
100
#[cfg(feature = "cloud")]
101
pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
102
use self::async_writeable::AsyncDynWriteable;
103
104
match self {
105
Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
106
Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
107
Self::Cloud(v) => Ok(AsyncWriteable::Cloud(v)),
108
}
109
}
110
111
pub fn as_buffered(&mut self) -> BufferedWriteable<'_> {
112
match self {
113
Writeable::Dyn(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v.as_mut())),
114
Writeable::Local(v) => BufferedWriteable::BufWriter(std::io::BufWriter::new(v)),
115
#[cfg(feature = "cloud")]
116
Writeable::Cloud(v) => BufferedWriteable::Direct(v as _),
117
}
118
}
119
120
pub fn sync_all(&self) -> io::Result<()> {
121
match self {
122
Self::Dyn(v) => v.sync_all(),
123
Self::Local(v) => v.sync_all(),
124
#[cfg(feature = "cloud")]
125
Self::Cloud(v) => v.sync_all(),
126
}
127
}
128
129
pub fn sync_data(&self) -> io::Result<()> {
130
match self {
131
Self::Dyn(v) => v.sync_data(),
132
Self::Local(v) => v.sync_data(),
133
#[cfg(feature = "cloud")]
134
Self::Cloud(v) => v.sync_data(),
135
}
136
}
137
138
pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
139
match sync {
140
SyncOnCloseType::All => self.sync_all()?,
141
SyncOnCloseType::Data => self.sync_data()?,
142
SyncOnCloseType::None => {},
143
}
144
145
match self {
146
Self::Dyn(mut v) => v.close(),
147
Self::Local(v) => close_file(v),
148
#[cfg(feature = "cloud")]
149
Self::Cloud(mut v) => v.close(),
150
}
151
}
152
}
153
154
impl io::Write for Writeable {
155
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
156
match self {
157
Self::Dyn(v) => v.write(buf),
158
Self::Local(v) => v.write(buf),
159
#[cfg(feature = "cloud")]
160
Self::Cloud(v) => v.write(buf),
161
}
162
}
163
164
fn flush(&mut self) -> io::Result<()> {
165
self.sync_all()
166
}
167
}
168
169
impl Deref for Writeable {
170
type Target = dyn io::Write + Send;
171
172
fn deref(&self) -> &Self::Target {
173
match self {
174
Self::Dyn(v) => v,
175
Self::Local(v) => v,
176
#[cfg(feature = "cloud")]
177
Self::Cloud(v) => v,
178
}
179
}
180
}
181
182
impl DerefMut for Writeable {
183
fn deref_mut(&mut self) -> &mut Self::Target {
184
match self {
185
Self::Dyn(v) => v,
186
Self::Local(v) => v,
187
#[cfg(feature = "cloud")]
188
Self::Cloud(v) => v,
189
}
190
}
191
}
192
193
/// Avoid BufWriter wrapping on writers that already have internal buffering.
194
pub enum BufferedWriteable<'a> {
195
BufWriter(std::io::BufWriter<&'a mut (dyn std::io::Write + Send)>),
196
Direct(&'a mut (dyn std::io::Write + Send)),
197
}
198
199
impl<'a> Deref for BufferedWriteable<'a> {
200
type Target = dyn io::Write + Send + 'a;
201
202
fn deref(&self) -> &Self::Target {
203
match self {
204
Self::BufWriter(v) => v as _,
205
Self::Direct(v) => v,
206
}
207
}
208
}
209
210
impl DerefMut for BufferedWriteable<'_> {
211
fn deref_mut(&mut self) -> &mut Self::Target {
212
match self {
213
Self::BufWriter(v) => v as _,
214
Self::Direct(v) => v,
215
}
216
}
217
}
218
219
#[cfg(feature = "cloud")]
220
async fn new_cloud_writer(
221
path: PlRefPath,
222
cloud_options: Option<&CloudOptions>,
223
cloud_upload_chunk_size: usize,
224
cloud_upload_concurrency: NonZeroUsize,
225
io_metrics: Option<Arc<IOMetrics>>,
226
) -> PolarsResult<crate::cloud::cloud_writer::CloudWriter> {
227
use crate::cloud::cloud_writer::CloudWriter;
228
use crate::cloud::object_path_from_str;
229
230
let (cloud_location, object_store) =
231
crate::cloud::build_object_store(path, cloud_options, false).await?;
232
233
let mut writer = CloudWriter::new(
234
object_store,
235
object_path_from_str(&cloud_location.prefix)?,
236
cloud_upload_chunk_size,
237
cloud_upload_concurrency,
238
io_metrics,
239
);
240
241
writer.start().await?;
242
243
Ok(writer)
244
}
245
246
#[cfg(feature = "cloud")]
247
mod async_writeable {
248
use std::io;
249
use std::ops::{Deref, DerefMut};
250
use std::pin::Pin;
251
use std::sync::Arc;
252
use std::task::{Context, Poll};
253
254
use bytes::Bytes;
255
use polars_error::{PolarsError, PolarsResult};
256
use polars_utils::file::close_file;
257
use polars_utils::pl_path::PlRefPath;
258
use tokio::io::AsyncWriteExt;
259
use tokio::task;
260
261
use super::{Writeable, WriteableTrait};
262
use crate::cloud::CloudOptions;
263
use crate::metrics::IOMetrics;
264
use crate::utils::sync_on_close::SyncOnCloseType;
265
266
/// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
267
pub struct AsyncDynWriteable(pub Box<dyn WriteableTrait + Send>);
268
269
impl tokio::io::AsyncWrite for AsyncDynWriteable {
270
fn poll_write(
271
self: Pin<&mut Self>,
272
_cx: &mut Context<'_>,
273
buf: &[u8],
274
) -> Poll<io::Result<usize>> {
275
let result = task::block_in_place(|| self.get_mut().0.write(buf));
276
Poll::Ready(result)
277
}
278
279
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
280
let result = task::block_in_place(|| self.get_mut().0.flush());
281
Poll::Ready(result)
282
}
283
284
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
285
self.poll_flush(cx)
286
}
287
}
288
289
/// Holds an async writeable file, abstracted over local files or cloud files.
290
///
291
/// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
292
///
293
/// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
294
/// You should instead call the [`AsyncWriteable::close`] at the end.
295
pub enum AsyncWriteable {
296
Dyn(AsyncDynWriteable),
297
Local(tokio::fs::File),
298
Cloud(crate::cloud::cloud_writer::CloudWriterIoTraitWrap),
299
}
300
301
impl AsyncWriteable {
302
pub async fn try_new(
303
path: PlRefPath,
304
cloud_options: Option<&CloudOptions>,
305
cloud_upload_chunk_size: usize,
306
cloud_upload_concurrency: usize,
307
io_metrics: Option<Arc<IOMetrics>>,
308
) -> PolarsResult<Self> {
309
// TODO: Native async impl
310
Writeable::try_new(
311
path,
312
cloud_options,
313
cloud_upload_chunk_size,
314
cloud_upload_concurrency,
315
io_metrics,
316
)
317
.and_then(|x| x.try_into_async_writeable())
318
}
319
320
/// If this writer holds a cloud writer, it will `mem::take(T)`. `T` is unmodified for other
321
/// writer types.
322
pub async fn write_all_owned<T>(&mut self, src: &mut T) -> io::Result<()>
323
where
324
T: AsRef<[u8]> + Default + Drop, // `Drop` is to exclude `&[u8]` slices.
325
Bytes: From<T>,
326
{
327
match self {
328
Self::Cloud(v) => v.write_all_owned(Bytes::from(std::mem::take(src))).await,
329
Self::Dyn(_) | Self::Local(_) => self.write_all(src.as_ref()).await,
330
}
331
}
332
333
pub async fn sync_all(&mut self) -> io::Result<()> {
334
match self {
335
Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
336
Self::Local(v) => v.sync_all().await,
337
Self::Cloud(_) => Ok(()),
338
}
339
}
340
341
pub async fn sync_data(&mut self) -> io::Result<()> {
342
match self {
343
Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
344
Self::Local(v) => v.sync_data().await,
345
Self::Cloud(_) => Ok(()),
346
}
347
}
348
349
pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
350
match sync {
351
SyncOnCloseType::All => self.sync_all().await?,
352
SyncOnCloseType::Data => self.sync_data().await?,
353
SyncOnCloseType::None => {},
354
}
355
356
match self {
357
Self::Dyn(mut v) => {
358
v.shutdown().await.map_err(PolarsError::from)?;
359
Ok(task::block_in_place(|| v.0.close())?)
360
},
361
Self::Local(v) => async {
362
let f = v.into_std().await;
363
close_file(f)
364
}
365
.await
366
.map_err(PolarsError::from),
367
Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
368
}
369
}
370
}
371
372
impl Deref for AsyncWriteable {
373
type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
374
375
fn deref(&self) -> &Self::Target {
376
match self {
377
Self::Dyn(v) => v,
378
Self::Local(v) => v,
379
Self::Cloud(v) => v,
380
}
381
}
382
}
383
384
impl DerefMut for AsyncWriteable {
385
fn deref_mut(&mut self) -> &mut Self::Target {
386
match self {
387
Self::Dyn(v) => v,
388
Self::Local(v) => v,
389
Self::Cloud(v) => v,
390
}
391
}
392
}
393
}
394
395