Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/partition.rs
6939 views
1
//! Functionality for writing a DataFrame partitioned into multiple files.
2
3
use polars_core::POOL;
4
use polars_core::prelude::*;
5
use polars_core::series::IsSorted;
6
use polars_utils::plpath::PlPathRef;
7
use rayon::prelude::*;
8
9
use crate::cloud::CloudOptions;
10
use crate::parquet::write::ParquetWriteOptions;
11
#[cfg(feature = "ipc")]
12
use crate::prelude::IpcWriterOptions;
13
use crate::prelude::URL_ENCODE_CHAR_SET;
14
use crate::utils::file::try_get_writeable;
15
use crate::{SerWriter, WriteDataFrameToFile};
16
17
impl WriteDataFrameToFile for ParquetWriteOptions {
18
fn write_df_to_file(
19
&self,
20
df: &mut DataFrame,
21
addr: PlPathRef<'_>,
22
cloud_options: Option<&CloudOptions>,
23
) -> PolarsResult<()> {
24
let f = try_get_writeable(addr, cloud_options)?;
25
self.to_writer(f).finish(df)?;
26
Ok(())
27
}
28
}
29
30
#[cfg(feature = "ipc")]
31
impl WriteDataFrameToFile for IpcWriterOptions {
32
fn write_df_to_file(
33
&self,
34
df: &mut DataFrame,
35
addr: PlPathRef<'_>,
36
cloud_options: Option<&CloudOptions>,
37
) -> PolarsResult<()> {
38
let f = try_get_writeable(addr, cloud_options)?;
39
self.to_writer(f).finish(df)?;
40
Ok(())
41
}
42
}
43
44
/// Write a partitioned parquet dataset. This functionality is unstable.
45
pub fn write_partitioned_dataset(
46
df: &mut DataFrame,
47
addr: PlPathRef<'_>,
48
partition_by: Vec<PlSmallStr>,
49
file_write_options: &(dyn WriteDataFrameToFile + Send + Sync),
50
cloud_options: Option<&CloudOptions>,
51
chunk_size: usize,
52
) -> PolarsResult<()> {
53
// Ensure we have a single chunk as the gather will otherwise rechunk per group.
54
df.as_single_chunk_par();
55
56
// Note: When adding support for formats other than Parquet, avoid writing the partitioned
57
// columns into the file. We write them for parquet because they are encoded efficiently with
58
// RLE and also gives us a way to get the hive schema from the parquet file for free.
59
let get_hive_path_part = {
60
let schema = &df.schema();
61
62
let partition_by_col_idx = partition_by
63
.iter()
64
.map(|x| {
65
let Some(i) = schema.index_of(x.as_str()) else {
66
polars_bail!(col_not_found = x)
67
};
68
Ok(i)
69
})
70
.collect::<PolarsResult<Vec<_>>>()?;
71
72
move |df: &DataFrame| {
73
let cols = df.get_columns();
74
75
partition_by_col_idx
76
.iter()
77
.map(|&i| {
78
let s = &cols[i].slice(0, 1).cast(&DataType::String).unwrap();
79
80
format!(
81
"{}={}",
82
s.name(),
83
percent_encoding::percent_encode(
84
s.str()
85
.unwrap()
86
.get(0)
87
.unwrap_or("__HIVE_DEFAULT_PARTITION__")
88
.as_bytes(),
89
URL_ENCODE_CHAR_SET
90
)
91
)
92
})
93
.collect::<Vec<_>>()
94
.join("/")
95
}
96
};
97
98
let base_path = addr;
99
let groups = df.group_by(partition_by)?.take_groups();
100
101
let init_part_base_dir = |part_df: &DataFrame| {
102
let path_part = get_hive_path_part(part_df);
103
let dir = base_path.join(path_part);
104
105
if let Some(dir) = dir.as_ref().as_local_path() {
106
std::fs::create_dir_all(dir)?;
107
}
108
109
PolarsResult::Ok(dir)
110
};
111
112
fn get_path_for_index(i: usize) -> String {
113
// Use a fixed-width file name so that it sorts properly.
114
format!("{i:08x}.parquet")
115
}
116
117
let get_n_files_and_rows_per_file = |part_df: &DataFrame| {
118
let n_files = (part_df.estimated_size() / chunk_size).clamp(1, 0xffff_ffff);
119
let rows_per_file = (df.height() / n_files).saturating_add(1);
120
(n_files, rows_per_file)
121
};
122
123
let write_part = |mut df: DataFrame, addr: PlPathRef| {
124
file_write_options.write_df_to_file(&mut df, addr, cloud_options)?;
125
PolarsResult::Ok(())
126
};
127
128
// This is sqrt(N) of the actual limit - we chunk the input both at the groups
129
// proxy level and within every group.
130
const MAX_OPEN_FILES: usize = 8;
131
132
let finish_part_df = |df: DataFrame| {
133
let dir_path = init_part_base_dir(&df)?;
134
let (n_files, rows_per_file) = get_n_files_and_rows_per_file(&df);
135
136
if n_files == 1 {
137
write_part(df, dir_path.as_ref().join(get_path_for_index(0)).as_ref())
138
} else {
139
(0..df.height())
140
.step_by(rows_per_file)
141
.enumerate()
142
.collect::<Vec<_>>()
143
.chunks(MAX_OPEN_FILES)
144
.map(|chunk| {
145
chunk
146
.into_par_iter()
147
.map(|&(idx, slice_start)| {
148
let df = df.slice(slice_start as i64, rows_per_file);
149
write_part(df, dir_path.as_ref().join(get_path_for_index(idx)).as_ref())
150
})
151
.reduce(
152
|| PolarsResult::Ok(()),
153
|a, b| if a.is_err() { a } else { b },
154
)
155
})
156
.collect::<PolarsResult<Vec<()>>>()?;
157
Ok(())
158
}
159
};
160
161
POOL.install(|| match groups.as_ref() {
162
GroupsType::Idx(idx) => idx
163
.all()
164
.chunks(MAX_OPEN_FILES)
165
.map(|chunk| {
166
chunk
167
.par_iter()
168
.map(|group| {
169
let df = unsafe {
170
df._take_unchecked_slice_sorted(group, true, IsSorted::Ascending)
171
};
172
finish_part_df(df)
173
})
174
.reduce(
175
|| PolarsResult::Ok(()),
176
|a, b| if a.is_err() { a } else { b },
177
)
178
})
179
.collect::<PolarsResult<Vec<()>>>(),
180
GroupsType::Slice { groups, .. } => groups
181
.chunks(MAX_OPEN_FILES)
182
.map(|chunk| {
183
chunk
184
.into_par_iter()
185
.map(|&[offset, len]| {
186
let df = df.slice(offset as i64, len as usize);
187
finish_part_df(df)
188
})
189
.reduce(
190
|| PolarsResult::Ok(()),
191
|a, b| if a.is_err() { a } else { b },
192
)
193
})
194
.collect::<PolarsResult<Vec<()>>>(),
195
})?;
196
197
Ok(())
198
}
199
200