Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/serde/df.rs
6940 views
1
use std::sync::Arc;
2
3
use arrow::datatypes::Metadata;
4
use arrow::io::ipc::read::{StreamReader, StreamState, read_stream_metadata};
5
use arrow::io::ipc::write::WriteOptions;
6
use polars_error::{PolarsResult, polars_err, to_compute_err};
7
use polars_utils::format_pl_smallstr;
8
use polars_utils::pl_serialize::deserialize_map_bytes;
9
use polars_utils::pl_str::PlSmallStr;
10
use serde::de::Error;
11
use serde::*;
12
13
use crate::chunked_array::flags::StatisticsFlags;
14
use crate::config;
15
use crate::frame::chunk_df_for_writing;
16
use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
17
use crate::utils::accumulate_dataframes_vertical_unchecked;
18
19
const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");
20
21
impl DataFrame {
22
pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
23
let schema = self.schema();
24
25
if schema.iter_values().any(|x| x.is_object()) {
26
return Err(polars_err!(
27
ComputeError:
28
"serializing data of type Object is not supported",
29
));
30
}
31
32
let mut ipc_writer =
33
arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });
34
35
ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
36
self.get_columns().iter().map(|c| {
37
(
38
format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
39
PlSmallStr::from(c.get_flags().bits().to_string()),
40
)
41
}),
42
)));
43
44
ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
45
FLAGS_KEY,
46
serde_json::to_string(
47
&self
48
.iter()
49
.map(|s| s.get_flags().bits())
50
.collect::<Vec<u32>>(),
51
)
52
.map_err(to_compute_err)?
53
.into(),
54
)])));
55
56
ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;
57
58
for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
59
{
60
ipc_writer.write(&batch, None)?;
61
}
62
63
ipc_writer.finish()?;
64
65
Ok(())
66
}
67
68
pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
69
let mut buf = vec![];
70
self.serialize_into_writer(&mut buf)?;
71
72
Ok(buf)
73
}
74
75
pub fn deserialize_from_reader(reader: &mut dyn std::io::Read) -> PolarsResult<Self> {
76
let mut md = read_stream_metadata(reader)?;
77
78
let custom_metadata = md.custom_schema_metadata.take();
79
80
let reader = StreamReader::new(reader, md, None);
81
let dfs = reader
82
.into_iter()
83
.map_while(|batch| match batch {
84
Ok(StreamState::Some(batch)) => Some(Ok(DataFrame::from(batch))),
85
Ok(StreamState::Waiting) => None,
86
Err(e) => Some(Err(e)),
87
})
88
.collect::<PolarsResult<Vec<DataFrame>>>()?;
89
90
if dfs.is_empty() {
91
return Ok(DataFrame::empty());
92
}
93
let mut df = accumulate_dataframes_vertical_unchecked(dfs);
94
95
// Set custom metadata (fallible)
96
(|| {
97
let custom_metadata = custom_metadata?;
98
let flags = custom_metadata.get(&FLAGS_KEY)?;
99
100
let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);
101
102
let verbose = config::verbose();
103
104
if let Err(e) = &flags {
105
if verbose {
106
eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {e}");
107
}
108
}
109
110
let flags = flags.ok()?;
111
112
if flags.len() != df.width() {
113
if verbose {
114
eprintln!(
115
"DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
116
flags.len(),
117
df.width()
118
);
119
}
120
121
return None;
122
}
123
124
let mut n_set = 0;
125
126
for (c, v) in unsafe { df.get_columns_mut() }.iter_mut().zip(flags) {
127
if let Some(flags) = StatisticsFlags::from_bits(v) {
128
n_set += c.set_flags(flags) as usize;
129
}
130
}
131
132
if verbose {
133
eprintln!(
134
"DataFrame::read_ipc: Loaded metadata for {} / {} columns",
135
n_set,
136
df.width()
137
);
138
}
139
140
Some(())
141
})();
142
143
Ok(df)
144
}
145
}
146
147
impl Serialize for DataFrame {
148
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
149
where
150
S: Serializer,
151
{
152
use serde::ser::Error;
153
154
let mut bytes = vec![];
155
self.clone()
156
.serialize_into_writer(&mut bytes)
157
.map_err(S::Error::custom)?;
158
159
serializer.serialize_bytes(bytes.as_slice())
160
}
161
}
162
163
impl<'de> Deserialize<'de> for DataFrame {
164
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
165
where
166
D: Deserializer<'de>,
167
{
168
deserialize_map_bytes(deserializer, |b| {
169
let v = &mut b.as_ref();
170
Self::deserialize_from_reader(v)
171
})?
172
.map_err(D::Error::custom)
173
}
174
}
175
176
#[cfg(feature = "dsl-schema")]
177
impl schemars::JsonSchema for DataFrame {
178
fn schema_name() -> String {
179
"DataFrame".to_owned()
180
}
181
182
fn schema_id() -> std::borrow::Cow<'static, str> {
183
std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "DataFrame"))
184
}
185
186
fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
187
Vec::<u8>::json_schema(generator)
188
}
189
}
190
191