Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/functions/io.rs
7889 views
1
use std::io::BufReader;
2
3
#[cfg(any(feature = "ipc", feature = "parquet"))]
4
use polars::prelude::ArrowSchema;
5
use polars::prelude::CloudScheme;
6
use polars_io::cloud::CloudOptions;
7
use pyo3::prelude::*;
8
use pyo3::types::PyDict;
9
10
use crate::conversion::Wrap;
11
use crate::error::PyPolarsErr;
12
use crate::file::{EitherRustPythonFile, get_either_file};
13
14
#[cfg(feature = "ipc")]
15
#[pyfunction]
16
pub fn read_ipc_schema(py: Python<'_>, py_f: Py<PyAny>) -> PyResult<Bound<'_, PyDict>> {
17
use arrow::io::ipc::read::read_file_metadata;
18
let metadata = match get_either_file(py_f, false)? {
19
EitherRustPythonFile::Rust(r) => {
20
read_file_metadata(&mut BufReader::new(r)).map_err(PyPolarsErr::from)?
21
},
22
EitherRustPythonFile::Py(mut r) => read_file_metadata(&mut r).map_err(PyPolarsErr::from)?,
23
};
24
25
let dict = PyDict::new(py);
26
fields_to_pydict(&metadata.schema, &dict)?;
27
Ok(dict)
28
}
29
30
#[cfg(feature = "parquet")]
31
#[pyfunction]
32
pub fn read_parquet_metadata(
33
py: Python,
34
py_f: Py<PyAny>,
35
storage_options: Option<Vec<(String, String)>>,
36
credential_provider: Option<Py<PyAny>>,
37
retries: usize,
38
) -> PyResult<Bound<PyDict>> {
39
use std::io::Cursor;
40
41
use polars_error::feature_gated;
42
use polars_io::pl_async::get_runtime;
43
use polars_parquet::read::read_metadata;
44
use polars_parquet::read::schema::read_custom_key_value_metadata;
45
use polars_utils::plpath::PlPath;
46
47
use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
48
49
let metadata = match get_python_scan_source_input(py_f, false)? {
50
PythonScanSourceInput::Buffer(buf) => {
51
read_metadata(&mut Cursor::new(buf)).map_err(PyPolarsErr::from)?
52
},
53
PythonScanSourceInput::Path(p) => {
54
let cloud_options = parse_cloud_options(
55
CloudScheme::from_uri(p.to_str()),
56
storage_options,
57
credential_provider,
58
retries,
59
)?;
60
match p {
61
PlPath::Local(local) => {
62
let file = polars_utils::open_file(&local).map_err(PyPolarsErr::from)?;
63
read_metadata(&mut BufReader::new(file)).map_err(PyPolarsErr::from)?
64
},
65
PlPath::Cloud(_) => {
66
use polars::prelude::ParquetObjectStore;
67
use polars_error::PolarsResult;
68
69
feature_gated!("cloud", {
70
py.detach(|| {
71
get_runtime().block_on(async {
72
let mut reader = ParquetObjectStore::from_uri(
73
p.as_ref(),
74
cloud_options.as_ref(),
75
None,
76
)
77
.await?;
78
let result = reader.get_metadata().await?;
79
PolarsResult::Ok((**result).clone())
80
})
81
})
82
})
83
.map_err(PyPolarsErr::from)?
84
},
85
}
86
},
87
PythonScanSourceInput::File(f) => {
88
read_metadata(&mut BufReader::new(f)).map_err(PyPolarsErr::from)?
89
},
90
};
91
92
let key_value_metadata = read_custom_key_value_metadata(metadata.key_value_metadata());
93
let dict = PyDict::new(py);
94
for (key, value) in key_value_metadata.into_iter() {
95
dict.set_item(key.as_str(), value.as_str())?;
96
}
97
Ok(dict)
98
}
99
100
#[cfg(any(feature = "ipc", feature = "parquet"))]
101
fn fields_to_pydict(schema: &ArrowSchema, dict: &Bound<'_, PyDict>) -> PyResult<()> {
102
for field in schema.iter_values() {
103
let dt = Wrap(polars::prelude::DataType::from_arrow_field(field));
104
dict.set_item(field.name.as_str(), &dt)?;
105
}
106
Ok(())
107
}
108
109
#[cfg(feature = "clipboard")]
110
#[pyfunction]
111
pub fn read_clipboard_string() -> PyResult<String> {
112
use arboard;
113
let mut clipboard =
114
arboard::Clipboard::new().map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
115
let result = clipboard
116
.get_text()
117
.map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
118
Ok(result)
119
}
120
121
#[cfg(feature = "clipboard")]
122
#[pyfunction]
123
pub fn write_clipboard_string(s: &str) -> PyResult<()> {
124
use arboard;
125
let mut clipboard =
126
arboard::Clipboard::new().map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
127
clipboard
128
.set_text(s)
129
.map_err(|e| PyPolarsErr::Other(format!("{e}")))?;
130
Ok(())
131
}
132
133
pub fn parse_cloud_options(
134
cloud_scheme: Option<CloudScheme>,
135
storage_options: Option<Vec<(String, String)>>,
136
credential_provider: Option<Py<PyAny>>,
137
retries: usize,
138
) -> PyResult<Option<CloudOptions>> {
139
let result: Option<CloudOptions> = {
140
#[cfg(feature = "cloud")]
141
{
142
use polars_io::cloud::credential_provider::PlCredentialProvider;
143
144
use crate::prelude::parse_cloud_options;
145
146
let cloud_options =
147
parse_cloud_options(cloud_scheme, storage_options.unwrap_or_default())?;
148
149
Some(
150
cloud_options
151
.with_max_retries(retries)
152
.with_credential_provider(
153
credential_provider.map(PlCredentialProvider::from_python_builder),
154
),
155
)
156
}
157
158
#[cfg(not(feature = "cloud"))]
159
{
160
None
161
}
162
};
163
Ok(result)
164
}
165
166