Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/lazyframe/general.rs
8393 views
1
use std::collections::HashMap;
2
use std::ffi::CString;
3
use std::num::NonZeroUsize;
4
5
use arrow::ffi::export_iterator;
6
use either::Either;
7
use parking_lot::Mutex;
8
use polars::io::RowIndex;
9
use polars::time::*;
10
use polars_core::prelude::*;
11
#[cfg(feature = "parquet")]
12
use polars_parquet::arrow::write::StatisticsOptions;
13
use polars_plan::dsl::ScanSources;
14
use polars_plan::plans::{AExpr, HintIR, IR, Sorted};
15
use polars_utils::arena::{Arena, Node};
16
use polars_utils::python_function::PythonObject;
17
use pyo3::exceptions::{PyTypeError, PyValueError};
18
use pyo3::prelude::*;
19
use pyo3::pybacked::PyBackedStr;
20
use pyo3::types::{PyCapsule, PyDict, PyDictMethods, PyList};
21
22
use super::{PyLazyFrame, PyOptFlags};
23
use crate::error::PyPolarsErr;
24
use crate::expr::ToExprs;
25
use crate::expr::datatype::PyDataTypeExpr;
26
use crate::expr::selector::PySelector;
27
use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
28
#[cfg(feature = "json")]
29
use crate::io::cloud_options::OptPyCloudOptions;
30
use crate::io::scan_options::PyScanOptions;
31
use crate::io::sink_options::PySinkOptions;
32
use crate::io::sink_output::PyFileSinkDestination;
33
use crate::lazyframe::visit::NodeTraverser;
34
use crate::prelude::*;
35
use crate::utils::{EnterPolarsExt, to_py_err};
36
use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
37
38
fn pyobject_to_first_path_and_scan_sources(
39
obj: Py<PyAny>,
40
) -> PyResult<(Option<PlRefPath>, ScanSources)> {
41
use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
42
Ok(match get_python_scan_source_input(obj, false)? {
43
PythonScanSourceInput::Path(path) => (
44
Some(path.clone()),
45
ScanSources::Paths(FromIterator::from_iter([path])),
46
),
47
PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
48
PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
49
})
50
}
51
52
fn post_opt_callback(
53
lambda: &Py<PyAny>,
54
root: Node,
55
lp_arena: &mut Arena<IR>,
56
expr_arena: &mut Arena<AExpr>,
57
duration_since_start: Option<std::time::Duration>,
58
) -> PolarsResult<()> {
59
Python::attach(|py| {
60
let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
61
62
// Get a copy of the arenas.
63
let arenas = nt.get_arenas();
64
65
// Pass the node visitor which allows the python callback to replace parts of the query plan.
66
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
67
lambda
68
.call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
69
.map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
70
71
// Unpack the arenas.
72
// At this point the `nt` is useless.
73
74
std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
75
std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
76
77
Ok(())
78
})
79
}
80
81
#[pymethods]
82
#[allow(clippy::should_implement_trait)]
83
impl PyLazyFrame {
84
#[staticmethod]
85
#[cfg(feature = "json")]
86
#[allow(clippy::too_many_arguments)]
87
#[pyo3(signature = (
88
source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
89
row_index, ignore_errors, include_file_paths, cloud_options, credential_provider
90
))]
91
fn new_from_ndjson(
92
source: Option<Py<PyAny>>,
93
sources: Wrap<ScanSources>,
94
infer_schema_length: Option<usize>,
95
schema: Option<Wrap<Schema>>,
96
schema_overrides: Option<Wrap<Schema>>,
97
batch_size: Option<NonZeroUsize>,
98
n_rows: Option<usize>,
99
low_memory: bool,
100
rechunk: bool,
101
row_index: Option<(String, IdxSize)>,
102
ignore_errors: bool,
103
include_file_paths: Option<String>,
104
cloud_options: OptPyCloudOptions,
105
credential_provider: Option<Py<PyAny>>,
106
) -> PyResult<Self> {
107
let row_index = row_index.map(|(name, offset)| RowIndex {
108
name: name.into(),
109
offset,
110
});
111
112
let sources = sources.0;
113
let (first_path, sources) = match source {
114
None => (sources.first_path().cloned(), sources),
115
Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
116
};
117
118
let mut r = LazyJsonLineReader::new_with_sources(sources);
119
120
if let Some(first_path) = first_path {
121
let first_path_url = first_path.as_str();
122
123
let cloud_options = cloud_options.extract_opt_cloud_options(
124
CloudScheme::from_path(first_path_url),
125
credential_provider,
126
)?;
127
128
r = r.with_cloud_options(cloud_options);
129
};
130
131
let lf = r
132
.with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
133
.with_batch_size(batch_size)
134
.with_n_rows(n_rows)
135
.low_memory(low_memory)
136
.with_rechunk(rechunk)
137
.with_schema(schema.map(|schema| Arc::new(schema.0)))
138
.with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
139
.with_row_index(row_index)
140
.with_ignore_errors(ignore_errors)
141
.with_include_file_paths(include_file_paths.map(|x| x.into()))
142
.finish()
143
.map_err(PyPolarsErr::from)?;
144
145
Ok(lf.into())
146
}
147
148
#[staticmethod]
149
#[cfg(feature = "csv")]
150
#[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
151
low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
152
infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
153
encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
154
cloud_options, credential_provider, include_file_paths
155
)
156
)]
157
fn new_from_csv(
158
source: Option<Py<PyAny>>,
159
sources: Wrap<ScanSources>,
160
separator: &str,
161
has_header: bool,
162
ignore_errors: bool,
163
skip_rows: usize,
164
skip_lines: usize,
165
n_rows: Option<usize>,
166
cache: bool,
167
overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
168
low_memory: bool,
169
comment_prefix: Option<&str>,
170
quote_char: Option<&str>,
171
null_values: Option<Wrap<NullValues>>,
172
missing_utf8_is_empty_string: bool,
173
infer_schema_length: Option<usize>,
174
with_schema_modify: Option<Py<PyAny>>,
175
rechunk: bool,
176
skip_rows_after_header: usize,
177
encoding: Wrap<CsvEncoding>,
178
row_index: Option<(String, IdxSize)>,
179
try_parse_dates: bool,
180
eol_char: &str,
181
raise_if_empty: bool,
182
truncate_ragged_lines: bool,
183
decimal_comma: bool,
184
glob: bool,
185
schema: Option<Wrap<Schema>>,
186
cloud_options: OptPyCloudOptions,
187
credential_provider: Option<Py<PyAny>>,
188
include_file_paths: Option<String>,
189
) -> PyResult<Self> {
190
let null_values = null_values.map(|w| w.0);
191
let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
192
let separator = separator
193
.as_bytes()
194
.first()
195
.ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
196
.copied()
197
.map_err(PyPolarsErr::from)?;
198
let eol_char = eol_char
199
.as_bytes()
200
.first()
201
.ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
202
.copied()
203
.map_err(PyPolarsErr::from)?;
204
let row_index = row_index.map(|(name, offset)| RowIndex {
205
name: name.into(),
206
offset,
207
});
208
209
let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
210
overwrite_dtype
211
.into_iter()
212
.map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
213
.collect::<Schema>()
214
});
215
216
let sources = sources.0;
217
let (first_path, sources) = match source {
218
None => (sources.first_path().cloned(), sources),
219
Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
220
};
221
222
let mut r = LazyCsvReader::new_with_sources(sources);
223
224
if let Some(first_path) = first_path {
225
let first_path_url = first_path.as_str();
226
let cloud_options = cloud_options.extract_opt_cloud_options(
227
CloudScheme::from_path(first_path_url),
228
credential_provider,
229
)?;
230
r = r.with_cloud_options(cloud_options);
231
}
232
233
let mut r = r
234
.with_infer_schema_length(infer_schema_length)
235
.with_separator(separator)
236
.with_has_header(has_header)
237
.with_ignore_errors(ignore_errors)
238
.with_skip_rows(skip_rows)
239
.with_skip_lines(skip_lines)
240
.with_n_rows(n_rows)
241
.with_cache(cache)
242
.with_dtype_overwrite(overwrite_dtype.map(Arc::new))
243
.with_schema(schema.map(|schema| Arc::new(schema.0)))
244
.with_low_memory(low_memory)
245
.with_comment_prefix(comment_prefix.map(|x| x.into()))
246
.with_quote_char(quote_char)
247
.with_eol_char(eol_char)
248
.with_rechunk(rechunk)
249
.with_skip_rows_after_header(skip_rows_after_header)
250
.with_encoding(encoding.0)
251
.with_row_index(row_index)
252
.with_try_parse_dates(try_parse_dates)
253
.with_null_values(null_values)
254
.with_missing_is_null(!missing_utf8_is_empty_string)
255
.with_truncate_ragged_lines(truncate_ragged_lines)
256
.with_decimal_comma(decimal_comma)
257
.with_glob(glob)
258
.with_raise_if_empty(raise_if_empty)
259
.with_include_file_paths(include_file_paths.map(|x| x.into()));
260
261
if let Some(lambda) = with_schema_modify {
262
let f = |schema: Schema| {
263
let iter = schema.iter_names().map(|s| s.as_str());
264
Python::attach(|py| {
265
let names = PyList::new(py, iter).unwrap();
266
267
let out = lambda.call1(py, (names,)).expect("python function failed");
268
let new_names = out
269
.extract::<Vec<String>>(py)
270
.expect("python function should return List[str]");
271
polars_ensure!(new_names.len() == schema.len(),
272
ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
273
);
274
Ok(schema
275
.iter_values()
276
.zip(new_names)
277
.map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
278
.collect())
279
})
280
};
281
r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
282
}
283
284
Ok(r.finish().map_err(PyPolarsErr::from)?.into())
285
}
286
287
#[cfg(feature = "parquet")]
288
#[staticmethod]
289
#[pyo3(signature = (
290
sources, schema, scan_options, parallel, low_memory, use_statistics
291
))]
292
fn new_from_parquet(
293
sources: Wrap<ScanSources>,
294
schema: Option<Wrap<Schema>>,
295
scan_options: PyScanOptions,
296
parallel: Wrap<ParallelStrategy>,
297
low_memory: bool,
298
use_statistics: bool,
299
) -> PyResult<Self> {
300
use crate::utils::to_py_err;
301
302
let parallel = parallel.0;
303
304
let options = ParquetOptions {
305
schema: schema.map(|x| Arc::new(x.0)),
306
parallel,
307
low_memory,
308
use_statistics,
309
};
310
311
let sources = sources.0;
312
let first_path = sources.first_path();
313
314
let unified_scan_args =
315
scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
316
317
let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
318
.map_err(to_py_err)?
319
.build()
320
.into();
321
322
Ok(lf.into())
323
}
324
325
#[cfg(feature = "ipc")]
326
#[staticmethod]
327
#[pyo3(signature = (sources, record_batch_statistics, scan_options))]
328
fn new_from_ipc(
329
sources: Wrap<ScanSources>,
330
record_batch_statistics: bool,
331
scan_options: PyScanOptions,
332
) -> PyResult<Self> {
333
let options = IpcScanOptions {
334
record_batch_statistics,
335
checked: Default::default(),
336
};
337
338
let sources = sources.0;
339
let first_path = sources.first_path().cloned();
340
341
let unified_scan_args =
342
scan_options.extract_unified_scan_args(first_path.as_ref().and_then(|x| x.scheme()))?;
343
344
let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)
345
.map_err(PyPolarsErr::from)?;
346
Ok(lf.into())
347
}
348
349
#[cfg(feature = "scan_lines")]
350
#[staticmethod]
351
#[pyo3(signature = (sources, scan_options, name))]
352
fn new_from_scan_lines(
353
sources: Wrap<ScanSources>,
354
scan_options: PyScanOptions,
355
name: PyBackedStr,
356
) -> PyResult<Self> {
357
let sources = sources.0;
358
let first_path = sources.first_path();
359
360
let unified_scan_args =
361
scan_options.extract_unified_scan_args(first_path.and_then(|x| x.scheme()))?;
362
363
let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())
364
.map_err(to_py_err)?
365
.build();
366
let lf: LazyFrame = dsl.into();
367
368
Ok(lf.into())
369
}
370
371
#[staticmethod]
372
#[pyo3(signature = (
373
dataset_object
374
))]
375
fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {
376
let lf =
377
LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
378
.into();
379
380
Ok(lf)
381
}
382
383
#[staticmethod]
384
fn scan_from_python_function_arrow_schema(
385
schema: &Bound<'_, PyList>,
386
scan_fn: Py<PyAny>,
387
pyarrow: bool,
388
validate_schema: bool,
389
is_pure: bool,
390
) -> PyResult<Self> {
391
let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
392
393
Ok(LazyFrame::scan_from_python_function(
394
Either::Right(schema),
395
scan_fn,
396
pyarrow,
397
validate_schema,
398
is_pure,
399
)
400
.into())
401
}
402
403
#[staticmethod]
404
fn scan_from_python_function_pl_schema(
405
schema: Vec<(PyBackedStr, Wrap<DataType>)>,
406
scan_fn: Py<PyAny>,
407
pyarrow: bool,
408
validate_schema: bool,
409
is_pure: bool,
410
) -> PyResult<Self> {
411
let schema = Arc::new(Schema::from_iter(
412
schema
413
.into_iter()
414
.map(|(name, dt)| Field::new((&*name).into(), dt.0)),
415
));
416
Ok(LazyFrame::scan_from_python_function(
417
Either::Right(schema),
418
scan_fn,
419
pyarrow,
420
validate_schema,
421
is_pure,
422
)
423
.into())
424
}
425
426
#[staticmethod]
427
fn scan_from_python_function_schema_function(
428
schema_fn: Py<PyAny>,
429
scan_fn: Py<PyAny>,
430
validate_schema: bool,
431
is_pure: bool,
432
) -> PyResult<Self> {
433
Ok(LazyFrame::scan_from_python_function(
434
Either::Left(schema_fn),
435
scan_fn,
436
false,
437
validate_schema,
438
is_pure,
439
)
440
.into())
441
}
442
443
fn describe_plan(&self, py: Python) -> PyResult<String> {
444
py.enter_polars(|| self.ldf.read().describe_plan())
445
}
446
447
fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
448
py.enter_polars(|| self.ldf.read().describe_optimized_plan())
449
}
450
451
fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
452
py.enter_polars(|| self.ldf.read().describe_plan_tree())
453
}
454
455
fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
456
py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
457
}
458
459
fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
460
py.enter_polars(|| self.ldf.read().to_dot(optimized))
461
}
462
463
#[cfg(feature = "new_streaming")]
464
fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
465
py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
466
}
467
468
fn sort(
469
&self,
470
by_column: &str,
471
descending: bool,
472
nulls_last: bool,
473
maintain_order: bool,
474
multithreaded: bool,
475
) -> Self {
476
let ldf = self.ldf.read().clone();
477
ldf.sort(
478
[by_column],
479
SortMultipleOptions {
480
descending: vec![descending],
481
nulls_last: vec![nulls_last],
482
multithreaded,
483
maintain_order,
484
limit: None,
485
},
486
)
487
.into()
488
}
489
490
fn sort_by_exprs(
491
&self,
492
by: Vec<PyExpr>,
493
descending: Vec<bool>,
494
nulls_last: Vec<bool>,
495
maintain_order: bool,
496
multithreaded: bool,
497
) -> Self {
498
let ldf = self.ldf.read().clone();
499
let exprs = by.to_exprs();
500
ldf.sort_by_exprs(
501
exprs,
502
SortMultipleOptions {
503
descending,
504
nulls_last,
505
maintain_order,
506
multithreaded,
507
limit: None,
508
},
509
)
510
.into()
511
}
512
513
fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
514
let ldf = self.ldf.read().clone();
515
let exprs = by.to_exprs();
516
ldf.top_k(
517
k,
518
exprs,
519
SortMultipleOptions::new().with_order_descending_multi(reverse),
520
)
521
.into()
522
}
523
524
fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
525
let ldf = self.ldf.read().clone();
526
let exprs = by.to_exprs();
527
ldf.bottom_k(
528
k,
529
exprs,
530
SortMultipleOptions::new().with_order_descending_multi(reverse),
531
)
532
.into()
533
}
534
535
fn cache(&self) -> Self {
536
let ldf = self.ldf.read().clone();
537
ldf.cache().into()
538
}
539
540
#[pyo3(signature = (optflags))]
541
fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
542
let ldf = self.ldf.read().clone();
543
ldf.with_optimizations(optflags.inner.into_inner()).into()
544
}
545
546
#[pyo3(signature = (lambda_post_opt))]
547
fn profile(
548
&self,
549
py: Python<'_>,
550
lambda_post_opt: Option<Py<PyAny>>,
551
) -> PyResult<(PyDataFrame, PyDataFrame)> {
552
let (df, time_df) = py.enter_polars(|| {
553
let ldf = self.ldf.read().clone();
554
if let Some(lambda) = lambda_post_opt {
555
ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
556
post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
557
})
558
} else {
559
ldf.profile()
560
}
561
})?;
562
Ok((df.into(), time_df.into()))
563
}
564
565
#[pyo3(signature = (engine, lambda_post_opt))]
566
fn collect(
567
&self,
568
py: Python<'_>,
569
engine: Wrap<Engine>,
570
lambda_post_opt: Option<Py<PyAny>>,
571
) -> PyResult<PyDataFrame> {
572
py.enter_polars_df(|| {
573
let ldf = self.ldf.read().clone();
574
if let Some(lambda) = lambda_post_opt {
575
ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
576
post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
577
})
578
} else {
579
ldf.collect_with_engine(engine.0)
580
}
581
})
582
}
583
584
#[cfg(feature = "async")]
585
#[pyo3(signature = (engine, lambda))]
586
fn collect_with_callback(
587
&self,
588
py: Python<'_>,
589
engine: Wrap<Engine>,
590
lambda: Py<PyAny>,
591
) -> PyResult<()> {
592
py.enter_polars_ok(|| {
593
let ldf = self.ldf.read().clone();
594
595
// We use a tokio spawn_blocking here as it has a high blocking
596
// thread pool limit.
597
polars_io::pl_async::get_runtime().spawn_blocking(move || {
598
let result = ldf
599
.collect_with_engine(engine.0)
600
.map(PyDataFrame::new)
601
.map_err(PyPolarsErr::from);
602
603
Python::attach(|py| match result {
604
Ok(df) => {
605
lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
606
},
607
Err(err) => {
608
lambda
609
.call1(py, (PyErr::from(err),))
610
.map_err(|err| err.restore(py))
611
.ok();
612
},
613
});
614
});
615
})
616
}
617
618
#[cfg(feature = "async")]
619
fn collect_batches(
620
&self,
621
py: Python<'_>,
622
engine: Wrap<Engine>,
623
maintain_order: bool,
624
chunk_size: Option<NonZeroUsize>,
625
lazy: bool,
626
) -> PyResult<PyCollectBatches> {
627
py.enter_polars(|| {
628
let ldf = self.ldf.read().clone();
629
630
let collect_batches = ldf
631
.clone()
632
.collect_batches(engine.0, maintain_order, chunk_size, lazy)
633
.map_err(PyPolarsErr::from)?;
634
635
PyResult::Ok(PyCollectBatches {
636
inner: Arc::new(Mutex::new(collect_batches)),
637
ldf,
638
})
639
})
640
}
641
642
#[cfg(feature = "parquet")]
643
#[pyo3(signature = (
644
target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size,
645
metadata, arrow_schema
646
))]
647
fn sink_parquet(
648
&self,
649
py: Python<'_>,
650
target: PyFileSinkDestination,
651
sink_options: PySinkOptions,
652
compression: &str,
653
compression_level: Option<i32>,
654
statistics: Wrap<StatisticsOptions>,
655
row_group_size: Option<usize>,
656
data_page_size: Option<usize>,
657
metadata: Wrap<Option<KeyValueMetadata>>,
658
arrow_schema: Option<Wrap<ArrowSchema>>,
659
) -> PyResult<PyLazyFrame> {
660
let compression = parse_parquet_compression(compression, compression_level)?;
661
662
let options = ParquetWriteOptions {
663
compression,
664
statistics: statistics.0,
665
row_group_size,
666
data_page_size,
667
key_value_metadata: metadata.0,
668
arrow_schema: arrow_schema.map(|x| Arc::new(x.0)),
669
compat_level: None,
670
};
671
672
let target = target.extract_file_sink_destination()?;
673
let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
674
675
py.enter_polars(|| {
676
self.ldf
677
.read()
678
.clone()
679
.sink(
680
target,
681
FileWriteFormat::Parquet(Arc::new(options)),
682
unified_sink_args,
683
)
684
.into()
685
})
686
.map(Into::into)
687
.map_err(Into::into)
688
}
689
690
#[cfg(feature = "ipc")]
691
#[pyo3(signature = (
692
target, sink_options, compression, compat_level, record_batch_size, record_batch_statistics
693
))]
694
fn sink_ipc(
695
&self,
696
py: Python<'_>,
697
target: PyFileSinkDestination,
698
sink_options: PySinkOptions,
699
compression: Wrap<Option<IpcCompression>>,
700
compat_level: PyCompatLevel,
701
record_batch_size: Option<usize>,
702
record_batch_statistics: bool,
703
) -> PyResult<PyLazyFrame> {
704
let options = IpcWriterOptions {
705
compression: compression.0,
706
compat_level: compat_level.0,
707
record_batch_size,
708
record_batch_statistics,
709
};
710
711
let target = target.extract_file_sink_destination()?;
712
let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
713
714
py.enter_polars(|| {
715
self.ldf
716
.read()
717
.clone()
718
.sink(target, FileWriteFormat::Ipc(options), unified_sink_args)
719
.into()
720
})
721
.map(Into::into)
722
.map_err(Into::into)
723
}
724
725
#[cfg(feature = "csv")]
726
#[pyo3(signature = (
727
target, sink_options, include_bom, compression, compression_level, check_extension,
728
include_header, separator, line_terminator, quote_char, batch_size, datetime_format,
729
date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
730
quote_style
731
))]
732
fn sink_csv(
733
&self,
734
py: Python<'_>,
735
target: PyFileSinkDestination,
736
sink_options: PySinkOptions,
737
include_bom: bool,
738
compression: &str,
739
compression_level: Option<u32>,
740
check_extension: bool,
741
include_header: bool,
742
separator: u8,
743
line_terminator: Wrap<PlSmallStr>,
744
quote_char: u8,
745
batch_size: NonZeroUsize,
746
datetime_format: Option<Wrap<PlSmallStr>>,
747
date_format: Option<Wrap<PlSmallStr>>,
748
time_format: Option<Wrap<PlSmallStr>>,
749
float_scientific: Option<bool>,
750
float_precision: Option<usize>,
751
decimal_comma: bool,
752
null_value: Option<Wrap<PlSmallStr>>,
753
quote_style: Option<Wrap<QuoteStyle>>,
754
) -> PyResult<PyLazyFrame> {
755
let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
756
let null_value = null_value
757
.map(|x| x.0)
758
.unwrap_or(SerializeOptions::default().null);
759
760
let serialize_options = SerializeOptions {
761
date_format: date_format.map(|x| x.0),
762
time_format: time_format.map(|x| x.0),
763
datetime_format: datetime_format.map(|x| x.0),
764
float_scientific,
765
float_precision,
766
decimal_comma,
767
separator,
768
quote_char,
769
null: null_value,
770
line_terminator: line_terminator.0,
771
quote_style,
772
};
773
774
let options = CsvWriterOptions {
775
include_bom,
776
compression: ExternalCompression::try_from(compression, compression_level)
777
.map_err(PyPolarsErr::from)?,
778
check_extension,
779
include_header,
780
batch_size,
781
serialize_options: serialize_options.into(),
782
};
783
784
let target = target.extract_file_sink_destination()?;
785
let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
786
787
py.enter_polars(|| {
788
self.ldf
789
.read()
790
.clone()
791
.sink(target, FileWriteFormat::Csv(options), unified_sink_args)
792
.into()
793
})
794
.map(Into::into)
795
.map_err(Into::into)
796
}
797
798
#[allow(clippy::too_many_arguments)]
799
#[cfg(feature = "json")]
800
#[pyo3(signature = (target, compression, compression_level, check_extension, sink_options))]
801
fn sink_ndjson(
802
&self,
803
py: Python<'_>,
804
target: PyFileSinkDestination,
805
compression: &str,
806
compression_level: Option<u32>,
807
check_extension: bool,
808
sink_options: PySinkOptions,
809
) -> PyResult<PyLazyFrame> {
810
let options = NDJsonWriterOptions {
811
compression: ExternalCompression::try_from(compression, compression_level)
812
.map_err(PyPolarsErr::from)?,
813
check_extension,
814
};
815
816
let target = target.extract_file_sink_destination()?;
817
let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
818
819
py.enter_polars(|| {
820
self.ldf
821
.read()
822
.clone()
823
.sink(target, FileWriteFormat::NDJson(options), unified_sink_args)
824
.into()
825
})
826
.map(Into::into)
827
.map_err(Into::into)
828
}
829
830
#[pyo3(signature = (function, maintain_order, chunk_size))]
831
pub fn sink_batches(
832
&self,
833
py: Python<'_>,
834
function: Py<PyAny>,
835
maintain_order: bool,
836
chunk_size: Option<NonZeroUsize>,
837
) -> PyResult<PyLazyFrame> {
838
let ldf = self.ldf.read().clone();
839
py.enter_polars(|| {
840
ldf.sink_batches(
841
PlanCallback::new_python(PythonObject(function)),
842
maintain_order,
843
chunk_size,
844
)
845
})
846
.map(Into::into)
847
.map_err(Into::into)
848
}
849
850
fn filter(&self, predicate: PyExpr) -> Self {
851
self.ldf.read().clone().filter(predicate.inner).into()
852
}
853
854
fn remove(&self, predicate: PyExpr) -> Self {
855
let ldf = self.ldf.read().clone();
856
ldf.remove(predicate.inner).into()
857
}
858
859
fn select(&self, exprs: Vec<PyExpr>) -> Self {
860
let ldf = self.ldf.read().clone();
861
let exprs = exprs.to_exprs();
862
ldf.select(exprs).into()
863
}
864
865
fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
866
let ldf = self.ldf.read().clone();
867
let exprs = exprs.to_exprs();
868
ldf.select_seq(exprs).into()
869
}
870
871
fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
872
let ldf = self.ldf.read().clone();
873
let by = by.to_exprs();
874
let lazy_gb = if maintain_order {
875
ldf.group_by_stable(by)
876
} else {
877
ldf.group_by(by)
878
};
879
880
PyLazyGroupBy { lgb: Some(lazy_gb) }
881
}
882
883
fn rolling(
884
&self,
885
index_column: PyExpr,
886
period: &str,
887
offset: &str,
888
closed: Wrap<ClosedWindow>,
889
by: Vec<PyExpr>,
890
) -> PyResult<PyLazyGroupBy> {
891
let closed_window = closed.0;
892
let ldf = self.ldf.read().clone();
893
let by = by
894
.into_iter()
895
.map(|pyexpr| pyexpr.inner)
896
.collect::<Vec<_>>();
897
let lazy_gb = ldf.rolling(
898
index_column.inner,
899
by,
900
RollingGroupOptions {
901
index_column: "".into(),
902
period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
903
offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
904
closed_window,
905
},
906
);
907
908
Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
909
}
910
911
fn group_by_dynamic(
912
&self,
913
index_column: PyExpr,
914
every: &str,
915
period: &str,
916
offset: &str,
917
label: Wrap<Label>,
918
include_boundaries: bool,
919
closed: Wrap<ClosedWindow>,
920
group_by: Vec<PyExpr>,
921
start_by: Wrap<StartBy>,
922
) -> PyResult<PyLazyGroupBy> {
923
let closed_window = closed.0;
924
let group_by = group_by
925
.into_iter()
926
.map(|pyexpr| pyexpr.inner)
927
.collect::<Vec<_>>();
928
let ldf = self.ldf.read().clone();
929
let lazy_gb = ldf.group_by_dynamic(
930
index_column.inner,
931
group_by,
932
DynamicGroupOptions {
933
every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
934
period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
935
offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
936
label: label.0,
937
include_boundaries,
938
closed_window,
939
start_by: start_by.0,
940
..Default::default()
941
},
942
);
943
944
Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
945
}
946
947
fn with_context(&self, contexts: Vec<Self>) -> Self {
948
let contexts = contexts
949
.into_iter()
950
.map(|ldf| ldf.ldf.into_inner())
951
.collect::<Vec<_>>();
952
self.ldf.read().clone().with_context(contexts).into()
953
}
954
955
#[cfg(feature = "asof_join")]
956
#[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
957
fn join_asof(
958
&self,
959
other: Self,
960
left_on: PyExpr,
961
right_on: PyExpr,
962
left_by: Option<Vec<PyBackedStr>>,
963
right_by: Option<Vec<PyBackedStr>>,
964
allow_parallel: bool,
965
force_parallel: bool,
966
suffix: String,
967
strategy: Wrap<AsofStrategy>,
968
tolerance: Option<Wrap<AnyValue<'_>>>,
969
tolerance_str: Option<String>,
970
coalesce: bool,
971
allow_eq: bool,
972
check_sortedness: bool,
973
) -> PyResult<Self> {
974
let coalesce = if coalesce {
975
JoinCoalesce::CoalesceColumns
976
} else {
977
JoinCoalesce::KeepColumns
978
};
979
let ldf = self.ldf.read().clone();
980
let other = other.ldf.into_inner();
981
let left_on = left_on.inner;
982
let right_on = right_on.inner;
983
Ok(ldf
984
.join_builder()
985
.with(other)
986
.left_on([left_on])
987
.right_on([right_on])
988
.allow_parallel(allow_parallel)
989
.force_parallel(force_parallel)
990
.coalesce(coalesce)
991
.how(JoinType::AsOf(Box::new(AsOfOptions {
992
strategy: strategy.0,
993
left_by: left_by.map(strings_to_pl_smallstr),
994
right_by: right_by.map(strings_to_pl_smallstr),
995
tolerance: tolerance.map(|t| {
996
let av = t.0.into_static();
997
let dtype = av.dtype();
998
Scalar::new(dtype, av)
999
}),
1000
tolerance_str: tolerance_str.map(|s| s.into()),
1001
allow_eq,
1002
check_sortedness,
1003
})))
1004
.suffix(suffix)
1005
.finish()
1006
.into())
1007
}
1008
1009
#[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1010
fn join(
1011
&self,
1012
other: Self,
1013
left_on: Vec<PyExpr>,
1014
right_on: Vec<PyExpr>,
1015
allow_parallel: bool,
1016
force_parallel: bool,
1017
nulls_equal: bool,
1018
how: Wrap<JoinType>,
1019
suffix: String,
1020
validate: Wrap<JoinValidation>,
1021
maintain_order: Wrap<MaintainOrderJoin>,
1022
coalesce: Option<bool>,
1023
) -> PyResult<Self> {
1024
let coalesce = match coalesce {
1025
None => JoinCoalesce::JoinSpecific,
1026
Some(true) => JoinCoalesce::CoalesceColumns,
1027
Some(false) => JoinCoalesce::KeepColumns,
1028
};
1029
let ldf = self.ldf.read().clone();
1030
let other = other.ldf.into_inner();
1031
let left_on = left_on
1032
.into_iter()
1033
.map(|pyexpr| pyexpr.inner)
1034
.collect::<Vec<_>>();
1035
let right_on = right_on
1036
.into_iter()
1037
.map(|pyexpr| pyexpr.inner)
1038
.collect::<Vec<_>>();
1039
1040
Ok(ldf
1041
.join_builder()
1042
.with(other)
1043
.left_on(left_on)
1044
.right_on(right_on)
1045
.allow_parallel(allow_parallel)
1046
.force_parallel(force_parallel)
1047
.join_nulls(nulls_equal)
1048
.how(how.0)
1049
.suffix(suffix)
1050
.validate(validate.0)
1051
.coalesce(coalesce)
1052
.maintain_order(maintain_order.0)
1053
.finish()
1054
.into())
1055
}
1056
1057
fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1058
let ldf = self.ldf.read().clone();
1059
let other = other.ldf.into_inner();
1060
1061
let predicates = predicates.to_exprs();
1062
1063
Ok(ldf
1064
.join_builder()
1065
.with(other)
1066
.suffix(suffix)
1067
.join_where(predicates)
1068
.into())
1069
}
1070
1071
fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1072
let ldf = self.ldf.read().clone();
1073
ldf.with_columns(exprs.to_exprs()).into()
1074
}
1075
1076
fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1077
let ldf = self.ldf.read().clone();
1078
ldf.with_columns_seq(exprs.to_exprs()).into()
1079
}
1080
1081
fn match_to_schema<'py>(
1082
&self,
1083
schema: Wrap<Schema>,
1084
missing_columns: &Bound<'py, PyAny>,
1085
missing_struct_fields: &Bound<'py, PyAny>,
1086
extra_columns: Wrap<ExtraColumnsPolicy>,
1087
extra_struct_fields: &Bound<'py, PyAny>,
1088
integer_cast: &Bound<'py, PyAny>,
1089
float_cast: &Bound<'py, PyAny>,
1090
) -> PyResult<Self> {
1091
fn parse_missing_columns<'py>(
1092
schema: &Schema,
1093
missing_columns: &Bound<'py, PyAny>,
1094
) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1095
let mut out = Vec::with_capacity(schema.len());
1096
if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1097
out.extend(std::iter::repeat_n(policy.0, schema.len()));
1098
} else if let Ok(dict) = missing_columns.cast::<PyDict>() {
1099
out.extend(std::iter::repeat_n(
1100
MissingColumnsPolicyOrExpr::Raise,
1101
schema.len(),
1102
));
1103
for (key, value) in dict.iter() {
1104
let key = key.extract::<String>()?;
1105
let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1106
out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1107
}
1108
} else {
1109
return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1110
}
1111
Ok(out)
1112
}
1113
fn parse_missing_struct_fields<'py>(
1114
schema: &Schema,
1115
missing_struct_fields: &Bound<'py, PyAny>,
1116
) -> PyResult<Vec<MissingColumnsPolicy>> {
1117
let mut out = Vec::with_capacity(schema.len());
1118
if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1119
out.extend(std::iter::repeat_n(policy.0, schema.len()));
1120
} else if let Ok(dict) = missing_struct_fields.cast::<PyDict>() {
1121
out.extend(std::iter::repeat_n(
1122
MissingColumnsPolicy::Raise,
1123
schema.len(),
1124
));
1125
for (key, value) in dict.iter() {
1126
let key = key.extract::<String>()?;
1127
let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1128
out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1129
}
1130
} else {
1131
return Err(PyTypeError::new_err(
1132
"Invalid value for `missing_struct_fields`",
1133
));
1134
}
1135
Ok(out)
1136
}
1137
fn parse_extra_struct_fields<'py>(
1138
schema: &Schema,
1139
extra_struct_fields: &Bound<'py, PyAny>,
1140
) -> PyResult<Vec<ExtraColumnsPolicy>> {
1141
let mut out = Vec::with_capacity(schema.len());
1142
if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1143
out.extend(std::iter::repeat_n(policy.0, schema.len()));
1144
} else if let Ok(dict) = extra_struct_fields.cast::<PyDict>() {
1145
out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1146
for (key, value) in dict.iter() {
1147
let key = key.extract::<String>()?;
1148
let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1149
out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1150
}
1151
} else {
1152
return Err(PyTypeError::new_err(
1153
"Invalid value for `extra_struct_fields`",
1154
));
1155
}
1156
Ok(out)
1157
}
1158
fn parse_cast<'py>(
1159
schema: &Schema,
1160
cast: &Bound<'py, PyAny>,
1161
) -> PyResult<Vec<UpcastOrForbid>> {
1162
let mut out = Vec::with_capacity(schema.len());
1163
if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1164
out.extend(std::iter::repeat_n(policy.0, schema.len()));
1165
} else if let Ok(dict) = cast.cast::<PyDict>() {
1166
out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1167
for (key, value) in dict.iter() {
1168
let key = key.extract::<String>()?;
1169
let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1170
out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1171
}
1172
} else {
1173
return Err(PyTypeError::new_err(
1174
"Invalid value for `integer_cast` / `float_cast`",
1175
));
1176
}
1177
Ok(out)
1178
}
1179
1180
let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1181
let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1182
let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1183
let integer_cast = parse_cast(&schema.0, integer_cast)?;
1184
let float_cast = parse_cast(&schema.0, float_cast)?;
1185
1186
let per_column = (0..schema.0.len())
1187
.map(|i| MatchToSchemaPerColumn {
1188
missing_columns: missing_columns[i].clone(),
1189
missing_struct_fields: missing_struct_fields[i],
1190
extra_struct_fields: extra_struct_fields[i],
1191
integer_cast: integer_cast[i],
1192
float_cast: float_cast[i],
1193
})
1194
.collect();
1195
1196
let ldf = self.ldf.read().clone();
1197
Ok(ldf
1198
.match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1199
.into())
1200
}
1201
1202
fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {
1203
let ldf = self.ldf.read().clone();
1204
let function = PythonObject(callback);
1205
ldf.pipe_with_schema(PlanCallback::new_python(function))
1206
.into()
1207
}
1208
1209
fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1210
let ldf = self.ldf.read().clone();
1211
ldf.rename(existing, new, strict).into()
1212
}
1213
1214
fn reverse(&self) -> Self {
1215
let ldf = self.ldf.read().clone();
1216
ldf.reverse().into()
1217
}
1218
1219
#[pyo3(signature = (n, fill_value=None))]
1220
fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1221
let lf = self.ldf.read().clone();
1222
let out = match fill_value {
1223
Some(v) => lf.shift_and_fill(n.inner, v.inner),
1224
None => lf.shift(n.inner),
1225
};
1226
out.into()
1227
}
1228
1229
fn fill_nan(&self, fill_value: PyExpr) -> Self {
1230
let ldf = self.ldf.read().clone();
1231
ldf.fill_nan(fill_value.inner).into()
1232
}
1233
1234
fn min(&self) -> Self {
1235
let ldf = self.ldf.read().clone();
1236
let out = ldf.min();
1237
out.into()
1238
}
1239
1240
fn max(&self) -> Self {
1241
let ldf = self.ldf.read().clone();
1242
let out = ldf.max();
1243
out.into()
1244
}
1245
1246
fn sum(&self) -> Self {
1247
let ldf = self.ldf.read().clone();
1248
let out = ldf.sum();
1249
out.into()
1250
}
1251
1252
fn mean(&self) -> Self {
1253
let ldf = self.ldf.read().clone();
1254
let out = ldf.mean();
1255
out.into()
1256
}
1257
1258
fn std(&self, ddof: u8) -> Self {
1259
let ldf = self.ldf.read().clone();
1260
let out = ldf.std(ddof);
1261
out.into()
1262
}
1263
1264
fn var(&self, ddof: u8) -> Self {
1265
let ldf = self.ldf.read().clone();
1266
let out = ldf.var(ddof);
1267
out.into()
1268
}
1269
1270
fn median(&self) -> Self {
1271
let ldf = self.ldf.read().clone();
1272
let out = ldf.median();
1273
out.into()
1274
}
1275
1276
fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1277
let ldf = self.ldf.read().clone();
1278
let out = ldf.quantile(quantile.inner, interpolation.0);
1279
out.into()
1280
}
1281
1282
fn explode(&self, subset: PySelector, empty_as_null: bool, keep_nulls: bool) -> Self {
1283
self.ldf
1284
.read()
1285
.clone()
1286
.explode(
1287
subset.inner,
1288
ExplodeOptions {
1289
empty_as_null,
1290
keep_nulls,
1291
},
1292
)
1293
.into()
1294
}
1295
1296
fn null_count(&self) -> Self {
1297
let ldf = self.ldf.read().clone();
1298
ldf.null_count().into()
1299
}
1300
1301
#[pyo3(signature = (maintain_order, subset, keep))]
1302
fn unique(
1303
&self,
1304
maintain_order: bool,
1305
subset: Option<Vec<PyExpr>>,
1306
keep: Wrap<UniqueKeepStrategy>,
1307
) -> Self {
1308
let ldf = self.ldf.read().clone();
1309
let subset = subset.map(|exprs| exprs.into_iter().map(|e| e.inner).collect());
1310
match maintain_order {
1311
true => ldf.unique_stable_generic(subset, keep.0),
1312
false => ldf.unique_generic(subset, keep.0),
1313
}
1314
.into()
1315
}
1316
1317
fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1318
self.ldf
1319
.read()
1320
.clone()
1321
.drop_nans(subset.map(|e| e.inner))
1322
.into()
1323
}
1324
1325
fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1326
self.ldf
1327
.read()
1328
.clone()
1329
.drop_nulls(subset.map(|e| e.inner))
1330
.into()
1331
}
1332
1333
#[pyo3(signature = (offset, len=None))]
1334
fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1335
let ldf = self.ldf.read().clone();
1336
ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1337
}
1338
1339
fn tail(&self, n: IdxSize) -> Self {
1340
let ldf = self.ldf.read().clone();
1341
ldf.tail(n).into()
1342
}
1343
1344
#[cfg(feature = "pivot")]
1345
#[pyo3(signature = (on, on_columns, index, values, agg, maintain_order, separator))]
1346
fn pivot(
1347
&self,
1348
on: PySelector,
1349
on_columns: PyDataFrame,
1350
index: PySelector,
1351
values: PySelector,
1352
agg: PyExpr,
1353
maintain_order: bool,
1354
separator: String,
1355
) -> Self {
1356
let ldf = self.ldf.read().clone();
1357
ldf.pivot(
1358
on.inner,
1359
Arc::new(on_columns.df.read().clone()),
1360
index.inner,
1361
values.inner,
1362
agg.inner,
1363
maintain_order,
1364
separator.into(),
1365
)
1366
.into()
1367
}
1368
1369
#[cfg(feature = "pivot")]
1370
#[pyo3(signature = (on, index, value_name, variable_name))]
1371
fn unpivot(
1372
&self,
1373
on: Option<PySelector>,
1374
index: PySelector,
1375
value_name: Option<String>,
1376
variable_name: Option<String>,
1377
) -> Self {
1378
let args = UnpivotArgsDSL {
1379
on: on.map(|on| on.inner),
1380
index: index.inner,
1381
value_name: value_name.map(|s| s.into()),
1382
variable_name: variable_name.map(|s| s.into()),
1383
};
1384
1385
let ldf = self.ldf.read().clone();
1386
ldf.unpivot(args).into()
1387
}
1388
1389
#[pyo3(signature = (name, offset=None))]
1390
fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1391
let ldf = self.ldf.read().clone();
1392
ldf.with_row_index(name, offset).into()
1393
}
1394
1395
#[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1396
fn map_batches(
1397
&self,
1398
function: Py<PyAny>,
1399
predicate_pushdown: bool,
1400
projection_pushdown: bool,
1401
slice_pushdown: bool,
1402
streamable: bool,
1403
schema: Option<Wrap<Schema>>,
1404
validate_output: bool,
1405
) -> Self {
1406
let mut opt = OptFlags::default();
1407
opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1408
opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1409
opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1410
opt.set(OptFlags::NEW_STREAMING, streamable);
1411
1412
self.ldf
1413
.read()
1414
.clone()
1415
.map_python(
1416
function.into(),
1417
opt,
1418
schema.map(|s| Arc::new(s.0)),
1419
validate_output,
1420
)
1421
.into()
1422
}
1423
1424
fn drop(&self, columns: PySelector) -> Self {
1425
self.ldf.read().clone().drop(columns.inner).into()
1426
}
1427
1428
fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1429
let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1430
cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1431
self.ldf.read().clone().cast(cast_map, strict).into()
1432
}
1433
1434
fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1435
self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1436
}
1437
1438
fn clone(&self) -> Self {
1439
self.ldf.read().clone().into()
1440
}
1441
1442
fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1443
let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1444
1445
let schema_dict = PyDict::new(py);
1446
schema.iter_fields().for_each(|fld| {
1447
schema_dict
1448
.set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1449
.unwrap()
1450
});
1451
Ok(schema_dict)
1452
}
1453
1454
fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {
1455
self.ldf
1456
.read()
1457
.clone()
1458
.unnest(columns.inner, separator.map(PlSmallStr::from_str))
1459
.into()
1460
}
1461
1462
fn count(&self) -> Self {
1463
let ldf = self.ldf.read().clone();
1464
ldf.count().into()
1465
}
1466
1467
#[cfg(feature = "merge_sorted")]
1468
fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1469
let out = self
1470
.ldf
1471
.read()
1472
.clone()
1473
.merge_sorted(other.ldf.into_inner(), key)
1474
.map_err(PyPolarsErr::from)?;
1475
Ok(out.into())
1476
}
1477
1478
fn _node_name(&self) -> &str {
1479
let plan = &self.ldf.read().logical_plan;
1480
plan.into()
1481
}
1482
1483
fn hint_sorted(
1484
&self,
1485
columns: Vec<String>,
1486
descending: Vec<bool>,
1487
nulls_last: Vec<bool>,
1488
) -> PyResult<Self> {
1489
if columns.len() != descending.len() && descending.len() != 1 {
1490
return Err(PyValueError::new_err(
1491
"`set_sorted` expects the same amount of `columns` as `descending` values.",
1492
));
1493
}
1494
if columns.len() != nulls_last.len() && nulls_last.len() != 1 {
1495
return Err(PyValueError::new_err(
1496
"`set_sorted` expects the same amount of `columns` as `nulls_last` values.",
1497
));
1498
}
1499
1500
let mut sorted = columns
1501
.iter()
1502
.map(|c| Sorted {
1503
column: PlSmallStr::from_str(c.as_str()),
1504
descending: Some(false),
1505
nulls_last: Some(false),
1506
})
1507
.collect::<Vec<_>>();
1508
1509
if !columns.is_empty() {
1510
if descending.len() != 1 {
1511
sorted
1512
.iter_mut()
1513
.zip(descending)
1514
.for_each(|(s, d)| s.descending = Some(d));
1515
} else if descending[0] {
1516
sorted.iter_mut().for_each(|s| s.descending = Some(true));
1517
}
1518
1519
if nulls_last.len() != 1 {
1520
sorted
1521
.iter_mut()
1522
.zip(nulls_last)
1523
.for_each(|(s, d)| s.nulls_last = Some(d));
1524
} else if nulls_last[0] {
1525
sorted.iter_mut().for_each(|s| s.nulls_last = Some(true));
1526
}
1527
}
1528
1529
let out = self
1530
.ldf
1531
.read()
1532
.clone()
1533
.hint(HintIR::Sorted(sorted.into()))
1534
.map_err(PyPolarsErr::from)?;
1535
Ok(out.into())
1536
}
1537
}
1538
1539
#[pyclass(frozen)]
1540
struct PyCollectBatches {
1541
inner: Arc<Mutex<CollectBatches>>,
1542
ldf: LazyFrame,
1543
}
1544
1545
#[pymethods]
1546
impl PyCollectBatches {
1547
fn start(&self) {
1548
self.inner.lock().start();
1549
}
1550
1551
fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
1552
slf
1553
}
1554
1555
fn __next__(slf: PyRef<'_, Self>, py: Python) -> PyResult<Option<PyDataFrame>> {
1556
let inner = Arc::clone(&slf.inner);
1557
py.enter_polars(|| PolarsResult::Ok(inner.lock().next().transpose()?.map(PyDataFrame::new)))
1558
}
1559
1560
#[allow(unused_variables)]
1561
#[pyo3(signature = (requested_schema=None))]
1562
fn __arrow_c_stream__<'py>(
1563
&self,
1564
py: Python<'py>,
1565
requested_schema: Option<Py<PyAny>>,
1566
) -> PyResult<Bound<'py, PyCapsule>> {
1567
let mut ldf = self.ldf.clone();
1568
let schema = ldf
1569
.collect_schema()
1570
.map_err(PyPolarsErr::from)?
1571
.to_arrow(CompatLevel::newest());
1572
1573
let dtype = ArrowDataType::Struct(schema.into_iter_values().collect());
1574
1575
let iter = Box::new(ArrowStreamIterator::new(self.inner.clone(), dtype.clone()));
1576
let field = ArrowField::new(PlSmallStr::EMPTY, dtype, false);
1577
let stream = export_iterator(iter, field);
1578
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
1579
PyCapsule::new(py, stream, Some(stream_capsule_name))
1580
}
1581
}
1582
1583
pub struct ArrowStreamIterator {
1584
inner: Arc<Mutex<CollectBatches>>,
1585
dtype: ArrowDataType,
1586
}
1587
1588
impl ArrowStreamIterator {
1589
fn new(inner: Arc<Mutex<CollectBatches>>, schema: ArrowDataType) -> Self {
1590
Self {
1591
inner,
1592
dtype: schema,
1593
}
1594
}
1595
}
1596
1597
impl Iterator for ArrowStreamIterator {
1598
type Item = PolarsResult<ArrayRef>;
1599
1600
fn next(&mut self) -> Option<Self::Item> {
1601
let next = self.inner.lock().next();
1602
match next {
1603
None => None,
1604
Some(Err(err)) => Some(Err(err)),
1605
Some(Ok(df)) => {
1606
let height = df.height();
1607
let arrays = df.rechunk_into_arrow(CompatLevel::newest());
1608
Some(Ok(Box::new(arrow::array::StructArray::new(
1609
self.dtype.clone(),
1610
height,
1611
arrays,
1612
None,
1613
))))
1614
},
1615
}
1616
}
1617
}
1618
1619