Path: blob/main/crates/polars-stream/src/nodes/io_sinks/json.rs
6939 views
use std::cmp::Reverse;1use std::pin::Pin;23use polars_error::PolarsResult;4use polars_io::cloud::CloudOptions;5use polars_io::json::BatchedWriter;6use polars_plan::dsl::{SinkOptions, SinkTarget};7use polars_utils::priority::Priority;89use super::{SinkInputPort, SinkNode};10use crate::async_executor::spawn;11use crate::async_primitives::connector::{Receiver, Sender, connector};12use crate::async_primitives::linearizer::Linearizer;13use crate::execute::StreamingExecutionState;14use crate::morsel::MorselSeq;15use crate::nodes::io_sinks::parallelize_receive_task;16use crate::nodes::io_sinks::phase::PhaseOutcome;17use crate::nodes::{JoinHandle, TaskPriority};1819type IOSend = Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>;2021pub struct NDJsonSinkNode {22target: SinkTarget,23sink_options: SinkOptions,24cloud_options: Option<CloudOptions>,2526io_tx: Option<Sender<IOSend>>,27io_task: Option<tokio_util::task::AbortOnDropHandle<PolarsResult<()>>>,28}29impl NDJsonSinkNode {30pub fn new(31target: SinkTarget,32sink_options: SinkOptions,33cloud_options: Option<CloudOptions>,34) -> Self {35Self {36target,37sink_options,38cloud_options,3940io_tx: None,41io_task: None,42}43}44}4546impl SinkNode for NDJsonSinkNode {47fn name(&self) -> &str {48"ndjson-sink"49}5051fn is_sink_input_parallel(&self) -> bool {52true53}54fn do_maintain_order(&self) -> bool {55self.sink_options.maintain_order56}5758fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {59let (io_tx, mut io_rx) = connector::<Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>>();6061// IO task.62//63// Task that will actually do write to the target file.64let sink_options = self.sink_options.clone();65let cloud_options = self.cloud_options.clone();66let target = self.target.clone();67let io_task = polars_io::pl_async::get_runtime().spawn(async move {68use tokio::io::AsyncWriteExt;6970let mut file = target71.open_into_writeable_async(&sink_options, cloud_options.as_ref())72.await?73.try_into_async_writeable()?;7475while let Ok(mut lin_rx) = io_rx.recv().await {76while let Some(Priority(_, buffer)) = lin_rx.get().await {77file.write_all(&buffer).await?;78}79}8081file.sync_on_close(sink_options.sync_on_close).await?;82file.close().await?;8384PolarsResult::Ok(())85});8687self.io_tx = Some(io_tx);88self.io_task = Some(tokio_util::task::AbortOnDropHandle::new(io_task));8990Ok(())91}9293fn spawn_sink(94&mut self,95recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,96state: &StreamingExecutionState,97join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,98) {99let io_tx = self100.io_tx101.take()102.expect("not initialized / spawn called more than once");103let pass_rxs = parallelize_receive_task(104join_handles,105recv_port_rx,106state.num_pipelines,107self.sink_options.maintain_order,108io_tx,109);110111// 16MB112const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;113114// Encode task.115//116// Task encodes the columns into their corresponding JSON encoding.117join_handles.extend(pass_rxs.into_iter().map(|mut pass_rx| {118spawn(TaskPriority::High, async move {119// Amortize the allocations over time. If we see that we need to do way larger120// allocations, we adjust to that over time.121let mut allocation_size = DEFAULT_ALLOCATION_SIZE;122123while let Ok((mut rx, mut lin_tx)) = pass_rx.recv().await {124while let Ok(morsel) = rx.recv().await {125let (df, seq, _, consume_token) = morsel.into_inner();126127let mut buffer = Vec::with_capacity(allocation_size);128let mut writer = BatchedWriter::new(&mut buffer);129130writer.write_batch(&df)?;131132allocation_size = allocation_size.max(buffer.len());133if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {134return Ok(());135}136drop(consume_token); // Keep the consume_token until here to increase the137// backpressure.138}139}140141PolarsResult::Ok(())142})143}));144}145146fn finalize(147&mut self,148_state: &StreamingExecutionState,149) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {150// If we were never spawned, we need to make sure that the `tx` is taken. This signals to151// the IO task that it is done and prevents deadlocks.152drop(self.io_tx.take());153154let io_task = self155.io_task156.take()157.expect("not initialized / finish called more than once");158159// Wait for the IO task to complete.160Some(Box::pin(async move {161io_task162.await163.unwrap_or_else(|e| Err(std::io::Error::from(e).into()))164}))165}166}167168169