Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/avro/read.rs
6939 views
1
use std::io::{Read, Seek};
2
3
use arrow::io::avro::{self, read};
4
use arrow::record_batch::RecordBatch;
5
use polars_core::error::to_compute_err;
6
use polars_core::prelude::*;
7
8
use crate::prelude::*;
9
use crate::shared::{ArrowReader, finish_reader};
10
11
/// Read [Apache Avro] format into a [`DataFrame`]
12
///
13
/// [Apache Avro]: https://avro.apache.org
14
///
15
/// # Example
16
/// ```
17
/// use std::fs::File;
18
/// use polars_core::prelude::*;
19
/// use polars_io::avro::AvroReader;
20
/// use polars_io::SerReader;
21
///
22
/// fn example() -> PolarsResult<DataFrame> {
23
/// let file = File::open("file.avro").expect("file not found");
24
///
25
/// AvroReader::new(file)
26
/// .finish()
27
/// }
28
/// ```
29
#[must_use]
30
pub struct AvroReader<R> {
31
reader: R,
32
rechunk: bool,
33
n_rows: Option<usize>,
34
columns: Option<Vec<String>>,
35
projection: Option<Vec<usize>>,
36
}
37
38
impl<R: Read + Seek> AvroReader<R> {
39
/// Get schema of the Avro File
40
pub fn schema(&mut self) -> PolarsResult<Schema> {
41
let schema = self.arrow_schema()?;
42
Ok(Schema::from_arrow_schema(&schema))
43
}
44
45
/// Get arrow schema of the avro File, this is faster than a polars schema.
46
pub fn arrow_schema(&mut self) -> PolarsResult<ArrowSchema> {
47
let metadata =
48
avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
49
let schema = read::infer_schema(&metadata.record)?;
50
Ok(schema)
51
}
52
53
/// Stop reading when `n` rows are read.
54
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
55
self.n_rows = num_rows;
56
self
57
}
58
59
/// Set the reader's column projection. This counts from 0, meaning that
60
/// `vec![0, 4]` would select the 1st and 5th column.
61
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
62
self.projection = projection;
63
self
64
}
65
66
/// Columns to select/ project
67
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
68
self.columns = columns;
69
self
70
}
71
}
72
73
impl<R> ArrowReader for read::Reader<R>
74
where
75
R: Read + Seek,
76
{
77
fn next_record_batch(&mut self) -> PolarsResult<Option<RecordBatch>> {
78
self.next().map_or(Ok(None), |v| v.map(Some))
79
}
80
}
81
82
impl<R> SerReader<R> for AvroReader<R>
83
where
84
R: Read + Seek,
85
{
86
fn new(reader: R) -> Self {
87
AvroReader {
88
reader,
89
rechunk: true,
90
n_rows: None,
91
columns: None,
92
projection: None,
93
}
94
}
95
96
fn set_rechunk(mut self, rechunk: bool) -> Self {
97
self.rechunk = rechunk;
98
self
99
}
100
101
fn finish(mut self) -> PolarsResult<DataFrame> {
102
let rechunk = self.rechunk;
103
let metadata =
104
avro::avro_schema::read::read_metadata(&mut self.reader).map_err(to_compute_err)?;
105
let schema = read::infer_schema(&metadata.record)?;
106
107
if let Some(columns) = &self.columns {
108
self.projection = Some(columns_to_projection(columns, &schema)?);
109
}
110
111
let (projection, projected_schema) = if let Some(projection) = self.projection {
112
let mut prj = vec![false; schema.len()];
113
for &index in projection.iter() {
114
prj[index] = true;
115
}
116
(Some(prj), apply_projection(&schema, &projection))
117
} else {
118
(None, schema.clone())
119
};
120
121
let avro_reader = avro::read::Reader::new(&mut self.reader, metadata, schema, projection);
122
123
finish_reader(
124
avro_reader,
125
rechunk,
126
self.n_rows,
127
None,
128
&projected_schema,
129
None,
130
)
131
}
132
}
133
134