Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/dataframe/construction.rs
7889 views
1
use polars::frame::row::{Row, rows_to_schema_supertypes, rows_to_supertypes};
2
use polars::prelude::*;
3
use pyo3::prelude::*;
4
use pyo3::types::{PyDict, PyMapping, PyString};
5
6
use super::PyDataFrame;
7
use crate::conversion::any_value::py_object_to_any_value;
8
use crate::conversion::{Wrap, vec_extract_wrapped};
9
use crate::error::PyPolarsErr;
10
use crate::interop;
11
use crate::utils::EnterPolarsExt;
12
13
#[pymethods]
14
impl PyDataFrame {
15
#[staticmethod]
16
#[pyo3(signature = (data, schema=None, infer_schema_length=None))]
17
pub fn from_rows(
18
py: Python<'_>,
19
data: Vec<Wrap<Row>>,
20
schema: Option<Wrap<Schema>>,
21
infer_schema_length: Option<usize>,
22
) -> PyResult<Self> {
23
let data = vec_extract_wrapped(data);
24
let schema = schema.map(|wrap| wrap.0);
25
py.enter_polars(move || finish_from_rows(data, schema, None, infer_schema_length))
26
}
27
28
#[staticmethod]
29
#[pyo3(signature = (data, schema=None, schema_overrides=None, strict=true, infer_schema_length=None))]
30
pub fn from_dicts(
31
py: Python<'_>,
32
data: &Bound<PyAny>,
33
schema: Option<Wrap<Schema>>,
34
schema_overrides: Option<Wrap<Schema>>,
35
strict: bool,
36
infer_schema_length: Option<usize>,
37
) -> PyResult<Self> {
38
let schema = schema.map(|wrap| wrap.0);
39
let schema_overrides = schema_overrides.map(|wrap| wrap.0);
40
41
// determine row extraction strategy from the first item:
42
// PyDict (faster), or PyMapping (more generic, slower)
43
let from_mapping = data.len()? > 0 && {
44
let mut iter = data.try_iter()?;
45
loop {
46
match iter.next() {
47
Some(Ok(item)) if !item.is_none() => break !item.is_instance_of::<PyDict>(),
48
Some(Err(e)) => return Err(e),
49
Some(_) => continue,
50
None => break false,
51
}
52
}
53
};
54
55
// read (or infer) field names, then extract row values
56
let names = get_schema_names(data, schema.as_ref(), infer_schema_length, from_mapping)?;
57
let rows = if from_mapping {
58
mappings_to_rows(data, &names, strict)?
59
} else {
60
dicts_to_rows(data, &names, strict)?
61
};
62
63
let schema = schema.or_else(|| {
64
Some(columns_names_to_empty_schema(
65
names.iter().map(String::as_str),
66
))
67
});
68
py.enter_polars(move || {
69
finish_from_rows(rows, schema, schema_overrides, infer_schema_length)
70
})
71
}
72
73
#[staticmethod]
74
pub fn from_arrow_record_batches(
75
py: Python<'_>,
76
rb: Vec<Bound<PyAny>>,
77
schema: Bound<PyAny>,
78
) -> PyResult<Self> {
79
let df = interop::arrow::to_rust::to_rust_df(py, &rb, schema)?;
80
Ok(Self::from(df))
81
}
82
}
83
84
fn finish_from_rows(
85
rows: Vec<Row>,
86
schema: Option<Schema>,
87
schema_overrides: Option<Schema>,
88
infer_schema_length: Option<usize>,
89
) -> PyResult<PyDataFrame> {
90
let schema = if let Some(mut schema) = schema {
91
resolve_schema_overrides(&mut schema, schema_overrides);
92
update_schema_from_rows(&mut schema, &rows, infer_schema_length)?;
93
schema
94
} else {
95
rows_to_schema_supertypes(&rows, infer_schema_length).map_err(PyPolarsErr::from)?
96
};
97
98
let df = DataFrame::from_rows_and_schema(&rows, &schema).map_err(PyPolarsErr::from)?;
99
Ok(df.into())
100
}
101
102
fn update_schema_from_rows(
103
schema: &mut Schema,
104
rows: &[Row],
105
infer_schema_length: Option<usize>,
106
) -> PyResult<()> {
107
let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known());
108
if schema_is_complete {
109
return Ok(());
110
}
111
112
// TODO: Only infer dtypes for columns with an unknown dtype
113
let inferred_dtypes =
114
rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?;
115
let inferred_dtypes_slice = inferred_dtypes.as_slice();
116
117
for (i, dtype) in schema.iter_values_mut().enumerate() {
118
if !dtype.is_known() {
119
*dtype = inferred_dtypes_slice.get(i).ok_or_else(|| {
120
polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data")
121
})
122
.map_err(PyPolarsErr::from)?
123
.clone();
124
}
125
}
126
Ok(())
127
}
128
129
/// Override the data type of certain schema fields.
130
///
131
/// Overrides for nonexistent columns are ignored.
132
fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option<Schema>) {
133
if let Some(overrides) = schema_overrides {
134
for (name, dtype) in overrides.into_iter() {
135
schema.set_dtype(name.as_str(), dtype);
136
}
137
}
138
}
139
140
fn columns_names_to_empty_schema<'a, I>(column_names: I) -> Schema
141
where
142
I: IntoIterator<Item = &'a str>,
143
{
144
let fields = column_names
145
.into_iter()
146
.map(|c| Field::new(c.into(), DataType::Unknown(Default::default())));
147
Schema::from_iter(fields)
148
}
149
150
fn dicts_to_rows(
151
data: &Bound<'_, PyAny>,
152
names: &[String],
153
strict: bool,
154
) -> PyResult<Vec<Row<'static>>> {
155
let py = data.py();
156
let mut rows = Vec::with_capacity(data.len()?);
157
let null_row = Row::new(vec![AnyValue::Null; names.len()]);
158
159
// pre-convert keys/names so we don't repeatedly create them in the loop
160
let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
161
162
for d in data.try_iter()? {
163
let d = d?;
164
if d.is_none() {
165
rows.push(null_row.clone())
166
} else {
167
let d = d.downcast::<PyDict>()?;
168
let mut row = Vec::with_capacity(names.len());
169
for k in &py_keys {
170
let val = match d.get_item(k)? {
171
None => AnyValue::Null,
172
Some(py_val) => py_object_to_any_value(&py_val.as_borrowed(), strict, true)?,
173
};
174
row.push(val)
175
}
176
rows.push(Row(row))
177
}
178
}
179
Ok(rows)
180
}
181
182
fn mappings_to_rows(
183
data: &Bound<'_, PyAny>,
184
names: &[String],
185
strict: bool,
186
) -> PyResult<Vec<Row<'static>>> {
187
let py = data.py();
188
let mut rows = Vec::with_capacity(data.len()?);
189
let null_row = Row::new(vec![AnyValue::Null; names.len()]);
190
191
// pre-convert keys/names so we don't repeatedly create them in the loop
192
let py_keys: Vec<Py<PyString>> = names.iter().map(|k| PyString::new(py, k).into()).collect();
193
194
for d in data.try_iter()? {
195
let d = d?;
196
if d.is_none() {
197
rows.push(null_row.clone())
198
} else {
199
let d = d.downcast::<PyMapping>()?;
200
let mut row = Vec::with_capacity(names.len());
201
for k in &py_keys {
202
let py_val = d.get_item(k)?;
203
let val = if py_val.is_none() {
204
AnyValue::Null
205
} else {
206
py_object_to_any_value(&py_val, strict, true)?
207
};
208
row.push(val)
209
}
210
rows.push(Row(row))
211
}
212
}
213
Ok(rows)
214
}
215
216
/// Either read the given schema, or infer the schema names from the data.
217
fn get_schema_names(
218
data: &Bound<PyAny>,
219
schema: Option<&Schema>,
220
infer_schema_length: Option<usize>,
221
from_mapping: bool,
222
) -> PyResult<Vec<String>> {
223
if let Some(schema) = schema {
224
Ok(schema.iter_names().map(|n| n.to_string()).collect())
225
} else {
226
let data_len = data.len()?;
227
let infer_schema_length = infer_schema_length
228
.map(|n| std::cmp::max(1, n))
229
.unwrap_or(data_len);
230
231
if from_mapping {
232
infer_schema_names_from_mapping_data(data, infer_schema_length)
233
} else {
234
infer_schema_names_from_dict_data(data, infer_schema_length)
235
}
236
}
237
}
238
239
/// Infer schema names from an iterable of dictionaries.
240
///
241
/// The resulting schema order is determined by the order
242
/// in which the names are encountered in the data.
243
fn infer_schema_names_from_dict_data(
244
data: &Bound<PyAny>,
245
infer_schema_length: usize,
246
) -> PyResult<Vec<String>> {
247
let mut names = PlIndexSet::new();
248
for d in data.try_iter()?.take(infer_schema_length) {
249
let d = d?;
250
if !d.is_none() {
251
let d = d.downcast::<PyDict>()?;
252
let keys = d.keys().iter();
253
for name in keys {
254
let name = name.extract::<String>()?;
255
names.insert(name);
256
}
257
}
258
}
259
Ok(names.into_iter().collect())
260
}
261
262
/// Infer schema names from an iterable of mapping objects.
263
///
264
/// The resulting schema order is determined by the order
265
/// in which the names are encountered in the data.
266
fn infer_schema_names_from_mapping_data(
267
data: &Bound<PyAny>,
268
infer_schema_length: usize,
269
) -> PyResult<Vec<String>> {
270
let mut names = PlIndexSet::new();
271
for d in data.try_iter()?.take(infer_schema_length) {
272
let d = d?;
273
if !d.is_none() {
274
let d = d.downcast::<PyMapping>()?;
275
let keys = d.keys()?;
276
for name in keys {
277
let name = name.extract::<String>()?;
278
names.insert(name);
279
}
280
}
281
}
282
Ok(names.into_iter().collect())
283
}
284
285