Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/dataframe/io.rs
7889 views
1
use std::io::BufWriter;
2
use std::num::NonZeroUsize;
3
use std::sync::Arc;
4
5
use polars::io::RowIndex;
6
#[cfg(feature = "avro")]
7
use polars::io::avro::AvroCompression;
8
use polars::prelude::*;
9
use pyo3::prelude::*;
10
use pyo3::pybacked::PyBackedStr;
11
12
use super::PyDataFrame;
13
use crate::conversion::Wrap;
14
use crate::file::{get_file_like, get_mmap_bytes_reader, get_mmap_bytes_reader_and_path};
15
use crate::prelude::PyCompatLevel;
16
use crate::utils::EnterPolarsExt;
17
18
#[pymethods]
19
impl PyDataFrame {
20
#[staticmethod]
21
#[cfg(feature = "csv")]
22
#[pyo3(signature = (
23
py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,
24
skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,
25
overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,
26
null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,
27
row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)
28
)]
29
pub fn read_csv(
30
py: Python<'_>,
31
py_f: Bound<PyAny>,
32
infer_schema_length: Option<usize>,
33
chunk_size: usize,
34
has_header: bool,
35
ignore_errors: bool,
36
n_rows: Option<usize>,
37
skip_rows: usize,
38
skip_lines: usize,
39
projection: Option<Vec<usize>>,
40
separator: &str,
41
rechunk: bool,
42
columns: Option<Vec<String>>,
43
encoding: Wrap<CsvEncoding>,
44
n_threads: Option<usize>,
45
path: Option<String>,
46
overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
47
overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,
48
low_memory: bool,
49
comment_prefix: Option<&str>,
50
quote_char: Option<&str>,
51
null_values: Option<Wrap<NullValues>>,
52
missing_utf8_is_empty_string: bool,
53
try_parse_dates: bool,
54
skip_rows_after_header: usize,
55
row_index: Option<(String, IdxSize)>,
56
eol_char: &str,
57
raise_if_empty: bool,
58
truncate_ragged_lines: bool,
59
decimal_comma: bool,
60
schema: Option<Wrap<Schema>>,
61
) -> PyResult<Self> {
62
let null_values = null_values.map(|w| w.0);
63
let eol_char = eol_char.as_bytes()[0];
64
let row_index = row_index.map(|(name, offset)| RowIndex {
65
name: name.into(),
66
offset,
67
});
68
let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());
69
70
let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
71
overwrite_dtype
72
.iter()
73
.map(|(name, dtype)| {
74
let dtype = dtype.0.clone();
75
Field::new((&**name).into(), dtype)
76
})
77
.collect::<Schema>()
78
});
79
80
let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {
81
overwrite_dtype
82
.iter()
83
.map(|dt| dt.0.clone())
84
.collect::<Vec<_>>()
85
});
86
87
let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
88
py.enter_polars_df(move || {
89
CsvReadOptions::default()
90
.with_path(path)
91
.with_infer_schema_length(infer_schema_length)
92
.with_has_header(has_header)
93
.with_n_rows(n_rows)
94
.with_skip_rows(skip_rows)
95
.with_skip_lines(skip_lines)
96
.with_ignore_errors(ignore_errors)
97
.with_projection(projection.map(Arc::new))
98
.with_rechunk(rechunk)
99
.with_chunk_size(chunk_size)
100
.with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))
101
.with_n_threads(n_threads)
102
.with_schema_overwrite(overwrite_dtype.map(Arc::new))
103
.with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))
104
.with_schema(schema.map(|schema| Arc::new(schema.0)))
105
.with_low_memory(low_memory)
106
.with_skip_rows_after_header(skip_rows_after_header)
107
.with_row_index(row_index)
108
.with_raise_if_empty(raise_if_empty)
109
.with_parse_options(
110
CsvParseOptions::default()
111
.with_separator(separator.as_bytes()[0])
112
.with_encoding(encoding.0)
113
.with_missing_is_null(!missing_utf8_is_empty_string)
114
.with_comment_prefix(comment_prefix)
115
.with_null_values(null_values)
116
.with_try_parse_dates(try_parse_dates)
117
.with_quote_char(quote_char)
118
.with_eol_char(eol_char)
119
.with_truncate_ragged_lines(truncate_ragged_lines)
120
.with_decimal_comma(decimal_comma),
121
)
122
.into_reader_with_file_handle(mmap_bytes_r)
123
.finish()
124
})
125
}
126
127
#[staticmethod]
128
#[cfg(feature = "json")]
129
#[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]
130
pub fn read_json(
131
py: Python<'_>,
132
py_f: Bound<PyAny>,
133
infer_schema_length: Option<usize>,
134
schema: Option<Wrap<Schema>>,
135
schema_overrides: Option<Wrap<Schema>>,
136
) -> PyResult<Self> {
137
assert!(infer_schema_length != Some(0));
138
let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
139
140
py.enter_polars_df(move || {
141
let mut builder = JsonReader::new(mmap_bytes_r)
142
.with_json_format(JsonFormat::Json)
143
.infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));
144
145
if let Some(schema) = schema {
146
builder = builder.with_schema(Arc::new(schema.0));
147
}
148
149
if let Some(schema) = schema_overrides.as_ref() {
150
builder = builder.with_schema_overwrite(&schema.0);
151
}
152
153
builder.finish()
154
})
155
}
156
157
#[staticmethod]
158
#[cfg(feature = "ipc")]
159
#[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]
160
pub fn read_ipc(
161
py: Python<'_>,
162
py_f: Bound<PyAny>,
163
columns: Option<Vec<String>>,
164
projection: Option<Vec<usize>>,
165
n_rows: Option<usize>,
166
row_index: Option<(String, IdxSize)>,
167
memory_map: bool,
168
) -> PyResult<Self> {
169
let row_index = row_index.map(|(name, offset)| RowIndex {
170
name: name.into(),
171
offset,
172
});
173
let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;
174
175
let mmap_path = if memory_map { mmap_path } else { None };
176
py.enter_polars_df(move || {
177
IpcReader::new(mmap_bytes_r)
178
.with_projection(projection)
179
.with_columns(columns)
180
.with_n_rows(n_rows)
181
.with_row_index(row_index)
182
.memory_mapped(mmap_path)
183
.finish()
184
})
185
}
186
187
#[staticmethod]
188
#[cfg(feature = "ipc_streaming")]
189
#[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]
190
pub fn read_ipc_stream(
191
py: Python<'_>,
192
py_f: Bound<PyAny>,
193
columns: Option<Vec<String>>,
194
projection: Option<Vec<usize>>,
195
n_rows: Option<usize>,
196
row_index: Option<(String, IdxSize)>,
197
rechunk: bool,
198
) -> PyResult<Self> {
199
let row_index = row_index.map(|(name, offset)| RowIndex {
200
name: name.into(),
201
offset,
202
});
203
let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;
204
py.enter_polars_df(move || {
205
IpcStreamReader::new(mmap_bytes_r)
206
.with_projection(projection)
207
.with_columns(columns)
208
.with_n_rows(n_rows)
209
.with_row_index(row_index)
210
.set_rechunk(rechunk)
211
.finish()
212
})
213
}
214
215
#[staticmethod]
216
#[cfg(feature = "avro")]
217
#[pyo3(signature = (py_f, columns, projection, n_rows))]
218
pub fn read_avro(
219
py: Python<'_>,
220
py_f: Py<PyAny>,
221
columns: Option<Vec<String>>,
222
projection: Option<Vec<usize>>,
223
n_rows: Option<usize>,
224
) -> PyResult<Self> {
225
use polars::io::avro::AvroReader;
226
227
let file = get_file_like(py_f, false)?;
228
py.enter_polars_df(move || {
229
AvroReader::new(file)
230
.with_projection(projection)
231
.with_columns(columns)
232
.with_n_rows(n_rows)
233
.finish()
234
})
235
}
236
237
#[cfg(feature = "json")]
238
pub fn write_json(&self, py: Python<'_>, py_f: Py<PyAny>) -> PyResult<()> {
239
let file = BufWriter::new(get_file_like(py_f, true)?);
240
py.enter_polars(|| {
241
// TODO: Cloud support
242
243
JsonWriter::new(file)
244
.with_json_format(JsonFormat::Json)
245
.finish(&mut self.df.write())
246
})
247
}
248
249
#[cfg(feature = "ipc_streaming")]
250
pub fn write_ipc_stream(
251
&self,
252
py: Python<'_>,
253
py_f: Py<PyAny>,
254
compression: Wrap<Option<IpcCompression>>,
255
compat_level: PyCompatLevel,
256
) -> PyResult<()> {
257
let mut buf = get_file_like(py_f, true)?;
258
py.enter_polars(|| {
259
IpcStreamWriter::new(&mut buf)
260
.with_compression(compression.0)
261
.with_compat_level(compat_level.0)
262
.finish(&mut self.df.write())
263
})
264
}
265
266
#[cfg(feature = "avro")]
267
#[pyo3(signature = (py_f, compression, name))]
268
pub fn write_avro(
269
&self,
270
py: Python<'_>,
271
py_f: Py<PyAny>,
272
compression: Wrap<Option<AvroCompression>>,
273
name: String,
274
) -> PyResult<()> {
275
use polars::io::avro::AvroWriter;
276
let mut buf = get_file_like(py_f, true)?;
277
py.enter_polars(|| {
278
AvroWriter::new(&mut buf)
279
.with_compression(compression.0)
280
.with_name(name)
281
.finish(&mut self.df.write())
282
})
283
}
284
}
285
286