Path: blob/main/crates/polars-stream/src/nodes/io_sinks/ipc.rs
6939 views
use std::cmp::Reverse;1use std::io::BufWriter;2use std::pin::Pin;3use std::sync::{Arc, Mutex};45use polars_core::schema::{SchemaExt, SchemaRef};6use polars_core::utils::arrow;7use polars_core::utils::arrow::array::Array;8use polars_core::utils::arrow::io::ipc::write::{9DictionaryTracker, EncodedData, WriteOptions, commit_encoded_arrays, default_ipc_fields,10encode_array, encode_new_dictionaries,11};12use polars_error::PolarsResult;13use polars_io::SerWriter;14use polars_io::cloud::CloudOptions;15use polars_io::ipc::{IpcWriter, IpcWriterOptions};16use polars_plan::dsl::{SinkOptions, SinkTarget};17use polars_utils::priority::Priority;1819use super::{20DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE, SinkInputPort,21SinkNode, buffer_and_distribute_columns_task,22};23use crate::async_executor::spawn;24use crate::async_primitives::connector::{Receiver, Sender, connector};25use crate::async_primitives::distributor_channel::distributor_channel;26use crate::async_primitives::linearizer::Linearizer;27use crate::execute::StreamingExecutionState;28use crate::nodes::io_sinks::phase::PhaseOutcome;29use crate::nodes::{JoinHandle, TaskPriority};3031pub struct IpcSinkNode {32target: SinkTarget,3334input_schema: SchemaRef,35write_options: IpcWriterOptions,36sink_options: SinkOptions,37cloud_options: Option<CloudOptions>,3839io_tx: Option<Sender<(Vec<EncodedData>, EncodedData)>>,40io_task: Option<tokio_util::task::AbortOnDropHandle<PolarsResult<()>>>,41}4243impl IpcSinkNode {44pub fn new(45input_schema: SchemaRef,46target: SinkTarget,47sink_options: SinkOptions,48write_options: IpcWriterOptions,49cloud_options: Option<CloudOptions>,50) -> Self {51Self {52target,5354input_schema,55write_options,56sink_options,57cloud_options,5859io_tx: None,60io_task: None,61}62}63}6465impl SinkNode for IpcSinkNode {66fn name(&self) -> &str {67"ipc-sink"68}6970fn is_sink_input_parallel(&self) -> bool {71false72}73fn do_maintain_order(&self) -> bool {74self.sink_options.maintain_order75}7677fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {78// Collect task -> IO task79let (io_tx, mut io_rx) = connector::<(Vec<EncodedData>, EncodedData)>();8081// IO task.82//83// Task that will actually do write to the target file.84let target = self.target.clone();85let sink_options = self.sink_options.clone();86let write_options = self.write_options;87let cloud_options = self.cloud_options.clone();88let input_schema = self.input_schema.clone();89let io_task = polars_io::pl_async::get_runtime().spawn(async move {90let mut file = target91.open_into_writeable_async(&sink_options, cloud_options.as_ref())92.await?;93let writer = BufWriter::new(&mut *file);94let mut writer = IpcWriter::new(writer)95.with_compression(write_options.compression)96.with_compat_level(write_options.compat_level)97.with_parallel(false)98.batched(&input_schema)?;99100while let Ok((dicts, record_batch)) = io_rx.recv().await {101// @TODO: At the moment this is a sync write, this is not ideal because we can only102// have so many blocking threads in the tokio threadpool.103writer.write_encoded(dicts.as_slice(), &record_batch)?;104}105106writer.finish()?;107drop(writer);108109file.sync_on_close(sink_options.sync_on_close)?;110file.close()?;111112PolarsResult::Ok(())113});114115self.io_tx = Some(io_tx);116self.io_task = Some(tokio_util::task::AbortOnDropHandle::new(io_task));117118Ok(())119}120121fn spawn_sink(122&mut self,123recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,124state: &StreamingExecutionState,125join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,126) {127// Buffer task -> Encode tasks128let (dist_tx, dist_rxs) =129distributor_channel(state.num_pipelines, *DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE);130// Encode tasks -> Collect task131let (mut lin_rx, lin_txs) =132Linearizer::new(state.num_pipelines, *DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);133// Collect task -> IO task134let mut io_tx = self135.io_tx136.take()137.expect("not initialized / spawn called more than once");138139let options = WriteOptions {140compression: self.write_options.compression.map(Into::into),141};142143let chunk_size = self.write_options.chunk_size;144145let ipc_fields = self146.input_schema147.iter_fields()148.map(|f| f.to_arrow(self.write_options.compat_level))149.collect::<Vec<_>>();150let ipc_fields = default_ipc_fields(ipc_fields.iter());151152// Buffer task.153join_handles.push(buffer_and_distribute_columns_task(154recv_port_rx,155dist_tx,156chunk_size as usize,157self.input_schema.clone(),158Arc::new(Mutex::new(None)),159));160161// Encoding tasks.162//163// Task encodes the buffered record batch and sends it to be written to the file.164join_handles.extend(165dist_rxs166.into_iter()167.zip(lin_txs)168.map(|(mut dist_rx, mut lin_tx)| {169let write_options = self.write_options;170spawn(TaskPriority::High, async move {171while let Ok((seq, col_idx, column)) = dist_rx.recv().await {172let mut variadic_buffer_counts = Vec::new();173let mut buffers = Vec::new();174let mut arrow_data = Vec::new();175let mut nodes = Vec::new();176let mut offset = 0;177178// We want to rechunk for two reasons:179// 1. the IPC writer expects aligned column chunks180// 2. the IPC writer turns chunks / record batches into chunks in the file,181// so we want to respect the given `chunk_size`.182//183// This also properly sets the inner types of the record batches, which is184// important for dictionary and nested type encoding.185let array = column.rechunk_to_arrow(write_options.compat_level);186187// Encode array.188encode_array(189&array,190&options,191&mut variadic_buffer_counts,192&mut buffers,193&mut arrow_data,194&mut nodes,195&mut offset,196);197198// Send the encoded data to the IO task.199let msg = Priority(200Reverse(seq),201(202col_idx,203array,204variadic_buffer_counts,205buffers,206arrow_data,207nodes,208offset,209),210);211if lin_tx.insert(msg).await.is_err() {212return Ok(());213}214}215216PolarsResult::Ok(())217})218}),219);220221// Collect Task.222//223// Collects all the encoded data and packs it together for the IO task to write it.224let input_schema = self.input_schema.clone();225join_handles.push(spawn(TaskPriority::High, async move {226let mut dictionary_tracker = DictionaryTracker {227dictionaries: Default::default(),228cannot_replace: false,229};230231struct CurrentColumn {232array: Box<dyn Array>,233variadic_buffer_counts: Vec<i64>,234buffers: Vec<arrow::io::ipc::format::ipc::Buffer>,235arrow_data: Vec<u8>,236nodes: Vec<arrow::io::ipc::format::ipc::FieldNode>,237offset: i64,238}239struct Current {240seq: usize,241height: usize,242num_columns_seen: usize,243columns: Vec<Option<CurrentColumn>>,244encoded_dictionaries: Vec<EncodedData>,245}246247let mut current = Current {248seq: 0,249height: 0,250num_columns_seen: 0,251columns: (0..input_schema.len()).map(|_| None).collect(),252encoded_dictionaries: Vec::new(),253};254255// Linearize from all the Encoder tasks.256while let Some(Priority(257Reverse(seq),258(i, array, variadic_buffer_counts, buffers, arrow_data, nodes, offset),259)) = lin_rx.get().await260{261if current.num_columns_seen == 0 {262current.seq = seq;263current.height = array.len();264}265266debug_assert_eq!(current.seq, seq);267debug_assert_eq!(current.height, array.len());268debug_assert!(current.columns[i].is_none());269current.columns[i] = Some(CurrentColumn {270array,271variadic_buffer_counts,272buffers,273arrow_data,274nodes,275offset,276});277current.num_columns_seen += 1;278279if current.num_columns_seen == input_schema.len() {280// @Optimize: Keep track of these sizes so we can correctly preallocate281// them.282let mut variadic_buffer_counts = Vec::new();283let mut buffers = Vec::new();284let mut arrow_data = Vec::new();285let mut nodes = Vec::new();286let mut offset = 0;287288for (i, column) in current.columns.iter_mut().enumerate() {289let column = column.take().unwrap();290291// @Optimize: It would be nice to do this on the Encode Tasks, but it is292// difficult to centralize the dictionary tracker like that.293//294// If there are dictionaries, we might need to emit the original dictionary295// definitions or dictionary deltas. We have precomputed which columns contain296// dictionaries and only check those columns.297encode_new_dictionaries(298&ipc_fields[i],299column.array.as_ref(),300&options,301&mut dictionary_tracker,302&mut current.encoded_dictionaries,303)?;304305variadic_buffer_counts.extend(column.variadic_buffer_counts);306buffers.extend(column.buffers.into_iter().map(|mut b| {307// @NOTE: We need to offset all the buffers by the prefix sum of the308// column offsets.309b.offset += offset;310b311}));312arrow_data.extend(column.arrow_data);313nodes.extend(column.nodes);314315offset += column.offset;316}317318let mut encoded_data = EncodedData {319ipc_message: Vec::new(),320arrow_data,321};322commit_encoded_arrays(323current.height,324&options,325variadic_buffer_counts,326buffers,327nodes,328&mut encoded_data,329);330331if io_tx332.send((333std::mem::take(&mut current.encoded_dictionaries),334encoded_data,335))336.await337.is_err()338{339return Ok(());340}341current.num_columns_seen = 0;342}343}344345Ok(())346}));347}348349fn finalize(350&mut self,351_state: &StreamingExecutionState,352) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {353// If we were never spawned, we need to make sure that the `tx` is taken. This signals to354// the IO task that it is done and prevents deadlocks.355drop(self.io_tx.take());356357let io_task = self358.io_task359.take()360.expect("not initialized / finish called more than once");361362// Wait for the IO task to complete.363Some(Box::pin(async move {364io_task365.await366.unwrap_or_else(|e| Err(std::io::Error::from(e).into()))367}))368}369}370371372