Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/cloud/adaptors.rs
6939 views
1
//! Interface with the object_store crate and define AsyncSeek, AsyncRead.
2
3
use std::sync::Arc;
4
5
use object_store::ObjectStore;
6
use object_store::buffered::BufWriter;
7
use object_store::path::Path;
8
use polars_error::PolarsResult;
9
use polars_utils::file::WriteClose;
10
use tokio::io::AsyncWriteExt;
11
12
use super::{CloudOptions, object_path_from_str};
13
use crate::pl_async::{get_runtime, get_upload_chunk_size};
14
15
fn clone_io_err(e: &std::io::Error) -> std::io::Error {
16
std::io::Error::new(e.kind(), e.to_string())
17
}
18
19
/// Adaptor which wraps the interface of [ObjectStore::BufWriter] exposing a synchronous interface
20
/// which implements `std::io::Write`.
21
///
22
/// This allows it to be used in sync code which would otherwise write to a simple File or byte stream,
23
/// such as with `polars::prelude::CsvWriter`.
24
///
25
/// [ObjectStore::BufWriter]: https://docs.rs/object_store/latest/object_store/buffered/struct.BufWriter.html
26
pub struct BlockingCloudWriter {
27
state: std::io::Result<BufWriter>,
28
}
29
30
impl BlockingCloudWriter {
31
/// Construct a new BlockingCloudWriter, re-using the given `object_store`
32
///
33
/// Creates a new (current-thread) Tokio runtime
34
/// which bridges the sync writing process with the async ObjectStore multipart uploading.
35
/// TODO: Naming?
36
pub fn new_with_object_store(
37
object_store: Arc<dyn ObjectStore>,
38
path: Path,
39
) -> PolarsResult<Self> {
40
let writer = BufWriter::with_capacity(object_store, path, get_upload_chunk_size());
41
Ok(BlockingCloudWriter { state: Ok(writer) })
42
}
43
44
/// Constructs a new BlockingCloudWriter from a path and an optional set of CloudOptions.
45
///
46
/// Wrapper around `BlockingCloudWriter::new_with_object_store` that is useful if you only have a single write task.
47
/// TODO: Naming?
48
pub async fn new(uri: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Self> {
49
if let Some(local_path) = uri.strip_prefix("file://") {
50
// Local paths must be created first, otherwise object store will not write anything.
51
if !matches!(std::fs::exists(local_path), Ok(true)) {
52
panic!("[BlockingCloudWriter] Expected local file to be created: {local_path}");
53
}
54
}
55
56
let (cloud_location, object_store) =
57
crate::cloud::build_object_store(uri, cloud_options, false).await?;
58
Self::new_with_object_store(
59
object_store.to_dyn_object_store().await,
60
object_path_from_str(&cloud_location.prefix)?,
61
)
62
}
63
64
/// Returns the underlying [`object_store::buffered::BufWriter`]
65
pub fn try_into_inner(mut self) -> std::io::Result<BufWriter> {
66
// We can't just return self.state:
67
// * cannot move out of type `adaptors::BlockingCloudWriter`, which implements the `Drop` trait
68
std::mem::replace(&mut self.state, Err(std::io::Error::other("")))
69
}
70
71
/// Closes the writer, or returns the existing error if it exists. After this function is called
72
/// the writer is guaranteed to be in an error state.
73
pub fn close(&mut self) -> std::io::Result<()> {
74
match self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.shutdown())) {
75
Ok(_) => {
76
self.state = Err(std::io::Error::other("closed"));
77
Ok(())
78
},
79
Err(e) => Err(e),
80
}
81
}
82
83
fn try_with_writer<F, O>(&mut self, func: F) -> std::io::Result<O>
84
where
85
F: Fn(&mut BufWriter) -> std::io::Result<O>,
86
{
87
let writer: &mut BufWriter = self.state.as_mut().map_err(|e| clone_io_err(e))?;
88
match func(writer) {
89
Ok(v) => Ok(v),
90
Err(e) => {
91
self.state = Err(clone_io_err(&e));
92
Err(e)
93
},
94
}
95
}
96
}
97
98
impl std::io::Write for BlockingCloudWriter {
99
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
100
// SAFETY:
101
// We extend the lifetime for the duration of this function. This is safe as we block the
102
// async runtime here
103
let buf = unsafe { std::mem::transmute::<&[u8], &'static [u8]>(buf) };
104
105
self.try_with_writer(|writer| {
106
get_runtime()
107
.block_in_place_on(async { writer.write_all(buf).await.map(|_t| buf.len()) })
108
})
109
}
110
111
fn flush(&mut self) -> std::io::Result<()> {
112
self.try_with_writer(|writer| get_runtime().block_in_place_on(writer.flush()))
113
}
114
}
115
116
impl WriteClose for BlockingCloudWriter {
117
fn close(mut self: Box<Self>) -> std::io::Result<()> {
118
BlockingCloudWriter::close(self.as_mut())
119
}
120
}
121
122
impl Drop for BlockingCloudWriter {
123
fn drop(&mut self) {
124
if self.state.is_err() {
125
return;
126
}
127
128
// Note: We should not hit here - the writer should instead be explicitly closed.
129
// But we still have this here as a safety measure to prevent silently dropping errors.
130
match self.close() {
131
Ok(()) => {},
132
e @ Err(_) => {
133
if std::thread::panicking() {
134
eprintln!("ERROR: CloudWriter errored on close: {e:?}")
135
} else {
136
e.unwrap()
137
}
138
},
139
}
140
}
141
}
142
143
#[cfg(test)]
144
mod tests {
145
146
use polars_core::df;
147
use polars_core::prelude::DataFrame;
148
149
fn example_dataframe() -> DataFrame {
150
df!(
151
"foo" => &[1, 2, 3],
152
"bar" => &[None, Some("bak"), Some("baz")],
153
)
154
.unwrap()
155
}
156
157
#[test]
158
#[cfg(feature = "csv")]
159
fn csv_to_local_objectstore_cloudwriter() {
160
use super::*;
161
use crate::csv::write::CsvWriter;
162
use crate::prelude::SerWriter;
163
164
let mut df = example_dataframe();
165
166
let object_store: Arc<dyn ObjectStore> = Arc::new(
167
object_store::local::LocalFileSystem::new_with_prefix(std::env::temp_dir())
168
.expect("Could not initialize connection"),
169
);
170
171
let path: object_store::path::Path = "cloud_writer_example.csv".into();
172
173
let mut cloud_writer =
174
BlockingCloudWriter::new_with_object_store(object_store, path).unwrap();
175
CsvWriter::new(&mut cloud_writer)
176
.finish(&mut df)
177
.expect("Could not write DataFrame as CSV to remote location");
178
}
179
180
// Skip this tests on Windows since it does not have a convenient /tmp/ location.
181
#[cfg_attr(target_os = "windows", ignore)]
182
#[cfg(feature = "csv")]
183
#[test]
184
fn cloudwriter_from_cloudlocation_test() {
185
use super::*;
186
use crate::SerReader;
187
use crate::csv::write::CsvWriter;
188
use crate::prelude::{CsvReadOptions, SerWriter};
189
190
let mut df = example_dataframe();
191
192
let path = "/tmp/cloud_writer_example2.csv";
193
194
std::fs::File::create(path).unwrap();
195
196
let mut cloud_writer = get_runtime()
197
.block_on(BlockingCloudWriter::new(
198
format!("file://{path}").as_str(),
199
None,
200
))
201
.unwrap();
202
203
CsvWriter::new(&mut cloud_writer)
204
.finish(&mut df)
205
.expect("Could not write DataFrame as CSV to remote location");
206
207
cloud_writer.close().unwrap();
208
209
assert_eq!(
210
CsvReadOptions::default()
211
.try_into_reader_with_file_path(Some(path.into()))
212
.unwrap()
213
.finish()
214
.unwrap(),
215
df
216
);
217
}
218
}
219
220