Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/serializable_plan.rs
7884 views
1
use polars_utils::unique_id::UniqueId;
2
use recursive::recursive;
3
use serde::{Deserialize, Serialize};
4
use slotmap::{SecondaryMap, SlotMap, new_key_type};
5
6
use super::*;
7
8
new_key_type! {
9
/// A key type for identifying DataFrame nodes in a serialized DSL plan.
10
pub(crate) struct DataFrameKey;
11
12
/// A key type for identifying DslPlan nodes in a serialized DSL plan.
13
pub(crate) struct DslPlanKey;
14
}
15
16
/// A representation of DslPlan that does not contain any `Arc` pointers, and
17
/// instead uses indices to refer to DataFrames and other DslPlan nodes.
18
///
19
/// This data structure mirrors the `DslPlan` enum, but uses `DataFrameKey` and
20
/// `DslPlanKey` to refer to DataFrames and other DslPlan nodes, respectively.
21
/// We it like this, because serde does not support the keeping of a global
22
/// state during (de)serialization. Instead, we do a manual conversion to a
23
/// serde-compatible representation, and then let serde handle the rest.
24
#[derive(Debug)]
25
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
26
pub(crate) struct SerializableDslPlan {
27
pub(crate) root: DslPlanKey,
28
pub(crate) dataframes: SlotMap<DataFrameKey, DataFrameSerdeWrap>,
29
pub(crate) dsl_plans: SlotMap<DslPlanKey, SerializableDslPlanNode>,
30
}
31
32
#[derive(Debug, Serialize, Deserialize)]
33
pub(crate) enum SerializableDslPlanNode {
34
#[cfg(feature = "python")]
35
PythonScan {
36
options: crate::dsl::python_dsl::PythonOptionsDsl,
37
},
38
Filter {
39
input: DslPlanKey,
40
predicate: Expr,
41
},
42
Cache {
43
input: DslPlanKey,
44
id: UniqueId,
45
},
46
Scan {
47
sources: ScanSources,
48
unified_scan_args: Box<UnifiedScanArgs>,
49
scan_type: Box<FileScanDsl>,
50
},
51
DataFrameScan {
52
df: DataFrameKey,
53
schema: SchemaRef,
54
},
55
Select {
56
expr: Vec<Expr>,
57
input: DslPlanKey,
58
options: ProjectionOptions,
59
},
60
GroupBy {
61
input: DslPlanKey,
62
keys: Vec<Expr>,
63
aggs: Vec<Expr>,
64
predicates: Vec<Expr>,
65
maintain_order: bool,
66
options: Arc<GroupbyOptions>,
67
apply: Option<(PlanCallback<DataFrame, DataFrame>, SchemaRef)>,
68
},
69
Join {
70
input_left: DslPlanKey,
71
input_right: DslPlanKey,
72
left_on: Vec<Expr>,
73
right_on: Vec<Expr>,
74
predicates: Vec<Expr>,
75
options: Arc<JoinOptions>,
76
},
77
HStack {
78
input: DslPlanKey,
79
exprs: Vec<Expr>,
80
options: ProjectionOptions,
81
},
82
MatchToSchema {
83
input: DslPlanKey,
84
match_schema: SchemaRef,
85
per_column: Arc<[MatchToSchemaPerColumn]>,
86
extra_columns: ExtraColumnsPolicy,
87
},
88
PipeWithSchema {
89
input: Vec<DslPlanKey>,
90
callback: PlanCallback<(Vec<DslPlan>, Vec<SchemaRef>), DslPlan>,
91
},
92
#[cfg(feature = "pivot")]
93
Pivot {
94
input: DslPlanKey,
95
on: Selector,
96
on_columns: DataFrameKey,
97
index: Selector,
98
values: Selector,
99
agg: Expr,
100
maintain_order: bool,
101
separator: PlSmallStr,
102
},
103
Distinct {
104
input: DslPlanKey,
105
options: DistinctOptionsDSL,
106
},
107
Sort {
108
input: DslPlanKey,
109
by_column: Vec<Expr>,
110
slice: Option<(i64, usize)>,
111
sort_options: SortMultipleOptions,
112
},
113
Slice {
114
input: DslPlanKey,
115
offset: i64,
116
len: IdxSize,
117
},
118
MapFunction {
119
input: DslPlanKey,
120
function: DslFunction,
121
},
122
Union {
123
inputs: Vec<SerializableDslPlanNode>,
124
args: UnionArgs,
125
},
126
HConcat {
127
inputs: Vec<SerializableDslPlanNode>,
128
options: HConcatOptions,
129
},
130
ExtContext {
131
input: DslPlanKey,
132
contexts: Vec<SerializableDslPlanNode>,
133
},
134
Sink {
135
input: DslPlanKey,
136
payload: SinkType,
137
},
138
SinkMultiple {
139
inputs: Vec<SerializableDslPlanNode>,
140
},
141
#[cfg(feature = "merge_sorted")]
142
MergeSorted {
143
input_left: DslPlanKey,
144
input_right: DslPlanKey,
145
key: PlSmallStr,
146
},
147
IR {
148
dsl: DslPlanKey,
149
version: u32,
150
},
151
}
152
153
#[derive(Debug, Default)]
154
struct SerializeArenas {
155
dataframes: SlotMap<DataFrameKey, DataFrameSerdeWrap>,
156
dataframes_keys_table: PlIndexMap<*const DataFrame, DataFrameKey>,
157
dsl_plans: SlotMap<DslPlanKey, SerializableDslPlanNode>,
158
dsl_plans_keys_table: PlIndexMap<*const DslPlan, DslPlanKey>,
159
}
160
161
impl From<&DslPlan> for SerializableDslPlan {
162
fn from(plan: &DslPlan) -> Self {
163
let mut arenas = SerializeArenas::default();
164
let root_dsl_plan = convert_dsl_plan_to_serializable_plan(plan, &mut arenas);
165
166
let root_key = arenas.dsl_plans.insert(root_dsl_plan);
167
SerializableDslPlan {
168
root: root_key,
169
dataframes: arenas.dataframes,
170
dsl_plans: arenas.dsl_plans,
171
}
172
}
173
}
174
175
#[recursive]
176
fn convert_dsl_plan_to_serializable_plan(
177
plan: &DslPlan,
178
arenas: &mut SerializeArenas,
179
) -> SerializableDslPlanNode {
180
use {DslPlan as DP, SerializableDslPlanNode as SP};
181
182
match plan {
183
#[cfg(feature = "python")]
184
DP::PythonScan { options } => SP::PythonScan {
185
options: options.clone(),
186
},
187
DP::Filter { input, predicate } => SP::Filter {
188
input: dsl_plan_key(input, arenas),
189
predicate: predicate.clone(),
190
},
191
DP::Cache { input, id } => SP::Cache {
192
input: dsl_plan_key(input, arenas),
193
id: *id,
194
},
195
DP::Scan {
196
sources,
197
unified_scan_args,
198
scan_type,
199
cached_ir: _,
200
} => SP::Scan {
201
sources: sources.clone(),
202
unified_scan_args: unified_scan_args.clone(),
203
scan_type: scan_type.clone(),
204
},
205
DP::DataFrameScan { df, schema } => SP::DataFrameScan {
206
df: dataframe_key(df, arenas),
207
schema: schema.clone(),
208
},
209
DP::Select {
210
expr,
211
input,
212
options,
213
} => SP::Select {
214
expr: expr.clone(),
215
input: dsl_plan_key(input, arenas),
216
options: *options,
217
},
218
DP::GroupBy {
219
input,
220
keys,
221
aggs,
222
predicates,
223
maintain_order,
224
options,
225
apply,
226
} => SP::GroupBy {
227
input: dsl_plan_key(input, arenas),
228
keys: keys.clone(),
229
aggs: aggs.clone(),
230
predicates: predicates.clone(),
231
maintain_order: *maintain_order,
232
options: options.clone(),
233
apply: apply.clone(),
234
},
235
DP::Join {
236
input_left,
237
input_right,
238
left_on,
239
right_on,
240
predicates,
241
options,
242
} => SP::Join {
243
input_left: dsl_plan_key(input_left, arenas),
244
input_right: dsl_plan_key(input_right, arenas),
245
left_on: left_on.clone(),
246
right_on: right_on.clone(),
247
predicates: predicates.clone(),
248
options: options.clone(),
249
},
250
DP::HStack {
251
input,
252
exprs,
253
options,
254
} => SP::HStack {
255
input: dsl_plan_key(input, arenas),
256
exprs: exprs.clone(),
257
options: *options,
258
},
259
DP::MatchToSchema {
260
input,
261
match_schema,
262
per_column,
263
extra_columns,
264
} => SP::MatchToSchema {
265
input: dsl_plan_key(input, arenas),
266
match_schema: match_schema.clone(),
267
per_column: per_column.clone(),
268
extra_columns: *extra_columns,
269
},
270
DP::PipeWithSchema { input, callback } => SP::PipeWithSchema {
271
input: input
272
.iter()
273
.map(|plan| dsl_plan_key_from_ref(plan, arenas))
274
.collect(),
275
callback: callback.clone(),
276
},
277
#[cfg(feature = "pivot")]
278
DP::Pivot {
279
input,
280
on,
281
on_columns,
282
index,
283
values,
284
agg,
285
maintain_order,
286
separator,
287
} => SP::Pivot {
288
input: dsl_plan_key(input, arenas),
289
on: on.clone(),
290
on_columns: dataframe_key(on_columns, arenas),
291
index: index.clone(),
292
values: values.clone(),
293
agg: agg.clone(),
294
maintain_order: *maintain_order,
295
separator: separator.clone(),
296
},
297
DP::Distinct { input, options } => SP::Distinct {
298
input: dsl_plan_key(input, arenas),
299
options: options.clone(),
300
},
301
DP::Sort {
302
input,
303
by_column,
304
slice,
305
sort_options,
306
} => SP::Sort {
307
input: dsl_plan_key(input, arenas),
308
by_column: by_column.clone(),
309
slice: *slice,
310
sort_options: sort_options.clone(),
311
},
312
DP::Slice { input, offset, len } => SP::Slice {
313
input: dsl_plan_key(input, arenas),
314
offset: *offset,
315
len: *len,
316
},
317
DP::MapFunction { input, function } => SP::MapFunction {
318
input: dsl_plan_key(input, arenas),
319
function: function.clone(),
320
},
321
DP::Union { inputs, args } => SP::Union {
322
inputs: inputs
323
.iter()
324
.map(|p| convert_dsl_plan_to_serializable_plan(p, arenas))
325
.collect(),
326
args: *args,
327
},
328
DP::HConcat { inputs, options } => SP::HConcat {
329
inputs: inputs
330
.iter()
331
.map(|p| convert_dsl_plan_to_serializable_plan(p, arenas))
332
.collect(),
333
options: *options,
334
},
335
DP::ExtContext { input, contexts } => SP::ExtContext {
336
input: dsl_plan_key(input, arenas),
337
contexts: contexts
338
.iter()
339
.map(|p| convert_dsl_plan_to_serializable_plan(p, arenas))
340
.collect(),
341
},
342
DP::Sink { input, payload } => SP::Sink {
343
input: dsl_plan_key(input, arenas),
344
payload: payload.clone(),
345
},
346
DP::SinkMultiple { inputs } => SP::SinkMultiple {
347
inputs: inputs
348
.iter()
349
.map(|p| convert_dsl_plan_to_serializable_plan(p, arenas))
350
.collect(),
351
},
352
#[cfg(feature = "merge_sorted")]
353
DP::MergeSorted {
354
input_left,
355
input_right,
356
key,
357
} => SP::MergeSorted {
358
input_left: dsl_plan_key(input_left, arenas),
359
input_right: dsl_plan_key(input_right, arenas),
360
key: key.clone(),
361
},
362
DP::IR {
363
dsl,
364
version: _,
365
node: _,
366
} => convert_dsl_plan_to_serializable_plan(dsl.as_ref(), arenas),
367
}
368
}
369
370
fn dataframe_key(df: &Arc<DataFrame>, arenas: &mut SerializeArenas) -> DataFrameKey {
371
let ptr = Arc::as_ptr(df);
372
if let Some(key) = arenas.dataframes_keys_table.get(&ptr) {
373
*key
374
} else {
375
let key = arenas.dataframes.insert(DataFrameSerdeWrap(df.clone()));
376
arenas.dataframes_keys_table.insert(ptr, key);
377
key
378
}
379
}
380
381
fn dsl_plan_key_from_ref(plan: &DslPlan, arenas: &mut SerializeArenas) -> DslPlanKey {
382
let ptr = plan as *const _;
383
if let Some(key) = arenas.dsl_plans_keys_table.get(&ptr) {
384
*key
385
} else {
386
let ser_plan = convert_dsl_plan_to_serializable_plan(plan, arenas);
387
let key = arenas.dsl_plans.insert(ser_plan);
388
arenas.dsl_plans_keys_table.insert(ptr, key);
389
key
390
}
391
}
392
393
fn dsl_plan_key(plan: &Arc<DslPlan>, arenas: &mut SerializeArenas) -> DslPlanKey {
394
let ref_plan = Arc::as_ref(plan);
395
dsl_plan_key_from_ref(ref_plan, arenas)
396
}
397
398
#[derive(Debug, Default)]
399
struct DeserializeArenas {
400
dataframes: SecondaryMap<DataFrameKey, DataFrameSerdeWrap>,
401
dsl_plans: SecondaryMap<DslPlanKey, Arc<DslPlan>>,
402
}
403
404
impl TryFrom<&SerializableDslPlan> for DslPlan {
405
type Error = PolarsError;
406
407
fn try_from(ser_dsl_plan: &SerializableDslPlan) -> Result<Self, Self::Error> {
408
let mut arenas = DeserializeArenas::default();
409
let root = ser_dsl_plan
410
.dsl_plans
411
.get(ser_dsl_plan.root)
412
.ok_or(polars_err!(ComputeError: "Could not find root DslPlan in serialized plan"))?;
413
try_convert_serializable_plan_to_dsl_plan(root, ser_dsl_plan, &mut arenas)
414
}
415
}
416
417
#[recursive]
418
fn try_convert_serializable_plan_to_dsl_plan(
419
node: &SerializableDslPlanNode,
420
ser_dsl_plan: &SerializableDslPlan,
421
arenas: &mut DeserializeArenas,
422
) -> Result<DslPlan, PolarsError> {
423
use {DslPlan as DP, SerializableDslPlanNode as SP};
424
425
match node {
426
#[cfg(feature = "python")]
427
SP::PythonScan { options } => Ok(DP::PythonScan {
428
options: options.clone(),
429
}),
430
SP::Filter { input, predicate } => Ok(DP::Filter {
431
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
432
predicate: predicate.clone(),
433
}),
434
SP::Cache { input, id } => Ok(DP::Cache {
435
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
436
id: *id,
437
}),
438
SP::Scan {
439
sources,
440
unified_scan_args,
441
scan_type,
442
} => Ok(DP::Scan {
443
sources: sources.clone(),
444
unified_scan_args: unified_scan_args.clone(),
445
scan_type: scan_type.clone(),
446
cached_ir: Default::default(),
447
}),
448
SP::DataFrameScan { df, schema } => Ok(DP::DataFrameScan {
449
df: get_dataframe(*df, ser_dsl_plan, arenas)?,
450
schema: schema.clone(),
451
}),
452
SP::Select {
453
expr,
454
input,
455
options,
456
} => Ok(DP::Select {
457
expr: expr.clone(),
458
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
459
options: *options,
460
}),
461
SP::GroupBy {
462
input,
463
keys,
464
aggs,
465
predicates,
466
maintain_order,
467
options,
468
apply,
469
} => Ok(DP::GroupBy {
470
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
471
keys: keys.clone(),
472
aggs: aggs.clone(),
473
predicates: predicates.clone(),
474
maintain_order: *maintain_order,
475
options: options.clone(),
476
apply: apply.clone(),
477
}),
478
SP::Join {
479
input_left,
480
input_right,
481
left_on,
482
right_on,
483
predicates,
484
options,
485
} => Ok(DP::Join {
486
input_left: get_dsl_plan(*input_left, ser_dsl_plan, arenas)?,
487
input_right: get_dsl_plan(*input_right, ser_dsl_plan, arenas)?,
488
left_on: left_on.clone(),
489
right_on: right_on.clone(),
490
predicates: predicates.clone(),
491
options: options.clone(),
492
}),
493
SP::HStack {
494
input,
495
exprs,
496
options,
497
} => Ok(DP::HStack {
498
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
499
exprs: exprs.clone(),
500
options: *options,
501
}),
502
SP::MatchToSchema {
503
input,
504
match_schema,
505
per_column,
506
extra_columns,
507
} => Ok(DP::MatchToSchema {
508
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
509
match_schema: match_schema.clone(),
510
per_column: per_column.clone(),
511
extra_columns: *extra_columns,
512
}),
513
SP::PipeWithSchema { input, callback } => Ok(DP::PipeWithSchema {
514
input: Arc::from(
515
input
516
.iter()
517
.map(|key| get_dsl_plan(*key, ser_dsl_plan, arenas).map(Arc::unwrap_or_clone))
518
.collect::<PolarsResult<Vec<_>>>()?,
519
),
520
callback: callback.clone(),
521
}),
522
#[cfg(feature = "pivot")]
523
SP::Pivot {
524
input,
525
on,
526
on_columns,
527
index,
528
values,
529
agg,
530
maintain_order,
531
separator,
532
} => Ok(DP::Pivot {
533
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
534
on: on.clone(),
535
on_columns: get_dataframe(*on_columns, ser_dsl_plan, arenas)?,
536
index: index.clone(),
537
values: values.clone(),
538
agg: agg.clone(),
539
maintain_order: *maintain_order,
540
separator: separator.clone(),
541
}),
542
SP::Distinct { input, options } => Ok(DP::Distinct {
543
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
544
options: options.clone(),
545
}),
546
SP::Sort {
547
input,
548
by_column,
549
slice,
550
sort_options,
551
} => Ok(DP::Sort {
552
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
553
by_column: by_column.clone(),
554
slice: *slice,
555
sort_options: sort_options.clone(),
556
}),
557
SP::Slice { input, offset, len } => Ok(DP::Slice {
558
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
559
offset: *offset,
560
len: *len,
561
}),
562
SP::MapFunction { input, function } => Ok(DP::MapFunction {
563
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
564
function: function.clone(),
565
}),
566
SP::Union { inputs, args } => Ok(DP::Union {
567
inputs: inputs
568
.iter()
569
.map(|node| try_convert_serializable_plan_to_dsl_plan(node, ser_dsl_plan, arenas))
570
.collect::<Result<Vec<_>, _>>()?,
571
args: *args,
572
}),
573
SP::HConcat { inputs, options } => Ok(DP::HConcat {
574
inputs: inputs
575
.iter()
576
.map(|node| try_convert_serializable_plan_to_dsl_plan(node, ser_dsl_plan, arenas))
577
.collect::<Result<Vec<_>, _>>()?,
578
options: *options,
579
}),
580
SP::ExtContext { input, contexts } => Ok(DP::ExtContext {
581
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
582
contexts: contexts
583
.iter()
584
.map(|node| try_convert_serializable_plan_to_dsl_plan(node, ser_dsl_plan, arenas))
585
.collect::<Result<Vec<_>, _>>()?,
586
}),
587
SP::Sink { input, payload } => Ok(DP::Sink {
588
input: get_dsl_plan(*input, ser_dsl_plan, arenas)?,
589
payload: payload.clone(),
590
}),
591
SP::SinkMultiple { inputs } => Ok(DP::SinkMultiple {
592
inputs: inputs
593
.iter()
594
.map(|node| try_convert_serializable_plan_to_dsl_plan(node, ser_dsl_plan, arenas))
595
.collect::<Result<Vec<_>, _>>()?,
596
}),
597
#[cfg(feature = "merge_sorted")]
598
SP::MergeSorted {
599
input_left,
600
input_right,
601
key,
602
} => Ok(DP::MergeSorted {
603
input_left: get_dsl_plan(*input_left, ser_dsl_plan, arenas)?,
604
input_right: get_dsl_plan(*input_right, ser_dsl_plan, arenas)?,
605
key: key.clone(),
606
}),
607
SP::IR {
608
dsl: dsl_key,
609
version: _,
610
} => get_dsl_plan(*dsl_key, ser_dsl_plan, arenas).map(Arc::unwrap_or_clone),
611
}
612
}
613
614
fn get_dataframe(
615
key: DataFrameKey,
616
ser_dsl_plan: &SerializableDslPlan,
617
arenas: &mut DeserializeArenas,
618
) -> Result<Arc<DataFrame>, PolarsError> {
619
if let Some(df) = arenas.dataframes.get(key) {
620
Ok(df.0.clone())
621
} else {
622
let df = ser_dsl_plan.dataframes.get(key).ok_or(polars_err!(
623
ComputeError: "Could not find DataFrame at index {:?} in serialized plan", key
624
))?;
625
arenas.dataframes.insert(key, df.clone());
626
Ok(df.0.clone())
627
}
628
}
629
630
fn get_dsl_plan(
631
key: DslPlanKey,
632
ser_dsl_plan: &SerializableDslPlan,
633
arenas: &mut DeserializeArenas,
634
) -> Result<Arc<DslPlan>, PolarsError> {
635
if let Some(dsl_plan) = arenas.dsl_plans.get(key) {
636
Ok(dsl_plan.clone())
637
} else {
638
let node = ser_dsl_plan.dsl_plans.get(key).ok_or(polars_err!(
639
ComputeError: "Could not find DslPlan node at index {:?} in serialized plan", key
640
))?;
641
let dsl_plan = try_convert_serializable_plan_to_dsl_plan(node, ser_dsl_plan, arenas)?;
642
let arc_dsl_plan = Arc::new(dsl_plan);
643
arenas.dsl_plans.insert(key, arc_dsl_plan.clone());
644
Ok(arc_dsl_plan)
645
}
646
}
647
648
/// Serialization wrapper that splits large serialized byte values into chunks.
649
#[derive(Debug, Clone)]
650
pub(crate) struct DataFrameSerdeWrap(Arc<DataFrame>);
651
652
#[cfg(feature = "serde")]
653
mod _serde_impl {
654
use std::sync::Arc;
655
656
use polars_core::frame::DataFrame;
657
use polars_utils::chunked_bytes_cursor::FixedSizeChunkedBytesCursor;
658
use serde::de::Error;
659
use serde::{Deserialize, Serialize};
660
661
use super::DataFrameSerdeWrap;
662
663
fn max_byte_slice_len() -> usize {
664
std::env::var("POLARS_SERIALIZE_LAZYFRAME_MAX_BYTE_SLICE_LEN")
665
.as_deref()
666
.map_or(
667
usize::try_from(u32::MAX).unwrap(), // Limit for rmp_serde
668
|x| x.parse().unwrap(),
669
)
670
}
671
672
impl Serialize for DataFrameSerdeWrap {
673
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
674
where
675
S: serde::Serializer,
676
{
677
use serde::ser::Error;
678
679
let mut bytes: Vec<u8> = vec![];
680
self.0
681
.as_ref()
682
.clone()
683
.serialize_into_writer(&mut bytes)
684
.map_err(S::Error::custom)?;
685
686
serializer.collect_seq(bytes.chunks(max_byte_slice_len()))
687
}
688
}
689
690
impl<'de> Deserialize<'de> for DataFrameSerdeWrap {
691
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
692
where
693
D: serde::Deserializer<'de>,
694
{
695
let bytes: Vec<Vec<u8>> = Vec::deserialize(deserializer)?;
696
697
let result = match bytes.as_slice() {
698
[v] => DataFrame::deserialize_from_reader(&mut std::io::Cursor::new(v.as_slice())),
699
_ => DataFrame::deserialize_from_reader(
700
&mut FixedSizeChunkedBytesCursor::try_new(bytes.as_slice()).unwrap(),
701
),
702
};
703
704
result
705
.map(|x| DataFrameSerdeWrap(Arc::new(x)))
706
.map_err(D::Error::custom)
707
}
708
}
709
}
710
711
#[cfg(test)]
712
mod tests {
713
use super::*;
714
715
#[test]
716
fn test_dsl_plan_serialization() {
717
let name = || "a".into();
718
let df = Arc::new(
719
DataFrame::new(vec![Column::new(name(), Series::new(name(), &[1, 2, 3]))]).unwrap(),
720
);
721
let dfscan = Arc::new(DslPlan::DataFrameScan {
722
df: df.clone(),
723
schema: df.schema().clone(),
724
});
725
let join_options = JoinOptions {
726
allow_parallel: true,
727
force_parallel: false,
728
..Default::default()
729
};
730
let lf = DslPlan::Join {
731
input_left: dfscan.clone(),
732
input_right: dfscan,
733
left_on: vec![Expr::Column(name())],
734
right_on: vec![Expr::Column(name())],
735
predicates: Default::default(),
736
options: Arc::new(join_options),
737
};
738
let mut buffer: Vec<u8> = Vec::new();
739
lf.serialize_versioned(&mut buffer, Default::default())
740
.unwrap();
741
let mut reader: &[u8] = &buffer;
742
let deserialized = DslPlan::deserialize_versioned(&mut reader).unwrap();
743
assert_eq!(format!("{lf:?}"), format!("{deserialized:?}"));
744
}
745
}
746
747