Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/dataframe/general.rs
7889 views
1
use std::hash::BuildHasher;
2
3
use arrow::bitmap::MutableBitmap;
4
use either::Either;
5
use parking_lot::RwLock;
6
use polars::prelude::*;
7
use polars_ffi::version_0::SeriesExport;
8
use pyo3::exceptions::PyIndexError;
9
use pyo3::prelude::*;
10
use pyo3::pybacked::PyBackedStr;
11
use pyo3::types::{PyList, PyType};
12
13
use self::row_encode::{_get_rows_encoded_ca, _get_rows_encoded_ca_unordered};
14
use super::PyDataFrame;
15
use crate::PyLazyFrame;
16
use crate::conversion::Wrap;
17
use crate::error::PyPolarsErr;
18
use crate::prelude::strings_to_pl_smallstr;
19
use crate::py_modules::polars;
20
use crate::series::{PySeries, ToPySeries, ToSeries};
21
use crate::utils::EnterPolarsExt;
22
23
#[pymethods]
24
impl PyDataFrame {
25
#[new]
26
pub fn __init__(columns: Vec<PySeries>) -> PyResult<Self> {
27
let columns = columns.to_series();
28
// @scalar-opt
29
let columns = columns.into_iter().map(|s| s.into()).collect();
30
let df = DataFrame::new(columns).map_err(PyPolarsErr::from)?;
31
Ok(PyDataFrame::new(df))
32
}
33
34
pub fn estimated_size(&self) -> usize {
35
self.df.read().estimated_size()
36
}
37
38
pub fn dtype_strings(&self) -> Vec<String> {
39
self.df
40
.read()
41
.get_columns()
42
.iter()
43
.map(|s| format!("{}", s.dtype()))
44
.collect()
45
}
46
47
pub fn add(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
48
py.enter_polars_df(|| &*self.df.read() + &*s.series.read())
49
}
50
51
pub fn sub(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
52
py.enter_polars_df(|| &*self.df.read() - &*s.series.read())
53
}
54
55
pub fn mul(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
56
py.enter_polars_df(|| &*self.df.read() * &*s.series.read())
57
}
58
59
pub fn div(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
60
py.enter_polars_df(|| &*self.df.read() / &*s.series.read())
61
}
62
63
pub fn rem(&self, py: Python<'_>, s: &PySeries) -> PyResult<Self> {
64
py.enter_polars_df(|| &*self.df.read() % &*s.series.read())
65
}
66
67
pub fn add_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
68
py.enter_polars_df(|| &*self.df.read() + &*s.df.read())
69
}
70
71
pub fn sub_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
72
py.enter_polars_df(|| &*self.df.read() - &*s.df.read())
73
}
74
75
pub fn mul_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
76
py.enter_polars_df(|| &*self.df.read() * &*s.df.read())
77
}
78
79
pub fn div_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
80
py.enter_polars_df(|| &*self.df.read() / &*s.df.read())
81
}
82
83
pub fn rem_df(&self, py: Python<'_>, s: &Self) -> PyResult<Self> {
84
py.enter_polars_df(|| &*self.df.read() % &*s.df.read())
85
}
86
87
#[pyo3(signature = (n, with_replacement, shuffle, seed=None))]
88
pub fn sample_n(
89
&self,
90
py: Python<'_>,
91
n: &PySeries,
92
with_replacement: bool,
93
shuffle: bool,
94
seed: Option<u64>,
95
) -> PyResult<Self> {
96
py.enter_polars_df(|| {
97
self.df
98
.read()
99
.sample_n(&n.series.read(), with_replacement, shuffle, seed)
100
})
101
}
102
103
#[pyo3(signature = (frac, with_replacement, shuffle, seed=None))]
104
pub fn sample_frac(
105
&self,
106
py: Python<'_>,
107
frac: &PySeries,
108
with_replacement: bool,
109
shuffle: bool,
110
seed: Option<u64>,
111
) -> PyResult<Self> {
112
py.enter_polars_df(|| {
113
self.df
114
.read()
115
.sample_frac(&frac.series.read(), with_replacement, shuffle, seed)
116
})
117
}
118
119
pub fn rechunk(&self, py: Python) -> PyResult<Self> {
120
py.enter_polars_df(|| {
121
let mut df = self.df.read().clone();
122
df.as_single_chunk_par();
123
Ok(df)
124
})
125
}
126
127
/// Format `DataFrame` as String
128
pub fn as_str(&self) -> String {
129
format!("{:?}", self.df.read())
130
}
131
132
pub fn get_columns(&self) -> Vec<PySeries> {
133
let cols = self.df.read().get_columns().to_vec();
134
cols.to_pyseries()
135
}
136
137
/// Get column names
138
pub fn columns(&self) -> Vec<String> {
139
self.df
140
.read()
141
.get_columns()
142
.iter()
143
.map(|s| s.name().to_string())
144
.collect()
145
}
146
147
/// set column names
148
pub fn set_column_names(&self, names: Vec<PyBackedStr>) -> PyResult<()> {
149
self.df
150
.write()
151
.set_column_names(names.iter().map(|x| &**x))
152
.map_err(PyPolarsErr::from)?;
153
Ok(())
154
}
155
156
/// Get datatypes
157
pub fn dtypes<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyList>> {
158
let df = self.df.read();
159
let iter = df
160
.iter()
161
.map(|s| Wrap(s.dtype().clone()).into_pyobject(py).unwrap());
162
PyList::new(py, iter)
163
}
164
165
pub fn n_chunks(&self) -> usize {
166
self.df.read().first_col_n_chunks()
167
}
168
169
pub fn shape(&self) -> (usize, usize) {
170
self.df.read().shape()
171
}
172
173
pub fn height(&self) -> usize {
174
self.df.read().height()
175
}
176
177
pub fn width(&self) -> usize {
178
self.df.read().width()
179
}
180
181
pub fn is_empty(&self) -> bool {
182
self.df.read().is_empty()
183
}
184
185
pub fn hstack(&self, py: Python<'_>, columns: Vec<PySeries>) -> PyResult<Self> {
186
let columns = columns.to_series();
187
// @scalar-opt
188
let columns = columns.into_iter().map(Into::into).collect::<Vec<_>>();
189
py.enter_polars_df(|| self.df.read().hstack(&columns))
190
}
191
192
pub fn hstack_mut(&self, py: Python<'_>, columns: Vec<PySeries>) -> PyResult<()> {
193
let columns = columns.to_series();
194
// @scalar-opt
195
let columns = columns.into_iter().map(Into::into).collect::<Vec<_>>();
196
py.enter_polars(|| self.df.write().hstack_mut(&columns).map(drop))?;
197
Ok(())
198
}
199
200
pub fn vstack(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<Self> {
201
py.enter_polars_df(|| self.df.read().vstack(&other.df.read()))
202
}
203
204
pub fn vstack_mut(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<()> {
205
py.enter_polars(|| {
206
// Prevent self-vstack deadlocks.
207
let other = other.df.read().clone();
208
self.df.write().vstack_mut(&other)?;
209
PolarsResult::Ok(())
210
})?;
211
Ok(())
212
}
213
214
pub fn extend(&self, py: Python<'_>, other: &PyDataFrame) -> PyResult<()> {
215
py.enter_polars(|| {
216
// Prevent self-extend deadlocks.
217
let other = other.df.read().clone();
218
self.df.write().extend(&other)
219
})?;
220
Ok(())
221
}
222
223
pub fn drop_in_place(&self, name: &str) -> PyResult<PySeries> {
224
let s = self
225
.df
226
.write()
227
.drop_in_place(name)
228
.map_err(PyPolarsErr::from)?;
229
let s = s.take_materialized_series();
230
Ok(PySeries::from(s))
231
}
232
233
pub fn to_series(&self, index: isize) -> PyResult<PySeries> {
234
let df = &self.df.read();
235
236
let index_adjusted = if index < 0 {
237
df.width().checked_sub(index.unsigned_abs())
238
} else {
239
Some(usize::try_from(index).unwrap())
240
};
241
242
let s = index_adjusted.and_then(|i| df.select_at_idx(i));
243
match s {
244
Some(s) => Ok(PySeries::new(s.as_materialized_series().clone())),
245
None => Err(PyIndexError::new_err(
246
polars_err!(oob = index, df.width()).to_string(),
247
)),
248
}
249
}
250
251
pub fn get_column_index(&self, name: &str) -> PyResult<usize> {
252
Ok(self
253
.df
254
.read()
255
.try_get_column_index(name)
256
.map_err(PyPolarsErr::from)?)
257
}
258
259
pub fn get_column(&self, name: &str) -> PyResult<PySeries> {
260
let series = self
261
.df
262
.read()
263
.column(name)
264
.map(|s| PySeries::new(s.as_materialized_series().clone()))
265
.map_err(PyPolarsErr::from)?;
266
Ok(series)
267
}
268
269
pub fn select(&self, py: Python<'_>, columns: Vec<PyBackedStr>) -> PyResult<Self> {
270
py.enter_polars_df(|| self.df.read().select(columns.iter().map(|x| &**x)))
271
}
272
273
pub fn gather(&self, py: Python<'_>, indices: Wrap<Vec<IdxSize>>) -> PyResult<Self> {
274
let indices = indices.0;
275
let indices = IdxCa::from_vec("".into(), indices);
276
py.enter_polars_df(|| self.df.read().take(&indices))
277
}
278
279
pub fn gather_with_series(&self, py: Python<'_>, indices: &PySeries) -> PyResult<Self> {
280
let idx_s = indices.series.read();
281
let indices = idx_s.idx().map_err(PyPolarsErr::from)?;
282
py.enter_polars_df(|| self.df.read().take(indices))
283
}
284
285
pub fn replace(&self, column: &str, new_col: PySeries) -> PyResult<()> {
286
self.df
287
.write()
288
.replace(column, new_col.series.into_inner())
289
.map_err(PyPolarsErr::from)?;
290
Ok(())
291
}
292
293
pub fn replace_column(&self, index: usize, new_column: PySeries) -> PyResult<()> {
294
self.df
295
.write()
296
.replace_column(index, new_column.series.into_inner())
297
.map_err(PyPolarsErr::from)?;
298
Ok(())
299
}
300
301
pub fn insert_column(&self, index: usize, column: PySeries) -> PyResult<()> {
302
self.df
303
.write()
304
.insert_column(index, column.series.into_inner())
305
.map_err(PyPolarsErr::from)?;
306
Ok(())
307
}
308
309
#[pyo3(signature = (offset, length))]
310
pub fn slice(&self, py: Python<'_>, offset: i64, length: Option<usize>) -> PyResult<Self> {
311
py.enter_polars_df(|| {
312
let df = self.df.read();
313
Ok(df.slice(offset, length.unwrap_or_else(|| df.height())))
314
})
315
}
316
317
pub fn head(&self, py: Python<'_>, n: usize) -> PyResult<Self> {
318
py.enter_polars_df(|| Ok(self.df.read().head(Some(n))))
319
}
320
321
pub fn tail(&self, py: Python<'_>, n: usize) -> PyResult<Self> {
322
py.enter_polars_df(|| Ok(self.df.read().tail(Some(n))))
323
}
324
325
pub fn is_unique(&self, py: Python) -> PyResult<PySeries> {
326
py.enter_polars_series(|| self.df.read().is_unique())
327
}
328
329
pub fn is_duplicated(&self, py: Python) -> PyResult<PySeries> {
330
py.enter_polars_series(|| self.df.read().is_duplicated())
331
}
332
333
pub fn equals(&self, py: Python<'_>, other: &PyDataFrame, null_equal: bool) -> PyResult<bool> {
334
if null_equal {
335
py.enter_polars_ok(|| self.df.read().equals_missing(&other.df.read()))
336
} else {
337
py.enter_polars_ok(|| self.df.read().equals(&other.df.read()))
338
}
339
}
340
341
#[pyo3(signature = (name, offset=None))]
342
pub fn with_row_index(
343
&self,
344
py: Python<'_>,
345
name: &str,
346
offset: Option<IdxSize>,
347
) -> PyResult<Self> {
348
py.enter_polars_df(|| self.df.read().with_row_index(name.into(), offset))
349
}
350
351
pub fn _to_metadata(&self) -> Self {
352
Self {
353
df: RwLock::new(self.df.read()._to_metadata()),
354
}
355
}
356
357
pub fn group_by_map_groups(
358
&self,
359
py: Python<'_>,
360
by: Vec<PyBackedStr>,
361
lambda: Py<PyAny>,
362
maintain_order: bool,
363
) -> PyResult<Self> {
364
py.enter_polars_df(|| {
365
let df = self.df.read().clone(); // Clone so we can't deadlock on re-entrance from lambda.
366
let gb = if maintain_order {
367
df.group_by_stable(by.iter().map(|x| &**x))
368
} else {
369
df.group_by(by.iter().map(|x| &**x))
370
}?;
371
372
let function = move |df: DataFrame| {
373
Python::attach(|py| {
374
let pypolars = polars(py).bind(py);
375
let pydf = PyDataFrame::new(df);
376
let python_df_wrapper =
377
pypolars.getattr("wrap_df").unwrap().call1((pydf,)).unwrap();
378
379
// Call the lambda and get a python-side DataFrame wrapper.
380
let result_df_wrapper = match lambda.call1(py, (python_df_wrapper,)) {
381
Ok(pyobj) => pyobj,
382
Err(e) => panic!("UDF failed: {}", e.value(py)),
383
};
384
let py_pydf = result_df_wrapper.getattr(py, "_df").expect(
385
"Could not get DataFrame attribute '_df'. Make sure that you return a DataFrame object.",
386
);
387
388
let pydf = py_pydf.extract::<PyDataFrame>(py).unwrap();
389
Ok(pydf.df.into_inner())
390
})
391
};
392
393
gb.apply(function)
394
})
395
}
396
397
#[allow(clippy::should_implement_trait)]
398
pub fn clone(&self) -> Self {
399
Clone::clone(self)
400
}
401
402
#[cfg(feature = "pivot")]
403
#[pyo3(signature = (on, index, value_name=None, variable_name=None))]
404
pub fn unpivot(
405
&self,
406
py: Python<'_>,
407
on: Option<Vec<PyBackedStr>>,
408
index: Vec<PyBackedStr>,
409
value_name: Option<&str>,
410
variable_name: Option<&str>,
411
) -> PyResult<Self> {
412
use polars_ops::unpivot::UnpivotDF;
413
let args = UnpivotArgsIR::new(
414
self.df.read().get_column_names_owned(),
415
on.map(strings_to_pl_smallstr),
416
strings_to_pl_smallstr(index),
417
value_name.map(|s| s.into()),
418
variable_name.map(|s| s.into()),
419
);
420
421
py.enter_polars_df(|| self.df.read().unpivot2(args))
422
}
423
424
pub fn partition_by(
425
&self,
426
py: Python<'_>,
427
by: Vec<String>,
428
maintain_order: bool,
429
include_key: bool,
430
) -> PyResult<Vec<Self>> {
431
let out = py.enter_polars(|| {
432
if maintain_order {
433
self.df.read().partition_by_stable(by, include_key)
434
} else {
435
self.df.read().partition_by(by, include_key)
436
}
437
})?;
438
439
Ok(out.into_iter().map(PyDataFrame::from).collect())
440
}
441
442
pub fn lazy(&self) -> PyLazyFrame {
443
self.df.read().clone().lazy().into()
444
}
445
446
#[pyo3(signature = (columns, separator, drop_first, drop_nulls))]
447
pub fn to_dummies(
448
&self,
449
py: Python<'_>,
450
columns: Option<Vec<String>>,
451
separator: Option<&str>,
452
drop_first: bool,
453
drop_nulls: bool,
454
) -> PyResult<Self> {
455
py.enter_polars_df(|| match columns {
456
Some(cols) => self.df.read().columns_to_dummies(
457
cols.iter().map(|x| x as &str).collect(),
458
separator,
459
drop_first,
460
drop_nulls,
461
),
462
None => self.df.read().to_dummies(separator, drop_first, drop_nulls),
463
})
464
}
465
466
pub fn null_count(&self, py: Python) -> PyResult<Self> {
467
py.enter_polars_df(|| Ok(self.df.read().null_count()))
468
}
469
470
pub fn shrink_to_fit(&self, py: Python) -> PyResult<()> {
471
py.enter_polars_ok(|| self.df.write().shrink_to_fit())
472
}
473
474
pub fn hash_rows(
475
&self,
476
py: Python<'_>,
477
k0: u64,
478
k1: u64,
479
k2: u64,
480
k3: u64,
481
) -> PyResult<PySeries> {
482
// TODO: don't expose all these seeds.
483
let seed = PlFixedStateQuality::default().hash_one((k0, k1, k2, k3));
484
let hb = PlSeedableRandomStateQuality::seed_from_u64(seed);
485
py.enter_polars_series(|| self.df.write().hash_rows(Some(hb)))
486
}
487
488
#[pyo3(signature = (keep_names_as, column_names))]
489
pub fn transpose(
490
&self,
491
py: Python<'_>,
492
keep_names_as: Option<&str>,
493
column_names: &Bound<PyAny>,
494
) -> PyResult<Self> {
495
let new_col_names = if let Ok(name) = column_names.extract::<Vec<String>>() {
496
Some(Either::Right(name))
497
} else if let Ok(name) = column_names.extract::<String>() {
498
Some(Either::Left(name))
499
} else {
500
None
501
};
502
py.enter_polars_df(|| self.df.write().transpose(keep_names_as, new_col_names))
503
}
504
505
pub fn upsample(
506
&self,
507
py: Python<'_>,
508
by: Vec<String>,
509
index_column: &str,
510
every: &str,
511
stable: bool,
512
) -> PyResult<Self> {
513
let every = Duration::try_parse(every).map_err(PyPolarsErr::from)?;
514
py.enter_polars_df(|| {
515
if stable {
516
self.df.read().upsample_stable(by, index_column, every)
517
} else {
518
self.df.read().upsample(by, index_column, every)
519
}
520
})
521
}
522
523
pub fn to_struct(
524
&self,
525
py: Python<'_>,
526
name: &str,
527
invalid_indices: Vec<usize>,
528
) -> PyResult<PySeries> {
529
py.enter_polars_series(|| {
530
let mut ca = self.df.read().clone().into_struct(name.into());
531
532
if !invalid_indices.is_empty() {
533
let mut validity = MutableBitmap::with_capacity(ca.len());
534
validity.extend_constant(ca.len(), true);
535
for i in invalid_indices {
536
validity.set(i, false);
537
}
538
ca.rechunk_mut();
539
Ok(ca.with_outer_validity(Some(validity.freeze())))
540
} else {
541
Ok(ca)
542
}
543
})
544
}
545
546
pub fn clear(&self, py: Python) -> PyResult<Self> {
547
py.enter_polars_df(|| Ok(self.df.read().clear()))
548
}
549
550
/// Export the columns via polars-ffi
551
/// # Safety
552
/// Needs a preallocated *mut SeriesExport that has allocated space for n_columns.
553
pub unsafe fn _export_columns(&self, location: usize) {
554
use polars_ffi::version_0::export_column;
555
556
let df = self.df.read();
557
let cols = df.get_columns();
558
559
let location = location as *mut SeriesExport;
560
561
for (i, col) in cols.iter().enumerate() {
562
let e = export_column(col);
563
// SAFETY:
564
// Caller should ensure address is allocated.
565
// Be careful not to drop `e` here as that should be dropped by the ffi consumer
566
unsafe { core::ptr::write(location.add(i), e) };
567
}
568
}
569
570
/// Import [`Self`] via polars-ffi
571
/// # Safety
572
/// [`location`] should be an address that contains [`width`] properly initialized
573
/// [`SeriesExport`]s
574
#[classmethod]
575
pub unsafe fn _import_columns(
576
_cls: &Bound<PyType>,
577
location: usize,
578
width: usize,
579
) -> PyResult<Self> {
580
use polars_ffi::version_0::import_df;
581
582
let location = location as *mut SeriesExport;
583
584
let df = unsafe { import_df(location, width) }.map_err(PyPolarsErr::from)?;
585
Ok(PyDataFrame::from(df))
586
}
587
588
/// Internal utility function to allow direct access to the row encoding from python.
589
#[pyo3(signature = (opts))]
590
fn _row_encode(&self, py: Python<'_>, opts: Vec<(bool, bool, bool)>) -> PyResult<PySeries> {
591
py.enter_polars_series(|| {
592
let name = PlSmallStr::from_static("row_enc");
593
let is_unordered = opts.first().is_some_and(|(_, _, v)| *v);
594
595
let ca = if is_unordered {
596
_get_rows_encoded_ca_unordered(name, self.df.read().get_columns())
597
} else {
598
let descending = opts.iter().map(|(v, _, _)| *v).collect::<Vec<_>>();
599
let nulls_last = opts.iter().map(|(_, v, _)| *v).collect::<Vec<_>>();
600
601
_get_rows_encoded_ca(
602
name,
603
self.df.read().get_columns(),
604
descending.as_slice(),
605
nulls_last.as_slice(),
606
)
607
}?;
608
609
Ok(ca)
610
})
611
}
612
}
613
614