Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/ipc.rs
6939 views
1
use std::cmp::Reverse;
2
use std::io::BufWriter;
3
use std::pin::Pin;
4
use std::sync::{Arc, Mutex};
5
6
use polars_core::schema::{SchemaExt, SchemaRef};
7
use polars_core::utils::arrow;
8
use polars_core::utils::arrow::array::Array;
9
use polars_core::utils::arrow::io::ipc::write::{
10
DictionaryTracker, EncodedData, WriteOptions, commit_encoded_arrays, default_ipc_fields,
11
encode_array, encode_new_dictionaries,
12
};
13
use polars_error::PolarsResult;
14
use polars_io::SerWriter;
15
use polars_io::cloud::CloudOptions;
16
use polars_io::ipc::{IpcWriter, IpcWriterOptions};
17
use polars_plan::dsl::{SinkOptions, SinkTarget};
18
use polars_utils::priority::Priority;
19
20
use super::{
21
DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE, SinkInputPort,
22
SinkNode, buffer_and_distribute_columns_task,
23
};
24
use crate::async_executor::spawn;
25
use crate::async_primitives::connector::{Receiver, Sender, connector};
26
use crate::async_primitives::distributor_channel::distributor_channel;
27
use crate::async_primitives::linearizer::Linearizer;
28
use crate::execute::StreamingExecutionState;
29
use crate::nodes::io_sinks::phase::PhaseOutcome;
30
use crate::nodes::{JoinHandle, TaskPriority};
31
32
pub struct IpcSinkNode {
33
target: SinkTarget,
34
35
input_schema: SchemaRef,
36
write_options: IpcWriterOptions,
37
sink_options: SinkOptions,
38
cloud_options: Option<CloudOptions>,
39
40
io_tx: Option<Sender<(Vec<EncodedData>, EncodedData)>>,
41
io_task: Option<tokio_util::task::AbortOnDropHandle<PolarsResult<()>>>,
42
}
43
44
impl IpcSinkNode {
45
pub fn new(
46
input_schema: SchemaRef,
47
target: SinkTarget,
48
sink_options: SinkOptions,
49
write_options: IpcWriterOptions,
50
cloud_options: Option<CloudOptions>,
51
) -> Self {
52
Self {
53
target,
54
55
input_schema,
56
write_options,
57
sink_options,
58
cloud_options,
59
60
io_tx: None,
61
io_task: None,
62
}
63
}
64
}
65
66
impl SinkNode for IpcSinkNode {
67
fn name(&self) -> &str {
68
"ipc-sink"
69
}
70
71
fn is_sink_input_parallel(&self) -> bool {
72
false
73
}
74
fn do_maintain_order(&self) -> bool {
75
self.sink_options.maintain_order
76
}
77
78
fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {
79
// Collect task -> IO task
80
let (io_tx, mut io_rx) = connector::<(Vec<EncodedData>, EncodedData)>();
81
82
// IO task.
83
//
84
// Task that will actually do write to the target file.
85
let target = self.target.clone();
86
let sink_options = self.sink_options.clone();
87
let write_options = self.write_options;
88
let cloud_options = self.cloud_options.clone();
89
let input_schema = self.input_schema.clone();
90
let io_task = polars_io::pl_async::get_runtime().spawn(async move {
91
let mut file = target
92
.open_into_writeable_async(&sink_options, cloud_options.as_ref())
93
.await?;
94
let writer = BufWriter::new(&mut *file);
95
let mut writer = IpcWriter::new(writer)
96
.with_compression(write_options.compression)
97
.with_compat_level(write_options.compat_level)
98
.with_parallel(false)
99
.batched(&input_schema)?;
100
101
while let Ok((dicts, record_batch)) = io_rx.recv().await {
102
// @TODO: At the moment this is a sync write, this is not ideal because we can only
103
// have so many blocking threads in the tokio threadpool.
104
writer.write_encoded(dicts.as_slice(), &record_batch)?;
105
}
106
107
writer.finish()?;
108
drop(writer);
109
110
file.sync_on_close(sink_options.sync_on_close)?;
111
file.close()?;
112
113
PolarsResult::Ok(())
114
});
115
116
self.io_tx = Some(io_tx);
117
self.io_task = Some(tokio_util::task::AbortOnDropHandle::new(io_task));
118
119
Ok(())
120
}
121
122
fn spawn_sink(
123
&mut self,
124
recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,
125
state: &StreamingExecutionState,
126
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
127
) {
128
// Buffer task -> Encode tasks
129
let (dist_tx, dist_rxs) =
130
distributor_channel(state.num_pipelines, *DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE);
131
// Encode tasks -> Collect task
132
let (mut lin_rx, lin_txs) =
133
Linearizer::new(state.num_pipelines, *DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);
134
// Collect task -> IO task
135
let mut io_tx = self
136
.io_tx
137
.take()
138
.expect("not initialized / spawn called more than once");
139
140
let options = WriteOptions {
141
compression: self.write_options.compression.map(Into::into),
142
};
143
144
let chunk_size = self.write_options.chunk_size;
145
146
let ipc_fields = self
147
.input_schema
148
.iter_fields()
149
.map(|f| f.to_arrow(self.write_options.compat_level))
150
.collect::<Vec<_>>();
151
let ipc_fields = default_ipc_fields(ipc_fields.iter());
152
153
// Buffer task.
154
join_handles.push(buffer_and_distribute_columns_task(
155
recv_port_rx,
156
dist_tx,
157
chunk_size as usize,
158
self.input_schema.clone(),
159
Arc::new(Mutex::new(None)),
160
));
161
162
// Encoding tasks.
163
//
164
// Task encodes the buffered record batch and sends it to be written to the file.
165
join_handles.extend(
166
dist_rxs
167
.into_iter()
168
.zip(lin_txs)
169
.map(|(mut dist_rx, mut lin_tx)| {
170
let write_options = self.write_options;
171
spawn(TaskPriority::High, async move {
172
while let Ok((seq, col_idx, column)) = dist_rx.recv().await {
173
let mut variadic_buffer_counts = Vec::new();
174
let mut buffers = Vec::new();
175
let mut arrow_data = Vec::new();
176
let mut nodes = Vec::new();
177
let mut offset = 0;
178
179
// We want to rechunk for two reasons:
180
// 1. the IPC writer expects aligned column chunks
181
// 2. the IPC writer turns chunks / record batches into chunks in the file,
182
// so we want to respect the given `chunk_size`.
183
//
184
// This also properly sets the inner types of the record batches, which is
185
// important for dictionary and nested type encoding.
186
let array = column.rechunk_to_arrow(write_options.compat_level);
187
188
// Encode array.
189
encode_array(
190
&array,
191
&options,
192
&mut variadic_buffer_counts,
193
&mut buffers,
194
&mut arrow_data,
195
&mut nodes,
196
&mut offset,
197
);
198
199
// Send the encoded data to the IO task.
200
let msg = Priority(
201
Reverse(seq),
202
(
203
col_idx,
204
array,
205
variadic_buffer_counts,
206
buffers,
207
arrow_data,
208
nodes,
209
offset,
210
),
211
);
212
if lin_tx.insert(msg).await.is_err() {
213
return Ok(());
214
}
215
}
216
217
PolarsResult::Ok(())
218
})
219
}),
220
);
221
222
// Collect Task.
223
//
224
// Collects all the encoded data and packs it together for the IO task to write it.
225
let input_schema = self.input_schema.clone();
226
join_handles.push(spawn(TaskPriority::High, async move {
227
let mut dictionary_tracker = DictionaryTracker {
228
dictionaries: Default::default(),
229
cannot_replace: false,
230
};
231
232
struct CurrentColumn {
233
array: Box<dyn Array>,
234
variadic_buffer_counts: Vec<i64>,
235
buffers: Vec<arrow::io::ipc::format::ipc::Buffer>,
236
arrow_data: Vec<u8>,
237
nodes: Vec<arrow::io::ipc::format::ipc::FieldNode>,
238
offset: i64,
239
}
240
struct Current {
241
seq: usize,
242
height: usize,
243
num_columns_seen: usize,
244
columns: Vec<Option<CurrentColumn>>,
245
encoded_dictionaries: Vec<EncodedData>,
246
}
247
248
let mut current = Current {
249
seq: 0,
250
height: 0,
251
num_columns_seen: 0,
252
columns: (0..input_schema.len()).map(|_| None).collect(),
253
encoded_dictionaries: Vec::new(),
254
};
255
256
// Linearize from all the Encoder tasks.
257
while let Some(Priority(
258
Reverse(seq),
259
(i, array, variadic_buffer_counts, buffers, arrow_data, nodes, offset),
260
)) = lin_rx.get().await
261
{
262
if current.num_columns_seen == 0 {
263
current.seq = seq;
264
current.height = array.len();
265
}
266
267
debug_assert_eq!(current.seq, seq);
268
debug_assert_eq!(current.height, array.len());
269
debug_assert!(current.columns[i].is_none());
270
current.columns[i] = Some(CurrentColumn {
271
array,
272
variadic_buffer_counts,
273
buffers,
274
arrow_data,
275
nodes,
276
offset,
277
});
278
current.num_columns_seen += 1;
279
280
if current.num_columns_seen == input_schema.len() {
281
// @Optimize: Keep track of these sizes so we can correctly preallocate
282
// them.
283
let mut variadic_buffer_counts = Vec::new();
284
let mut buffers = Vec::new();
285
let mut arrow_data = Vec::new();
286
let mut nodes = Vec::new();
287
let mut offset = 0;
288
289
for (i, column) in current.columns.iter_mut().enumerate() {
290
let column = column.take().unwrap();
291
292
// @Optimize: It would be nice to do this on the Encode Tasks, but it is
293
// difficult to centralize the dictionary tracker like that.
294
//
295
// If there are dictionaries, we might need to emit the original dictionary
296
// definitions or dictionary deltas. We have precomputed which columns contain
297
// dictionaries and only check those columns.
298
encode_new_dictionaries(
299
&ipc_fields[i],
300
column.array.as_ref(),
301
&options,
302
&mut dictionary_tracker,
303
&mut current.encoded_dictionaries,
304
)?;
305
306
variadic_buffer_counts.extend(column.variadic_buffer_counts);
307
buffers.extend(column.buffers.into_iter().map(|mut b| {
308
// @NOTE: We need to offset all the buffers by the prefix sum of the
309
// column offsets.
310
b.offset += offset;
311
b
312
}));
313
arrow_data.extend(column.arrow_data);
314
nodes.extend(column.nodes);
315
316
offset += column.offset;
317
}
318
319
let mut encoded_data = EncodedData {
320
ipc_message: Vec::new(),
321
arrow_data,
322
};
323
commit_encoded_arrays(
324
current.height,
325
&options,
326
variadic_buffer_counts,
327
buffers,
328
nodes,
329
&mut encoded_data,
330
);
331
332
if io_tx
333
.send((
334
std::mem::take(&mut current.encoded_dictionaries),
335
encoded_data,
336
))
337
.await
338
.is_err()
339
{
340
return Ok(());
341
}
342
current.num_columns_seen = 0;
343
}
344
}
345
346
Ok(())
347
}));
348
}
349
350
fn finalize(
351
&mut self,
352
_state: &StreamingExecutionState,
353
) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {
354
// If we were never spawned, we need to make sure that the `tx` is taken. This signals to
355
// the IO task that it is done and prevents deadlocks.
356
drop(self.io_tx.take());
357
358
let io_task = self
359
.io_task
360
.take()
361
.expect("not initialized / finish called more than once");
362
363
// Wait for the IO task to complete.
364
Some(Box::pin(async move {
365
io_task
366
.await
367
.unwrap_or_else(|e| Err(std::io::Error::from(e).into()))
368
}))
369
}
370
}
371
372