Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/lazyframe/general.rs
7889 views
1
use std::collections::HashMap;
2
use std::num::NonZeroUsize;
3
4
use either::Either;
5
use polars::io::RowIndex;
6
use polars::time::*;
7
use polars_core::prelude::*;
8
#[cfg(feature = "parquet")]
9
use polars_parquet::arrow::write::StatisticsOptions;
10
use polars_plan::dsl::ScanSources;
11
use polars_plan::plans::{AExpr, HintIR, IR, Sorted};
12
use polars_utils::arena::{Arena, Node};
13
use polars_utils::python_function::PythonObject;
14
use pyo3::exceptions::{PyTypeError, PyValueError};
15
use pyo3::prelude::*;
16
use pyo3::pybacked::PyBackedStr;
17
use pyo3::types::{PyDict, PyDictMethods, PyList};
18
19
use super::{PyLazyFrame, PyOptFlags};
20
use crate::error::PyPolarsErr;
21
use crate::expr::ToExprs;
22
use crate::expr::datatype::PyDataTypeExpr;
23
use crate::expr::selector::PySelector;
24
use crate::interop::arrow::to_rust::pyarrow_schema_to_rust;
25
use crate::io::scan_options::PyScanOptions;
26
use crate::io::sink_options::PySinkOptions;
27
use crate::io::sink_output::PyFileSinkDestination;
28
use crate::lazyframe::visit::NodeTraverser;
29
use crate::prelude::*;
30
use crate::utils::{EnterPolarsExt, to_py_err};
31
use crate::{PyDataFrame, PyExpr, PyLazyGroupBy};
32
33
fn pyobject_to_first_path_and_scan_sources(
34
obj: Py<PyAny>,
35
) -> PyResult<(Option<PlPath>, ScanSources)> {
36
use crate::file::{PythonScanSourceInput, get_python_scan_source_input};
37
Ok(match get_python_scan_source_input(obj, false)? {
38
PythonScanSourceInput::Path(path) => (
39
Some(path.clone()),
40
ScanSources::Paths(FromIterator::from_iter([path])),
41
),
42
PythonScanSourceInput::File(file) => (None, ScanSources::Files([file.into()].into())),
43
PythonScanSourceInput::Buffer(buff) => (None, ScanSources::Buffers([buff].into())),
44
})
45
}
46
47
fn post_opt_callback(
48
lambda: &Py<PyAny>,
49
root: Node,
50
lp_arena: &mut Arena<IR>,
51
expr_arena: &mut Arena<AExpr>,
52
duration_since_start: Option<std::time::Duration>,
53
) -> PolarsResult<()> {
54
Python::attach(|py| {
55
let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));
56
57
// Get a copy of the arenas.
58
let arenas = nt.get_arenas();
59
60
// Pass the node visitor which allows the python callback to replace parts of the query plan.
61
// Remove "cuda" or specify better once we have multiple post-opt callbacks.
62
lambda
63
.call1(py, (nt, duration_since_start.map(|t| t.as_nanos() as u64)))
64
.map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;
65
66
// Unpack the arenas.
67
// At this point the `nt` is useless.
68
69
std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());
70
std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());
71
72
Ok(())
73
})
74
}
75
76
#[pymethods]
77
#[allow(clippy::should_implement_trait)]
78
impl PyLazyFrame {
79
#[staticmethod]
80
#[cfg(feature = "json")]
81
#[allow(clippy::too_many_arguments)]
82
#[pyo3(signature = (
83
source, sources, infer_schema_length, schema, schema_overrides, batch_size, n_rows, low_memory, rechunk,
84
row_index, ignore_errors, include_file_paths, cloud_options, credential_provider, retries, file_cache_ttl
85
))]
86
fn new_from_ndjson(
87
source: Option<Py<PyAny>>,
88
sources: Wrap<ScanSources>,
89
infer_schema_length: Option<usize>,
90
schema: Option<Wrap<Schema>>,
91
schema_overrides: Option<Wrap<Schema>>,
92
batch_size: Option<NonZeroUsize>,
93
n_rows: Option<usize>,
94
low_memory: bool,
95
rechunk: bool,
96
row_index: Option<(String, IdxSize)>,
97
ignore_errors: bool,
98
include_file_paths: Option<String>,
99
cloud_options: Option<Vec<(String, String)>>,
100
credential_provider: Option<Py<PyAny>>,
101
retries: usize,
102
file_cache_ttl: Option<u64>,
103
) -> PyResult<Self> {
104
use cloud::credential_provider::PlCredentialProvider;
105
let row_index = row_index.map(|(name, offset)| RowIndex {
106
name: name.into(),
107
offset,
108
});
109
110
let sources = sources.0;
111
let (first_path, sources) = match source {
112
None => (sources.first_path().map(|p| p.into_owned()), sources),
113
Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
114
};
115
116
let mut r = LazyJsonLineReader::new_with_sources(sources);
117
118
#[cfg(feature = "cloud")]
119
if let Some(first_path) = first_path {
120
let first_path_url = first_path.to_str();
121
122
let mut cloud_options = parse_cloud_options(
123
CloudScheme::from_uri(first_path_url),
124
cloud_options.unwrap_or_default(),
125
)?;
126
cloud_options = cloud_options
127
.with_max_retries(retries)
128
.with_credential_provider(
129
credential_provider.map(PlCredentialProvider::from_python_builder),
130
);
131
132
if let Some(file_cache_ttl) = file_cache_ttl {
133
cloud_options.file_cache_ttl = file_cache_ttl;
134
}
135
136
r = r.with_cloud_options(Some(cloud_options));
137
};
138
139
let lf = r
140
.with_infer_schema_length(infer_schema_length.and_then(NonZeroUsize::new))
141
.with_batch_size(batch_size)
142
.with_n_rows(n_rows)
143
.low_memory(low_memory)
144
.with_rechunk(rechunk)
145
.with_schema(schema.map(|schema| Arc::new(schema.0)))
146
.with_schema_overwrite(schema_overrides.map(|x| Arc::new(x.0)))
147
.with_row_index(row_index)
148
.with_ignore_errors(ignore_errors)
149
.with_include_file_paths(include_file_paths.map(|x| x.into()))
150
.finish()
151
.map_err(PyPolarsErr::from)?;
152
153
Ok(lf.into())
154
}
155
156
#[staticmethod]
157
#[cfg(feature = "csv")]
158
#[pyo3(signature = (source, sources, separator, has_header, ignore_errors, skip_rows, skip_lines, n_rows, cache, overwrite_dtype,
159
low_memory, comment_prefix, quote_char, null_values, missing_utf8_is_empty_string,
160
infer_schema_length, with_schema_modify, rechunk, skip_rows_after_header,
161
encoding, row_index, try_parse_dates, eol_char, raise_if_empty, truncate_ragged_lines, decimal_comma, glob, schema,
162
cloud_options, credential_provider, retries, file_cache_ttl, include_file_paths
163
)
164
)]
165
fn new_from_csv(
166
source: Option<Py<PyAny>>,
167
sources: Wrap<ScanSources>,
168
separator: &str,
169
has_header: bool,
170
ignore_errors: bool,
171
skip_rows: usize,
172
skip_lines: usize,
173
n_rows: Option<usize>,
174
cache: bool,
175
overwrite_dtype: Option<Vec<(PyBackedStr, Wrap<DataType>)>>,
176
low_memory: bool,
177
comment_prefix: Option<&str>,
178
quote_char: Option<&str>,
179
null_values: Option<Wrap<NullValues>>,
180
missing_utf8_is_empty_string: bool,
181
infer_schema_length: Option<usize>,
182
with_schema_modify: Option<Py<PyAny>>,
183
rechunk: bool,
184
skip_rows_after_header: usize,
185
encoding: Wrap<CsvEncoding>,
186
row_index: Option<(String, IdxSize)>,
187
try_parse_dates: bool,
188
eol_char: &str,
189
raise_if_empty: bool,
190
truncate_ragged_lines: bool,
191
decimal_comma: bool,
192
glob: bool,
193
schema: Option<Wrap<Schema>>,
194
cloud_options: Option<Vec<(String, String)>>,
195
credential_provider: Option<Py<PyAny>>,
196
retries: usize,
197
file_cache_ttl: Option<u64>,
198
include_file_paths: Option<String>,
199
) -> PyResult<Self> {
200
#[cfg(feature = "cloud")]
201
use cloud::credential_provider::PlCredentialProvider;
202
203
let null_values = null_values.map(|w| w.0);
204
let quote_char = quote_char.and_then(|s| s.as_bytes().first()).copied();
205
let separator = separator
206
.as_bytes()
207
.first()
208
.ok_or_else(|| polars_err!(InvalidOperation: "`separator` cannot be empty"))
209
.copied()
210
.map_err(PyPolarsErr::from)?;
211
let eol_char = eol_char
212
.as_bytes()
213
.first()
214
.ok_or_else(|| polars_err!(InvalidOperation: "`eol_char` cannot be empty"))
215
.copied()
216
.map_err(PyPolarsErr::from)?;
217
let row_index = row_index.map(|(name, offset)| RowIndex {
218
name: name.into(),
219
offset,
220
});
221
222
let overwrite_dtype = overwrite_dtype.map(|overwrite_dtype| {
223
overwrite_dtype
224
.into_iter()
225
.map(|(name, dtype)| Field::new((&*name).into(), dtype.0))
226
.collect::<Schema>()
227
});
228
229
let sources = sources.0;
230
let (first_path, sources) = match source {
231
None => (sources.first_path().map(|p| p.into_owned()), sources),
232
Some(source) => pyobject_to_first_path_and_scan_sources(source)?,
233
};
234
235
let mut r = LazyCsvReader::new_with_sources(sources);
236
237
#[cfg(feature = "cloud")]
238
if let Some(first_path) = first_path {
239
let first_path_url = first_path.to_str();
240
241
let mut cloud_options = parse_cloud_options(
242
CloudScheme::from_uri(first_path_url),
243
cloud_options.unwrap_or_default(),
244
)?;
245
if let Some(file_cache_ttl) = file_cache_ttl {
246
cloud_options.file_cache_ttl = file_cache_ttl;
247
}
248
cloud_options = cloud_options
249
.with_max_retries(retries)
250
.with_credential_provider(
251
credential_provider.map(PlCredentialProvider::from_python_builder),
252
);
253
r = r.with_cloud_options(Some(cloud_options));
254
}
255
256
let mut r = r
257
.with_infer_schema_length(infer_schema_length)
258
.with_separator(separator)
259
.with_has_header(has_header)
260
.with_ignore_errors(ignore_errors)
261
.with_skip_rows(skip_rows)
262
.with_skip_lines(skip_lines)
263
.with_n_rows(n_rows)
264
.with_cache(cache)
265
.with_dtype_overwrite(overwrite_dtype.map(Arc::new))
266
.with_schema(schema.map(|schema| Arc::new(schema.0)))
267
.with_low_memory(low_memory)
268
.with_comment_prefix(comment_prefix.map(|x| x.into()))
269
.with_quote_char(quote_char)
270
.with_eol_char(eol_char)
271
.with_rechunk(rechunk)
272
.with_skip_rows_after_header(skip_rows_after_header)
273
.with_encoding(encoding.0)
274
.with_row_index(row_index)
275
.with_try_parse_dates(try_parse_dates)
276
.with_null_values(null_values)
277
.with_missing_is_null(!missing_utf8_is_empty_string)
278
.with_truncate_ragged_lines(truncate_ragged_lines)
279
.with_decimal_comma(decimal_comma)
280
.with_glob(glob)
281
.with_raise_if_empty(raise_if_empty)
282
.with_include_file_paths(include_file_paths.map(|x| x.into()));
283
284
if let Some(lambda) = with_schema_modify {
285
let f = |schema: Schema| {
286
let iter = schema.iter_names().map(|s| s.as_str());
287
Python::attach(|py| {
288
let names = PyList::new(py, iter).unwrap();
289
290
let out = lambda.call1(py, (names,)).expect("python function failed");
291
let new_names = out
292
.extract::<Vec<String>>(py)
293
.expect("python function should return List[str]");
294
polars_ensure!(new_names.len() == schema.len(),
295
ShapeMismatch: "The length of the new names list should be equal to or less than the original column length",
296
);
297
Ok(schema
298
.iter_values()
299
.zip(new_names)
300
.map(|(dtype, name)| Field::new(name.into(), dtype.clone()))
301
.collect())
302
})
303
};
304
r = r.with_schema_modify(f).map_err(PyPolarsErr::from)?
305
}
306
307
Ok(r.finish().map_err(PyPolarsErr::from)?.into())
308
}
309
310
#[cfg(feature = "parquet")]
311
#[staticmethod]
312
#[pyo3(signature = (
313
sources, schema, scan_options, parallel, low_memory, use_statistics
314
))]
315
fn new_from_parquet(
316
sources: Wrap<ScanSources>,
317
schema: Option<Wrap<Schema>>,
318
scan_options: PyScanOptions,
319
parallel: Wrap<ParallelStrategy>,
320
low_memory: bool,
321
use_statistics: bool,
322
) -> PyResult<Self> {
323
use crate::utils::to_py_err;
324
325
let parallel = parallel.0;
326
327
let options = ParquetOptions {
328
schema: schema.map(|x| Arc::new(x.0)),
329
parallel,
330
low_memory,
331
use_statistics,
332
};
333
334
let sources = sources.0;
335
let first_path = sources.first_path().map(|p| p.into_owned());
336
337
let unified_scan_args = scan_options.extract_unified_scan_args(
338
first_path
339
.as_ref()
340
.and_then(|p| CloudScheme::from_uri(p.to_str())),
341
)?;
342
343
let lf: LazyFrame = DslBuilder::scan_parquet(sources, options, unified_scan_args)
344
.map_err(to_py_err)?
345
.build()
346
.into();
347
348
Ok(lf.into())
349
}
350
351
#[cfg(feature = "ipc")]
352
#[staticmethod]
353
#[pyo3(signature = (sources, scan_options, file_cache_ttl))]
354
fn new_from_ipc(
355
sources: Wrap<ScanSources>,
356
scan_options: PyScanOptions,
357
file_cache_ttl: Option<u64>,
358
) -> PyResult<Self> {
359
let options = IpcScanOptions;
360
361
let sources = sources.0;
362
let first_path = sources.first_path().map(|p| p.into_owned());
363
364
let mut unified_scan_args = scan_options.extract_unified_scan_args(
365
first_path
366
.as_ref()
367
.map(|p| p.as_ref())
368
.and_then(|x| CloudScheme::from_uri(x.to_str())),
369
)?;
370
371
if let Some(file_cache_ttl) = file_cache_ttl {
372
unified_scan_args
373
.cloud_options
374
.get_or_insert_default()
375
.file_cache_ttl = file_cache_ttl;
376
}
377
378
let lf = LazyFrame::scan_ipc_sources(sources, options, unified_scan_args)
379
.map_err(PyPolarsErr::from)?;
380
Ok(lf.into())
381
}
382
383
#[cfg(feature = "scan_lines")]
384
#[staticmethod]
385
#[pyo3(signature = (sources, scan_options, name, file_cache_ttl))]
386
fn new_from_scan_lines(
387
sources: Wrap<ScanSources>,
388
scan_options: PyScanOptions,
389
name: PyBackedStr,
390
file_cache_ttl: Option<u64>,
391
) -> PyResult<Self> {
392
let sources = sources.0;
393
let first_path = sources.first_path().map(|p| p.into_owned());
394
395
let mut unified_scan_args = scan_options.extract_unified_scan_args(
396
first_path
397
.as_ref()
398
.map(|p| p.as_ref())
399
.and_then(|x| CloudScheme::from_uri(x.to_str())),
400
)?;
401
402
if let Some(file_cache_ttl) = file_cache_ttl {
403
unified_scan_args
404
.cloud_options
405
.get_or_insert_default()
406
.file_cache_ttl = file_cache_ttl;
407
}
408
409
let dsl: DslPlan = DslBuilder::scan_lines(sources, unified_scan_args, (&*name).into())
410
.map_err(to_py_err)?
411
.build();
412
let lf: LazyFrame = dsl.into();
413
414
Ok(lf.into())
415
}
416
417
#[staticmethod]
418
#[pyo3(signature = (
419
dataset_object
420
))]
421
fn new_from_dataset_object(dataset_object: Py<PyAny>) -> PyResult<Self> {
422
let lf =
423
LazyFrame::from(DslBuilder::scan_python_dataset(PythonObject(dataset_object)).build())
424
.into();
425
426
Ok(lf)
427
}
428
429
#[staticmethod]
430
fn scan_from_python_function_arrow_schema(
431
schema: &Bound<'_, PyList>,
432
scan_fn: Py<PyAny>,
433
pyarrow: bool,
434
validate_schema: bool,
435
is_pure: bool,
436
) -> PyResult<Self> {
437
let schema = Arc::new(pyarrow_schema_to_rust(schema)?);
438
439
Ok(LazyFrame::scan_from_python_function(
440
Either::Right(schema),
441
scan_fn,
442
pyarrow,
443
validate_schema,
444
is_pure,
445
)
446
.into())
447
}
448
449
#[staticmethod]
450
fn scan_from_python_function_pl_schema(
451
schema: Vec<(PyBackedStr, Wrap<DataType>)>,
452
scan_fn: Py<PyAny>,
453
pyarrow: bool,
454
validate_schema: bool,
455
is_pure: bool,
456
) -> PyResult<Self> {
457
let schema = Arc::new(Schema::from_iter(
458
schema
459
.into_iter()
460
.map(|(name, dt)| Field::new((&*name).into(), dt.0)),
461
));
462
Ok(LazyFrame::scan_from_python_function(
463
Either::Right(schema),
464
scan_fn,
465
pyarrow,
466
validate_schema,
467
is_pure,
468
)
469
.into())
470
}
471
472
#[staticmethod]
473
fn scan_from_python_function_schema_function(
474
schema_fn: Py<PyAny>,
475
scan_fn: Py<PyAny>,
476
validate_schema: bool,
477
is_pure: bool,
478
) -> PyResult<Self> {
479
Ok(LazyFrame::scan_from_python_function(
480
Either::Left(schema_fn),
481
scan_fn,
482
false,
483
validate_schema,
484
is_pure,
485
)
486
.into())
487
}
488
489
fn describe_plan(&self, py: Python) -> PyResult<String> {
490
py.enter_polars(|| self.ldf.read().describe_plan())
491
}
492
493
fn describe_optimized_plan(&self, py: Python) -> PyResult<String> {
494
py.enter_polars(|| self.ldf.read().describe_optimized_plan())
495
}
496
497
fn describe_plan_tree(&self, py: Python) -> PyResult<String> {
498
py.enter_polars(|| self.ldf.read().describe_plan_tree())
499
}
500
501
fn describe_optimized_plan_tree(&self, py: Python) -> PyResult<String> {
502
py.enter_polars(|| self.ldf.read().describe_optimized_plan_tree())
503
}
504
505
fn to_dot(&self, py: Python<'_>, optimized: bool) -> PyResult<String> {
506
py.enter_polars(|| self.ldf.read().to_dot(optimized))
507
}
508
509
#[cfg(feature = "new_streaming")]
510
fn to_dot_streaming_phys(&self, py: Python, optimized: bool) -> PyResult<String> {
511
py.enter_polars(|| self.ldf.read().to_dot_streaming_phys(optimized))
512
}
513
514
fn sort(
515
&self,
516
by_column: &str,
517
descending: bool,
518
nulls_last: bool,
519
maintain_order: bool,
520
multithreaded: bool,
521
) -> Self {
522
let ldf = self.ldf.read().clone();
523
ldf.sort(
524
[by_column],
525
SortMultipleOptions {
526
descending: vec![descending],
527
nulls_last: vec![nulls_last],
528
multithreaded,
529
maintain_order,
530
limit: None,
531
},
532
)
533
.into()
534
}
535
536
fn sort_by_exprs(
537
&self,
538
by: Vec<PyExpr>,
539
descending: Vec<bool>,
540
nulls_last: Vec<bool>,
541
maintain_order: bool,
542
multithreaded: bool,
543
) -> Self {
544
let ldf = self.ldf.read().clone();
545
let exprs = by.to_exprs();
546
ldf.sort_by_exprs(
547
exprs,
548
SortMultipleOptions {
549
descending,
550
nulls_last,
551
maintain_order,
552
multithreaded,
553
limit: None,
554
},
555
)
556
.into()
557
}
558
559
fn top_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
560
let ldf = self.ldf.read().clone();
561
let exprs = by.to_exprs();
562
ldf.top_k(
563
k,
564
exprs,
565
SortMultipleOptions::new().with_order_descending_multi(reverse),
566
)
567
.into()
568
}
569
570
fn bottom_k(&self, k: IdxSize, by: Vec<PyExpr>, reverse: Vec<bool>) -> Self {
571
let ldf = self.ldf.read().clone();
572
let exprs = by.to_exprs();
573
ldf.bottom_k(
574
k,
575
exprs,
576
SortMultipleOptions::new().with_order_descending_multi(reverse),
577
)
578
.into()
579
}
580
581
fn cache(&self) -> Self {
582
let ldf = self.ldf.read().clone();
583
ldf.cache().into()
584
}
585
586
#[pyo3(signature = (optflags))]
587
fn with_optimizations(&self, optflags: PyOptFlags) -> Self {
588
let ldf = self.ldf.read().clone();
589
ldf.with_optimizations(optflags.inner.into_inner()).into()
590
}
591
592
#[pyo3(signature = (lambda_post_opt))]
593
fn profile(
594
&self,
595
py: Python<'_>,
596
lambda_post_opt: Option<Py<PyAny>>,
597
) -> PyResult<(PyDataFrame, PyDataFrame)> {
598
let (df, time_df) = py.enter_polars(|| {
599
let ldf = self.ldf.read().clone();
600
if let Some(lambda) = lambda_post_opt {
601
ldf._profile_post_opt(|root, lp_arena, expr_arena, duration_since_start| {
602
post_opt_callback(&lambda, root, lp_arena, expr_arena, duration_since_start)
603
})
604
} else {
605
ldf.profile()
606
}
607
})?;
608
Ok((df.into(), time_df.into()))
609
}
610
611
#[pyo3(signature = (engine, lambda_post_opt))]
612
fn collect(
613
&self,
614
py: Python<'_>,
615
engine: Wrap<Engine>,
616
lambda_post_opt: Option<Py<PyAny>>,
617
) -> PyResult<PyDataFrame> {
618
py.enter_polars_df(|| {
619
let ldf = self.ldf.read().clone();
620
if let Some(lambda) = lambda_post_opt {
621
ldf._collect_post_opt(|root, lp_arena, expr_arena, _| {
622
post_opt_callback(&lambda, root, lp_arena, expr_arena, None)
623
})
624
} else {
625
ldf.collect_with_engine(engine.0)
626
}
627
})
628
}
629
630
#[pyo3(signature = (engine, lambda))]
631
fn collect_with_callback(
632
&self,
633
py: Python<'_>,
634
engine: Wrap<Engine>,
635
lambda: Py<PyAny>,
636
) -> PyResult<()> {
637
py.enter_polars_ok(|| {
638
let ldf = self.ldf.read().clone();
639
640
polars_core::POOL.spawn(move || {
641
let result = ldf
642
.collect_with_engine(engine.0)
643
.map(PyDataFrame::new)
644
.map_err(PyPolarsErr::from);
645
646
Python::attach(|py| match result {
647
Ok(df) => {
648
lambda.call1(py, (df,)).map_err(|err| err.restore(py)).ok();
649
},
650
Err(err) => {
651
lambda
652
.call1(py, (PyErr::from(err),))
653
.map_err(|err| err.restore(py))
654
.ok();
655
},
656
});
657
});
658
})
659
}
660
661
#[cfg(feature = "parquet")]
662
#[pyo3(signature = (
663
target, sink_options, compression, compression_level, statistics, row_group_size, data_page_size,
664
metadata, field_overwrites,
665
))]
666
fn sink_parquet(
667
&self,
668
py: Python<'_>,
669
target: PyFileSinkDestination,
670
sink_options: PySinkOptions,
671
compression: &str,
672
compression_level: Option<i32>,
673
statistics: Wrap<StatisticsOptions>,
674
row_group_size: Option<usize>,
675
data_page_size: Option<usize>,
676
metadata: Wrap<Option<KeyValueMetadata>>,
677
field_overwrites: Vec<Wrap<ParquetFieldOverwrites>>,
678
) -> PyResult<PyLazyFrame> {
679
let compression = parse_parquet_compression(compression, compression_level)?;
680
681
let options = ParquetWriteOptions {
682
compression,
683
statistics: statistics.0,
684
row_group_size,
685
data_page_size,
686
key_value_metadata: metadata.0,
687
field_overwrites: field_overwrites.into_iter().map(|f| f.0).collect(),
688
};
689
690
let target = target.extract_file_sink_destination()?;
691
let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
692
693
py.enter_polars(|| {
694
self.ldf
695
.read()
696
.clone()
697
.sink(target, FileType::Parquet(options), unified_sink_args)
698
.into()
699
})
700
.map(Into::into)
701
.map_err(Into::into)
702
}
703
704
#[cfg(feature = "ipc")]
705
#[pyo3(signature = (
706
target, sink_options, compression, compat_level
707
))]
708
fn sink_ipc(
709
&self,
710
py: Python<'_>,
711
target: PyFileSinkDestination,
712
sink_options: PySinkOptions,
713
compression: Wrap<Option<IpcCompression>>,
714
compat_level: PyCompatLevel,
715
) -> PyResult<PyLazyFrame> {
716
let options = IpcWriterOptions {
717
compression: compression.0,
718
compat_level: compat_level.0,
719
..Default::default()
720
};
721
722
let target = target.extract_file_sink_destination()?;
723
let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
724
725
py.enter_polars(|| {
726
self.ldf
727
.read()
728
.clone()
729
.sink(target, FileType::Ipc(options), unified_sink_args)
730
.into()
731
})
732
.map(Into::into)
733
.map_err(Into::into)
734
}
735
736
#[cfg(feature = "csv")]
737
#[pyo3(signature = (
738
target, sink_options, include_bom, include_header, separator, line_terminator, quote_char, batch_size,
739
datetime_format, date_format, time_format, float_scientific, float_precision, decimal_comma, null_value,
740
quote_style
741
))]
742
fn sink_csv(
743
&self,
744
py: Python<'_>,
745
target: PyFileSinkDestination,
746
sink_options: PySinkOptions,
747
include_bom: bool,
748
include_header: bool,
749
separator: u8,
750
line_terminator: String,
751
quote_char: u8,
752
batch_size: NonZeroUsize,
753
datetime_format: Option<String>,
754
date_format: Option<String>,
755
time_format: Option<String>,
756
float_scientific: Option<bool>,
757
float_precision: Option<usize>,
758
decimal_comma: bool,
759
null_value: Option<String>,
760
quote_style: Option<Wrap<QuoteStyle>>,
761
) -> PyResult<PyLazyFrame> {
762
let quote_style = quote_style.map_or(QuoteStyle::default(), |wrap| wrap.0);
763
let null_value = null_value.unwrap_or(SerializeOptions::default().null);
764
765
let serialize_options = SerializeOptions {
766
date_format,
767
time_format,
768
datetime_format,
769
float_scientific,
770
float_precision,
771
decimal_comma,
772
separator,
773
quote_char,
774
null: null_value,
775
line_terminator,
776
quote_style,
777
};
778
779
let options = CsvWriterOptions {
780
include_bom,
781
include_header,
782
batch_size,
783
serialize_options,
784
};
785
786
let target = target.extract_file_sink_destination()?;
787
let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
788
789
py.enter_polars(|| {
790
self.ldf
791
.read()
792
.clone()
793
.sink(target, FileType::Csv(options), unified_sink_args)
794
.into()
795
})
796
.map(Into::into)
797
.map_err(Into::into)
798
}
799
800
#[allow(clippy::too_many_arguments)]
801
#[cfg(feature = "json")]
802
#[pyo3(signature = (target, sink_options))]
803
fn sink_json(
804
&self,
805
py: Python<'_>,
806
target: PyFileSinkDestination,
807
sink_options: PySinkOptions,
808
) -> PyResult<PyLazyFrame> {
809
let options = JsonWriterOptions {};
810
811
let target = target.extract_file_sink_destination()?;
812
let unified_sink_args = sink_options.extract_unified_sink_args(target.cloud_scheme())?;
813
814
py.enter_polars(|| {
815
self.ldf
816
.read()
817
.clone()
818
.sink(target, FileType::Json(options), unified_sink_args)
819
.into()
820
})
821
.map(Into::into)
822
.map_err(Into::into)
823
}
824
825
#[pyo3(signature = (function, maintain_order, chunk_size))]
826
pub fn sink_batches(
827
&self,
828
py: Python<'_>,
829
function: Py<PyAny>,
830
maintain_order: bool,
831
chunk_size: Option<NonZeroUsize>,
832
) -> PyResult<PyLazyFrame> {
833
let ldf = self.ldf.read().clone();
834
py.enter_polars(|| {
835
ldf.sink_batches(
836
PlanCallback::new_python(PythonObject(function)),
837
maintain_order,
838
chunk_size,
839
)
840
})
841
.map(Into::into)
842
.map_err(Into::into)
843
}
844
845
fn filter(&self, predicate: PyExpr) -> Self {
846
self.ldf.read().clone().filter(predicate.inner).into()
847
}
848
849
fn remove(&self, predicate: PyExpr) -> Self {
850
let ldf = self.ldf.read().clone();
851
ldf.remove(predicate.inner).into()
852
}
853
854
fn select(&self, exprs: Vec<PyExpr>) -> Self {
855
let ldf = self.ldf.read().clone();
856
let exprs = exprs.to_exprs();
857
ldf.select(exprs).into()
858
}
859
860
fn select_seq(&self, exprs: Vec<PyExpr>) -> Self {
861
let ldf = self.ldf.read().clone();
862
let exprs = exprs.to_exprs();
863
ldf.select_seq(exprs).into()
864
}
865
866
fn group_by(&self, by: Vec<PyExpr>, maintain_order: bool) -> PyLazyGroupBy {
867
let ldf = self.ldf.read().clone();
868
let by = by.to_exprs();
869
let lazy_gb = if maintain_order {
870
ldf.group_by_stable(by)
871
} else {
872
ldf.group_by(by)
873
};
874
875
PyLazyGroupBy { lgb: Some(lazy_gb) }
876
}
877
878
fn rolling(
879
&self,
880
index_column: PyExpr,
881
period: &str,
882
offset: &str,
883
closed: Wrap<ClosedWindow>,
884
by: Vec<PyExpr>,
885
) -> PyResult<PyLazyGroupBy> {
886
let closed_window = closed.0;
887
let ldf = self.ldf.read().clone();
888
let by = by
889
.into_iter()
890
.map(|pyexpr| pyexpr.inner)
891
.collect::<Vec<_>>();
892
let lazy_gb = ldf.rolling(
893
index_column.inner,
894
by,
895
RollingGroupOptions {
896
index_column: "".into(),
897
period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
898
offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
899
closed_window,
900
},
901
);
902
903
Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
904
}
905
906
fn group_by_dynamic(
907
&self,
908
index_column: PyExpr,
909
every: &str,
910
period: &str,
911
offset: &str,
912
label: Wrap<Label>,
913
include_boundaries: bool,
914
closed: Wrap<ClosedWindow>,
915
group_by: Vec<PyExpr>,
916
start_by: Wrap<StartBy>,
917
) -> PyResult<PyLazyGroupBy> {
918
let closed_window = closed.0;
919
let group_by = group_by
920
.into_iter()
921
.map(|pyexpr| pyexpr.inner)
922
.collect::<Vec<_>>();
923
let ldf = self.ldf.read().clone();
924
let lazy_gb = ldf.group_by_dynamic(
925
index_column.inner,
926
group_by,
927
DynamicGroupOptions {
928
every: Duration::try_parse(every).map_err(PyPolarsErr::from)?,
929
period: Duration::try_parse(period).map_err(PyPolarsErr::from)?,
930
offset: Duration::try_parse(offset).map_err(PyPolarsErr::from)?,
931
label: label.0,
932
include_boundaries,
933
closed_window,
934
start_by: start_by.0,
935
..Default::default()
936
},
937
);
938
939
Ok(PyLazyGroupBy { lgb: Some(lazy_gb) })
940
}
941
942
fn with_context(&self, contexts: Vec<Self>) -> Self {
943
let contexts = contexts
944
.into_iter()
945
.map(|ldf| ldf.ldf.into_inner())
946
.collect::<Vec<_>>();
947
self.ldf.read().clone().with_context(contexts).into()
948
}
949
950
#[cfg(feature = "asof_join")]
951
#[pyo3(signature = (other, left_on, right_on, left_by, right_by, allow_parallel, force_parallel, suffix, strategy, tolerance, tolerance_str, coalesce, allow_eq, check_sortedness))]
952
fn join_asof(
953
&self,
954
other: Self,
955
left_on: PyExpr,
956
right_on: PyExpr,
957
left_by: Option<Vec<PyBackedStr>>,
958
right_by: Option<Vec<PyBackedStr>>,
959
allow_parallel: bool,
960
force_parallel: bool,
961
suffix: String,
962
strategy: Wrap<AsofStrategy>,
963
tolerance: Option<Wrap<AnyValue<'_>>>,
964
tolerance_str: Option<String>,
965
coalesce: bool,
966
allow_eq: bool,
967
check_sortedness: bool,
968
) -> PyResult<Self> {
969
let coalesce = if coalesce {
970
JoinCoalesce::CoalesceColumns
971
} else {
972
JoinCoalesce::KeepColumns
973
};
974
let ldf = self.ldf.read().clone();
975
let other = other.ldf.into_inner();
976
let left_on = left_on.inner;
977
let right_on = right_on.inner;
978
Ok(ldf
979
.join_builder()
980
.with(other)
981
.left_on([left_on])
982
.right_on([right_on])
983
.allow_parallel(allow_parallel)
984
.force_parallel(force_parallel)
985
.coalesce(coalesce)
986
.how(JoinType::AsOf(Box::new(AsOfOptions {
987
strategy: strategy.0,
988
left_by: left_by.map(strings_to_pl_smallstr),
989
right_by: right_by.map(strings_to_pl_smallstr),
990
tolerance: tolerance.map(|t| {
991
let av = t.0.into_static();
992
let dtype = av.dtype();
993
Scalar::new(dtype, av)
994
}),
995
tolerance_str: tolerance_str.map(|s| s.into()),
996
allow_eq,
997
check_sortedness,
998
})))
999
.suffix(suffix)
1000
.finish()
1001
.into())
1002
}
1003
1004
#[pyo3(signature = (other, left_on, right_on, allow_parallel, force_parallel, nulls_equal, how, suffix, validate, maintain_order, coalesce=None))]
1005
fn join(
1006
&self,
1007
other: Self,
1008
left_on: Vec<PyExpr>,
1009
right_on: Vec<PyExpr>,
1010
allow_parallel: bool,
1011
force_parallel: bool,
1012
nulls_equal: bool,
1013
how: Wrap<JoinType>,
1014
suffix: String,
1015
validate: Wrap<JoinValidation>,
1016
maintain_order: Wrap<MaintainOrderJoin>,
1017
coalesce: Option<bool>,
1018
) -> PyResult<Self> {
1019
let coalesce = match coalesce {
1020
None => JoinCoalesce::JoinSpecific,
1021
Some(true) => JoinCoalesce::CoalesceColumns,
1022
Some(false) => JoinCoalesce::KeepColumns,
1023
};
1024
let ldf = self.ldf.read().clone();
1025
let other = other.ldf.into_inner();
1026
let left_on = left_on
1027
.into_iter()
1028
.map(|pyexpr| pyexpr.inner)
1029
.collect::<Vec<_>>();
1030
let right_on = right_on
1031
.into_iter()
1032
.map(|pyexpr| pyexpr.inner)
1033
.collect::<Vec<_>>();
1034
1035
Ok(ldf
1036
.join_builder()
1037
.with(other)
1038
.left_on(left_on)
1039
.right_on(right_on)
1040
.allow_parallel(allow_parallel)
1041
.force_parallel(force_parallel)
1042
.join_nulls(nulls_equal)
1043
.how(how.0)
1044
.suffix(suffix)
1045
.validate(validate.0)
1046
.coalesce(coalesce)
1047
.maintain_order(maintain_order.0)
1048
.finish()
1049
.into())
1050
}
1051
1052
fn join_where(&self, other: Self, predicates: Vec<PyExpr>, suffix: String) -> PyResult<Self> {
1053
let ldf = self.ldf.read().clone();
1054
let other = other.ldf.into_inner();
1055
1056
let predicates = predicates.to_exprs();
1057
1058
Ok(ldf
1059
.join_builder()
1060
.with(other)
1061
.suffix(suffix)
1062
.join_where(predicates)
1063
.into())
1064
}
1065
1066
fn with_columns(&self, exprs: Vec<PyExpr>) -> Self {
1067
let ldf = self.ldf.read().clone();
1068
ldf.with_columns(exprs.to_exprs()).into()
1069
}
1070
1071
fn with_columns_seq(&self, exprs: Vec<PyExpr>) -> Self {
1072
let ldf = self.ldf.read().clone();
1073
ldf.with_columns_seq(exprs.to_exprs()).into()
1074
}
1075
1076
fn match_to_schema<'py>(
1077
&self,
1078
schema: Wrap<Schema>,
1079
missing_columns: &Bound<'py, PyAny>,
1080
missing_struct_fields: &Bound<'py, PyAny>,
1081
extra_columns: Wrap<ExtraColumnsPolicy>,
1082
extra_struct_fields: &Bound<'py, PyAny>,
1083
integer_cast: &Bound<'py, PyAny>,
1084
float_cast: &Bound<'py, PyAny>,
1085
) -> PyResult<Self> {
1086
fn parse_missing_columns<'py>(
1087
schema: &Schema,
1088
missing_columns: &Bound<'py, PyAny>,
1089
) -> PyResult<Vec<MissingColumnsPolicyOrExpr>> {
1090
let mut out = Vec::with_capacity(schema.len());
1091
if let Ok(policy) = missing_columns.extract::<Wrap<MissingColumnsPolicyOrExpr>>() {
1092
out.extend(std::iter::repeat_n(policy.0, schema.len()));
1093
} else if let Ok(dict) = missing_columns.downcast::<PyDict>() {
1094
out.extend(std::iter::repeat_n(
1095
MissingColumnsPolicyOrExpr::Raise,
1096
schema.len(),
1097
));
1098
for (key, value) in dict.iter() {
1099
let key = key.extract::<String>()?;
1100
let value = value.extract::<Wrap<MissingColumnsPolicyOrExpr>>()?;
1101
out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1102
}
1103
} else {
1104
return Err(PyTypeError::new_err("Invalid value for `missing_columns`"));
1105
}
1106
Ok(out)
1107
}
1108
fn parse_missing_struct_fields<'py>(
1109
schema: &Schema,
1110
missing_struct_fields: &Bound<'py, PyAny>,
1111
) -> PyResult<Vec<MissingColumnsPolicy>> {
1112
let mut out = Vec::with_capacity(schema.len());
1113
if let Ok(policy) = missing_struct_fields.extract::<Wrap<MissingColumnsPolicy>>() {
1114
out.extend(std::iter::repeat_n(policy.0, schema.len()));
1115
} else if let Ok(dict) = missing_struct_fields.downcast::<PyDict>() {
1116
out.extend(std::iter::repeat_n(
1117
MissingColumnsPolicy::Raise,
1118
schema.len(),
1119
));
1120
for (key, value) in dict.iter() {
1121
let key = key.extract::<String>()?;
1122
let value = value.extract::<Wrap<MissingColumnsPolicy>>()?;
1123
out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1124
}
1125
} else {
1126
return Err(PyTypeError::new_err(
1127
"Invalid value for `missing_struct_fields`",
1128
));
1129
}
1130
Ok(out)
1131
}
1132
fn parse_extra_struct_fields<'py>(
1133
schema: &Schema,
1134
extra_struct_fields: &Bound<'py, PyAny>,
1135
) -> PyResult<Vec<ExtraColumnsPolicy>> {
1136
let mut out = Vec::with_capacity(schema.len());
1137
if let Ok(policy) = extra_struct_fields.extract::<Wrap<ExtraColumnsPolicy>>() {
1138
out.extend(std::iter::repeat_n(policy.0, schema.len()));
1139
} else if let Ok(dict) = extra_struct_fields.downcast::<PyDict>() {
1140
out.extend(std::iter::repeat_n(ExtraColumnsPolicy::Raise, schema.len()));
1141
for (key, value) in dict.iter() {
1142
let key = key.extract::<String>()?;
1143
let value = value.extract::<Wrap<ExtraColumnsPolicy>>()?;
1144
out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1145
}
1146
} else {
1147
return Err(PyTypeError::new_err(
1148
"Invalid value for `extra_struct_fields`",
1149
));
1150
}
1151
Ok(out)
1152
}
1153
fn parse_cast<'py>(
1154
schema: &Schema,
1155
cast: &Bound<'py, PyAny>,
1156
) -> PyResult<Vec<UpcastOrForbid>> {
1157
let mut out = Vec::with_capacity(schema.len());
1158
if let Ok(policy) = cast.extract::<Wrap<UpcastOrForbid>>() {
1159
out.extend(std::iter::repeat_n(policy.0, schema.len()));
1160
} else if let Ok(dict) = cast.downcast::<PyDict>() {
1161
out.extend(std::iter::repeat_n(UpcastOrForbid::Forbid, schema.len()));
1162
for (key, value) in dict.iter() {
1163
let key = key.extract::<String>()?;
1164
let value = value.extract::<Wrap<UpcastOrForbid>>()?;
1165
out[schema.try_index_of(&key).map_err(to_py_err)?] = value.0;
1166
}
1167
} else {
1168
return Err(PyTypeError::new_err(
1169
"Invalid value for `integer_cast` / `float_cast`",
1170
));
1171
}
1172
Ok(out)
1173
}
1174
1175
let missing_columns = parse_missing_columns(&schema.0, missing_columns)?;
1176
let missing_struct_fields = parse_missing_struct_fields(&schema.0, missing_struct_fields)?;
1177
let extra_struct_fields = parse_extra_struct_fields(&schema.0, extra_struct_fields)?;
1178
let integer_cast = parse_cast(&schema.0, integer_cast)?;
1179
let float_cast = parse_cast(&schema.0, float_cast)?;
1180
1181
let per_column = (0..schema.0.len())
1182
.map(|i| MatchToSchemaPerColumn {
1183
missing_columns: missing_columns[i].clone(),
1184
missing_struct_fields: missing_struct_fields[i],
1185
extra_struct_fields: extra_struct_fields[i],
1186
integer_cast: integer_cast[i],
1187
float_cast: float_cast[i],
1188
})
1189
.collect();
1190
1191
let ldf = self.ldf.read().clone();
1192
Ok(ldf
1193
.match_to_schema(Arc::new(schema.0), per_column, extra_columns.0)
1194
.into())
1195
}
1196
1197
fn pipe_with_schema(&self, callback: Py<PyAny>) -> Self {
1198
let ldf = self.ldf.read().clone();
1199
let function = PythonObject(callback);
1200
ldf.pipe_with_schema(PlanCallback::new_python(function))
1201
.into()
1202
}
1203
1204
fn rename(&self, existing: Vec<String>, new: Vec<String>, strict: bool) -> Self {
1205
let ldf = self.ldf.read().clone();
1206
ldf.rename(existing, new, strict).into()
1207
}
1208
1209
fn reverse(&self) -> Self {
1210
let ldf = self.ldf.read().clone();
1211
ldf.reverse().into()
1212
}
1213
1214
#[pyo3(signature = (n, fill_value=None))]
1215
fn shift(&self, n: PyExpr, fill_value: Option<PyExpr>) -> Self {
1216
let lf = self.ldf.read().clone();
1217
let out = match fill_value {
1218
Some(v) => lf.shift_and_fill(n.inner, v.inner),
1219
None => lf.shift(n.inner),
1220
};
1221
out.into()
1222
}
1223
1224
fn fill_nan(&self, fill_value: PyExpr) -> Self {
1225
let ldf = self.ldf.read().clone();
1226
ldf.fill_nan(fill_value.inner).into()
1227
}
1228
1229
fn min(&self) -> Self {
1230
let ldf = self.ldf.read().clone();
1231
let out = ldf.min();
1232
out.into()
1233
}
1234
1235
fn max(&self) -> Self {
1236
let ldf = self.ldf.read().clone();
1237
let out = ldf.max();
1238
out.into()
1239
}
1240
1241
fn sum(&self) -> Self {
1242
let ldf = self.ldf.read().clone();
1243
let out = ldf.sum();
1244
out.into()
1245
}
1246
1247
fn mean(&self) -> Self {
1248
let ldf = self.ldf.read().clone();
1249
let out = ldf.mean();
1250
out.into()
1251
}
1252
1253
fn std(&self, ddof: u8) -> Self {
1254
let ldf = self.ldf.read().clone();
1255
let out = ldf.std(ddof);
1256
out.into()
1257
}
1258
1259
fn var(&self, ddof: u8) -> Self {
1260
let ldf = self.ldf.read().clone();
1261
let out = ldf.var(ddof);
1262
out.into()
1263
}
1264
1265
fn median(&self) -> Self {
1266
let ldf = self.ldf.read().clone();
1267
let out = ldf.median();
1268
out.into()
1269
}
1270
1271
fn quantile(&self, quantile: PyExpr, interpolation: Wrap<QuantileMethod>) -> Self {
1272
let ldf = self.ldf.read().clone();
1273
let out = ldf.quantile(quantile.inner, interpolation.0);
1274
out.into()
1275
}
1276
1277
fn explode(&self, subset: PySelector, empty_as_null: bool, keep_nulls: bool) -> Self {
1278
self.ldf
1279
.read()
1280
.clone()
1281
.explode(
1282
subset.inner,
1283
ExplodeOptions {
1284
empty_as_null,
1285
keep_nulls,
1286
},
1287
)
1288
.into()
1289
}
1290
1291
fn null_count(&self) -> Self {
1292
let ldf = self.ldf.read().clone();
1293
ldf.null_count().into()
1294
}
1295
1296
#[pyo3(signature = (maintain_order, subset, keep))]
1297
fn unique(
1298
&self,
1299
maintain_order: bool,
1300
subset: Option<Vec<PyExpr>>,
1301
keep: Wrap<UniqueKeepStrategy>,
1302
) -> Self {
1303
let ldf = self.ldf.read().clone();
1304
let subset = subset.map(|exprs| exprs.into_iter().map(|e| e.inner).collect());
1305
match maintain_order {
1306
true => ldf.unique_stable_generic(subset, keep.0),
1307
false => ldf.unique_generic(subset, keep.0),
1308
}
1309
.into()
1310
}
1311
1312
fn drop_nans(&self, subset: Option<PySelector>) -> Self {
1313
self.ldf
1314
.read()
1315
.clone()
1316
.drop_nans(subset.map(|e| e.inner))
1317
.into()
1318
}
1319
1320
fn drop_nulls(&self, subset: Option<PySelector>) -> Self {
1321
self.ldf
1322
.read()
1323
.clone()
1324
.drop_nulls(subset.map(|e| e.inner))
1325
.into()
1326
}
1327
1328
#[pyo3(signature = (offset, len=None))]
1329
fn slice(&self, offset: i64, len: Option<IdxSize>) -> Self {
1330
let ldf = self.ldf.read().clone();
1331
ldf.slice(offset, len.unwrap_or(IdxSize::MAX)).into()
1332
}
1333
1334
fn tail(&self, n: IdxSize) -> Self {
1335
let ldf = self.ldf.read().clone();
1336
ldf.tail(n).into()
1337
}
1338
1339
#[cfg(feature = "pivot")]
1340
#[pyo3(signature = (on, on_columns, index, values, agg, maintain_order, separator))]
1341
fn pivot(
1342
&self,
1343
on: PySelector,
1344
on_columns: PyDataFrame,
1345
index: PySelector,
1346
values: PySelector,
1347
agg: PyExpr,
1348
maintain_order: bool,
1349
separator: String,
1350
) -> Self {
1351
let ldf = self.ldf.read().clone();
1352
ldf.pivot(
1353
on.inner,
1354
Arc::new(on_columns.df.read().clone()),
1355
index.inner,
1356
values.inner,
1357
agg.inner,
1358
maintain_order,
1359
separator.into(),
1360
)
1361
.into()
1362
}
1363
1364
#[cfg(feature = "pivot")]
1365
#[pyo3(signature = (on, index, value_name, variable_name))]
1366
fn unpivot(
1367
&self,
1368
on: Option<PySelector>,
1369
index: PySelector,
1370
value_name: Option<String>,
1371
variable_name: Option<String>,
1372
) -> Self {
1373
let args = UnpivotArgsDSL {
1374
on: on.map(|on| on.inner),
1375
index: index.inner,
1376
value_name: value_name.map(|s| s.into()),
1377
variable_name: variable_name.map(|s| s.into()),
1378
};
1379
1380
let ldf = self.ldf.read().clone();
1381
ldf.unpivot(args).into()
1382
}
1383
1384
#[pyo3(signature = (name, offset=None))]
1385
fn with_row_index(&self, name: &str, offset: Option<IdxSize>) -> Self {
1386
let ldf = self.ldf.read().clone();
1387
ldf.with_row_index(name, offset).into()
1388
}
1389
1390
#[pyo3(signature = (function, predicate_pushdown, projection_pushdown, slice_pushdown, streamable, schema, validate_output))]
1391
fn map_batches(
1392
&self,
1393
function: Py<PyAny>,
1394
predicate_pushdown: bool,
1395
projection_pushdown: bool,
1396
slice_pushdown: bool,
1397
streamable: bool,
1398
schema: Option<Wrap<Schema>>,
1399
validate_output: bool,
1400
) -> Self {
1401
let mut opt = OptFlags::default();
1402
opt.set(OptFlags::PREDICATE_PUSHDOWN, predicate_pushdown);
1403
opt.set(OptFlags::PROJECTION_PUSHDOWN, projection_pushdown);
1404
opt.set(OptFlags::SLICE_PUSHDOWN, slice_pushdown);
1405
opt.set(OptFlags::NEW_STREAMING, streamable);
1406
1407
self.ldf
1408
.read()
1409
.clone()
1410
.map_python(
1411
function.into(),
1412
opt,
1413
schema.map(|s| Arc::new(s.0)),
1414
validate_output,
1415
)
1416
.into()
1417
}
1418
1419
fn drop(&self, columns: PySelector) -> Self {
1420
self.ldf.read().clone().drop(columns.inner).into()
1421
}
1422
1423
fn cast(&self, dtypes: HashMap<PyBackedStr, Wrap<DataType>>, strict: bool) -> Self {
1424
let mut cast_map = PlHashMap::with_capacity(dtypes.len());
1425
cast_map.extend(dtypes.iter().map(|(k, v)| (k.as_ref(), v.0.clone())));
1426
self.ldf.read().clone().cast(cast_map, strict).into()
1427
}
1428
1429
fn cast_all(&self, dtype: PyDataTypeExpr, strict: bool) -> Self {
1430
self.ldf.read().clone().cast_all(dtype.inner, strict).into()
1431
}
1432
1433
fn clone(&self) -> Self {
1434
self.ldf.read().clone().into()
1435
}
1436
1437
fn collect_schema<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyDict>> {
1438
let schema = py.enter_polars(|| self.ldf.write().collect_schema())?;
1439
1440
let schema_dict = PyDict::new(py);
1441
schema.iter_fields().for_each(|fld| {
1442
schema_dict
1443
.set_item(fld.name().as_str(), &Wrap(fld.dtype().clone()))
1444
.unwrap()
1445
});
1446
Ok(schema_dict)
1447
}
1448
1449
fn unnest(&self, columns: PySelector, separator: Option<&str>) -> Self {
1450
self.ldf
1451
.read()
1452
.clone()
1453
.unnest(columns.inner, separator.map(PlSmallStr::from_str))
1454
.into()
1455
}
1456
1457
fn count(&self) -> Self {
1458
let ldf = self.ldf.read().clone();
1459
ldf.count().into()
1460
}
1461
1462
#[cfg(feature = "merge_sorted")]
1463
fn merge_sorted(&self, other: Self, key: &str) -> PyResult<Self> {
1464
let out = self
1465
.ldf
1466
.read()
1467
.clone()
1468
.merge_sorted(other.ldf.into_inner(), key)
1469
.map_err(PyPolarsErr::from)?;
1470
Ok(out.into())
1471
}
1472
1473
fn hint_sorted(
1474
&self,
1475
columns: Vec<String>,
1476
descending: Vec<bool>,
1477
nulls_last: Vec<bool>,
1478
) -> PyResult<Self> {
1479
if columns.len() != descending.len() && descending.len() != 1 {
1480
return Err(PyValueError::new_err(
1481
"`set_sorted` expects the same amount of `columns` as `descending` values.",
1482
));
1483
}
1484
if columns.len() != nulls_last.len() && nulls_last.len() != 1 {
1485
return Err(PyValueError::new_err(
1486
"`set_sorted` expects the same amount of `columns` as `nulls_last` values.",
1487
));
1488
}
1489
1490
let mut sorted = columns
1491
.iter()
1492
.map(|c| Sorted {
1493
column: PlSmallStr::from_str(c.as_str()),
1494
descending: Some(false),
1495
nulls_last: Some(false),
1496
})
1497
.collect::<Vec<_>>();
1498
1499
if !columns.is_empty() {
1500
if descending.len() != 1 {
1501
sorted
1502
.iter_mut()
1503
.zip(descending)
1504
.for_each(|(s, d)| s.descending = Some(d));
1505
} else if descending[0] {
1506
sorted.iter_mut().for_each(|s| s.descending = Some(true));
1507
}
1508
1509
if nulls_last.len() != 1 {
1510
sorted
1511
.iter_mut()
1512
.zip(nulls_last)
1513
.for_each(|(s, d)| s.nulls_last = Some(d));
1514
} else if nulls_last[0] {
1515
sorted.iter_mut().for_each(|s| s.nulls_last = Some(true));
1516
}
1517
}
1518
1519
let out = self
1520
.ldf
1521
.read()
1522
.clone()
1523
.hint(HintIR::Sorted(sorted.into()))
1524
.map_err(PyPolarsErr::from)?;
1525
Ok(out.into())
1526
}
1527
}
1528
1529
#[cfg(feature = "parquet")]
1530
impl<'py> FromPyObject<'py> for Wrap<polars_io::parquet::write::ParquetFieldOverwrites> {
1531
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
1532
use polars_io::parquet::write::ParquetFieldOverwrites;
1533
1534
let parsed = ob.extract::<pyo3::Bound<'_, PyDict>>()?;
1535
1536
let name = PyDictMethods::get_item(&parsed, "name")?
1537
.map(|v| PyResult::Ok(v.extract::<String>()?.into()))
1538
.transpose()?;
1539
let children = PyDictMethods::get_item(&parsed, "children")?.map_or(
1540
PyResult::Ok(ChildFieldOverwrites::None),
1541
|v| {
1542
Ok(
1543
if let Ok(overwrites) = v.extract::<Vec<Wrap<ParquetFieldOverwrites>>>() {
1544
ChildFieldOverwrites::Struct(overwrites.into_iter().map(|v| v.0).collect())
1545
} else {
1546
ChildFieldOverwrites::ListLike(Box::new(
1547
v.extract::<Wrap<ParquetFieldOverwrites>>()?.0,
1548
))
1549
},
1550
)
1551
},
1552
)?;
1553
1554
let field_id = PyDictMethods::get_item(&parsed, "field_id")?
1555
.map(|v| v.extract::<i32>())
1556
.transpose()?;
1557
1558
let metadata = PyDictMethods::get_item(&parsed, "metadata")?
1559
.map(|v| v.extract::<Vec<(String, Option<String>)>>())
1560
.transpose()?;
1561
let metadata = metadata.map(|v| {
1562
v.into_iter()
1563
.map(|v| MetadataKeyValue {
1564
key: v.0.into(),
1565
value: v.1.map(|v| v.into()),
1566
})
1567
.collect()
1568
});
1569
1570
let required = PyDictMethods::get_item(&parsed, "required")?
1571
.map(|v| v.extract::<bool>())
1572
.transpose()?;
1573
1574
Ok(Wrap(ParquetFieldOverwrites {
1575
name,
1576
children,
1577
field_id,
1578
metadata,
1579
required,
1580
}))
1581
}
1582
}
1583
1584