Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/scan_lines.rs
7884 views
1
use arrow::array::BinaryViewArrayGenericBuilder;
2
use arrow::datatypes::ArrowDataType;
3
use polars_core::prelude::DataType;
4
use polars_core::series::Series;
5
use polars_error::{PolarsResult, polars_bail};
6
use polars_utils::pl_str::PlSmallStr;
7
8
const EOL_CHAR: u8 = b'\n';
9
10
pub fn count_lines(full_bytes: &[u8]) -> usize {
11
let mut n: usize = full_bytes.iter().map(|c| (*c == EOL_CHAR) as usize).sum();
12
13
if let Some(c) = full_bytes.last()
14
&& *c != EOL_CHAR
15
{
16
n += 1;
17
}
18
19
n
20
}
21
22
pub fn split_lines_to_rows(bytes: &[u8]) -> PolarsResult<Series> {
23
split_lines_to_rows_impl(bytes, u32::MAX as usize)
24
}
25
26
fn split_lines_to_rows_impl(bytes: &[u8], max_buffer_size: usize) -> PolarsResult<Series> {
27
if bytes.is_empty() {
28
return Ok(Series::new_empty(PlSmallStr::EMPTY, &DataType::String));
29
};
30
31
let first_line_len = bytes.split(|c| *c == EOL_CHAR).next().unwrap().len();
32
let last_line_len = bytes.rsplit(|c| *c == EOL_CHAR).next().unwrap().len();
33
34
let n_lines_estimate = bytes
35
.len()
36
.div_ceil(first_line_len.min(last_line_len).max(1));
37
38
use arrow::array::builder::StaticArrayBuilder;
39
40
let mut builder: BinaryViewArrayGenericBuilder<[u8]> =
41
BinaryViewArrayGenericBuilder::new(ArrowDataType::BinaryView);
42
builder.reserve(n_lines_estimate);
43
44
for line_bytes in bytes
45
.strip_suffix(&[EOL_CHAR])
46
.unwrap_or(bytes)
47
.split(|c| *c == EOL_CHAR)
48
{
49
if line_bytes.len() > max_buffer_size {
50
polars_bail!(
51
ComputeError:
52
"line byte length {} exceeds max buffer size {}",
53
line_bytes.len(), max_buffer_size,
54
)
55
}
56
57
builder.push_value_ignore_validity(line_bytes);
58
}
59
60
let arr = builder.freeze();
61
62
// Performs UTF-8 validation.
63
let arr = arr.to_utf8view()?;
64
65
Ok(unsafe {
66
Series::_try_from_arrow_unchecked(
67
PlSmallStr::EMPTY,
68
vec![arr.boxed()],
69
&ArrowDataType::Utf8View,
70
)?
71
})
72
}
73
74
#[cfg(test)]
75
mod tests {
76
use arrow::buffer::Buffer;
77
use polars_error::PolarsError;
78
79
use crate::scan_lines::split_lines_to_rows_impl;
80
81
#[test]
82
fn test_split_lines_to_rows_impl() {
83
let data: &'static [u8] = b"
84
AAAAABBBBBCCCCCDDDDD
85
86
EEEEEFFFFFGGGGGHHHHH
87
88
";
89
90
let out = split_lines_to_rows_impl(data, 20).unwrap();
91
let out = out.str().unwrap();
92
93
assert_eq!(
94
out.iter().collect::<Vec<_>>().as_slice(),
95
&[
96
Some(""),
97
Some("AAAAABBBBBCCCCCDDDDD"),
98
Some(""),
99
Some("EEEEEFFFFFGGGGGHHHHH"),
100
Some(""),
101
]
102
);
103
104
let v: Vec<&[Buffer<u8>]> = out
105
.downcast_iter()
106
.map(|array| array.data_buffers().as_ref())
107
.collect();
108
109
assert_eq!(
110
v.as_slice(),
111
&[&[Buffer::from_static(
112
b"AAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHH"
113
)]]
114
);
115
116
let PolarsError::ComputeError(err_str) = split_lines_to_rows_impl(data, 19).unwrap_err()
117
else {
118
unreachable!()
119
};
120
121
assert_eq!(&*err_str, "line byte length 20 exceeds max buffer size 19");
122
}
123
124
#[test]
125
fn test_split_lines_to_rows_impl_all_inline() {
126
let data: Vec<u8> = [
127
b"AAAABBBBCCCC\n".as_slice(),
128
b" \n".as_slice(),
129
b"DDDDEEEEFFFF\n".as_slice(),
130
b" ".as_slice(),
131
]
132
.concat();
133
134
let out = split_lines_to_rows_impl(&data, 12).unwrap();
135
let out = out.str().unwrap();
136
137
assert_eq!(
138
out.iter().collect::<Vec<_>>().as_slice(),
139
&[
140
Some("AAAABBBBCCCC"),
141
Some(" "),
142
Some("DDDDEEEEFFFF"),
143
Some(" "),
144
]
145
);
146
147
let v: Vec<&[Buffer<u8>]> = out
148
.downcast_iter()
149
.map(|array| array.data_buffers().as_ref())
150
.collect();
151
152
assert_eq!(v.as_slice(), &[&[][..]]);
153
}
154
}
155
156