Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-json/src/ndjson/write.rs
6939 views
1
//! APIs to serialize and write to [NDJSON](http://ndjson.org/).
2
use std::io::Write;
3
4
use arrow::array::Array;
5
pub use fallible_streaming_iterator::FallibleStreamingIterator;
6
use polars_error::{PolarsError, PolarsResult};
7
8
use super::super::json::write::new_serializer;
9
10
fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
11
let mut serializer = new_serializer(array, 0, usize::MAX);
12
(0..array.len()).for_each(|_| {
13
buffer.extend_from_slice(serializer.next().unwrap());
14
buffer.push(b'\n');
15
});
16
}
17
18
/// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid NDJSON
19
/// where every line is an element of the array.
20
/// # Implementation
21
/// Advancing this iterator CPU-bounded
22
#[derive(Debug, Clone)]
23
pub struct Serializer<A, I>
24
where
25
A: AsRef<dyn Array>,
26
I: Iterator<Item = PolarsResult<A>>,
27
{
28
arrays: I,
29
buffer: Vec<u8>,
30
}
31
32
impl<A, I> Serializer<A, I>
33
where
34
A: AsRef<dyn Array>,
35
I: Iterator<Item = PolarsResult<A>>,
36
{
37
/// Creates a new [`Serializer`].
38
pub fn new(arrays: I, buffer: Vec<u8>) -> Self {
39
Self { arrays, buffer }
40
}
41
}
42
43
impl<A, I> FallibleStreamingIterator for Serializer<A, I>
44
where
45
A: AsRef<dyn Array>,
46
I: Iterator<Item = PolarsResult<A>>,
47
{
48
type Item = [u8];
49
50
type Error = PolarsError;
51
52
fn advance(&mut self) -> PolarsResult<()> {
53
self.buffer.clear();
54
self.arrays
55
.next()
56
.map(|maybe_array| maybe_array.map(|array| serialize(array.as_ref(), &mut self.buffer)))
57
.transpose()?;
58
Ok(())
59
}
60
61
fn get(&self) -> Option<&Self::Item> {
62
if !self.buffer.is_empty() {
63
Some(&self.buffer)
64
} else {
65
None
66
}
67
}
68
}
69
70
/// An iterator adapter that receives an implementer of [`Write`] and
71
/// an implementer of [`FallibleStreamingIterator`] (such as [`Serializer`])
72
/// and writes a valid NDJSON
73
/// # Implementation
74
/// Advancing this iterator mixes CPU-bounded (serializing arrays) tasks and IO-bounded (write to the writer).
75
pub struct FileWriter<W, I>
76
where
77
W: Write,
78
I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
79
{
80
writer: W,
81
iterator: I,
82
}
83
84
impl<W, I> FileWriter<W, I>
85
where
86
W: Write,
87
I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
88
{
89
/// Creates a new [`FileWriter`].
90
pub fn new(writer: W, iterator: I) -> Self {
91
Self { writer, iterator }
92
}
93
94
/// Returns the inner content of this iterator
95
///
96
/// There are two use-cases for this function:
97
/// * to continue writing to its writer
98
/// * to reuse an internal buffer of its iterator
99
pub fn into_inner(self) -> (W, I) {
100
(self.writer, self.iterator)
101
}
102
}
103
104
impl<W, I> Iterator for FileWriter<W, I>
105
where
106
W: Write,
107
I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
108
{
109
type Item = PolarsResult<()>;
110
111
fn next(&mut self) -> Option<Self::Item> {
112
let item = self.iterator.next().transpose()?;
113
Some(item.and_then(|x| {
114
self.writer.write_all(x)?;
115
Ok(())
116
}))
117
}
118
}
119
120