Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/writers/mod.rs
7884 views
1
use std::sync::Arc;
2
3
use polars_core::schema::SchemaRef;
4
use polars_error::PolarsResult;
5
use polars_io::utils::sync_on_close::SyncOnCloseType;
6
use polars_plan::dsl::FileType;
7
use polars_utils::IdxSize;
8
9
use crate::nodes::io_sinks2::writers::interface::FileWriterStarter;
10
11
pub mod interface;
12
#[cfg(feature = "ipc")]
13
pub mod ipc;
14
#[cfg(feature = "parquet")]
15
pub mod parquet;
16
17
pub fn create_file_writer_starter(
18
file_format: &Arc<FileType>,
19
file_schema: &SchemaRef,
20
pipeline_depth: usize,
21
sync_on_close: SyncOnCloseType,
22
) -> PolarsResult<Arc<dyn FileWriterStarter>> {
23
Ok(match file_format.as_ref() {
24
#[cfg(feature = "parquet")]
25
FileType::Parquet(options) => {
26
use polars_core::prelude::CompatLevel;
27
use polars_io::schema_to_arrow_checked;
28
29
use crate::nodes::io_sinks2::writers::parquet::ParquetWriterStarter;
30
31
let arrow_schema = Arc::new(schema_to_arrow_checked(
32
file_schema.as_ref(),
33
CompatLevel::newest(),
34
"",
35
)?);
36
37
Arc::new(ParquetWriterStarter {
38
options: options.clone(),
39
arrow_schema,
40
initialized_state: Default::default(),
41
pipeline_depth,
42
sync_on_close,
43
row_group_size: options
44
.row_group_size
45
.map(|x| IdxSize::try_from(x).unwrap()),
46
}) as _
47
},
48
#[cfg(feature = "ipc")]
49
FileType::Ipc(options) => {
50
use crate::nodes::io_sinks2::writers::ipc::IpcWriterStarter;
51
52
Arc::new(IpcWriterStarter {
53
options: *options,
54
schema: file_schema.clone(),
55
pipeline_depth,
56
sync_on_close,
57
}) as _
58
},
59
_ => unimplemented!(),
60
})
61
}
62
63