Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/file.rs
8353 views
1
use std::borrow::Cow;
2
#[cfg(target_family = "unix")]
3
use std::fs;
4
use std::fs::File;
5
use std::io;
6
use std::io::{Cursor, ErrorKind, Read, Seek, SeekFrom, Write};
7
#[cfg(target_family = "unix")]
8
use std::os::fd::{FromRawFd, RawFd};
9
use std::path::PathBuf;
10
11
use polars::io::mmap::MmapBytesReader;
12
use polars::prelude::PlRefPath;
13
use polars::prelude::file::{Writeable, WriteableTrait};
14
use polars_buffer::{Buffer, SharedStorage};
15
use polars_error::polars_err;
16
use polars_utils::create_file;
17
use pyo3::IntoPyObjectExt;
18
use pyo3::exceptions::PyTypeError;
19
use pyo3::prelude::*;
20
use pyo3::types::{PyBytes, PyString, PyStringMethods};
21
22
use crate::error::PyPolarsErr;
23
use crate::prelude::resolve_homedir;
24
use crate::utils::to_py_err;
25
26
pub(crate) struct PyFileLikeObject {
27
inner: Py<PyAny>,
28
/// The object expects a string instead of a bytes for `write`.
29
expects_str: bool,
30
/// The object has a flush method.
31
has_flush: bool,
32
}
33
34
impl WriteableTrait for PyFileLikeObject {
35
fn close(&mut self) -> io::Result<()> {
36
Ok(())
37
}
38
39
fn sync_all(&self) -> std::io::Result<()> {
40
self.flush()
41
}
42
43
fn sync_data(&self) -> std::io::Result<()> {
44
self.flush()
45
}
46
}
47
48
impl Clone for PyFileLikeObject {
49
fn clone(&self) -> Self {
50
Python::attach(|py| Self {
51
inner: self.inner.clone_ref(py),
52
expects_str: self.expects_str,
53
has_flush: self.has_flush,
54
})
55
}
56
}
57
58
/// Wraps a `PyObject`, and implements read, seek, and write for it.
59
impl PyFileLikeObject {
60
/// Creates an instance of a `PyFileLikeObject` from a `PyObject`.
61
/// To assert the object has the required methods,
62
/// instantiate it with `PyFileLikeObject::require`
63
pub(crate) fn new(object: Py<PyAny>, expects_str: bool, has_flush: bool) -> Self {
64
PyFileLikeObject {
65
inner: object,
66
expects_str,
67
has_flush,
68
}
69
}
70
71
pub(crate) fn to_buffer(&self) -> Buffer<u8> {
72
Python::attach(|py| {
73
let bytes = self
74
.inner
75
.call_method(py, "read", (), None)
76
.expect("no read method found");
77
78
if let Ok(b) = bytes.cast_bound::<PyBytes>(py) {
79
// SAFETY: we keep the underlying python object alive.
80
let slice = b.as_bytes();
81
let owner = bytes.clone_ref(py);
82
let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
83
return Buffer::from_storage(ss);
84
}
85
86
if let Ok(b) = bytes.cast_bound::<PyString>(py) {
87
return match b.to_cow().expect("PyString is not valid UTF-8") {
88
Cow::Borrowed(v) => {
89
// SAFETY: we keep the underlying python object alive.
90
let slice = v.as_bytes();
91
let owner = bytes.clone_ref(py);
92
let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
93
return Buffer::from_storage(ss);
94
},
95
Cow::Owned(v) => Buffer::from_vec(v.into_bytes()),
96
};
97
}
98
99
panic!("Expecting to be able to downcast into bytes from read result.");
100
})
101
}
102
103
/// Validates that the underlying
104
/// python object has a `read`, `write`, and `seek` methods in respect to parameters.
105
/// Will return a `TypeError` if object does not have `read`, `seek`, and `write` methods.
106
pub(crate) fn ensure_requirements(
107
object: &Bound<PyAny>,
108
read: bool,
109
write: bool,
110
seek: bool,
111
) -> PyResult<()> {
112
if read && object.getattr("read").is_err() {
113
return Err(PyErr::new::<PyTypeError, _>(
114
"Object does not have a .read() method.",
115
));
116
}
117
118
if seek && object.getattr("seek").is_err() {
119
return Err(PyErr::new::<PyTypeError, _>(
120
"Object does not have a .seek() method.",
121
));
122
}
123
124
if write && object.getattr("write").is_err() {
125
return Err(PyErr::new::<PyTypeError, _>(
126
"Object does not have a .write() method.",
127
));
128
}
129
130
Ok(())
131
}
132
133
pub fn flush(&self) -> std::io::Result<()> {
134
if self.has_flush {
135
Python::attach(|py| {
136
self.inner
137
.call_method(py, "flush", (), None)
138
.map_err(pyerr_to_io_err)
139
})?;
140
}
141
142
Ok(())
143
}
144
}
145
146
/// Extracts a string repr from, and returns an IO error to send back to rust.
147
fn pyerr_to_io_err(e: PyErr) -> io::Error {
148
Python::attach(|py| {
149
let e_as_object: Py<PyAny> = e.into_py_any(py).unwrap();
150
151
match e_as_object.call_method(py, "__str__", (), None) {
152
Ok(repr) => match repr.extract::<String>(py) {
153
Ok(s) => io::Error::other(s),
154
Err(_e) => io::Error::other("An unknown error has occurred"),
155
},
156
Err(_) => io::Error::other("Err doesn't have __str__"),
157
}
158
})
159
}
160
161
impl Read for PyFileLikeObject {
162
fn read(&mut self, mut buf: &mut [u8]) -> Result<usize, io::Error> {
163
Python::attach(|py| {
164
let bytes = self
165
.inner
166
.call_method(py, "read", (buf.len(),), None)
167
.map_err(pyerr_to_io_err)?;
168
169
let opt_bytes = bytes.cast_bound::<PyBytes>(py);
170
171
if let Ok(bytes) = opt_bytes {
172
buf.write_all(bytes.as_bytes())?;
173
174
bytes.len().map_err(pyerr_to_io_err)
175
} else if let Ok(s) = bytes.cast_bound::<PyString>(py) {
176
let s = s.to_cow().map_err(pyerr_to_io_err)?;
177
buf.write_all(s.as_bytes())?;
178
Ok(s.len())
179
} else {
180
Err(io::Error::new(
181
ErrorKind::InvalidInput,
182
polars_err!(InvalidOperation: "could not read from input"),
183
))
184
}
185
})
186
}
187
}
188
189
impl Write for PyFileLikeObject {
190
fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
191
// Note on the .extract() method:
192
// In case of a PyString object, it returns the number of chars,
193
// so we need to take extra steps if the underlying string is not all ASCII.
194
// In case of a ByBytes object, it returns the number of bytes.
195
let expects_str = self.expects_str;
196
let expects_str_and_is_ascii = expects_str && buf.is_ascii();
197
198
Python::attach(|py| {
199
let n_bytes = if expects_str_and_is_ascii {
200
let number_chars_written = unsafe {
201
self.inner.call_method(
202
py,
203
"write",
204
(PyString::new(py, std::str::from_utf8_unchecked(buf)),),
205
None,
206
)
207
}
208
.map_err(pyerr_to_io_err)?;
209
number_chars_written.extract(py).map_err(pyerr_to_io_err)?
210
} else if expects_str {
211
let number_chars_written = self
212
.inner
213
.call_method(
214
py,
215
"write",
216
(PyString::new(
217
py,
218
std::str::from_utf8(buf).map_err(io::Error::other)?,
219
),),
220
None,
221
)
222
.map_err(pyerr_to_io_err)?;
223
let n_chars: usize = number_chars_written.extract(py).map_err(pyerr_to_io_err)?;
224
// calculate n_bytes
225
if n_chars > 0 {
226
std::str::from_utf8(buf)
227
.map(|str| {
228
str.char_indices()
229
.nth(n_chars - 1)
230
.map(|(i, ch)| i + ch.len_utf8())
231
.unwrap()
232
})
233
.expect("unable to parse buffer as utf-8")
234
} else {
235
0
236
}
237
} else {
238
let number_bytes_written = self
239
.inner
240
.call_method(py, "write", (PyBytes::new(py, buf),), None)
241
.map_err(pyerr_to_io_err)?;
242
number_bytes_written.extract(py).map_err(pyerr_to_io_err)?
243
};
244
Ok(n_bytes)
245
})
246
}
247
248
fn flush(&mut self) -> Result<(), io::Error> {
249
Self::flush(self)
250
}
251
}
252
253
impl Seek for PyFileLikeObject {
254
fn seek(&mut self, pos: SeekFrom) -> Result<u64, io::Error> {
255
Python::attach(|py| {
256
let (whence, offset) = match pos {
257
SeekFrom::Start(i) => (0, i as i64),
258
SeekFrom::Current(i) => (1, i),
259
SeekFrom::End(i) => (2, i),
260
};
261
262
let new_position = self
263
.inner
264
.call_method(py, "seek", (offset, whence), None)
265
.map_err(pyerr_to_io_err)?;
266
267
new_position.extract(py).map_err(pyerr_to_io_err)
268
})
269
}
270
}
271
272
pub(crate) trait FileLike: Read + Write + Seek + Sync + Send {}
273
274
impl FileLike for File {}
275
impl FileLike for PyFileLikeObject {}
276
impl MmapBytesReader for PyFileLikeObject {}
277
278
pub(crate) enum EitherRustPythonFile {
279
Py(PyFileLikeObject),
280
Rust(std::fs::File),
281
}
282
283
impl EitherRustPythonFile {
284
pub(crate) fn into_dyn(self) -> Box<dyn FileLike> {
285
match self {
286
EitherRustPythonFile::Py(f) => Box::new(f),
287
EitherRustPythonFile::Rust(f) => Box::new(f),
288
}
289
}
290
291
fn into_scan_source_input(self) -> PythonScanSourceInput {
292
match self {
293
EitherRustPythonFile::Py(f) => PythonScanSourceInput::Buffer(f.to_buffer()),
294
EitherRustPythonFile::Rust(f) => PythonScanSourceInput::File(f),
295
}
296
}
297
298
pub(crate) fn into_writeable(self) -> Writeable {
299
match self {
300
Self::Py(f) => Writeable::Dyn(Box::new(f)),
301
Self::Rust(f) => Writeable::Local(f),
302
}
303
}
304
}
305
306
pub(crate) enum PythonScanSourceInput {
307
Buffer(Buffer<u8>),
308
Path(PlRefPath),
309
File(std::fs::File),
310
}
311
312
pub(crate) fn try_get_pyfile(
313
py: Python<'_>,
314
py_f: Bound<'_, PyAny>,
315
write: bool,
316
) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
317
let io = py.import("io")?;
318
let is_utf8_encoding = |py_f: &Bound<PyAny>| -> PyResult<bool> {
319
let encoding = py_f.getattr("encoding")?;
320
let encoding = encoding.extract::<Cow<str>>()?;
321
Ok(encoding.eq_ignore_ascii_case("utf-8") || encoding.eq_ignore_ascii_case("utf8"))
322
};
323
324
#[cfg(target_family = "unix")]
325
if let Some(fd) = (py_f.is_exact_instance(&io.getattr("FileIO").unwrap())
326
|| (py_f.is_exact_instance(&io.getattr("BufferedReader").unwrap())
327
|| py_f.is_exact_instance(&io.getattr("BufferedWriter").unwrap())
328
|| py_f.is_exact_instance(&io.getattr("BufferedRandom").unwrap())
329
|| py_f.is_exact_instance(&io.getattr("BufferedRWPair").unwrap())
330
|| (py_f.is_exact_instance(&io.getattr("TextIOWrapper").unwrap())
331
&& is_utf8_encoding(&py_f)?))
332
&& if write {
333
// invalidate read buffer
334
py_f.call_method0("flush").is_ok()
335
} else {
336
// flush write buffer
337
py_f.call_method1("seek", (0, 1)).is_ok()
338
})
339
.then(|| {
340
py_f.getattr("fileno")
341
.and_then(|fileno| fileno.call0())
342
.and_then(|fileno| fileno.extract::<libc::c_int>())
343
.ok()
344
})
345
.flatten()
346
.map(|fileno| unsafe {
347
// `File::from_raw_fd()` takes the ownership of the file descriptor.
348
// When the File is dropped, it closes the file descriptor.
349
// This is undesired - the Python file object will become invalid.
350
// Therefore, we duplicate the file descriptor here.
351
// Closing the duplicated file descriptor will not close
352
// the original file descriptor;
353
// and the status, e.g. stream position, is still shared with
354
// the original file descriptor.
355
// We use `F_DUPFD_CLOEXEC` here instead of `dup()`
356
// because it also sets the `O_CLOEXEC` flag on the duplicated file descriptor,
357
// which `dup()` clears.
358
// `open()` in both Rust and Python automatically set `O_CLOEXEC` flag;
359
// it prevents leaking file descriptors across processes,
360
// and we want to be consistent with them.
361
// `F_DUPFD_CLOEXEC` is defined in POSIX.1-2008
362
// and is present on all alive UNIX(-like) systems.
363
libc::fcntl(fileno, libc::F_DUPFD_CLOEXEC, 0)
364
})
365
.filter(|fileno| *fileno != -1)
366
.map(|fileno| fileno as RawFd)
367
{
368
return Ok((
369
EitherRustPythonFile::Rust(unsafe { File::from_raw_fd(fd).into() }),
370
// This works on Linux and BSD with procfs mounted,
371
// otherwise it fails silently.
372
fs::canonicalize(format!("/proc/self/fd/{fd}")).ok(),
373
));
374
}
375
376
// Unwrap TextIOWrapper
377
// Allow subclasses to allow things like pytest.capture.CaptureIO
378
let py_f = if py_f
379
.is_instance(&io.getattr("TextIOWrapper").unwrap())
380
.unwrap_or_default()
381
{
382
if !is_utf8_encoding(&py_f)? {
383
return Err(PyPolarsErr::from(
384
polars_err!(InvalidOperation: "file encoding is not UTF-8"),
385
)
386
.into());
387
}
388
// XXX: we have to clear buffer here.
389
// Is there a better solution?
390
if write {
391
py_f.call_method0("flush")?;
392
} else {
393
py_f.call_method1("seek", (0, 1))?;
394
}
395
py_f.getattr("buffer")?
396
} else {
397
py_f
398
};
399
PyFileLikeObject::ensure_requirements(&py_f, !write, write, !write)?;
400
let expects_str = py_f.is_instance(&io.getattr("TextIOBase").unwrap())?;
401
let has_flush = py_f
402
.getattr_opt("flush")?
403
.is_some_and(|flush| flush.is_callable());
404
let f = PyFileLikeObject::new(py_f.unbind(), expects_str, has_flush);
405
Ok((EitherRustPythonFile::Py(f), None))
406
}
407
408
pub(crate) fn get_python_scan_source_input(
409
py_f: Py<PyAny>,
410
write: bool,
411
) -> PyResult<PythonScanSourceInput> {
412
Python::attach(|py| {
413
let py_f = py_f.into_bound(py);
414
415
// CPython has some internal tricks that means much of the time
416
// BytesIO.getvalue() involves no memory copying, unlike
417
// BytesIO.read(). So we want to handle BytesIO specially in order
418
// to save memory.
419
let py_f = read_if_bytesio(py_f);
420
421
// If the pyobject is a `bytes` class
422
if let Ok(b) = py_f.cast::<PyBytes>() {
423
// SAFETY: we keep the underlying python object alive.
424
let slice = b.as_bytes();
425
let owner = b.clone().unbind();
426
let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
427
let buffer = Buffer::from_storage(ss);
428
return Ok(PythonScanSourceInput::Buffer(buffer));
429
}
430
431
if let Ok(s) = py_f.extract::<Cow<str>>() {
432
let file_path = PlRefPath::try_from_path(resolve_homedir(s.as_ref()).as_ref())
433
.map_err(to_py_err)?;
434
435
Ok(PythonScanSourceInput::Path(file_path))
436
} else {
437
Ok(try_get_pyfile(py, py_f, write)?.0.into_scan_source_input())
438
}
439
})
440
}
441
442
fn get_either_buffer_or_path(
443
py_f: Py<PyAny>,
444
write: bool,
445
) -> PyResult<(EitherRustPythonFile, Option<PathBuf>)> {
446
Python::attach(|py| {
447
let py_f = py_f.into_bound(py);
448
if let Ok(s) = py_f.extract::<Cow<str>>() {
449
let file_path = resolve_homedir(s.as_ref());
450
let f = if write {
451
create_file(&file_path).map_err(PyPolarsErr::from)?
452
} else {
453
polars_utils::open_file(&file_path).map_err(PyPolarsErr::from)?
454
};
455
Ok((
456
EitherRustPythonFile::Rust(f.into()),
457
Some(file_path.into_owned()),
458
))
459
} else {
460
try_get_pyfile(py, py_f, write)
461
}
462
})
463
}
464
465
///
466
/// # Arguments
467
/// * `write` - open for writing; will truncate existing file and create new file if not.
468
pub(crate) fn get_either_file(py_f: Py<PyAny>, write: bool) -> PyResult<EitherRustPythonFile> {
469
Ok(get_either_buffer_or_path(py_f, write)?.0)
470
}
471
472
pub(crate) fn get_file_like(f: Py<PyAny>, truncate: bool) -> PyResult<Box<dyn FileLike>> {
473
Ok(get_either_file(f, truncate)?.into_dyn())
474
}
475
476
/// If the give file-like is a BytesIO, read its contents in a memory-efficient
477
/// way.
478
fn read_if_bytesio(py_f: Bound<PyAny>) -> Bound<PyAny> {
479
let bytes_io = py_f.py().import("io").unwrap().getattr("BytesIO").unwrap();
480
if py_f.is_instance(&bytes_io).unwrap() {
481
// Note that BytesIO has some memory optimizations ensuring that much of
482
// the time getvalue() doesn't need to copy the underlying data:
483
let Ok(bytes) = py_f.call_method0("getvalue") else {
484
return py_f;
485
};
486
return bytes;
487
}
488
py_f
489
}
490
491
/// Create reader from PyBytes or a file-like object.
492
pub(crate) fn get_mmap_bytes_reader(py_f: &Bound<PyAny>) -> PyResult<Box<dyn MmapBytesReader>> {
493
get_mmap_bytes_reader_and_path(py_f).map(|t| t.0)
494
}
495
496
pub(crate) fn get_mmap_bytes_reader_and_path(
497
py_f: &Bound<PyAny>,
498
) -> PyResult<(Box<dyn MmapBytesReader>, Option<PathBuf>)> {
499
let py_f = read_if_bytesio(py_f.clone());
500
501
// bytes object
502
if let Ok(bytes) = py_f.cast::<PyBytes>() {
503
// SAFETY: we keep the underlying python object alive.
504
let slice = bytes.as_bytes();
505
let owner = bytes.clone().unbind();
506
let ss = unsafe { SharedStorage::from_slice_with_owner(slice, owner) };
507
Ok((Box::new(Cursor::new(Buffer::from_storage(ss))), None))
508
}
509
// string so read file
510
else {
511
match get_either_buffer_or_path(py_f.to_owned().unbind(), false)? {
512
(EitherRustPythonFile::Rust(f), path) => Ok((Box::new(f), path)),
513
(EitherRustPythonFile::Py(f), path) => Ok((Box::new(Cursor::new(f.to_buffer())), path)),
514
}
515
}
516
}
517
518