Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/writers/ipc/initialization.rs
8446 views
1
use std::sync::Arc;
2
3
use arrow::io::ipc::IpcField;
4
use polars_core::prelude::{CategoricalMapping, CompatLevel, DataType};
5
use polars_core::schema::Schema;
6
use polars_core::series::ToArrowConverter;
7
use polars_core::series::categorical_to_arrow::CategoricalToArrowConverter;
8
use polars_core::utils::arrow;
9
10
pub fn build_ipc_write_components(
11
file_schema: &Schema,
12
compat_level: CompatLevel,
13
) -> (Vec<ToArrowConverter>, Vec<IpcField>, Arc<[usize]>) {
14
let arrow_converters: Vec<ToArrowConverter> = file_schema
15
.iter_values()
16
.map(|dtype| {
17
let mut categorical_converter = CategoricalToArrowConverter {
18
converters: Default::default(),
19
persist_remap: true,
20
output_keys_only: true,
21
};
22
categorical_converter.initialize(dtype);
23
ToArrowConverter {
24
compat_level,
25
categorical_converter,
26
}
27
})
28
.collect();
29
30
let dictionary_id_offsets: Arc<[usize]> =
31
dictionary_id_offsets_iter(&arrow_converters).collect();
32
33
let ipc_fields: Vec<IpcField> = file_schema
34
.iter_values()
35
.zip(&arrow_converters)
36
.zip(dictionary_id_offsets.iter().copied())
37
.map(|((dtype, arrow_converter), dictionary_id_offset)| {
38
IpcFieldConverter {
39
get_dictionary_id: |mapping: &Arc<CategoricalMapping>| {
40
let converter_key: usize = Arc::as_ptr(mapping) as *const () as _;
41
let converter_index: usize = arrow_converter
42
.categorical_converter
43
.converters
44
.get_index_of(&converter_key)
45
.unwrap();
46
47
i64::try_from(dictionary_id_offset + converter_index).unwrap()
48
},
49
}
50
.dtype_to_ipc_field(dtype)
51
})
52
.collect();
53
54
(arrow_converters, ipc_fields, dictionary_id_offsets)
55
}
56
57
/// Cumulative sum, excluding the current element.
58
///
59
/// Indicates total number of dictionaries in the columns to the left of the current one.
60
fn dictionary_id_offsets_iter(
61
arrow_converters: &[ToArrowConverter],
62
) -> impl Iterator<Item = usize> {
63
arrow_converters
64
.iter()
65
.scan(0, |acc: &mut usize, arrow_converter| {
66
let out = *acc;
67
*acc += arrow_converter.categorical_converter.converters.len();
68
Some(out)
69
})
70
}
71
72
struct IpcFieldConverter<F>
73
where
74
F: Fn(&Arc<CategoricalMapping>) -> i64,
75
{
76
get_dictionary_id: F,
77
}
78
79
impl<F> IpcFieldConverter<F>
80
where
81
F: Fn(&Arc<CategoricalMapping>) -> i64,
82
{
83
fn dtype_to_ipc_field(&self, dtype: &DataType) -> IpcField {
84
use DataType::*;
85
86
match dtype {
87
#[cfg(feature = "dtype-categorical")]
88
Categorical(_, mapping) | Enum(_, mapping) => IpcField {
89
fields: vec![self.dtype_to_ipc_field(&DataType::String)],
90
dictionary_id: Some((self.get_dictionary_id)(mapping)),
91
},
92
List(inner) => IpcField {
93
fields: vec![self.dtype_to_ipc_field(inner)],
94
dictionary_id: None,
95
},
96
#[cfg(feature = "dtype-array")]
97
Array(inner, _width) => IpcField {
98
fields: vec![self.dtype_to_ipc_field(inner)],
99
dictionary_id: None,
100
},
101
Struct(fields) => IpcField {
102
fields: fields
103
.iter()
104
.map(|x| self.dtype_to_ipc_field(x.dtype()))
105
.collect(),
106
dictionary_id: None,
107
},
108
#[cfg(feature = "dtype-extension")]
109
Extension(_, storage) => self.dtype_to_ipc_field(storage.as_ref()),
110
_ => {
111
assert!(!dtype.is_nested());
112
IpcField {
113
fields: vec![],
114
dictionary_id: None,
115
}
116
},
117
}
118
}
119
}
120
121