Path: blob/main/crates/polars-python/src/dataframe/io.rs
7889 views
use std::io::BufWriter;1use std::num::NonZeroUsize;2use std::sync::Arc;34use polars::io::RowIndex;5#[cfg(feature = "avro")]6use polars::io::avro::AvroCompression;7use polars::prelude::*;8use pyo3::prelude::*;9use pyo3::pybacked::PyBackedStr;1011use super::PyDataFrame;12use crate::conversion::Wrap;13use crate::file::{get_file_like, get_mmap_bytes_reader, get_mmap_bytes_reader_and_path};14use crate::prelude::PyCompatLevel;15use crate::utils::EnterPolarsExt;1617#[pymethods]18impl PyDataFrame {19#[staticmethod]20#[cfg(feature = "csv")]21#[pyo3(signature = (22py_f, infer_schema_length, chunk_size, has_header, ignore_errors, n_rows,23skip_rows, skip_lines, projection, separator, rechunk, columns, encoding, n_threads, path,24overwrite_dtype, overwrite_dtype_slice, low_memory, comment_prefix, quote_char,25null_values, missing_utf8_is_empty_string, try_parse_dates, skip_rows_after_header,26row_index, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, schema)27)]28pub fn read_csv(29py: Python<'_>,30py_f: Bound<PyAny>,31infer_schema_length: Option<usize>,32chunk_size: usize,33has_header: bool,34ignore_errors: bool,35n_rows: Option<usize>,36skip_rows: usize,37skip_lines: usize,38projection: Option<Vec<usize>>,39separator: &str,40rechunk: bool,41columns: Option<Vec<String>>,42encoding: Wrap<CsvEncoding>,43n_threads: Option<usize>,44path: Option<String>,45overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,46overwrite_dtype_slice: Option<Vec<Wrap<DataType>>>,47low_memory: bool,48comment_prefix: Option<&str>,49quote_char: Option<&str>,50null_values: Option<Wrap<NullValues>>,51missing_utf8_is_empty_string: bool,52try_parse_dates: bool,53skip_rows_after_header: usize,54row_index: Option<(String, IdxSize)>,55eol_char: &str,56raise_if_empty: bool,57truncate_ragged_lines: bool,58decimal_comma: bool,59schema: Option<Wrap<Schema>>,60) -> PyResult<Self> {61let null_values = null_values.map(|w| w.0);62let eol_char = eol_char.as_bytes()[0];63let row_index = row_index.map(|(name, offset)| RowIndex {64name: name.into(),65offset,66});67let quote_char = quote_char.and_then(|s| s.as_bytes().first().copied());6869let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {70overwrite_dtype71.iter()72.map(|(name, dtype)| {73let dtype = dtype.0.clone();74Field::new((&**name).into(), dtype)75})76.collect::<Schema>()77});7879let overwrite_dtype_slice = overwrite_dtype_slice.map(|overwrite_dtype| {80overwrite_dtype81.iter()82.map(|dt| dt.0.clone())83.collect::<Vec<_>>()84});8586let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;87py.enter_polars_df(move || {88CsvReadOptions::default()89.with_path(path)90.with_infer_schema_length(infer_schema_length)91.with_has_header(has_header)92.with_n_rows(n_rows)93.with_skip_rows(skip_rows)94.with_skip_lines(skip_lines)95.with_ignore_errors(ignore_errors)96.with_projection(projection.map(Arc::new))97.with_rechunk(rechunk)98.with_chunk_size(chunk_size)99.with_columns(columns.map(|x| x.into_iter().map(|x| x.into()).collect()))100.with_n_threads(n_threads)101.with_schema_overwrite(overwrite_dtype.map(Arc::new))102.with_dtype_overwrite(overwrite_dtype_slice.map(Arc::new))103.with_schema(schema.map(|schema| Arc::new(schema.0)))104.with_low_memory(low_memory)105.with_skip_rows_after_header(skip_rows_after_header)106.with_row_index(row_index)107.with_raise_if_empty(raise_if_empty)108.with_parse_options(109CsvParseOptions::default()110.with_separator(separator.as_bytes()[0])111.with_encoding(encoding.0)112.with_missing_is_null(!missing_utf8_is_empty_string)113.with_comment_prefix(comment_prefix)114.with_null_values(null_values)115.with_try_parse_dates(try_parse_dates)116.with_quote_char(quote_char)117.with_eol_char(eol_char)118.with_truncate_ragged_lines(truncate_ragged_lines)119.with_decimal_comma(decimal_comma),120)121.into_reader_with_file_handle(mmap_bytes_r)122.finish()123})124}125126#[staticmethod]127#[cfg(feature = "json")]128#[pyo3(signature = (py_f, infer_schema_length, schema, schema_overrides))]129pub fn read_json(130py: Python<'_>,131py_f: Bound<PyAny>,132infer_schema_length: Option<usize>,133schema: Option<Wrap<Schema>>,134schema_overrides: Option<Wrap<Schema>>,135) -> PyResult<Self> {136assert!(infer_schema_length != Some(0));137let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;138139py.enter_polars_df(move || {140let mut builder = JsonReader::new(mmap_bytes_r)141.with_json_format(JsonFormat::Json)142.infer_schema_len(infer_schema_length.and_then(NonZeroUsize::new));143144if let Some(schema) = schema {145builder = builder.with_schema(Arc::new(schema.0));146}147148if let Some(schema) = schema_overrides.as_ref() {149builder = builder.with_schema_overwrite(&schema.0);150}151152builder.finish()153})154}155156#[staticmethod]157#[cfg(feature = "ipc")]158#[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))]159pub fn read_ipc(160py: Python<'_>,161py_f: Bound<PyAny>,162columns: Option<Vec<String>>,163projection: Option<Vec<usize>>,164n_rows: Option<usize>,165row_index: Option<(String, IdxSize)>,166memory_map: bool,167) -> PyResult<Self> {168let row_index = row_index.map(|(name, offset)| RowIndex {169name: name.into(),170offset,171});172let (mmap_bytes_r, mmap_path) = get_mmap_bytes_reader_and_path(&py_f)?;173174let mmap_path = if memory_map { mmap_path } else { None };175py.enter_polars_df(move || {176IpcReader::new(mmap_bytes_r)177.with_projection(projection)178.with_columns(columns)179.with_n_rows(n_rows)180.with_row_index(row_index)181.memory_mapped(mmap_path)182.finish()183})184}185186#[staticmethod]187#[cfg(feature = "ipc_streaming")]188#[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))]189pub fn read_ipc_stream(190py: Python<'_>,191py_f: Bound<PyAny>,192columns: Option<Vec<String>>,193projection: Option<Vec<usize>>,194n_rows: Option<usize>,195row_index: Option<(String, IdxSize)>,196rechunk: bool,197) -> PyResult<Self> {198let row_index = row_index.map(|(name, offset)| RowIndex {199name: name.into(),200offset,201});202let mmap_bytes_r = get_mmap_bytes_reader(&py_f)?;203py.enter_polars_df(move || {204IpcStreamReader::new(mmap_bytes_r)205.with_projection(projection)206.with_columns(columns)207.with_n_rows(n_rows)208.with_row_index(row_index)209.set_rechunk(rechunk)210.finish()211})212}213214#[staticmethod]215#[cfg(feature = "avro")]216#[pyo3(signature = (py_f, columns, projection, n_rows))]217pub fn read_avro(218py: Python<'_>,219py_f: Py<PyAny>,220columns: Option<Vec<String>>,221projection: Option<Vec<usize>>,222n_rows: Option<usize>,223) -> PyResult<Self> {224use polars::io::avro::AvroReader;225226let file = get_file_like(py_f, false)?;227py.enter_polars_df(move || {228AvroReader::new(file)229.with_projection(projection)230.with_columns(columns)231.with_n_rows(n_rows)232.finish()233})234}235236#[cfg(feature = "json")]237pub fn write_json(&self, py: Python<'_>, py_f: Py<PyAny>) -> PyResult<()> {238let file = BufWriter::new(get_file_like(py_f, true)?);239py.enter_polars(|| {240// TODO: Cloud support241242JsonWriter::new(file)243.with_json_format(JsonFormat::Json)244.finish(&mut self.df.write())245})246}247248#[cfg(feature = "ipc_streaming")]249pub fn write_ipc_stream(250&self,251py: Python<'_>,252py_f: Py<PyAny>,253compression: Wrap<Option<IpcCompression>>,254compat_level: PyCompatLevel,255) -> PyResult<()> {256let mut buf = get_file_like(py_f, true)?;257py.enter_polars(|| {258IpcStreamWriter::new(&mut buf)259.with_compression(compression.0)260.with_compat_level(compat_level.0)261.finish(&mut self.df.write())262})263}264265#[cfg(feature = "avro")]266#[pyo3(signature = (py_f, compression, name))]267pub fn write_avro(268&self,269py: Python<'_>,270py_f: Py<PyAny>,271compression: Wrap<Option<AvroCompression>>,272name: String,273) -> PyResult<()> {274use polars::io::avro::AvroWriter;275let mut buf = get_file_like(py_f, true)?;276py.enter_polars(|| {277AvroWriter::new(&mut buf)278.with_compression(compression.0)279.with_name(name)280.finish(&mut self.df.write())281})282}283}284285286