Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/writers/ndjson/mod.rs
8485 views
1
use polars_core::config;
2
use polars_core::schema::SchemaRef;
3
use polars_error::PolarsResult;
4
use polars_io::ndjson::NDJsonWriterOptions;
5
use polars_io::pl_async;
6
use polars_utils::IdxSize;
7
use polars_utils::index::NonZeroIdxSize;
8
9
use crate::async_executor::{self, TaskPriority};
10
use crate::async_primitives::connector;
11
use crate::morsel::get_ideal_morsel_size;
12
use crate::nodes::io_sinks::components::sink_morsel::{SinkMorsel, SinkMorselPermit};
13
use crate::nodes::io_sinks::components::size::{
14
NonZeroRowCountAndSize, RowCountAndSize, TakeableRowsProvider,
15
};
16
use crate::nodes::io_sinks::writers::interface::{
17
FileOpenTaskHandle, FileWriterStarter, ideal_sink_morsel_size_env,
18
};
19
use crate::utils::tokio_handle_ext;
20
21
mod io_writer;
22
mod morsel_serializer;
23
24
pub struct NDJsonWriterStarter {
25
pub options: NDJsonWriterOptions,
26
pub schema: SchemaRef,
27
pub initialized_state: std::sync::Mutex<Option<InitializedState>>,
28
}
29
30
#[derive(Clone)]
31
pub struct InitializedState {
32
pub ideal_morsel_size: NonZeroRowCountAndSize,
33
pub base_allocation_size: usize,
34
}
35
36
impl NDJsonWriterStarter {
37
fn initialized_state(&self) -> InitializedState {
38
let mut initialized_state = self.initialized_state.lock().unwrap();
39
40
if initialized_state.is_none() {
41
let (env_num_rows, env_num_bytes) = ideal_sink_morsel_size_env();
42
43
let ideal_morsel_size = RowCountAndSize {
44
num_rows: env_num_rows
45
.unwrap_or(get_ideal_morsel_size().try_into().unwrap_or(IdxSize::MAX)),
46
num_bytes: env_num_bytes.unwrap_or(8 * 1024 * 1024),
47
};
48
49
let serialized_row_size_estimate = u64::saturating_mul(self.schema.len() as _, 50);
50
51
let base_allocation_size: usize = u64::min(
52
64 * 1024 * 1024,
53
u64::min(
54
ideal_morsel_size.num_bytes.saturating_mul(3),
55
u64::saturating_mul(
56
serialized_row_size_estimate,
57
ideal_morsel_size.num_rows as _,
58
),
59
),
60
) as _;
61
62
if config::verbose() {
63
eprintln!("[NDJsonWriterStarter]: base_allocation_size: {base_allocation_size}")
64
}
65
66
let ideal_morsel_size = NonZeroRowCountAndSize::new(ideal_morsel_size).unwrap();
67
68
*initialized_state = Some(InitializedState {
69
ideal_morsel_size,
70
base_allocation_size,
71
})
72
}
73
74
initialized_state.clone().unwrap()
75
}
76
}
77
78
impl FileWriterStarter for NDJsonWriterStarter {
79
fn writer_name(&self) -> &str {
80
"ndjson"
81
}
82
83
fn takeable_rows_provider(&self) -> TakeableRowsProvider {
84
TakeableRowsProvider {
85
max_size: self.initialized_state().ideal_morsel_size,
86
byte_size_min_rows: NonZeroIdxSize::new(256).unwrap(),
87
allow_non_max_size: true,
88
}
89
}
90
91
fn start_file_writer(
92
&self,
93
morsel_rx: connector::Receiver<SinkMorsel>,
94
file: FileOpenTaskHandle,
95
num_pipelines: std::num::NonZeroUsize,
96
) -> PolarsResult<async_executor::JoinHandle<PolarsResult<()>>> {
97
let (filled_serializer_tx, filled_serializer_rx) = tokio::sync::mpsc::channel::<(
98
async_executor::AbortOnDropHandle<PolarsResult<morsel_serializer::MorselSerializer>>,
99
SinkMorselPermit,
100
)>(num_pipelines.get());
101
102
let max_serializers = num_pipelines.get();
103
let (reuse_serializer_tx, reuse_serializer_rx) =
104
tokio::sync::mpsc::channel::<morsel_serializer::MorselSerializer>(max_serializers);
105
106
let io_handle = tokio_handle_ext::AbortOnDropHandle(
107
pl_async::get_runtime().spawn(
108
io_writer::IOWriter {
109
file,
110
filled_serializer_rx,
111
reuse_serializer_tx,
112
options: self.options,
113
}
114
.run(),
115
),
116
);
117
118
let base_allocation_size = self.initialized_state().base_allocation_size;
119
120
let serializer_handle = async_executor::spawn(
121
TaskPriority::High,
122
morsel_serializer::MorselSerializerPipeline {
123
morsel_rx,
124
filled_serializer_tx,
125
reuse_serializer_rx,
126
max_serializers,
127
base_allocation_size,
128
}
129
.run(),
130
);
131
132
Ok(async_executor::spawn(TaskPriority::Low, async move {
133
io_handle.await.unwrap()?;
134
serializer_handle.await;
135
Ok(())
136
}))
137
}
138
}
139
140