Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/ipc/write.rs
6939 views
1
use std::io::Write;
2
3
use arrow::datatypes::Metadata;
4
use arrow::io::ipc::write::{self, EncodedData, WriteOptions};
5
use polars_core::prelude::*;
6
#[cfg(feature = "serde")]
7
use serde::{Deserialize, Serialize};
8
9
use crate::prelude::*;
10
use crate::shared::schema_to_arrow_checked;
11
12
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
13
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
14
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
15
pub struct IpcWriterOptions {
16
/// Data page compression
17
pub compression: Option<IpcCompression>,
18
/// Compatibility level
19
pub compat_level: CompatLevel,
20
/// Size of each written chunk.
21
pub chunk_size: IdxSize,
22
}
23
24
impl Default for IpcWriterOptions {
25
fn default() -> Self {
26
Self {
27
compression: None,
28
compat_level: CompatLevel::newest(),
29
chunk_size: 1 << 18,
30
}
31
}
32
}
33
34
impl IpcWriterOptions {
35
pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
36
IpcWriter::new(writer).with_compression(self.compression)
37
}
38
}
39
40
/// Write a DataFrame to Arrow's IPC format
41
///
42
/// # Example
43
///
44
/// ```
45
/// use polars_core::prelude::*;
46
/// use polars_io::ipc::IpcWriter;
47
/// use std::fs::File;
48
/// use polars_io::SerWriter;
49
///
50
/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
51
/// let mut file = File::create("file.ipc").expect("could not create file");
52
///
53
/// let mut writer = IpcWriter::new(&mut file);
54
///
55
/// let custom_metadata = [
56
/// ("first_name".into(), "John".into()),
57
/// ("last_name".into(), "Doe".into()),
58
/// ]
59
/// .into_iter()
60
/// .collect();
61
/// writer.set_custom_schema_metadata(Arc::new(custom_metadata));
62
/// writer.finish(df)
63
/// }
64
///
65
/// ```
66
#[must_use]
67
pub struct IpcWriter<W> {
68
pub(super) writer: W,
69
pub(super) compression: Option<IpcCompression>,
70
/// Polars' flavor of arrow. This might be temporary.
71
pub(super) compat_level: CompatLevel,
72
pub(super) parallel: bool,
73
pub(super) custom_schema_metadata: Option<Arc<Metadata>>,
74
}
75
76
impl<W: Write> IpcWriter<W> {
77
/// Set the compression used. Defaults to None.
78
pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
79
self.compression = compression;
80
self
81
}
82
83
pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
84
self.compat_level = compat_level;
85
self
86
}
87
88
pub fn with_parallel(mut self, parallel: bool) -> Self {
89
self.parallel = parallel;
90
self
91
}
92
93
pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
94
let schema = schema_to_arrow_checked(schema, self.compat_level, "ipc")?;
95
let mut writer = write::FileWriter::new(
96
self.writer,
97
Arc::new(schema),
98
None,
99
WriteOptions {
100
compression: self.compression.map(|c| c.into()),
101
},
102
);
103
writer.start()?;
104
105
Ok(BatchedWriter {
106
writer,
107
compat_level: self.compat_level,
108
})
109
}
110
111
/// Sets custom schema metadata. Must be called before `start` is called
112
pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
113
self.custom_schema_metadata = Some(custom_metadata);
114
}
115
}
116
117
impl<W> SerWriter<W> for IpcWriter<W>
118
where
119
W: Write,
120
{
121
fn new(writer: W) -> Self {
122
IpcWriter {
123
writer,
124
compression: None,
125
compat_level: CompatLevel::newest(),
126
parallel: true,
127
custom_schema_metadata: None,
128
}
129
}
130
131
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
132
let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
133
let mut ipc_writer = write::FileWriter::try_new(
134
&mut self.writer,
135
Arc::new(schema),
136
None,
137
WriteOptions {
138
compression: self.compression.map(|c| c.into()),
139
},
140
)?;
141
if let Some(custom_metadata) = &self.custom_schema_metadata {
142
ipc_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
143
}
144
145
if self.parallel {
146
df.align_chunks_par();
147
} else {
148
df.align_chunks();
149
}
150
let iter = df.iter_chunks(self.compat_level, true);
151
152
for batch in iter {
153
ipc_writer.write(&batch, None)?
154
}
155
ipc_writer.finish()?;
156
Ok(())
157
}
158
}
159
160
pub struct BatchedWriter<W: Write> {
161
writer: write::FileWriter<W>,
162
compat_level: CompatLevel,
163
}
164
165
impl<W: Write> BatchedWriter<W> {
166
/// Write a batch to the ipc writer.
167
///
168
/// # Panics
169
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
170
pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
171
let iter = df.iter_chunks(self.compat_level, true);
172
for batch in iter {
173
self.writer.write(&batch, None)?
174
}
175
Ok(())
176
}
177
178
/// Write a encoded data to the ipc writer.
179
///
180
/// # Panics
181
/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
182
pub fn write_encoded(
183
&mut self,
184
dictionaries: &[EncodedData],
185
message: &EncodedData,
186
) -> PolarsResult<()> {
187
self.writer.write_encoded(dictionaries, message)?;
188
Ok(())
189
}
190
191
/// Writes the footer of the IPC file.
192
pub fn finish(&mut self) -> PolarsResult<()> {
193
self.writer.finish()?;
194
Ok(())
195
}
196
}
197
198
/// Compression codec
199
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
200
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
201
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
202
pub enum IpcCompression {
203
/// LZ4 (framed)
204
LZ4,
205
/// ZSTD
206
#[default]
207
ZSTD,
208
}
209
210
impl From<IpcCompression> for write::Compression {
211
fn from(value: IpcCompression) -> Self {
212
match value {
213
IpcCompression::LZ4 => write::Compression::LZ4,
214
IpcCompression::ZSTD => write::Compression::ZSTD,
215
}
216
}
217
}
218
219