Path: blob/main/crates/polars-io/src/cloud/cloud_writer/writer.rs
8472 views
use std::num::NonZeroUsize;1use std::sync::Arc;23use bytes::Bytes;4use polars_error::PolarsResult;56use crate::cloud::PolarsObjectStore;7use crate::cloud::cloud_writer::bufferer::BytesBufferer;8use crate::cloud::cloud_writer::internal_writer::{InternalCloudWriter, InternalCloudWriterState};9use crate::metrics::{IOMetrics, OptIOMetrics};1011pub struct CloudWriter {12writer: InternalCloudWriter,13bufferer: BytesBufferer,14}1516impl CloudWriter {17pub fn new(18store: PolarsObjectStore,19path: object_store::path::Path,20upload_chunk_size: usize,21max_concurrency: NonZeroUsize,22io_metrics: Option<Arc<IOMetrics>>,23) -> Self {24let bufferer = BytesBufferer::new(upload_chunk_size);2526Self {27writer: InternalCloudWriter {28store,29path,30max_concurrency,31io_metrics: OptIOMetrics(io_metrics),32state: InternalCloudWriterState::NotStarted,33},34bufferer,35}36}3738pub async fn start(&mut self) -> PolarsResult<()> {39self.writer.start().await40}4142pub async fn write_all_owned(&mut self, mut bytes: Bytes) -> PolarsResult<()> {43while !bytes.is_empty() {44self.bufferer.push_owned(&mut bytes);4546if let Some(payload) = self.bufferer.flush_full_chunk() {47self.writer.put(payload).await?;48}49}5051Ok(())52}5354pub(super) fn fill_buffer_from_slice(&mut self, bytes: &mut &[u8]) -> bool {55self.bufferer.push_slice(bytes);56self.bufferer.is_full()57}5859pub(super) async fn flush_full_chunk(&mut self) -> PolarsResult<()> {60if let Some(payload) = self.bufferer.flush_full_chunk() {61self.writer.put(payload).await?;62}6364Ok(())65}6667pub(super) async fn flush(&mut self) -> PolarsResult<()> {68if let Some(payload) = self.bufferer.flush() {69self.writer.put(payload).await?;70}7172assert!(self.bufferer.is_empty());7374Ok(())75}7677pub(super) fn has_buffered_bytes(&self) -> bool {78!self.bufferer.is_empty()79}8081pub async fn finish(&mut self) -> PolarsResult<()> {82if let Some(payload) = self.bufferer.flush() {83self.writer.put(payload).await?;84}8586assert!(self.bufferer.is_empty());8788self.writer.finish().await89}90}919293