Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/utils/other.rs
8424 views
1
use std::io::Read;
2
#[cfg(target_os = "emscripten")]
3
use std::io::{Seek, SeekFrom};
4
5
use polars_buffer::Buffer;
6
use polars_core::prelude::*;
7
use polars_utils::mmap::MMapSemaphore;
8
9
use crate::mmap::{MmapBytesReader, ReaderBytes};
10
11
pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
12
reader: &mut R,
13
) -> PolarsResult<ReaderBytes<'_>> {
14
// we have a file so we can mmap
15
// only seekable files are mmap-able
16
if let Some((file, offset)) = reader
17
.stream_position()
18
.ok()
19
.and_then(|offset| Some((reader.to_file()?, offset)))
20
{
21
let mut options = memmap::MmapOptions::new();
22
options.offset(offset);
23
24
// Set mmap size based on seek to end when running under Emscripten
25
#[cfg(target_os = "emscripten")]
26
{
27
let mut file = file;
28
let size = file.seek(SeekFrom::End(0)).unwrap();
29
options.len((size - offset) as usize);
30
}
31
32
let mmap = MMapSemaphore::new_from_file_with_options(file, options)?;
33
Ok(ReaderBytes::Owned(Buffer::from_owner(mmap)))
34
} else {
35
// we can get the bytes for free
36
if reader.to_bytes().is_some() {
37
// duplicate .to_bytes() is necessary to satisfy the borrow checker
38
Ok(ReaderBytes::Borrowed((*reader).to_bytes().unwrap()))
39
} else {
40
// we have to read to an owned buffer to get the bytes.
41
let mut bytes = Vec::with_capacity(1024 * 128);
42
reader.read_to_end(&mut bytes)?;
43
Ok(ReaderBytes::Owned(bytes.into()))
44
}
45
}
46
}
47
48
#[cfg(any(
49
feature = "ipc",
50
feature = "ipc_streaming",
51
feature = "parquet",
52
feature = "avro"
53
))]
54
pub fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema {
55
projection
56
.iter()
57
.map(|idx| schema.get_at_index(*idx).unwrap())
58
.map(|(k, v)| (k.clone(), v.clone()))
59
.collect()
60
}
61
62
#[cfg(any(
63
feature = "ipc",
64
feature = "ipc_streaming",
65
feature = "avro",
66
feature = "parquet"
67
))]
68
pub fn columns_to_projection<T: AsRef<str>>(
69
columns: &[T],
70
schema: &ArrowSchema,
71
) -> PolarsResult<Vec<usize>> {
72
let mut prj = Vec::with_capacity(columns.len());
73
74
for column in columns {
75
let i = schema.try_index_of(column.as_ref())?;
76
prj.push(i);
77
}
78
79
Ok(prj)
80
}
81
82
#[cfg(debug_assertions)]
83
fn check_offsets(dfs: &[DataFrame]) {
84
dfs.windows(2).for_each(|s| {
85
let a = &s[0].columns()[0];
86
let b = &s[1].columns()[0];
87
88
let prev = a.get(a.len() - 1).unwrap().extract::<usize>().unwrap();
89
let next = b.get(0).unwrap().extract::<usize>().unwrap();
90
assert_eq!(prev + 1, next);
91
})
92
}
93
94
/// Because of threading every row starts from `0` or from `offset`.
95
/// We must correct that so that they are monotonically increasing.
96
#[cfg(any(feature = "csv", feature = "json"))]
97
pub(crate) fn update_row_counts2(dfs: &mut [DataFrame], offset: IdxSize) {
98
if !dfs.is_empty() {
99
let mut previous = offset;
100
for df in &mut *dfs {
101
if df.shape_has_zero() {
102
continue;
103
}
104
let n_read = df.height() as IdxSize;
105
if let Some(s) = unsafe { df.columns_mut_retain_schema() }.get_mut(0) {
106
if let Ok(v) = s.get(0) {
107
if v.extract::<usize>().unwrap() != previous as usize {
108
*s = &*s + previous;
109
}
110
}
111
}
112
previous += n_read;
113
}
114
}
115
#[cfg(debug_assertions)]
116
{
117
check_offsets(dfs)
118
}
119
}
120
121
/// Because of threading every row starts from `0` or from `offset`.
122
/// We must correct that so that they are monotonically increasing.
123
#[cfg(feature = "json")]
124
pub(crate) fn update_row_counts3(dfs: &mut [DataFrame], heights: &[IdxSize], offset: IdxSize) {
125
assert_eq!(dfs.len(), heights.len());
126
if !dfs.is_empty() {
127
let mut previous = offset;
128
for i in 0..dfs.len() {
129
let df = &mut dfs[i];
130
if df.shape_has_zero() {
131
continue;
132
}
133
134
if let Some(s) = unsafe { df.columns_mut_retain_schema() }.get_mut(0) {
135
if let Ok(v) = s.get(0) {
136
if v.extract::<usize>().unwrap() != previous as usize {
137
*s = &*s + previous;
138
}
139
}
140
}
141
let n_read = heights[i];
142
previous += n_read;
143
}
144
}
145
}
146
147
#[cfg(feature = "json")]
148
pub fn overwrite_schema(schema: &mut Schema, overwriting_schema: &Schema) -> PolarsResult<()> {
149
for (k, value) in overwriting_schema.iter() {
150
*schema.try_get_mut(k)? = value.clone();
151
}
152
Ok(())
153
}
154
155
polars_utils::regex_cache::cached_regex! {
156
pub static FLOAT_RE = r"^[-+]?((\d*\.\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+\.)$";
157
pub static FLOAT_RE_DECIMAL = r"^[-+]?((\d*,\d+)([eE][-+]?\d+)?|inf|NaN|(\d+)[eE][-+]?\d+|\d+,)$";
158
pub static INTEGER_RE = r"^-?(\d+)$";
159
pub static BOOLEAN_RE = r"^(?i:true|false)$";
160
}
161
162
pub fn materialize_projection(
163
with_columns: Option<&[PlSmallStr]>,
164
schema: &Schema,
165
hive_partitions: Option<&[Series]>,
166
has_row_index: bool,
167
) -> Option<Vec<usize>> {
168
match hive_partitions {
169
None => with_columns.map(|with_columns| {
170
with_columns
171
.iter()
172
.map(|name| schema.index_of(name).unwrap() - has_row_index as usize)
173
.collect()
174
}),
175
Some(part_cols) => {
176
with_columns.map(|with_columns| {
177
with_columns
178
.iter()
179
.flat_map(|name| {
180
// the hive partitions are added at the end of the schema, but we don't want to project
181
// them from the file
182
if part_cols.iter().any(|s| s.name() == name.as_str()) {
183
None
184
} else {
185
Some(schema.index_of(name).unwrap() - has_row_index as usize)
186
}
187
})
188
.collect()
189
})
190
},
191
}
192
}
193
194
/// Utility for decoding JSON that adds the response value to the error message if decoding fails.
195
/// This makes it much easier to debug errors from parsing network responses.
196
#[cfg(feature = "cloud")]
197
pub fn decode_json_response<T>(bytes: &[u8]) -> PolarsResult<T>
198
where
199
T: for<'de> serde::de::Deserialize<'de>,
200
{
201
use polars_error::to_compute_err;
202
use polars_utils::error::TruncateErrorDetail;
203
204
serde_json::from_slice(bytes)
205
.map_err(to_compute_err)
206
.map_err(|e| {
207
e.wrap_msg(|e| {
208
format!(
209
"error decoding response: {}, response value: {}",
210
e,
211
TruncateErrorDetail(&String::from_utf8_lossy(bytes))
212
)
213
})
214
})
215
}
216
217
#[cfg(test)]
218
mod tests {
219
use super::FLOAT_RE;
220
221
#[test]
222
fn test_float_parse() {
223
assert!(FLOAT_RE.is_match("0.1"));
224
assert!(FLOAT_RE.is_match("3.0"));
225
assert!(FLOAT_RE.is_match("3.00001"));
226
assert!(FLOAT_RE.is_match("-9.9990e-003"));
227
assert!(FLOAT_RE.is_match("9.9990e+003"));
228
assert!(FLOAT_RE.is_match("9.9990E+003"));
229
assert!(FLOAT_RE.is_match("9.9990E+003"));
230
assert!(FLOAT_RE.is_match(".5"));
231
assert!(FLOAT_RE.is_match("2.5E-10"));
232
assert!(FLOAT_RE.is_match("2.5e10"));
233
assert!(FLOAT_RE.is_match("NaN"));
234
assert!(FLOAT_RE.is_match("-NaN"));
235
assert!(FLOAT_RE.is_match("-inf"));
236
assert!(FLOAT_RE.is_match("inf"));
237
assert!(FLOAT_RE.is_match("-7e-05"));
238
assert!(FLOAT_RE.is_match("7e-05"));
239
assert!(FLOAT_RE.is_match("+7e+05"));
240
}
241
}
242
243