Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/json.rs
6939 views
1
use std::cmp::Reverse;
2
use std::pin::Pin;
3
4
use polars_error::PolarsResult;
5
use polars_io::cloud::CloudOptions;
6
use polars_io::json::BatchedWriter;
7
use polars_plan::dsl::{SinkOptions, SinkTarget};
8
use polars_utils::priority::Priority;
9
10
use super::{SinkInputPort, SinkNode};
11
use crate::async_executor::spawn;
12
use crate::async_primitives::connector::{Receiver, Sender, connector};
13
use crate::async_primitives::linearizer::Linearizer;
14
use crate::execute::StreamingExecutionState;
15
use crate::morsel::MorselSeq;
16
use crate::nodes::io_sinks::parallelize_receive_task;
17
use crate::nodes::io_sinks::phase::PhaseOutcome;
18
use crate::nodes::{JoinHandle, TaskPriority};
19
20
type IOSend = Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>;
21
22
pub struct NDJsonSinkNode {
23
target: SinkTarget,
24
sink_options: SinkOptions,
25
cloud_options: Option<CloudOptions>,
26
27
io_tx: Option<Sender<IOSend>>,
28
io_task: Option<tokio_util::task::AbortOnDropHandle<PolarsResult<()>>>,
29
}
30
impl NDJsonSinkNode {
31
pub fn new(
32
target: SinkTarget,
33
sink_options: SinkOptions,
34
cloud_options: Option<CloudOptions>,
35
) -> Self {
36
Self {
37
target,
38
sink_options,
39
cloud_options,
40
41
io_tx: None,
42
io_task: None,
43
}
44
}
45
}
46
47
impl SinkNode for NDJsonSinkNode {
48
fn name(&self) -> &str {
49
"ndjson-sink"
50
}
51
52
fn is_sink_input_parallel(&self) -> bool {
53
true
54
}
55
fn do_maintain_order(&self) -> bool {
56
self.sink_options.maintain_order
57
}
58
59
fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {
60
let (io_tx, mut io_rx) = connector::<Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>>();
61
62
// IO task.
63
//
64
// Task that will actually do write to the target file.
65
let sink_options = self.sink_options.clone();
66
let cloud_options = self.cloud_options.clone();
67
let target = self.target.clone();
68
let io_task = polars_io::pl_async::get_runtime().spawn(async move {
69
use tokio::io::AsyncWriteExt;
70
71
let mut file = target
72
.open_into_writeable_async(&sink_options, cloud_options.as_ref())
73
.await?
74
.try_into_async_writeable()?;
75
76
while let Ok(mut lin_rx) = io_rx.recv().await {
77
while let Some(Priority(_, buffer)) = lin_rx.get().await {
78
file.write_all(&buffer).await?;
79
}
80
}
81
82
file.sync_on_close(sink_options.sync_on_close).await?;
83
file.close().await?;
84
85
PolarsResult::Ok(())
86
});
87
88
self.io_tx = Some(io_tx);
89
self.io_task = Some(tokio_util::task::AbortOnDropHandle::new(io_task));
90
91
Ok(())
92
}
93
94
fn spawn_sink(
95
&mut self,
96
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
97
state: &StreamingExecutionState,
98
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
99
) {
100
let io_tx = self
101
.io_tx
102
.take()
103
.expect("not initialized / spawn called more than once");
104
let pass_rxs = parallelize_receive_task(
105
join_handles,
106
recv_port_rx,
107
state.num_pipelines,
108
self.sink_options.maintain_order,
109
io_tx,
110
);
111
112
// 16MB
113
const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;
114
115
// Encode task.
116
//
117
// Task encodes the columns into their corresponding JSON encoding.
118
join_handles.extend(pass_rxs.into_iter().map(|mut pass_rx| {
119
spawn(TaskPriority::High, async move {
120
// Amortize the allocations over time. If we see that we need to do way larger
121
// allocations, we adjust to that over time.
122
let mut allocation_size = DEFAULT_ALLOCATION_SIZE;
123
124
while let Ok((mut rx, mut lin_tx)) = pass_rx.recv().await {
125
while let Ok(morsel) = rx.recv().await {
126
let (df, seq, _, consume_token) = morsel.into_inner();
127
128
let mut buffer = Vec::with_capacity(allocation_size);
129
let mut writer = BatchedWriter::new(&mut buffer);
130
131
writer.write_batch(&df)?;
132
133
allocation_size = allocation_size.max(buffer.len());
134
if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
135
return Ok(());
136
}
137
drop(consume_token); // Keep the consume_token until here to increase the
138
// backpressure.
139
}
140
}
141
142
PolarsResult::Ok(())
143
})
144
}));
145
}
146
147
fn finalize(
148
&mut self,
149
_state: &StreamingExecutionState,
150
) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {
151
// If we were never spawned, we need to make sure that the `tx` is taken. This signals to
152
// the IO task that it is done and prevents deadlocks.
153
drop(self.io_tx.take());
154
155
let io_task = self
156
.io_task
157
.take()
158
.expect("not initialized / finish called more than once");
159
160
// Wait for the IO task to complete.
161
Some(Box::pin(async move {
162
io_task
163
.await
164
.unwrap_or_else(|e| Err(std::io::Error::from(e).into()))
165
}))
166
}
167
}
168
169