Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/cloud/cloud_writer/writer.rs
8472 views
1
use std::num::NonZeroUsize;
2
use std::sync::Arc;
3
4
use bytes::Bytes;
5
use polars_error::PolarsResult;
6
7
use crate::cloud::PolarsObjectStore;
8
use crate::cloud::cloud_writer::bufferer::BytesBufferer;
9
use crate::cloud::cloud_writer::internal_writer::{InternalCloudWriter, InternalCloudWriterState};
10
use crate::metrics::{IOMetrics, OptIOMetrics};
11
12
pub struct CloudWriter {
13
writer: InternalCloudWriter,
14
bufferer: BytesBufferer,
15
}
16
17
impl CloudWriter {
18
pub fn new(
19
store: PolarsObjectStore,
20
path: object_store::path::Path,
21
upload_chunk_size: usize,
22
max_concurrency: NonZeroUsize,
23
io_metrics: Option<Arc<IOMetrics>>,
24
) -> Self {
25
let bufferer = BytesBufferer::new(upload_chunk_size);
26
27
Self {
28
writer: InternalCloudWriter {
29
store,
30
path,
31
max_concurrency,
32
io_metrics: OptIOMetrics(io_metrics),
33
state: InternalCloudWriterState::NotStarted,
34
},
35
bufferer,
36
}
37
}
38
39
pub async fn start(&mut self) -> PolarsResult<()> {
40
self.writer.start().await
41
}
42
43
pub async fn write_all_owned(&mut self, mut bytes: Bytes) -> PolarsResult<()> {
44
while !bytes.is_empty() {
45
self.bufferer.push_owned(&mut bytes);
46
47
if let Some(payload) = self.bufferer.flush_full_chunk() {
48
self.writer.put(payload).await?;
49
}
50
}
51
52
Ok(())
53
}
54
55
pub(super) fn fill_buffer_from_slice(&mut self, bytes: &mut &[u8]) -> bool {
56
self.bufferer.push_slice(bytes);
57
self.bufferer.is_full()
58
}
59
60
pub(super) async fn flush_full_chunk(&mut self) -> PolarsResult<()> {
61
if let Some(payload) = self.bufferer.flush_full_chunk() {
62
self.writer.put(payload).await?;
63
}
64
65
Ok(())
66
}
67
68
pub(super) async fn flush(&mut self) -> PolarsResult<()> {
69
if let Some(payload) = self.bufferer.flush() {
70
self.writer.put(payload).await?;
71
}
72
73
assert!(self.bufferer.is_empty());
74
75
Ok(())
76
}
77
78
pub(super) fn has_buffered_bytes(&self) -> bool {
79
!self.bufferer.is_empty()
80
}
81
82
pub async fn finish(&mut self) -> PolarsResult<()> {
83
if let Some(payload) = self.bufferer.flush() {
84
self.writer.put(payload).await?;
85
}
86
87
assert!(self.bufferer.is_empty());
88
89
self.writer.finish().await
90
}
91
}
92
93