Path: blob/main/crates/polars-stream/src/nodes/io_sinks/writers/ipc/initialization.rs
8446 views
use std::sync::Arc;12use arrow::io::ipc::IpcField;3use polars_core::prelude::{CategoricalMapping, CompatLevel, DataType};4use polars_core::schema::Schema;5use polars_core::series::ToArrowConverter;6use polars_core::series::categorical_to_arrow::CategoricalToArrowConverter;7use polars_core::utils::arrow;89pub fn build_ipc_write_components(10file_schema: &Schema,11compat_level: CompatLevel,12) -> (Vec<ToArrowConverter>, Vec<IpcField>, Arc<[usize]>) {13let arrow_converters: Vec<ToArrowConverter> = file_schema14.iter_values()15.map(|dtype| {16let mut categorical_converter = CategoricalToArrowConverter {17converters: Default::default(),18persist_remap: true,19output_keys_only: true,20};21categorical_converter.initialize(dtype);22ToArrowConverter {23compat_level,24categorical_converter,25}26})27.collect();2829let dictionary_id_offsets: Arc<[usize]> =30dictionary_id_offsets_iter(&arrow_converters).collect();3132let ipc_fields: Vec<IpcField> = file_schema33.iter_values()34.zip(&arrow_converters)35.zip(dictionary_id_offsets.iter().copied())36.map(|((dtype, arrow_converter), dictionary_id_offset)| {37IpcFieldConverter {38get_dictionary_id: |mapping: &Arc<CategoricalMapping>| {39let converter_key: usize = Arc::as_ptr(mapping) as *const () as _;40let converter_index: usize = arrow_converter41.categorical_converter42.converters43.get_index_of(&converter_key)44.unwrap();4546i64::try_from(dictionary_id_offset + converter_index).unwrap()47},48}49.dtype_to_ipc_field(dtype)50})51.collect();5253(arrow_converters, ipc_fields, dictionary_id_offsets)54}5556/// Cumulative sum, excluding the current element.57///58/// Indicates total number of dictionaries in the columns to the left of the current one.59fn dictionary_id_offsets_iter(60arrow_converters: &[ToArrowConverter],61) -> impl Iterator<Item = usize> {62arrow_converters63.iter()64.scan(0, |acc: &mut usize, arrow_converter| {65let out = *acc;66*acc += arrow_converter.categorical_converter.converters.len();67Some(out)68})69}7071struct IpcFieldConverter<F>72where73F: Fn(&Arc<CategoricalMapping>) -> i64,74{75get_dictionary_id: F,76}7778impl<F> IpcFieldConverter<F>79where80F: Fn(&Arc<CategoricalMapping>) -> i64,81{82fn dtype_to_ipc_field(&self, dtype: &DataType) -> IpcField {83use DataType::*;8485match dtype {86#[cfg(feature = "dtype-categorical")]87Categorical(_, mapping) | Enum(_, mapping) => IpcField {88fields: vec![self.dtype_to_ipc_field(&DataType::String)],89dictionary_id: Some((self.get_dictionary_id)(mapping)),90},91List(inner) => IpcField {92fields: vec![self.dtype_to_ipc_field(inner)],93dictionary_id: None,94},95#[cfg(feature = "dtype-array")]96Array(inner, _width) => IpcField {97fields: vec![self.dtype_to_ipc_field(inner)],98dictionary_id: None,99},100Struct(fields) => IpcField {101fields: fields102.iter()103.map(|x| self.dtype_to_ipc_field(x.dtype()))104.collect(),105dictionary_id: None,106},107#[cfg(feature = "dtype-extension")]108Extension(_, storage) => self.dtype_to_ipc_field(storage.as_ref()),109_ => {110assert!(!dtype.is_nested());111IpcField {112fields: vec![],113dictionary_id: None,114}115},116}117}118}119120121