Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/serde/df.rs
8424 views
1
use std::io::{Read, Seek};
2
use std::sync::Arc;
3
4
use arrow::datatypes::Metadata;
5
use arrow::io::ipc::read::{StreamReader, StreamState, read_stream_metadata};
6
use arrow::io::ipc::write::WriteOptions;
7
use polars_error::{PolarsResult, polars_err, to_compute_err};
8
use polars_utils::format_pl_smallstr;
9
use polars_utils::pl_serialize::deserialize_map_bytes;
10
use polars_utils::pl_str::PlSmallStr;
11
use serde::de::Error;
12
use serde::*;
13
14
use crate::chunked_array::flags::StatisticsFlags;
15
use crate::config;
16
use crate::frame::chunk_df_for_writing;
17
use crate::prelude::{CompatLevel, DataFrame, SchemaExt};
18
use crate::schema::Schema;
19
use crate::utils::accumulate_dataframes_vertical_unchecked;
20
21
const FLAGS_KEY: PlSmallStr = PlSmallStr::from_static("_PL_FLAGS");
22
23
impl DataFrame {
24
pub fn serialize_into_writer(&mut self, writer: &mut dyn std::io::Write) -> PolarsResult<()> {
25
let schema = self.schema();
26
27
if schema.iter_values().any(|x| x.is_object()) {
28
return Err(polars_err!(
29
ComputeError:
30
"serializing data of type Object is not supported",
31
));
32
}
33
34
let mut ipc_writer =
35
arrow::io::ipc::write::StreamWriter::new(writer, WriteOptions { compression: None });
36
37
ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from_iter(
38
self.columns().iter().map(|c| {
39
(
40
format_pl_smallstr!("{}{}", FLAGS_KEY, c.name()),
41
PlSmallStr::from(c.get_flags().bits().to_string()),
42
)
43
}),
44
)));
45
46
ipc_writer.set_custom_schema_metadata(Arc::new(Metadata::from([(
47
FLAGS_KEY,
48
serde_json::to_string(
49
&self
50
.columns()
51
.iter()
52
.map(|s| s.get_flags().bits())
53
.collect::<Vec<u32>>(),
54
)
55
.map_err(to_compute_err)?
56
.into(),
57
)])));
58
59
ipc_writer.start(&schema.to_arrow(CompatLevel::newest()), None)?;
60
61
for batch in chunk_df_for_writing(self, 512 * 512)?.iter_chunks(CompatLevel::newest(), true)
62
{
63
ipc_writer.write(&batch, None)?;
64
}
65
66
ipc_writer.finish()?;
67
68
Ok(())
69
}
70
71
pub fn serialize_to_bytes(&mut self) -> PolarsResult<Vec<u8>> {
72
let mut buf = vec![];
73
self.serialize_into_writer(&mut buf)?;
74
75
Ok(buf)
76
}
77
78
pub fn deserialize_from_reader<T: Read + Seek>(reader: &mut T) -> PolarsResult<Self> {
79
let mut md = read_stream_metadata(reader)?;
80
let pl_schema = Schema::from_arrow_schema(&md.schema);
81
82
let custom_metadata = md.custom_schema_metadata.take();
83
84
let reader = StreamReader::new(reader, md, None);
85
let dfs = reader
86
.into_iter()
87
.map_while(|batch| match batch {
88
Ok(StreamState::Some(batch)) => Some(Ok(DataFrame::from(batch))),
89
Ok(StreamState::Waiting) => None,
90
Err(e) => Some(Err(e)),
91
})
92
.collect::<PolarsResult<Vec<DataFrame>>>()?;
93
94
if dfs.is_empty() {
95
return Ok(DataFrame::empty_with_schema(&pl_schema));
96
}
97
98
let mut df = accumulate_dataframes_vertical_unchecked(dfs);
99
100
// Set custom metadata (fallible)
101
(|| {
102
let custom_metadata = custom_metadata?;
103
let flags = custom_metadata.get(&FLAGS_KEY)?;
104
105
let flags: PolarsResult<Vec<u32>> = serde_json::from_str(flags).map_err(to_compute_err);
106
107
let verbose = config::verbose();
108
109
if let Err(e) = &flags {
110
if verbose {
111
eprintln!("DataFrame::read_ipc: Error parsing metadata flags: {e}");
112
}
113
}
114
115
let flags = flags.ok()?;
116
117
if flags.len() != df.width() {
118
if verbose {
119
eprintln!(
120
"DataFrame::read_ipc: Metadata flags width mismatch: {} != {}",
121
flags.len(),
122
df.width()
123
);
124
}
125
126
return None;
127
}
128
129
let mut n_set = 0;
130
131
for (c, v) in unsafe { df.columns_mut_retain_schema() }
132
.iter_mut()
133
.zip(flags)
134
{
135
if let Some(flags) = StatisticsFlags::from_bits(v) {
136
n_set += c.set_flags(flags) as usize;
137
}
138
}
139
140
if verbose {
141
eprintln!(
142
"DataFrame::read_ipc: Loaded metadata for {} / {} columns",
143
n_set,
144
df.width()
145
);
146
}
147
148
Some(())
149
})();
150
151
Ok(df)
152
}
153
}
154
155
impl Serialize for DataFrame {
156
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
157
where
158
S: Serializer,
159
{
160
use serde::ser::Error;
161
162
let mut bytes = vec![];
163
self.clone()
164
.serialize_into_writer(&mut bytes)
165
.map_err(S::Error::custom)?;
166
167
serializer.serialize_bytes(bytes.as_slice())
168
}
169
}
170
171
impl<'de> Deserialize<'de> for DataFrame {
172
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
173
where
174
D: Deserializer<'de>,
175
{
176
deserialize_map_bytes(deserializer, |b| {
177
let v = &mut b.as_ref();
178
let mut reader = std::io::Cursor::new(v);
179
Self::deserialize_from_reader(&mut reader)
180
})?
181
.map_err(D::Error::custom)
182
}
183
}
184
185
#[cfg(feature = "dsl-schema")]
186
impl schemars::JsonSchema for DataFrame {
187
fn schema_name() -> std::borrow::Cow<'static, str> {
188
"DataFrame".into()
189
}
190
191
fn schema_id() -> std::borrow::Cow<'static, str> {
192
std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "DataFrame"))
193
}
194
195
fn json_schema(generator: &mut schemars::SchemaGenerator) -> schemars::Schema {
196
Vec::<u8>::json_schema(generator)
197
}
198
}
199
200