Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/utils/file.rs
6939 views
1
use std::io;
2
use std::ops::{Deref, DerefMut};
3
use std::path::Path;
4
5
#[cfg(feature = "cloud")]
6
pub use async_writeable::AsyncWriteable;
7
use polars_core::config;
8
use polars_error::{PolarsError, PolarsResult, feature_gated};
9
use polars_utils::create_file;
10
use polars_utils::file::{ClosableFile, WriteClose};
11
use polars_utils::mmap::ensure_not_mapped;
12
use polars_utils::plpath::{CloudScheme, PlPathRef};
13
14
use super::sync_on_close::SyncOnCloseType;
15
use crate::cloud::CloudOptions;
16
use crate::resolve_homedir;
17
18
pub trait DynWriteable: io::Write + Send {
19
// Needed because trait upcasting is only stable in 1.86.
20
fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static);
21
fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static);
22
23
fn close(self: Box<Self>) -> io::Result<()>;
24
fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()>;
25
}
26
27
impl DynWriteable for ClosableFile {
28
fn as_dyn_write(&self) -> &(dyn io::Write + Send + 'static) {
29
self as _
30
}
31
fn as_mut_dyn_write(&mut self) -> &mut (dyn io::Write + Send + 'static) {
32
self as _
33
}
34
fn close(self: Box<Self>) -> io::Result<()> {
35
ClosableFile::close(*self)
36
}
37
fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> io::Result<()> {
38
super::sync_on_close::sync_on_close(sync_on_close, self.as_mut())
39
}
40
}
41
42
/// Holds a non-async writeable file, abstracted over local files or cloud files.
43
///
44
/// This implements `DerefMut` to a trait object implementing [`std::io::Write`].
45
///
46
/// Also see: `Writeable::try_into_async_writeable` and `AsyncWriteable`.
47
#[allow(clippy::large_enum_variant)] // It will be boxed
48
pub enum Writeable {
49
/// An abstract implementation for writable.
50
///
51
/// This is used to implement writing to in-memory and arbitrary file descriptors.
52
Dyn(Box<dyn DynWriteable>),
53
Local(std::fs::File),
54
#[cfg(feature = "cloud")]
55
Cloud(crate::cloud::BlockingCloudWriter),
56
}
57
58
impl Writeable {
59
pub fn try_new(
60
addr: PlPathRef,
61
#[cfg_attr(not(feature = "cloud"), allow(unused))] cloud_options: Option<&CloudOptions>,
62
) -> PolarsResult<Self> {
63
let verbose = config::verbose();
64
65
match addr {
66
PlPathRef::Cloud(p) => {
67
feature_gated!("cloud", {
68
use crate::cloud::BlockingCloudWriter;
69
70
if verbose {
71
eprintln!("Writeable: try_new: cloud: {p}")
72
}
73
74
if p.scheme() == CloudScheme::File {
75
create_file(Path::new(p.strip_scheme()))?;
76
}
77
78
let writer = crate::pl_async::get_runtime().block_in_place_on(
79
BlockingCloudWriter::new(&p.to_string(), cloud_options),
80
)?;
81
Ok(Self::Cloud(writer))
82
})
83
},
84
PlPathRef::Local(path) if config::force_async() => {
85
feature_gated!("cloud", {
86
use crate::cloud::BlockingCloudWriter;
87
88
let path = resolve_homedir(&path);
89
90
if verbose {
91
eprintln!("Writeable: try_new: forced async: {}", path.display())
92
}
93
94
create_file(&path)?;
95
let path = std::fs::canonicalize(&path)?;
96
97
ensure_not_mapped(&path.metadata()?)?;
98
99
let path = format!(
100
"file://{}",
101
if cfg!(target_family = "windows") {
102
path.to_str().unwrap().strip_prefix(r#"\\?\"#).unwrap()
103
} else {
104
path.to_str().unwrap()
105
}
106
);
107
108
if verbose {
109
eprintln!("Writeable: try_new: forced async converted path: {path}")
110
}
111
112
let writer = crate::pl_async::get_runtime()
113
.block_in_place_on(BlockingCloudWriter::new(&path, cloud_options))?;
114
Ok(Self::Cloud(writer))
115
})
116
},
117
PlPathRef::Local(path) => {
118
let path = resolve_homedir(&path);
119
create_file(&path)?;
120
121
// Note: `canonicalize` does not work on some systems.
122
123
if verbose {
124
eprintln!(
125
"Writeable: try_new: local: {} (canonicalize: {:?})",
126
path.display(),
127
std::fs::canonicalize(&path)
128
)
129
}
130
131
Ok(Self::Local(polars_utils::open_file_write(&path)?))
132
},
133
}
134
}
135
136
/// This returns `Result<>` - if a write was performed before calling this,
137
/// `CloudWriter` can be in an Err(_) state.
138
#[cfg(feature = "cloud")]
139
pub fn try_into_async_writeable(self) -> PolarsResult<AsyncWriteable> {
140
use self::async_writeable::AsyncDynWriteable;
141
142
match self {
143
Self::Dyn(v) => Ok(AsyncWriteable::Dyn(AsyncDynWriteable(v))),
144
Self::Local(v) => Ok(AsyncWriteable::Local(tokio::fs::File::from_std(v))),
145
// Moves the `BufWriter` out of the `BlockingCloudWriter` wrapper, as
146
// `BlockingCloudWriter` has a `Drop` impl that we don't want.
147
Self::Cloud(v) => v
148
.try_into_inner()
149
.map(AsyncWriteable::Cloud)
150
.map_err(PolarsError::from),
151
}
152
}
153
154
pub fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> std::io::Result<()> {
155
match self {
156
Writeable::Dyn(d) => d.sync_on_close(sync_on_close),
157
Writeable::Local(file) => {
158
crate::utils::sync_on_close::sync_on_close(sync_on_close, file)
159
},
160
#[cfg(feature = "cloud")]
161
Writeable::Cloud(_) => Ok(()),
162
}
163
}
164
165
pub fn close(self) -> std::io::Result<()> {
166
match self {
167
Self::Dyn(v) => v.close(),
168
Self::Local(v) => ClosableFile::from(v).close(),
169
#[cfg(feature = "cloud")]
170
Self::Cloud(mut v) => v.close(),
171
}
172
}
173
}
174
175
impl Deref for Writeable {
176
type Target = dyn io::Write + Send;
177
178
fn deref(&self) -> &Self::Target {
179
match self {
180
Self::Dyn(v) => v.as_dyn_write(),
181
Self::Local(v) => v,
182
#[cfg(feature = "cloud")]
183
Self::Cloud(v) => v,
184
}
185
}
186
}
187
188
impl DerefMut for Writeable {
189
fn deref_mut(&mut self) -> &mut Self::Target {
190
match self {
191
Self::Dyn(v) => v.as_mut_dyn_write(),
192
Self::Local(v) => v,
193
#[cfg(feature = "cloud")]
194
Self::Cloud(v) => v,
195
}
196
}
197
}
198
199
/// Note: Prefer using [`Writeable`] / [`Writeable::try_new`] where possible.
200
///
201
/// Open a path for writing. Supports cloud paths.
202
pub fn try_get_writeable(
203
addr: PlPathRef<'_>,
204
cloud_options: Option<&CloudOptions>,
205
) -> PolarsResult<Box<dyn WriteClose + Send>> {
206
Writeable::try_new(addr, cloud_options).map(|x| match x {
207
Writeable::Dyn(_) => unreachable!(),
208
Writeable::Local(v) => Box::new(ClosableFile::from(v)) as Box<dyn WriteClose + Send>,
209
#[cfg(feature = "cloud")]
210
Writeable::Cloud(v) => Box::new(v) as Box<dyn WriteClose + Send>,
211
})
212
}
213
214
#[cfg(feature = "cloud")]
215
mod async_writeable {
216
use std::io;
217
use std::ops::{Deref, DerefMut};
218
use std::pin::Pin;
219
use std::task::{Context, Poll};
220
221
use polars_error::{PolarsError, PolarsResult};
222
use polars_utils::file::ClosableFile;
223
use polars_utils::plpath::PlPathRef;
224
use tokio::io::AsyncWriteExt;
225
use tokio::task;
226
227
use super::{DynWriteable, Writeable};
228
use crate::cloud::CloudOptions;
229
use crate::utils::sync_on_close::SyncOnCloseType;
230
231
/// Turn an abstract io::Write into an abstract tokio::io::AsyncWrite.
232
pub struct AsyncDynWriteable(pub Box<dyn DynWriteable>);
233
234
impl tokio::io::AsyncWrite for AsyncDynWriteable {
235
fn poll_write(
236
self: Pin<&mut Self>,
237
_cx: &mut Context<'_>,
238
buf: &[u8],
239
) -> Poll<io::Result<usize>> {
240
let result = task::block_in_place(|| self.get_mut().0.write(buf));
241
Poll::Ready(result)
242
}
243
244
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
245
let result = task::block_in_place(|| self.get_mut().0.flush());
246
Poll::Ready(result)
247
}
248
249
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
250
self.poll_flush(cx)
251
}
252
}
253
254
/// Holds an async writeable file, abstracted over local files or cloud files.
255
///
256
/// This implements `DerefMut` to a trait object implementing [`tokio::io::AsyncWrite`].
257
///
258
/// Note: It is important that you do not call `shutdown()` on the deref'ed `AsyncWrite` object.
259
/// You should instead call the [`AsyncWriteable::close`] at the end.
260
pub enum AsyncWriteable {
261
Dyn(AsyncDynWriteable),
262
Local(tokio::fs::File),
263
Cloud(object_store::buffered::BufWriter),
264
}
265
266
impl AsyncWriteable {
267
pub async fn try_new(
268
addr: PlPathRef<'_>,
269
cloud_options: Option<&CloudOptions>,
270
) -> PolarsResult<Self> {
271
// TODO: Native async impl
272
Writeable::try_new(addr, cloud_options).and_then(|x| x.try_into_async_writeable())
273
}
274
275
pub async fn sync_on_close(
276
&mut self,
277
sync_on_close: SyncOnCloseType,
278
) -> std::io::Result<()> {
279
match self {
280
Self::Dyn(d) => task::block_in_place(|| d.0.sync_on_close(sync_on_close)),
281
Self::Local(file) => {
282
crate::utils::sync_on_close::tokio_sync_on_close(sync_on_close, file).await
283
},
284
Self::Cloud(_) => Ok(()),
285
}
286
}
287
288
pub async fn close(self) -> PolarsResult<()> {
289
match self {
290
Self::Dyn(mut v) => {
291
v.shutdown().await.map_err(PolarsError::from)?;
292
Ok(task::block_in_place(|| v.0.close())?)
293
},
294
Self::Local(v) => async {
295
let f = v.into_std().await;
296
ClosableFile::from(f).close()
297
}
298
.await
299
.map_err(PolarsError::from),
300
Self::Cloud(mut v) => v.shutdown().await.map_err(PolarsError::from),
301
}
302
}
303
}
304
305
impl Deref for AsyncWriteable {
306
type Target = dyn tokio::io::AsyncWrite + Send + Unpin;
307
308
fn deref(&self) -> &Self::Target {
309
match self {
310
Self::Dyn(v) => v,
311
Self::Local(v) => v,
312
Self::Cloud(v) => v,
313
}
314
}
315
}
316
317
impl DerefMut for AsyncWriteable {
318
fn deref_mut(&mut self) -> &mut Self::Target {
319
match self {
320
Self::Dyn(v) => v,
321
Self::Local(v) => v,
322
Self::Cloud(v) => v,
323
}
324
}
325
}
326
}
327
328