Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/file.rs
7884 views
1
use std::borrow::Cow;
2
#[cfg(target_family = "unix")]
3
use std::fs;
4
use std::fs::File;
5
use std::io;
6
use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7
#[cfg(target_family = "unix")]
8
use std::os::fd::{FromRawFd, RawFd};
9
use std::path::PathBuf;
10
use std::sync::Arc;
11
12
use polars::io::mmap::MmapBytesReader;
13
use polars::prelude::PlPath;
14
use polars::prelude::file::{Writeable, WriteableTrait};
15
use polars_error::polars_err;
16
use polars_utils::create_file;
17
use polars_utils::mmap::MemSlice;
18
use pyo3::IntoPyObjectExt;
19
use pyo3::exceptions::PyTypeError;
20
use pyo3::prelude::*;
21
use pyo3::types::{PyBytes, PyString, PyStringMethods};
22
23
use crate::error::PyPolarsErr;
24
use crate::prelude::resolve_homedir;
25
26
pub(crate) struct PyFileLikeObject {
27
inner: Py<PyAny>,
28
/// The object expects a string instead of a bytes for `write`.
29
expects_str: bool,
30
/// The object has a flush method.
31
has_flush: bool,
32
}
33
34
impl WriteableTrait for PyFileLikeObject {
35
fn close(&mut self) -> io::Result<()> {
36
Ok(())
37
}
38
39
fn sync_all(&self) -> std::io::Result<()> {
40
self.flush()
41
}
42
43
fn sync_data(&self) -> std::io::Result<()> {
44
self.flush()
45
}
46
}
47
48
impl Clone for PyFileLikeObject {
49
fn clone(&self) -> Self {
50
Python::attach(|py| Self {
51
inner: self.inner.clone_ref(py),
52
expects_str: self.expects_str,
53
has_flush: self.has_flush,
54
})
55
}
56
}
57
58
/// Wraps a `PyObject`, and implements read, seek, and write for it.
59
impl PyFileLikeObject {
60
/// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
61
/// To assert the object has the required methods,
62
/// instantiate it with `PyFileLikeObject::require`
63
pub(crate) fn new(object: Py<PyAny>, expects_str: bool, has_flush: bool) -> Self {
64
PyFileLikeObject {
65
inner: object,
66
expects_str,
67
has_flush,
68
}
69
}
70
71
pub(crate) fn to_memslice(&self) -> MemSlice {
72
Python::attach(|py| {
73
let bytes = self
74
.inner
75
.call_method(py, "read", (), None)
76
.expect("no read method found");
77
78
if let Ok(b) = bytes.downcast_bound::<PyBytes>(py) {
79
return MemSlice::from_arc(b.as_bytes(), Arc::new(bytes.clone_ref(py)));
80
}
81
82
if let Ok(b) = bytes.downcast_bound::<PyString>(py) {
83
return match b.to_cow().expect("PyString is not valid UTF-8") {
84
Cow::Borrowed(v) => {
85
MemSlice::from_arc(v.as_bytes(), Arc::new(bytes.clone_ref(py)))
86
},
87
Cow::Owned(v) => MemSlice::from_vec(v.into_bytes()),
88
};
89
}
90
91
panic!("Expecting to be able to downcast into bytes from read result.");
92
})
93
}
94
95
/// Validates that the underlying
96
/// python object has a `read`, `write`, and `seek` methods in respect to parameters.
97
/// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
98
pub(crate) fn ensure_requirements(
99
object: &Bound<PyAny>,
100
read: bool,
101
write: bool,
102
seek: bool,
103
) -> PyResult<()> {
104
if read && object.getattr("read").is_err() {
105
return Err(PyErr::new::<PyTypeError, _>(
106
"Object does not have a .read() method.",
107
));
108
}
109
110
if seek && object.getattr("seek").is_err() {
111
return Err(PyErr::new::<PyTypeError, _>(
112
"Object does not have a .seek() method.",
113
));
114
}
115
116
if write && object.getattr("write").is_err() {
117
return Err(PyErr::new::<PyTypeError, _>(
118
"Object does not have a .write() method.",
119
));
120
}
121
122
Ok(())
123
}
124
125
pub fn flush(&self) -> std::io::Result<()> {
126
if self.has_flush {
127
Python::attach(|py| {
128
self.inner
129
.call_method(py, "flush", (), None)
130
.map_err(pyerr_to_io_err)
131
})?;
132
}
133
134
Ok(())
135
}
136
}
137
138
/// Extracts a string repr from, and returns an IO error to send back to rust.
139
fn pyerr_to_io_err(e: PyErr) -> io::Error {
140
Python::attach(|py| {
141
let e_as_object: Py<PyAny> = e.into_py_any(py).unwrap();
142
143
match e_as_object.call_method(py, "__str__", (), None) {
144
Ok(repr) => match repr.extract::<String>(py) {
145
Ok(s) => io::Error::other(s),
146
Err(_e) => io::Error::other("An unknown error has occurred"),
147
},
148
Err(_) => io::Error::other("Err doesn't have __str__"),
149
}
150
})
151
}
152
153
impl Read for PyFileLikeObject {
154
fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
155
Python::attach(|py| {
156
let bytes = self
157
.inner
158
.call_method(py, "read", (buf.len(),), None)
159
.map_err(pyerr_to_io_err)?;
160
161
let opt_bytes = bytes.downcast_bound::<PyBytes>(py);
162
163
if let Ok(bytes) = opt_bytes {
164
buf.write_all(bytes.as_bytes())?;
165
166
bytes.len().map_err(pyerr_to_io_err)
167
} else if let Ok(s) = bytes.downcast_bound::<PyString>(py) {
168
let s = s.to_cow().map_err(pyerr_to_io_err)?;
169
buf.write_all(s.as_bytes())?;
170
Ok(s.len())
171
} else {
172
Err(io::Error::new(
173
ErrorKind::InvalidInput,
174
polars_err!(InvalidOperation: "could not read from input"),
175
))
176
}
177
})
178
}
179
}
180
181
impl Write for PyFileLikeObject {
182
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
183
// Note on the .extract() method:
184
// In case of a PyString object, it returns the number of chars,
185
// so we need to take extra steps if the underlying string is not all ASCII.
186
// In case of a ByBytes object, it returns the number of bytes.
187
let expects_str = self.expects_str;
188
let expects_str_and_is_ascii = expects_str && buf.is_ascii();
189
190
Python::attach(|py| {
191
let n_bytes = if expects_str_and_is_ascii {
192
let number_chars_written = unsafe {
193
self.inner.call_method(
194
py,
195
"write",
196
(PyString::new(py, std::str::from_utf8_unchecked(buf)),),
197
None,
198
)
199
}
200
.map_err(pyerr_to_io_err)?;
201
number_chars_written.extract(py).map_err(pyerr_to_io_err)?
202
} else if expects_str {
203
let number_chars_written = self
204
.inner
205
.call_method(
206
py,
207
"write",
208
(PyString::new(
209
py,
210
std::str::from_utf8(buf).map_err(io::Error::other)?,
211
),),
212
None,
213
)
214
.map_err(pyerr_to_io_err)?;
215
let n_chars: usize = number_chars_written.extract(py).map_err(pyerr_to_io_err)?;
216
// calculate n_bytes
217
if n_chars > 0 {
218
std::str::from_utf8(buf)
219
.map(|str| {
220
str.char_indices()
221
.nth(n_chars - 1)
222
.map(|(i, ch)| i + ch.len_utf8())
223
.unwrap()
224
})
225
.expect("unable to parse buffer as utf-8")
226
} else {
227
0
228
}
229
} else {
230
let number_bytes_written = self
231
.inner
232
.call_method(py, "write", (PyBytes::new(py, buf),), None)
233
.map_err(pyerr_to_io_err)?;
234
number_bytes_written.extract(py).map_err(pyerr_to_io_err)?
235
};
236
Ok(n_bytes)
237
})
238
}
239
240
fn flush(&mut self) -> Result<(), io::Error> {
241
Self::flush(self)
242
}
243
}
244
245
impl Seek for PyFileLikeObject {
246
fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
247
Python::attach(|py| {
248
let (whence, offset) = match pos {
249
SeekFrom::Start(i) => (0, i as i64),
250
SeekFrom::Current(i) => (1, i),
251
SeekFrom::End(i) => (2, i),
252
};
253
254
let new_position = self
255
.inner
256
.call_method(py, "seek", (offset, whence), None)
257
.map_err(pyerr_to_io_err)?;
258
259
new_position.extract(py).map_err(pyerr_to_io_err)
260
})
261
}
262
}
263
264
pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
265
266
impl FileLike for File {}
267
impl FileLike for PyFileLikeObject {}
268
impl MmapBytesReader for PyFileLikeObject {}
269
270
pub(crate) enum EitherRustPythonFile {
271
Py(PyFileLikeObject),
272
Rust(std::fs::File),
273
}
274
275
impl EitherRustPythonFile {
276
pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
277
match self {
278
EitherRustPythonFile::Py(f) => Box::new(f),
279
EitherRustPythonFile::Rust(f) => Box::new(f),
280
}
281
}
282
283
fn into_scan_source_input(self) -> PythonScanSourceInput {
284
match self {
285
EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_memslice()),
286
EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
287
}
288
}
289
290
pub(crate) fn into_writeable(self) -> Writeable {
291
match self {
292
Self::Py(f) => Writeable::Dyn(Box::new(f)),
293
Self::Rust(f) => Writeable::Local(f),
294
}
295
}
296
}
297
298
pub(crate) enum PythonScanSourceInput {
299
Buffer(MemSlice),
300
Path(PlPath),
301
File(std::fs::File),
302
}
303
304
pub(crate) fn try_get_pyfile(
305
py: Python<'_>,
306
py_f: Bound<'_, PyAny>,
307
write: bool,
308
) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
309
let io = py.import("io")?;
310
let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
311
let encoding = py_f.getattr("encoding")?;
312
let encoding = encoding.extract::<Cow<str>>()?;
313
Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
314
};
315
316
#[cfg(target_family = "unix")]
317
if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
318
|| (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
319
|| py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
320
|| py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
321
|| py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
322
|| (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
323
&& is_utf8_encoding(&py_f)?))
324
&& if write {
325
// invalidate read buffer
326
py_f.call_method0("flush").is_ok()
327
} else {
328
// flush write buffer
329
py_f.call_method1("seek", (0, 1)).is_ok()
330
})
331
.then(|| {
332
py_f.getattr("fileno")
333
.and_then(|fileno| fileno.call0())
334
.and_then(|fileno| fileno.extract::<libc::c_int>())
335
.ok()
336
})
337
.flatten()
338
.map(|fileno| unsafe {
339
// `File::from_raw_fd()` takes the ownership of the file descriptor.
340
// When the File is dropped, it closes the file descriptor.
341
// This is undesired - the Python file object will become invalid.
342
// Therefore, we duplicate the file descriptor here.
343
// Closing the duplicated file descriptor will not close
344
// the original file descriptor;
345
// and the status, e.g. stream position, is still shared with
346
// the original file descriptor.
347
// We use `F_DUPFD_CLOEXEC` here instead of `dup()`
348
// because it also sets the `O_CLOEXEC` flag on the duplicated file descriptor,
349
// which `dup()` clears.
350
// `open()` in both Rust and Python automatically set `O_CLOEXEC` flag;
351
// it prevents leaking file descriptors across processes,
352
// and we want to be consistent with them.
353
// `F_DUPFD_CLOEXEC` is defined in POSIX.1-2008
354
// and is present on all alive UNIX(-like) systems.
355
libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
356
})
357
.filter(|fileno| *fileno != -1)
358
.map(|fileno| fileno as RawFd)
359
{
360
return Ok((
361
EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
362
// This works on Linux and BSD with procfs mounted,
363
// otherwise it fails silently.
364
fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
365
));
366
}
367
368
// Unwrap TextIOWrapper
369
// Allow subclasses to allow things like pytest.capture.CaptureIO
370
let py_f = if py_f
371
.is_instance(&io.getattr("TextIOWrapper").unwrap())
372
.unwrap_or_default()
373
{
374
if !is_utf8_encoding(&py_f)? {
375
return Err(PyPolarsErr::from(
376
polars_err!(InvalidOperation: "file encoding is not UTF-8"),
377
)
378
.into());
379
}
380
// XXX: we have to clear buffer here.
381
// Is there a better solution?
382
if write {
383
py_f.call_method0("flush")?;
384
} else {
385
py_f.call_method1("seek", (0, 1))?;
386
}
387
py_f.getattr("buffer")?
388
} else {
389
py_f
390
};
391
PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
392
let expects_str = py_f.is_instance(&io.getattr("TextIOBase").unwrap())?;
393
let has_flush = py_f
394
.getattr_opt("flush")?
395
.is_some_and(|flush| flush.is_callable());
396
let f = PyFileLikeObject::new(py_f.unbind(), expects_str, has_flush);
397
Ok((EitherRustPythonFile::Py(f), None))
398
}
399
400
pub(crate) fn get_python_scan_source_input(
401
py_f: Py<PyAny>,
402
write: bool,
403
) -> PyResult<PythonScanSourceInput> {
404
Python::attach(|py| {
405
let py_f = py_f.into_bound(py);
406
407
// CPython has some internal tricks that means much of the time
408
// BytesIO.getvalue() involves no memory copying, unlike
409
// BytesIO.read(). So we want to handle BytesIO specially in order
410
// to save memory.
411
let py_f = read_if_bytesio(py_f);
412
413
// If the pyobject is a `bytes` class
414
if let Ok(b) = py_f.downcast::<PyBytes>() {
415
return Ok(PythonScanSourceInput::Buffer(MemSlice::from_arc(
416
b.as_bytes(),
417
// We want to specifically keep alive the PyBytes object.
418
Arc::new(b.clone().unbind()),
419
)));
420
}
421
422
if let Ok(s) = py_f.extract::<Cow<str>>() {
423
let mut file_path = PlPath::new(&s);
424
if let Some(p) = file_path.as_ref().as_local_path() {
425
if p.starts_with("~/") {
426
file_path = PlPath::Local(resolve_homedir(&p).into());
427
}
428
}
429
Ok(PythonScanSourceInput::Path(file_path))
430
} else {
431
Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
432
}
433
})
434
}
435
436
fn get_either_buffer_or_path(
437
py_f: Py<PyAny>,
438
write: bool,
439
) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
440
Python::attach(|py| {
441
let py_f = py_f.into_bound(py);
442
if let Ok(s) = py_f.extract::<Cow<str>>() {
443
let file_path = resolve_homedir(&&*s);
444
let f = if write {
445
create_file(&file_path).map_err(PyPolarsErr::from)?
446
} else {
447
polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
448
};
449
Ok((EitherRustPythonFile::Rust(f.into()), Some(file_path)))
450
} else {
451
try_get_pyfile(py, py_f, write)
452
}
453
})
454
}
455
456
///
457
/// # Arguments
458
/// * `write` - open for writing; will truncate existing file and create new file if not.
459
pub(crate) fn get_either_file(py_f: Py<PyAny>, write: bool) -> PyResult<EitherRustPythonFile> {
460
Ok(get_either_buffer_or_path(py_f, write)?.0)
461
}
462
463
pub(crate) fn get_file_like(f: Py<PyAny>, truncate: bool) -> PyResult<Box<dyn FileLike>> {
464
Ok(get_either_file(f, truncate)?.into_dyn())
465
}
466
467
/// If the give file-like is a BytesIO, read its contents in a memory-efficient
468
/// way.
469
fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
470
let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
471
if py_f.is_instance(&bytes_io).unwrap() {
472
// Note that BytesIO has some memory optimizations ensuring that much of
473
// the time getvalue() doesn't need to copy the underlying data:
474
let Ok(bytes) = py_f.call_method0("getvalue") else {
475
return py_f;
476
};
477
return bytes;
478
}
479
py_f
480
}
481
482
/// Create reader from PyBytes or a file-like object.
483
pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
484
get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
485
}
486
487
pub(crate) fn get_mmap_bytes_reader_and_path(
488
py_f: &Bound<PyAny>,
489
) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
490
let py_f = read_if_bytesio(py_f.clone());
491
492
// bytes object
493
if let Ok(bytes) = py_f.downcast::<PyBytes>() {
494
Ok((
495
Box::new(Cursor::new(MemSlice::from_arc(
496
bytes.as_bytes(),
497
Arc::new(py_f.clone().unbind()),
498
))),
499
None,
500
))
501
}
502
// string so read file
503
else {
504
match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
505
(EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
506
(EitherRustPythonFile::Py(f), path) => {
507
Ok((Box::new(Cursor::new(f.to_memslice())), path))
508
},
509
}
510
}
511
}
512
513