Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-core/src/lib.rs
8409 views
1
#![cfg_attr(docsrs, feature(doc_cfg))]
2
#![cfg_attr(feature = "simd", feature(portable_simd))]
3
#![allow(ambiguous_glob_reexports)]
4
#![cfg_attr(
5
feature = "allow_unused",
6
allow(unused, dead_code, irrefutable_let_patterns)
7
)] // Maybe be caused by some feature
8
// combinations
9
#![cfg_attr(feature = "nightly", allow(clippy::non_canonical_partial_ord_impl))] // remove once stable
10
extern crate core;
11
12
#[macro_use]
13
pub mod utils;
14
pub mod chunked_array;
15
pub mod config;
16
pub mod datatypes;
17
pub mod error;
18
pub mod fmt;
19
pub mod frame;
20
pub mod functions;
21
pub mod hashing;
22
mod named_from;
23
pub mod prelude;
24
#[cfg(feature = "random")]
25
pub mod random;
26
pub mod scalar;
27
pub mod schema;
28
#[cfg(feature = "serde")]
29
pub mod serde;
30
pub mod series;
31
pub mod testing;
32
#[cfg(test)]
33
mod tests;
34
35
use std::cell::{Cell, RefCell};
36
use std::sync::{LazyLock, Mutex};
37
38
pub use datatypes::SchemaExtPl;
39
pub use hashing::IdBuildHasher;
40
use rayon::{ThreadPool, ThreadPoolBuilder};
41
42
// A secret ID used to limit deserialization of raw pointers to those
43
// generated by this instance of Polars.
44
pub static PROCESS_ID: LazyLock<u128> = LazyLock::new(|| {
45
let mut bytes = [0u8; 16];
46
getrandom::fill(&mut bytes).unwrap();
47
u128::from_le_bytes(bytes)
48
});
49
50
pub struct POOL;
51
52
// Thread locals to allow disabling threading for specific threads.
53
#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
54
thread_local! {
55
pub static ALLOW_RAYON_THREADS: Cell<bool> = const { Cell::new(true) };
56
static NOOP_POOL: RefCell<ThreadPool> = RefCell::new(
57
ThreadPoolBuilder::new()
58
.use_current_thread()
59
.num_threads(1)
60
.build()
61
.expect("could not create no-op thread pool")
62
);
63
}
64
65
impl POOL {
66
pub fn install<OP, R>(&self, op: OP) -> R
67
where
68
OP: FnOnce() -> R + Send,
69
R: Send,
70
{
71
#[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
72
{
73
op()
74
}
75
76
#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
77
{
78
self.with(|p| p.install(op))
79
}
80
}
81
82
pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
83
where
84
A: FnOnce() -> RA + Send,
85
B: FnOnce() -> RB + Send,
86
RA: Send,
87
RB: Send,
88
{
89
self.install(|| rayon::join(oper_a, oper_b))
90
}
91
92
pub fn scope<'scope, OP, R>(&self, op: OP) -> R
93
where
94
OP: FnOnce(&rayon::Scope<'scope>) -> R + Send,
95
R: Send,
96
{
97
self.install(|| rayon::scope(op))
98
}
99
100
pub fn spawn<OP>(&self, op: OP)
101
where
102
OP: FnOnce() + Send + 'static,
103
{
104
#[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
105
{
106
rayon::spawn(op)
107
}
108
109
#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
110
{
111
self.with(|p| {
112
p.spawn(op);
113
if p.current_num_threads() == 1 {
114
p.yield_now();
115
}
116
})
117
}
118
}
119
120
pub fn spawn_fifo<OP>(&self, op: OP)
121
where
122
OP: FnOnce() + Send + 'static,
123
{
124
#[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
125
{
126
rayon::spawn_fifo(op)
127
}
128
129
#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
130
{
131
self.with(|p| {
132
p.spawn_fifo(op);
133
if p.current_num_threads() == 1 {
134
p.yield_now();
135
}
136
})
137
}
138
}
139
140
pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
141
#[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
142
{
143
None
144
}
145
146
#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
147
{
148
self.with(|p| p.current_thread_has_pending_tasks())
149
}
150
}
151
152
pub fn current_thread_index(&self) -> Option<usize> {
153
#[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
154
{
155
rayon::current_thread_index()
156
}
157
158
#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
159
{
160
self.with(|p| p.current_thread_index())
161
}
162
}
163
164
pub fn current_num_threads(&self) -> usize {
165
#[cfg(not(any(target_os = "emscripten", not(target_family = "wasm"))))]
166
{
167
rayon::current_num_threads()
168
}
169
170
#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
171
{
172
self.with(|p| p.current_num_threads())
173
}
174
}
175
176
#[cfg(any(target_os = "emscripten", not(target_family = "wasm")))]
177
pub fn with<OP, R>(&self, op: OP) -> R
178
where
179
OP: FnOnce(&ThreadPool) -> R + Send,
180
R: Send,
181
{
182
if ALLOW_RAYON_THREADS.get() || THREAD_POOL.current_thread_index().is_some() {
183
op(&THREAD_POOL)
184
} else {
185
NOOP_POOL.with(|v| op(&v.borrow()))
186
}
187
}
188
}
189
190
// this is re-exported in utils for polars child crates
191
#[cfg(not(target_family = "wasm"))] // only use this on non wasm targets
192
pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
193
let thread_name = std::env::var("POLARS_THREAD_NAME").unwrap_or_else(|_| "polars".to_string());
194
ThreadPoolBuilder::new()
195
.num_threads(
196
std::env::var("POLARS_MAX_THREADS")
197
.map(|s| s.parse::<usize>().expect("integer"))
198
.unwrap_or_else(|_| {
199
std::thread::available_parallelism()
200
.unwrap_or(std::num::NonZeroUsize::new(1).unwrap())
201
.get()
202
}),
203
)
204
.thread_name(move |i| format!("{thread_name}-{i}"))
205
.build()
206
.expect("could not spawn threads")
207
});
208
209
#[cfg(all(target_os = "emscripten", target_family = "wasm"))] // Use 1 rayon thread on emscripten
210
pub static THREAD_POOL: LazyLock<ThreadPool> = LazyLock::new(|| {
211
ThreadPoolBuilder::new()
212
.num_threads(1)
213
.use_current_thread()
214
.build()
215
.expect("could not create pool")
216
});
217
218
// utility for the tests to ensure a single thread can execute
219
pub static SINGLE_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
220
221
/// Default length for a `.head()` call
222
pub(crate) const HEAD_DEFAULT_LENGTH: usize = 10;
223
/// Default length for a `.tail()` call
224
pub(crate) const TAIL_DEFAULT_LENGTH: usize = 10;
225
pub const CHEAP_SERIES_HASH_LIMIT: usize = 1000;
226
227