use std::io::{Read, Write};
use std::path::PathBuf;
use arrow::datatypes::Metadata;
use arrow::io::ipc::read::{StreamMetadata, StreamState};
use arrow::io::ipc::write::WriteOptions;
use arrow::io::ipc::{read, write};
use polars_core::frame::chunk_df_for_writing;
use polars_core::prelude::*;
use crate::prelude::*;
use crate::shared::{ArrowReader, finish_reader};
#[must_use]
pub struct IpcStreamReader<R> {
reader: R,
rechunk: bool,
n_rows: Option<usize>,
projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
row_index: Option<RowIndex>,
metadata: Option<StreamMetadata>,
}
impl<R: Read> IpcStreamReader<R> {
pub fn schema(&mut self) -> PolarsResult<Schema> {
Ok(Schema::from_arrow_schema(&self.metadata()?.schema))
}
pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
Ok(self.metadata()?.schema)
}
pub fn custom_metadata(&mut self) -> PolarsResult<Option<Arc<Metadata>>> {
Ok(self.metadata()?.custom_schema_metadata.map(Arc::new))
}
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
self.columns = columns;
self
}
pub fn with_row_index(mut self, row_index: Option<RowIndex>) -> Self {
self.row_index = row_index;
self
}
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
fn metadata(&mut self) -> PolarsResult<StreamMetadata> {
match &self.metadata {
None => {
let metadata = read::read_stream_metadata(&mut self.reader)?;
self.metadata = Option::from(metadata.clone());
Ok(metadata)
},
Some(md) => Ok(md.clone()),
}
}
}
impl<R> ArrowReader for read::StreamReader<R>
where
R: Read,
{
fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
self.next().map_or(Ok(None), |v| match v {
Ok(stream_state) => match stream_state {
StreamState::Waiting => Ok(None),
StreamState::Some(chunk) => Ok(Some(chunk)),
},
Err(err) => Err(err),
})
}
}
impl<R> SerReader<R> for IpcStreamReader<R>
where
R: Read,
{
fn new(reader: R) -> Self {
IpcStreamReader {
reader,
rechunk: true,
n_rows: None,
columns: None,
projection: None,
row_index: None,
metadata: None,
}
}
fn set_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}
fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let metadata = self.metadata()?;
let schema = &metadata.schema;
if let Some(columns) = self.columns {
let prj = columns_to_projection(&columns, schema)?;
self.projection = Some(prj);
}
let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};
let ipc_reader =
read::StreamReader::new(&mut self.reader, metadata.clone(), self.projection);
finish_reader(
ipc_reader,
rechunk,
self.n_rows,
None,
&schema,
self.row_index,
)
}
}
#[must_use]
pub struct IpcStreamWriter<W> {
writer: W,
compression: Option<IpcCompression>,
compat_level: CompatLevel,
custom_schema_metadata: Option<Arc<Metadata>>,
}
use arrow::record_batch::RecordBatch;
use crate::RowIndex;
impl<W> IpcStreamWriter<W> {
pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
self.compression = compression;
self
}
pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
self.compat_level = compat_level;
self
}
pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
self.custom_schema_metadata = Some(custom_metadata);
}
}
impl<W> SerWriter<W> for IpcStreamWriter<W>
where
W: Write,
{
fn new(writer: W) -> Self {
IpcStreamWriter {
writer,
compression: None,
compat_level: CompatLevel::oldest(),
custom_schema_metadata: None,
}
}
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
let mut ipc_stream_writer = write::StreamWriter::new(
&mut self.writer,
WriteOptions {
compression: self.compression.map(|c| c.into()),
},
);
if let Some(custom_metadata) = &self.custom_schema_metadata {
ipc_stream_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
}
ipc_stream_writer.start(&df.schema().to_arrow(self.compat_level), None)?;
let df = chunk_df_for_writing(df, 512 * 512)?;
let iter = df.iter_chunks(self.compat_level, true);
for batch in iter {
ipc_stream_writer.write(&batch, None)?
}
ipc_stream_writer.finish()?;
Ok(())
}
}
pub struct IpcStreamWriterOption {
compression: Option<IpcCompression>,
extension: PathBuf,
}
impl IpcStreamWriterOption {
pub fn new() -> Self {
Self {
compression: None,
extension: PathBuf::from(".ipc"),
}
}
pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
self.compression = compression;
self
}
pub fn with_extension(mut self, extension: PathBuf) -> Self {
self.extension = extension;
self
}
}
impl Default for IpcStreamWriterOption {
fn default() -> Self {
Self::new()
}
}