Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/avro/write.rs
6939 views
1
use std::io::Write;
2
3
pub use Compression as AvroCompression;
4
pub use arrow::io::avro::avro_schema::file::Compression;
5
use arrow::io::avro::avro_schema::{self};
6
use arrow::io::avro::write;
7
use polars_core::error::to_compute_err;
8
use polars_core::prelude::*;
9
10
use crate::shared::{SerWriter, schema_to_arrow_checked};
11
12
/// Write a [`DataFrame`] to [Apache Avro] format
13
///
14
/// [Apache Avro]: https://avro.apache.org
15
///
16
/// # Example
17
///
18
/// ```
19
/// use polars_core::prelude::*;
20
/// use polars_io::avro::AvroWriter;
21
/// use std::fs::File;
22
/// use polars_io::SerWriter;
23
///
24
/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
25
/// let mut file = File::create("file.avro").expect("could not create file");
26
///
27
/// AvroWriter::new(&mut file)
28
/// .finish(df)
29
/// }
30
/// ```
31
#[must_use]
32
pub struct AvroWriter<W> {
33
writer: W,
34
compression: Option<AvroCompression>,
35
name: String,
36
}
37
38
impl<W> AvroWriter<W>
39
where
40
W: Write,
41
{
42
/// Set the compression used. Defaults to None.
43
pub fn with_compression(mut self, compression: Option<AvroCompression>) -> Self {
44
self.compression = compression;
45
self
46
}
47
48
pub fn with_name(mut self, name: String) -> Self {
49
self.name = name;
50
self
51
}
52
}
53
54
impl<W> SerWriter<W> for AvroWriter<W>
55
where
56
W: Write,
57
{
58
fn new(writer: W) -> Self {
59
Self {
60
writer,
61
compression: None,
62
name: "".to_string(),
63
}
64
}
65
66
fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
67
let schema = schema_to_arrow_checked(df.schema(), CompatLevel::oldest(), "avro")?;
68
let record = write::to_record(&schema, self.name.clone())?;
69
70
let mut data = vec![];
71
let mut compressed_block = avro_schema::file::CompressedBlock::default();
72
for chunk in df.iter_chunks(CompatLevel::oldest(), true) {
73
let mut serializers = chunk
74
.iter()
75
.zip(record.fields.iter())
76
.map(|(array, field)| write::new_serializer(array.as_ref(), &field.schema))
77
.collect::<Vec<_>>();
78
79
let mut block =
80
avro_schema::file::Block::new(chunk.arrays()[0].len(), std::mem::take(&mut data));
81
write::serialize(&mut serializers, &mut block);
82
let _was_compressed =
83
avro_schema::write::compress(&mut block, &mut compressed_block, self.compression)
84
.map_err(to_compute_err)?;
85
86
avro_schema::write::write_metadata(&mut self.writer, record.clone(), self.compression)
87
.map_err(to_compute_err)?;
88
89
avro_schema::write::write_block(&mut self.writer, &compressed_block)
90
.map_err(to_compute_err)?;
91
// reuse block for next iteration.
92
data = block.data;
93
data.clear();
94
95
// reuse block for next iteration
96
compressed_block.data.clear();
97
compressed_block.number_of_rows = 0
98
}
99
100
Ok(())
101
}
102
}
103
104