Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/options/sink.rs
6940 views
1
use std::fmt;
2
use std::hash::{Hash, Hasher};
3
use std::path::PathBuf;
4
use std::sync::Arc;
5
6
use polars_core::error::PolarsResult;
7
use polars_core::frame::DataFrame;
8
use polars_core::prelude::DataType;
9
use polars_core::scalar::Scalar;
10
use polars_io::cloud::CloudOptions;
11
use polars_io::utils::file::{DynWriteable, Writeable};
12
use polars_io::utils::sync_on_close::SyncOnCloseType;
13
use polars_utils::IdxSize;
14
use polars_utils::arena::Arena;
15
use polars_utils::pl_str::PlSmallStr;
16
use polars_utils::plpath::PlPath;
17
18
use super::{ExprIR, FileType};
19
use crate::dsl::{AExpr, Expr, SpecialEq};
20
21
/// Options that apply to all sinks.
22
#[derive(Clone, PartialEq, Eq, Debug, Hash)]
23
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
24
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
25
pub struct SinkOptions {
26
/// Call sync when closing the file.
27
pub sync_on_close: SyncOnCloseType,
28
29
/// The output file needs to maintain order of the data that comes in.
30
pub maintain_order: bool,
31
32
/// Recursively create all the directories in the path.
33
pub mkdir: bool,
34
}
35
36
impl Default for SinkOptions {
37
fn default() -> Self {
38
Self {
39
sync_on_close: Default::default(),
40
maintain_order: true,
41
mkdir: false,
42
}
43
}
44
}
45
46
type DynSinkTarget = SpecialEq<Arc<std::sync::Mutex<Option<Box<dyn DynWriteable>>>>>;
47
48
#[derive(Clone, PartialEq, Eq)]
49
pub enum SinkTarget {
50
Path(PlPath),
51
Dyn(DynSinkTarget),
52
}
53
54
impl SinkTarget {
55
pub fn open_into_writeable(
56
&self,
57
sink_options: &SinkOptions,
58
cloud_options: Option<&CloudOptions>,
59
) -> PolarsResult<Writeable> {
60
match self {
61
SinkTarget::Path(addr) => {
62
if sink_options.mkdir {
63
polars_io::utils::mkdir::mkdir_recursive(addr.as_ref())?;
64
}
65
66
polars_io::utils::file::Writeable::try_new(addr.as_ref(), cloud_options)
67
},
68
SinkTarget::Dyn(memory_writer) => Ok(Writeable::Dyn(
69
memory_writer.lock().unwrap().take().unwrap(),
70
)),
71
}
72
}
73
74
#[cfg(not(feature = "cloud"))]
75
pub async fn open_into_writeable_async(
76
&self,
77
sink_options: &SinkOptions,
78
cloud_options: Option<&CloudOptions>,
79
) -> PolarsResult<Writeable> {
80
self.open_into_writeable(sink_options, cloud_options)
81
}
82
83
#[cfg(feature = "cloud")]
84
pub async fn open_into_writeable_async(
85
&self,
86
sink_options: &SinkOptions,
87
cloud_options: Option<&CloudOptions>,
88
) -> PolarsResult<Writeable> {
89
match self {
90
SinkTarget::Path(addr) => {
91
if sink_options.mkdir {
92
polars_io::utils::mkdir::tokio_mkdir_recursive(addr.as_ref()).await?;
93
}
94
95
polars_io::utils::file::Writeable::try_new(addr.as_ref(), cloud_options)
96
},
97
SinkTarget::Dyn(memory_writer) => Ok(Writeable::Dyn(
98
memory_writer.lock().unwrap().take().unwrap(),
99
)),
100
}
101
}
102
103
pub fn to_display_string(&self) -> String {
104
match self {
105
Self::Path(p) => p.display().to_string(),
106
Self::Dyn(_) => "dynamic-target".to_string(),
107
}
108
}
109
}
110
111
impl fmt::Debug for SinkTarget {
112
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113
f.write_str("SinkTarget::")?;
114
match self {
115
Self::Path(p) => write!(f, "Path({p:?})"),
116
Self::Dyn(_) => f.write_str("Dyn"),
117
}
118
}
119
}
120
121
impl std::hash::Hash for SinkTarget {
122
fn hash<H: Hasher>(&self, state: &mut H) {
123
std::mem::discriminant(self).hash(state);
124
match self {
125
Self::Path(p) => p.hash(state),
126
Self::Dyn(p) => Arc::as_ptr(p).hash(state),
127
}
128
}
129
}
130
131
#[cfg(feature = "serde")]
132
impl serde::Serialize for SinkTarget {
133
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
134
where
135
S: serde::Serializer,
136
{
137
match self {
138
Self::Path(p) => p.serialize(serializer),
139
Self::Dyn(_) => Err(serde::ser::Error::custom(
140
"cannot serialize in-memory sink target",
141
)),
142
}
143
}
144
}
145
146
#[cfg(feature = "serde")]
147
impl<'de> serde::Deserialize<'de> for SinkTarget {
148
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
149
where
150
D: serde::Deserializer<'de>,
151
{
152
Ok(Self::Path(PlPath::deserialize(deserializer)?))
153
}
154
}
155
156
#[cfg(feature = "dsl-schema")]
157
impl schemars::JsonSchema for SinkTarget {
158
fn schema_name() -> String {
159
"SinkTarget".to_owned()
160
}
161
162
fn schema_id() -> std::borrow::Cow<'static, str> {
163
std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "SinkTarget"))
164
}
165
166
fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
167
PathBuf::json_schema(generator)
168
}
169
}
170
171
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
172
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
173
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
174
pub struct FileSinkType {
175
pub target: SinkTarget,
176
pub file_type: FileType,
177
pub sink_options: SinkOptions,
178
pub cloud_options: Option<polars_io::cloud::CloudOptions>,
179
}
180
181
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
182
#[derive(Clone, Debug, PartialEq)]
183
pub enum SinkTypeIR {
184
Memory,
185
File(FileSinkType),
186
#[cfg_attr(all(feature = "serde", not(feature = "ir_serde")), serde(skip))]
187
Partition(PartitionSinkTypeIR),
188
}
189
190
#[cfg_attr(feature = "python", pyo3::pyclass)]
191
#[derive(Clone)]
192
pub struct PartitionTargetContextKey {
193
pub name: PlSmallStr,
194
pub raw_value: Scalar,
195
}
196
197
#[cfg_attr(feature = "python", pyo3::pyclass)]
198
pub struct PartitionTargetContext {
199
pub file_idx: usize,
200
pub part_idx: usize,
201
pub in_part_idx: usize,
202
pub keys: Vec<PartitionTargetContextKey>,
203
pub file_path: String,
204
pub full_path: PlPath,
205
}
206
207
#[cfg(feature = "python")]
208
#[pyo3::pymethods]
209
impl PartitionTargetContext {
210
#[getter]
211
pub fn file_idx(&self) -> usize {
212
self.file_idx
213
}
214
#[getter]
215
pub fn part_idx(&self) -> usize {
216
self.part_idx
217
}
218
#[getter]
219
pub fn in_part_idx(&self) -> usize {
220
self.in_part_idx
221
}
222
#[getter]
223
pub fn keys(&self) -> Vec<PartitionTargetContextKey> {
224
self.keys.clone()
225
}
226
#[getter]
227
pub fn file_path(&self) -> &str {
228
self.file_path.as_str()
229
}
230
#[getter]
231
pub fn full_path(&self) -> &str {
232
self.full_path.to_str()
233
}
234
}
235
#[cfg(feature = "python")]
236
#[pyo3::pymethods]
237
impl PartitionTargetContextKey {
238
#[getter]
239
pub fn name(&self) -> &str {
240
self.name.as_str()
241
}
242
#[getter]
243
pub fn str_value(&self) -> pyo3::PyResult<String> {
244
let value = self
245
.raw_value
246
.clone()
247
.into_series(PlSmallStr::EMPTY)
248
.strict_cast(&DataType::String)
249
.map_err(|err| pyo3::exceptions::PyRuntimeError::new_err(err.to_string()))?;
250
let value = value.str().unwrap();
251
let value = value.get(0).unwrap_or("null").as_bytes();
252
let value = percent_encoding::percent_encode(value, polars_io::utils::URL_ENCODE_CHAR_SET);
253
Ok(value.to_string())
254
}
255
#[getter]
256
pub fn raw_value(&self) -> pyo3::PyObject {
257
let converter = polars_core::chunked_array::object::registry::get_pyobject_converter();
258
*(converter.as_ref())(self.raw_value.as_any_value())
259
.downcast::<pyo3::PyObject>()
260
.unwrap()
261
}
262
}
263
264
#[derive(Clone, Debug, PartialEq)]
265
pub enum PartitionTargetCallback {
266
Rust(
267
SpecialEq<
268
Arc<
269
dyn Fn(PartitionTargetContext) -> PolarsResult<PartitionTargetCallbackResult>
270
+ Send
271
+ Sync,
272
>,
273
>,
274
),
275
#[cfg(feature = "python")]
276
Python(polars_utils::python_function::PythonFunction),
277
}
278
279
#[cfg_attr(feature = "python", pyo3::pyclass)]
280
pub struct SinkWritten {
281
pub file_idx: usize,
282
pub part_idx: usize,
283
pub in_part_idx: usize,
284
pub keys: Vec<PartitionTargetContextKey>,
285
pub file_path: PathBuf,
286
pub full_path: PathBuf,
287
pub num_rows: usize,
288
pub file_size: usize,
289
pub gathered: Option<DataFrame>,
290
}
291
292
#[cfg_attr(feature = "python", pyo3::pyclass)]
293
pub struct SinkFinishContext {
294
pub written: Vec<SinkWritten>,
295
}
296
297
#[derive(Clone, Debug, PartialEq)]
298
pub enum SinkFinishCallback {
299
Rust(SpecialEq<Arc<dyn Fn(DataFrame) -> PolarsResult<()> + Send + Sync>>),
300
#[cfg(feature = "python")]
301
Python(polars_utils::python_function::PythonFunction),
302
}
303
304
impl SinkFinishCallback {
305
pub fn call(&self, df: DataFrame) -> PolarsResult<()> {
306
match self {
307
Self::Rust(f) => f(df),
308
#[cfg(feature = "python")]
309
Self::Python(f) => pyo3::Python::with_gil(|py| {
310
let converter =
311
polars_utils::python_convert_registry::get_python_convert_registry();
312
let df = (converter.to_py.df)(Box::new(df) as Box<dyn std::any::Any>)?;
313
f.call1(py, (df,))?;
314
PolarsResult::Ok(())
315
}),
316
}
317
}
318
}
319
320
#[derive(Clone)]
321
pub enum PartitionTargetCallbackResult {
322
Str(String),
323
Dyn(DynSinkTarget),
324
}
325
326
impl PartitionTargetCallback {
327
pub fn call(&self, ctx: PartitionTargetContext) -> PolarsResult<PartitionTargetCallbackResult> {
328
match self {
329
Self::Rust(f) => f(ctx),
330
#[cfg(feature = "python")]
331
Self::Python(f) => pyo3::Python::with_gil(|py| {
332
let partition_target = f.call1(py, (ctx,))?;
333
let converter =
334
polars_utils::python_convert_registry::get_python_convert_registry();
335
let partition_target =
336
(converter.from_py.partition_target_cb_result)(partition_target)?;
337
let partition_target = partition_target
338
.downcast_ref::<PartitionTargetCallbackResult>()
339
.unwrap()
340
.clone();
341
PolarsResult::Ok(partition_target)
342
}),
343
}
344
}
345
}
346
347
#[cfg(feature = "serde")]
348
impl serde::Serialize for SinkFinishCallback {
349
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
350
where
351
S: serde::Serializer,
352
{
353
use serde::ser::Error;
354
355
#[cfg(feature = "python")]
356
if let Self::Python(v) = self {
357
return v.serialize(_serializer);
358
}
359
360
Err(S::Error::custom(format!("cannot serialize {self:?}")))
361
}
362
}
363
364
#[cfg(feature = "serde")]
365
impl<'de> serde::Deserialize<'de> for SinkFinishCallback {
366
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
367
where
368
D: serde::Deserializer<'de>,
369
{
370
#[cfg(feature = "python")]
371
{
372
Ok(Self::Python(
373
polars_utils::python_function::PythonFunction::deserialize(_deserializer)?,
374
))
375
}
376
#[cfg(not(feature = "python"))]
377
{
378
use serde::de::Error;
379
Err(D::Error::custom(
380
"cannot deserialize PartitionOutputCallback",
381
))
382
}
383
}
384
}
385
386
#[cfg(feature = "dsl-schema")]
387
impl schemars::JsonSchema for SinkFinishCallback {
388
fn schema_name() -> String {
389
"PartitionTargetCallback".to_owned()
390
}
391
392
fn schema_id() -> std::borrow::Cow<'static, str> {
393
std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "SinkFinishCallback"))
394
}
395
396
fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
397
Vec::<u8>::json_schema(generator)
398
}
399
}
400
401
#[cfg(feature = "serde")]
402
impl<'de> serde::Deserialize<'de> for PartitionTargetCallback {
403
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
404
where
405
D: serde::Deserializer<'de>,
406
{
407
#[cfg(feature = "python")]
408
{
409
Ok(Self::Python(
410
polars_utils::python_function::PythonFunction::deserialize(_deserializer)?,
411
))
412
}
413
#[cfg(not(feature = "python"))]
414
{
415
use serde::de::Error;
416
Err(D::Error::custom(
417
"cannot deserialize PartitionOutputCallback",
418
))
419
}
420
}
421
}
422
423
#[cfg(feature = "serde")]
424
impl serde::Serialize for PartitionTargetCallback {
425
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
426
where
427
S: serde::Serializer,
428
{
429
use serde::ser::Error;
430
431
#[cfg(feature = "python")]
432
if let Self::Python(v) = self {
433
return v.serialize(_serializer);
434
}
435
436
Err(S::Error::custom(format!("cannot serialize {self:?}")))
437
}
438
}
439
440
#[cfg(feature = "dsl-schema")]
441
impl schemars::JsonSchema for PartitionTargetCallback {
442
fn schema_name() -> String {
443
"PartitionTargetCallback".to_owned()
444
}
445
446
fn schema_id() -> std::borrow::Cow<'static, str> {
447
std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "PartitionTargetCallback"))
448
}
449
450
fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
451
Vec::<u8>::json_schema(generator)
452
}
453
}
454
455
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
456
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
457
#[derive(Clone, Debug, PartialEq)]
458
pub struct SortColumn {
459
pub expr: Expr,
460
pub descending: bool,
461
pub nulls_last: bool,
462
}
463
464
#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]
465
#[derive(Clone, Debug, PartialEq)]
466
pub struct SortColumnIR {
467
pub expr: ExprIR,
468
pub descending: bool,
469
pub nulls_last: bool,
470
}
471
472
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
473
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
474
#[derive(Clone, Debug, PartialEq)]
475
pub struct PartitionSinkType {
476
pub base_path: Arc<PlPath>,
477
pub file_path_cb: Option<PartitionTargetCallback>,
478
pub file_type: FileType,
479
pub sink_options: SinkOptions,
480
pub variant: PartitionVariant,
481
pub cloud_options: Option<polars_io::cloud::CloudOptions>,
482
pub per_partition_sort_by: Option<Vec<SortColumn>>,
483
pub finish_callback: Option<SinkFinishCallback>,
484
}
485
486
#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]
487
#[derive(Clone, Debug, PartialEq)]
488
pub struct PartitionSinkTypeIR {
489
pub base_path: Arc<PlPath>,
490
pub file_path_cb: Option<PartitionTargetCallback>,
491
pub file_type: FileType,
492
pub sink_options: SinkOptions,
493
pub variant: PartitionVariantIR,
494
pub cloud_options: Option<polars_io::cloud::CloudOptions>,
495
pub per_partition_sort_by: Option<Vec<SortColumnIR>>,
496
pub finish_callback: Option<SinkFinishCallback>,
497
}
498
499
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
500
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
501
#[derive(Clone, Debug, PartialEq)]
502
pub enum SinkType {
503
Memory,
504
File(FileSinkType),
505
Partition(PartitionSinkType),
506
}
507
508
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
509
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
510
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
511
pub enum PartitionVariant {
512
MaxSize(IdxSize),
513
Parted {
514
key_exprs: Vec<Expr>,
515
include_key: bool,
516
},
517
ByKey {
518
key_exprs: Vec<Expr>,
519
include_key: bool,
520
},
521
}
522
523
#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]
524
#[derive(Clone, Debug, PartialEq, Eq)]
525
pub enum PartitionVariantIR {
526
MaxSize(IdxSize),
527
Parted {
528
key_exprs: Vec<ExprIR>,
529
include_key: bool,
530
},
531
ByKey {
532
key_exprs: Vec<ExprIR>,
533
include_key: bool,
534
},
535
}
536
537
#[cfg(feature = "cse")]
538
impl SinkTypeIR {
539
pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {
540
std::mem::discriminant(self).hash(state);
541
match self {
542
Self::Memory => {},
543
Self::File(f) => f.hash(state),
544
Self::Partition(f) => f.traverse_and_hash(expr_arena, state),
545
}
546
}
547
}
548
549
impl SinkTypeIR {
550
pub fn maintain_order(&self) -> bool {
551
match self {
552
SinkTypeIR::Memory => true,
553
SinkTypeIR::File(s) => s.sink_options.maintain_order,
554
SinkTypeIR::Partition(s) => s.sink_options.maintain_order,
555
}
556
}
557
}
558
559
#[cfg(feature = "cse")]
560
impl PartitionSinkTypeIR {
561
pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {
562
self.file_type.hash(state);
563
self.sink_options.hash(state);
564
self.variant.traverse_and_hash(expr_arena, state);
565
self.cloud_options.hash(state);
566
std::mem::discriminant(&self.per_partition_sort_by).hash(state);
567
if let Some(v) = &self.per_partition_sort_by {
568
v.len().hash(state);
569
for v in v {
570
v.traverse_and_hash(expr_arena, state);
571
}
572
}
573
}
574
}
575
576
#[cfg(feature = "cse")]
577
impl SortColumnIR {
578
pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {
579
self.expr.traverse_and_hash(expr_arena, state);
580
self.descending.hash(state);
581
self.nulls_last.hash(state);
582
}
583
}
584
585
impl PartitionVariantIR {
586
#[cfg(feature = "cse")]
587
pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {
588
std::mem::discriminant(self).hash(state);
589
match self {
590
Self::MaxSize(size) => size.hash(state),
591
Self::Parted {
592
key_exprs,
593
include_key,
594
}
595
| Self::ByKey {
596
key_exprs,
597
include_key,
598
} => {
599
include_key.hash(state);
600
for key_expr in key_exprs.as_slice() {
601
key_expr.traverse_and_hash(expr_arena, state);
602
}
603
},
604
}
605
}
606
}
607
608
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
609
#[derive(Clone, Debug)]
610
pub struct FileSinkOptions {
611
pub path: Arc<PathBuf>,
612
pub file_type: FileType,
613
}
614
615