Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-sql/src/context.rs
8382 views
1
use std::ops::Deref;
2
use std::sync::RwLock;
3
4
use polars_core::frame::row::Row;
5
use polars_core::prelude::*;
6
use polars_lazy::prelude::*;
7
use polars_ops::frame::JoinCoalesce;
8
use polars_plan::dsl::function_expr::StructFunction;
9
use polars_plan::prelude::*;
10
use polars_utils::aliases::{PlHashSet, PlIndexSet};
11
use polars_utils::format_pl_smallstr;
12
use sqlparser::ast::{
13
BinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct, ExcludeSelectItem,
14
Expr as SQLExpr, Fetch, FromTable, FunctionArg, GroupByExpr, Ident, JoinConstraint,
15
JoinOperator, LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName, ObjectType,
16
OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectItem,
17
SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
18
TableFactor, TableWithJoins, Truncate, UnaryOperator, Value as SQLValue, ValueWithSpan, Values,
19
Visit, WildcardAdditionalOptions, WindowSpec,
20
};
21
use sqlparser::dialect::GenericDialect;
22
use sqlparser::parser::{Parser, ParserOptions};
23
24
use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
25
use crate::sql_expr::{
26
parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
27
};
28
use crate::sql_visitors::{
29
QualifyExpression, TableIdentifierCollector, check_for_ambiguous_column_refs,
30
expr_has_window_functions, expr_refers_to_table,
31
};
32
use crate::table_functions::PolarsTableFunctions;
33
use crate::types::map_sql_dtype_to_polars;
34
35
fn clear_lf(lf: LazyFrame) -> LazyFrame {
36
let cb = PlanCallback::new(move |(_, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
37
let schema = &schemas[0];
38
Ok(DataFrame::empty_with_schema(schema).lazy().logical_plan)
39
});
40
lf.pipe_with_schema(cb)
41
}
42
43
#[derive(Clone)]
44
pub struct TableInfo {
45
pub(crate) frame: LazyFrame,
46
pub(crate) name: PlSmallStr,
47
pub(crate) schema: Arc<Schema>,
48
}
49
50
struct SelectModifiers {
51
exclude: PlHashSet<String>, // SELECT * EXCLUDE
52
ilike: Option<regex::Regex>, // SELECT * ILIKE
53
rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
54
replace: Vec<Expr>, // SELECT * REPLACE
55
}
56
impl SelectModifiers {
57
fn matches_ilike(&self, s: &str) -> bool {
58
match &self.ilike {
59
Some(rx) => rx.is_match(s),
60
None => true,
61
}
62
}
63
fn renamed_cols(&self) -> Vec<Expr> {
64
self.rename
65
.iter()
66
.map(|(before, after)| col(before.clone()).alias(after.clone()))
67
.collect()
68
}
69
}
70
71
/// For SELECT projection items; helps simplify any required disambiguation.
72
enum ProjectionItem {
73
QualifiedExprs(PlSmallStr, Vec<Expr>),
74
Exprs(Vec<Expr>),
75
}
76
77
/// Extract the output column name from an expression (if it has one).
78
fn expr_output_name(expr: &Expr) -> Option<&PlSmallStr> {
79
match expr {
80
Expr::Column(name) | Expr::Alias(_, name) => Some(name),
81
_ => None,
82
}
83
}
84
85
/// Disambiguate qualified wildcard columns that conflict with each other or other projections.
86
fn disambiguate_projection_cols(
87
items: Vec<ProjectionItem>,
88
schema: &Schema,
89
) -> PolarsResult<Vec<Expr>> {
90
// Establish qualified wildcard names (with counts), and other expression names
91
let mut qualified_wildcard_names: PlHashMap<PlSmallStr, usize> = PlHashMap::new();
92
let mut other_names: PlHashSet<PlSmallStr> = PlHashSet::new();
93
for item in &items {
94
match item {
95
ProjectionItem::QualifiedExprs(_, exprs) => {
96
for expr in exprs {
97
if let Some(name) = expr_output_name(expr) {
98
*qualified_wildcard_names.entry(name.clone()).or_insert(0) += 1;
99
}
100
}
101
},
102
ProjectionItem::Exprs(exprs) => {
103
for expr in exprs {
104
if let Some(name) = expr_output_name(expr) {
105
other_names.insert(name.clone());
106
}
107
}
108
},
109
}
110
}
111
112
// Names requiring disambiguation (duplicates across wildcards, eg: `tbl1.*`,`tbl2.*`)
113
let needs_suffix: PlHashSet<PlSmallStr> = qualified_wildcard_names
114
.into_iter()
115
.filter(|(name, count)| *count > 1 || other_names.contains(name))
116
.map(|(name, _)| name)
117
.collect();
118
119
// Output, applying suffixes where needed
120
let mut result: Vec<Expr> = Vec::new();
121
for item in items {
122
match item {
123
ProjectionItem::QualifiedExprs(tbl_name, exprs) if !needs_suffix.is_empty() => {
124
for expr in exprs {
125
if let Some(name) = expr_output_name(&expr) {
126
if needs_suffix.contains(name) {
127
let suffixed = format_pl_smallstr!("{}:{}", name, tbl_name);
128
if schema.contains(suffixed.as_str()) {
129
result.push(col(suffixed));
130
continue;
131
}
132
if other_names.contains(name) {
133
polars_bail!(
134
SQLInterface:
135
"column '{}' is duplicated in the SELECT (explicitly, and via the `*` wildcard)", name
136
);
137
}
138
}
139
}
140
result.push(expr);
141
}
142
},
143
ProjectionItem::QualifiedExprs(_, exprs) | ProjectionItem::Exprs(exprs) => {
144
result.extend(exprs);
145
},
146
}
147
}
148
Ok(result)
149
}
150
151
/// The SQLContext is the main entry point for executing SQL queries.
152
#[derive(Clone)]
153
pub struct SQLContext {
154
pub(crate) table_map: Arc<RwLock<PlHashMap<String, LazyFrame>>>,
155
pub(crate) function_registry: Arc<dyn FunctionRegistry>,
156
pub(crate) lp_arena: Arena<IR>,
157
pub(crate) expr_arena: Arena<AExpr>,
158
159
cte_map: PlHashMap<String, LazyFrame>,
160
table_aliases: PlHashMap<String, String>,
161
joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
162
pub(crate) named_windows: PlHashMap<String, WindowSpec>,
163
}
164
165
impl Default for SQLContext {
166
fn default() -> Self {
167
Self {
168
function_registry: Arc::new(DefaultFunctionRegistry {}),
169
table_map: Default::default(),
170
cte_map: Default::default(),
171
table_aliases: Default::default(),
172
joined_aliases: Default::default(),
173
named_windows: Default::default(),
174
lp_arena: Default::default(),
175
expr_arena: Default::default(),
176
}
177
}
178
}
179
180
impl SQLContext {
181
/// Create a new SQLContext.
182
/// ```rust
183
/// # use polars_sql::SQLContext;
184
/// # fn main() {
185
/// let ctx = SQLContext::new();
186
/// # }
187
/// ```
188
pub fn new() -> Self {
189
Self::default()
190
}
191
192
/// Get the names of all registered tables, in sorted order.
193
pub fn get_tables(&self) -> Vec<String> {
194
let mut tables = Vec::from_iter(self.table_map.read().unwrap().keys().cloned());
195
tables.sort_unstable();
196
tables
197
}
198
199
/// Register a [`LazyFrame`] as a table in the SQLContext.
200
/// ```rust
201
/// # use polars_sql::SQLContext;
202
/// # use polars_core::prelude::*;
203
/// # use polars_lazy::prelude::*;
204
/// # fn main() {
205
///
206
/// let mut ctx = SQLContext::new();
207
/// let df = df! {
208
/// "a" => [1, 2, 3],
209
/// }.unwrap().lazy();
210
///
211
/// ctx.register("df", df);
212
/// # }
213
///```
214
pub fn register(&self, name: &str, lf: LazyFrame) {
215
self.table_map.write().unwrap().insert(name.to_owned(), lf);
216
}
217
218
/// Unregister a [`LazyFrame`] table from the [`SQLContext`].
219
pub fn unregister(&self, name: &str) {
220
self.table_map.write().unwrap().remove(&name.to_owned());
221
}
222
223
/// Execute a SQL query, returning a [`LazyFrame`].
224
/// ```rust
225
/// # use polars_sql::SQLContext;
226
/// # use polars_core::prelude::*;
227
/// # use polars_lazy::prelude::*;
228
/// # fn main() {
229
///
230
/// let mut ctx = SQLContext::new();
231
/// let df = df! {
232
/// "a" => [1, 2, 3],
233
/// }
234
/// .unwrap();
235
///
236
/// ctx.register("df", df.clone().lazy());
237
/// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
238
/// assert!(sql_df.equals(&df));
239
/// # }
240
///```
241
pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
242
let mut parser = Parser::new(&GenericDialect);
243
parser = parser.with_options(ParserOptions {
244
trailing_commas: true,
245
..Default::default()
246
});
247
248
let ast = parser
249
.try_with_sql(query)
250
.map_err(to_sql_interface_err)?
251
.parse_statements()
252
.map_err(to_sql_interface_err)?;
253
254
polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
255
let res = self.execute_statement(ast.first().unwrap())?;
256
257
// Ensure the result uses the proper arenas.
258
// This will instantiate new arenas with a new version.
259
let lp_arena = std::mem::take(&mut self.lp_arena);
260
let expr_arena = std::mem::take(&mut self.expr_arena);
261
res.set_cached_arena(lp_arena, expr_arena);
262
263
// Every execution should clear the statement-level maps.
264
self.cte_map.clear();
265
self.table_aliases.clear();
266
self.joined_aliases.clear();
267
self.named_windows.clear();
268
269
Ok(res)
270
}
271
272
/// Add a function registry to the SQLContext.
273
/// The registry provides the ability to add custom functions to the SQLContext.
274
pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
275
self.function_registry = function_registry;
276
self
277
}
278
279
/// Get the function registry of the SQLContext
280
pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
281
&self.function_registry
282
}
283
284
/// Get a mutable reference to the function registry of the SQLContext
285
pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
286
Arc::get_mut(&mut self.function_registry).unwrap()
287
}
288
}
289
290
impl SQLContext {
291
fn isolated(&self) -> Self {
292
Self {
293
// Deep clone to isolate
294
table_map: Arc::new(RwLock::new(self.table_map.read().unwrap().clone())),
295
named_windows: self.named_windows.clone(),
296
cte_map: self.cte_map.clone(),
297
298
..Default::default()
299
}
300
}
301
302
pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
303
let ast = stmt;
304
Ok(match ast {
305
Statement::Query(query) => self.execute_query(query)?,
306
stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
307
stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
308
stmt @ Statement::Drop {
309
object_type: ObjectType::Table,
310
..
311
} => self.execute_drop_table(stmt)?,
312
stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
313
stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
314
stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
315
_ => polars_bail!(
316
SQLInterface: "statement type is not supported:\n{:?}", ast,
317
),
318
})
319
}
320
321
pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
322
self.register_ctes(query)?;
323
self.execute_query_no_ctes(query)
324
}
325
326
pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
327
self.validate_query(query)?;
328
329
let lf = self.process_query(&query.body, query)?;
330
self.process_limit_offset(lf, &query.limit_clause, &query.fetch)
331
}
332
333
pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
334
frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
335
}
336
337
pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
338
// Resolve the table name in the current scope; multi-stage fallback
339
// * table name → cte name
340
// * table alias → cte alias
341
self.table_map
342
.read()
343
.unwrap()
344
.get(name)
345
.cloned()
346
.or_else(|| self.cte_map.get(name).cloned())
347
.or_else(|| {
348
self.table_aliases.get(name).and_then(|alias| {
349
self.table_map
350
.read()
351
.unwrap()
352
.get(alias.as_str())
353
.or_else(|| self.cte_map.get(alias.as_str()))
354
.cloned()
355
})
356
})
357
}
358
359
/// Execute a query in an isolated context. This prevents subqueries from mutating
360
/// arenas and other context state. Returns both the LazyFrame *and* its associated
361
/// Schema (so that the correct arenas are used when determining schema).
362
pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<LazyFrame>
363
where
364
F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
365
{
366
let mut ctx = self.isolated();
367
368
// Execute query with clean state (eg: nested/subquery)
369
let lf = query(&mut ctx)?;
370
371
// Save state
372
lf.set_cached_arena(ctx.lp_arena, ctx.expr_arena);
373
374
Ok(lf)
375
}
376
377
fn expr_or_ordinal(
378
&mut self,
379
e: &SQLExpr,
380
exprs: &[Expr],
381
selected: Option<&[Expr]>,
382
schema: Option<&Schema>,
383
clause: &str,
384
) -> PolarsResult<Expr> {
385
match e {
386
SQLExpr::UnaryOp {
387
op: UnaryOperator::Minus,
388
expr,
389
} if matches!(
390
**expr,
391
SQLExpr::Value(ValueWithSpan {
392
value: SQLValue::Number(_, _),
393
..
394
})
395
) =>
396
{
397
if let SQLExpr::Value(ValueWithSpan {
398
value: SQLValue::Number(ref idx, _),
399
..
400
}) = **expr
401
{
402
Err(polars_err!(
403
SQLSyntax:
404
"negative ordinal values are invalid for {}; found -{}",
405
clause,
406
idx
407
))
408
} else {
409
unreachable!()
410
}
411
},
412
SQLExpr::Value(ValueWithSpan {
413
value: SQLValue::Number(idx, _),
414
..
415
}) => {
416
// note: sql queries are 1-indexed
417
let idx = idx.parse::<usize>().map_err(|_| {
418
polars_err!(
419
SQLSyntax:
420
"negative ordinal values are invalid for {}; found {}",
421
clause,
422
idx
423
)
424
})?;
425
// note: "selected" cols represent final projection order, so we use those for
426
// ordinal resolution. "exprs" may include cols that are subsequently dropped.
427
let cols = if let Some(cols) = selected {
428
cols
429
} else {
430
exprs
431
};
432
Ok(cols
433
.get(idx - 1)
434
.ok_or_else(|| {
435
polars_err!(
436
SQLInterface:
437
"{} ordinal value must refer to a valid column; found {}",
438
clause,
439
idx
440
)
441
})?
442
.clone())
443
},
444
SQLExpr::Value(v) => Err(polars_err!(
445
SQLSyntax:
446
"{} requires a valid expression or positive ordinal; found {}", clause, v,
447
)),
448
_ => {
449
// Handle qualified cross-aliasing in ORDER BY clauses
450
// (eg: `SELECT a AS b, -b AS a ... ORDER BY self.a`)
451
let mut expr = parse_sql_expr(e, self, schema)?;
452
if matches!(e, SQLExpr::CompoundIdentifier(_)) {
453
if let Some(schema) = schema {
454
expr = expr.map_expr(|ex| match &ex {
455
Expr::Column(name) => {
456
let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
457
if schema.contains(prefixed.as_str()) {
458
col(prefixed)
459
} else {
460
ex
461
}
462
},
463
_ => ex,
464
});
465
}
466
}
467
Ok(expr)
468
},
469
}
470
}
471
472
pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
473
if let Some(aliases) = self.joined_aliases.get(tbl_name) {
474
if let Some(name) = aliases.get(column_name) {
475
return name.to_string();
476
}
477
}
478
column_name.to_string()
479
}
480
481
fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
482
match expr {
483
SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
484
SetExpr::Query(nested_query) => {
485
let lf = self.execute_query_no_ctes(nested_query)?;
486
self.process_order_by(lf, &query.order_by, None)
487
},
488
SetExpr::SetOperation {
489
op: SetOperator::Union,
490
set_quantifier,
491
left,
492
right,
493
} => self.process_union(left, right, set_quantifier, query),
494
495
#[cfg(feature = "semi_anti_join")]
496
SetExpr::SetOperation {
497
op: SetOperator::Intersect | SetOperator::Except,
498
set_quantifier,
499
left,
500
right,
501
} => self.process_except_intersect(left, right, *set_quantifier, query),
502
503
SetExpr::Values(Values {
504
explicit_row: _,
505
rows,
506
value_keyword: _,
507
}) => self.process_values(rows),
508
509
SetExpr::Table(tbl) => {
510
if let Some(table_name) = tbl.table_name.as_ref() {
511
self.get_table_from_current_scope(table_name)
512
.ok_or_else(|| {
513
polars_err!(
514
SQLInterface: "no table or alias named '{}' found",
515
tbl
516
)
517
})
518
} else {
519
polars_bail!(SQLInterface: "'TABLE' requires valid table name")
520
}
521
},
522
op => {
523
polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
524
},
525
}
526
}
527
528
#[cfg(feature = "semi_anti_join")]
529
fn process_except_intersect(
530
&mut self,
531
left: &SetExpr,
532
right: &SetExpr,
533
quantifier: SetQuantifier,
534
query: &Query,
535
) -> PolarsResult<LazyFrame> {
536
let (join_type, op_name) = match *query.body {
537
SetExpr::SetOperation {
538
op: SetOperator::Except,
539
..
540
} => (JoinType::Anti, "EXCEPT"),
541
_ => (JoinType::Semi, "INTERSECT"),
542
};
543
544
// Note: each side of the EXCEPT/INTERSECT operation should execute
545
// in isolation to prevent context state leakage between them
546
let lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
547
let rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
548
549
let cb = PlanCallback::new(
550
move |(mut plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
551
let rf = LazyFrame::from_logical_plan(plans.pop().unwrap(), Default::default());
552
let lf = LazyFrame::from_logical_plan(plans.pop().unwrap(), Default::default());
553
554
let lf_cols: Vec<_> = schemas[0].iter_names_cloned().map(col).collect();
555
let rf_cols: Vec<_> = schemas[1].iter_names_cloned().map(col).collect();
556
let join = lf
557
.join_builder()
558
.with(rf)
559
.how(join_type.clone())
560
.join_nulls(true);
561
let joined_tbl = match quantifier {
562
SetQuantifier::ByName => join.on(lf_cols).finish(),
563
SetQuantifier::Distinct | SetQuantifier::None => {
564
polars_ensure!(schemas[0].len() == schemas[1].len(), SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables", op_name, op_name);
565
join.left_on(lf_cols).right_on(rf_cols).finish()
566
},
567
_ => {
568
polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
569
},
570
};
571
Ok(joined_tbl
572
.unique(None, UniqueKeepStrategy::Any)
573
.logical_plan)
574
},
575
);
576
let lf = lf.pipe_with_schemas(vec![rf], cb);
577
self.process_order_by(lf, &query.order_by, None)
578
}
579
580
fn process_union(
581
&mut self,
582
left: &SetExpr,
583
right: &SetExpr,
584
quantifier: &SetQuantifier,
585
query: &Query,
586
) -> PolarsResult<LazyFrame> {
587
let quantifier = *quantifier;
588
589
// Note: each side of the UNION operation should execute
590
// in isolation to prevent context state leakage between them
591
let lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
592
let rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
593
594
let cb = PlanCallback::new(
595
move |(mut plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
596
let mut rf = LazyFrame::from(plans.pop().unwrap());
597
let lf = LazyFrame::from(plans.pop().unwrap());
598
599
let opts = UnionArgs {
600
parallel: true,
601
to_supertypes: true,
602
maintain_order: false,
603
..Default::default()
604
};
605
let out = match quantifier {
606
// UNION [ALL | DISTINCT]
607
SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
608
let lf_schema = &schemas[0];
609
let rf_schema = &schemas[1];
610
if lf_schema.len() != rf_schema.len() {
611
polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
612
}
613
// rename `rf` columns to match `lf` if they differ; SQL behaves
614
// positionally on UNION ops (unless using the "BY NAME" qualifier)
615
if lf_schema.iter_names().ne(rf_schema.iter_names()) {
616
rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
617
}
618
let concatenated = concat(vec![lf, rf], opts);
619
match quantifier {
620
SetQuantifier::Distinct | SetQuantifier::None => {
621
concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
622
},
623
_ => concatenated,
624
}
625
},
626
// UNION ALL BY NAME
627
#[cfg(feature = "diagonal_concat")]
628
SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
629
// UNION [DISTINCT] BY NAME
630
#[cfg(feature = "diagonal_concat")]
631
SetQuantifier::ByName | SetQuantifier::DistinctByName => {
632
let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
633
concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
634
},
635
#[allow(unreachable_patterns)]
636
_ => {
637
polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
638
},
639
};
640
641
out.map(|lf| lf.logical_plan)
642
},
643
);
644
645
let lf = lf.pipe_with_schemas(vec![rf], cb);
646
self.process_order_by(lf, &query.order_by, None)
647
}
648
649
/// Process UNNEST as a lateral operation when it contains column references
650
/// (handles `CROSS JOIN UNNEST(col) AS name` by exploding the referenced col).
651
fn process_unnest_lateral(
652
&self,
653
lf: LazyFrame,
654
alias: &Option<TableAlias>,
655
array_exprs: &[SQLExpr],
656
with_offset: bool,
657
) -> PolarsResult<LazyFrame> {
658
let alias = alias
659
.as_ref()
660
.ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
661
polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
662
663
let (mut explode_cols, mut rename_from, mut rename_to) = (
664
Vec::with_capacity(array_exprs.len()),
665
Vec::with_capacity(array_exprs.len()),
666
Vec::with_capacity(array_exprs.len()),
667
);
668
let is_single_col = array_exprs.len() == 1;
669
670
for (i, arr_expr) in array_exprs.iter().enumerate() {
671
let col_name = match arr_expr {
672
SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
673
SQLExpr::CompoundIdentifier(parts) => {
674
PlSmallStr::from_str(&parts.last().unwrap().value)
675
},
676
SQLExpr::Array(_) => polars_bail!(
677
SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
678
),
679
other => polars_bail!(
680
SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
681
),
682
};
683
// alias: column name from "AS t(col)", or table alias
684
if let Some(name) = alias
685
.columns
686
.get(i)
687
.map(|c| c.name.value.as_str())
688
.or_else(|| is_single_col.then_some(alias.name.value.as_str()))
689
.filter(|name| !name.is_empty() && *name != col_name.as_str())
690
{
691
rename_from.push(col_name.clone());
692
rename_to.push(PlSmallStr::from_str(name));
693
}
694
explode_cols.push(col_name);
695
}
696
697
let mut lf = lf.explode(
698
Selector::ByName {
699
names: Arc::from(explode_cols),
700
strict: true,
701
},
702
ExplodeOptions {
703
empty_as_null: true,
704
keep_nulls: true,
705
},
706
);
707
if !rename_from.is_empty() {
708
lf = lf.rename(rename_from, rename_to, true);
709
}
710
Ok(lf)
711
}
712
713
fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
714
let frame_rows: Vec<Row> = values.iter().map(|row| {
715
let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
716
let expr = parse_sql_expr(expr, self, None)?;
717
match expr {
718
Expr::Literal(value) => {
719
value.to_any_value()
720
.ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
721
.map(|av| av.into_static())
722
},
723
_ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
724
}
725
}).collect();
726
row_data.map(Row::new)
727
}).collect::<Result<_, _>>()?;
728
729
Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
730
}
731
732
// EXPLAIN SELECT * FROM DF
733
fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
734
match stmt {
735
Statement::Explain { statement, .. } => {
736
let lf = self.execute_statement(statement)?;
737
let plan = lf.describe_optimized_plan()?;
738
let plan = plan
739
.split('\n')
740
.collect::<Series>()
741
.with_name(PlSmallStr::from_static("Logical Plan"))
742
.into_column();
743
let df = DataFrame::new_infer_height(vec![plan])?;
744
Ok(df.lazy())
745
},
746
_ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
747
}
748
}
749
750
// SHOW TABLES
751
fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
752
let tables = Column::new("name".into(), self.get_tables());
753
let df = DataFrame::new_infer_height(vec![tables])?;
754
Ok(df.lazy())
755
}
756
757
// DROP TABLE <tbl>
758
fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
759
match stmt {
760
Statement::Drop { names, .. } => {
761
names.iter().for_each(|name| {
762
self.table_map.write().unwrap().remove(&name.to_string());
763
});
764
Ok(DataFrame::empty().lazy())
765
},
766
_ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
767
}
768
}
769
770
// DELETE FROM <tbl> [WHERE ...]
771
fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
772
if let Statement::Delete(Delete {
773
tables,
774
from,
775
using,
776
selection,
777
returning,
778
order_by,
779
limit,
780
delete_token: _,
781
}) = stmt
782
{
783
if !tables.is_empty()
784
|| using.is_some()
785
|| returning.is_some()
786
|| limit.is_some()
787
|| !order_by.is_empty()
788
{
789
let error_message = match () {
790
_ if !tables.is_empty() => "DELETE expects exactly one table name",
791
_ if using.is_some() => "DELETE does not support the USING clause",
792
_ if returning.is_some() => "DELETE does not support the RETURNING clause",
793
_ if limit.is_some() => "DELETE does not support the LIMIT clause",
794
_ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
795
_ => unreachable!(),
796
};
797
polars_bail!(SQLInterface: error_message);
798
}
799
let from_tables = match &from {
800
FromTable::WithFromKeyword(from) => from,
801
FromTable::WithoutKeyword(from) => from,
802
};
803
if from_tables.len() > 1 {
804
polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
805
}
806
let tbl_expr = from_tables.first().unwrap();
807
if !tbl_expr.joins.is_empty() {
808
polars_bail!(SQLInterface: "DELETE does not support table JOINs")
809
}
810
let (_, lf) = self.get_table(&tbl_expr.relation)?;
811
if selection.is_none() {
812
// no WHERE clause; equivalent to TRUNCATE (drop all rows)
813
Ok(clear_lf(lf))
814
} else {
815
// apply constraint as inverted filter (drops rows matching the selection)
816
Ok(self.process_where(lf.clone(), selection, true, None)?)
817
}
818
} else {
819
polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
820
}
821
}
822
823
// TRUNCATE <tbl>
824
fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
825
if let Statement::Truncate(Truncate {
826
table_names,
827
partitions,
828
..
829
}) = stmt
830
{
831
match partitions {
832
None => {
833
if table_names.len() != 1 {
834
polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
835
}
836
let tbl = table_names[0].name.to_string();
837
if let Some(lf) = self.table_map.write().unwrap().get_mut(&tbl) {
838
*lf = clear_lf(lf.clone());
839
Ok(lf.clone())
840
} else {
841
polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
842
}
843
},
844
_ => {
845
polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
846
},
847
}
848
} else {
849
polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
850
}
851
}
852
853
fn register_cte(&mut self, name: &str, lf: LazyFrame) {
854
self.cte_map.insert(name.to_owned(), lf);
855
}
856
857
fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
858
if let Some(with) = &query.with {
859
if with.recursive {
860
polars_bail!(SQLInterface: "recursive CTEs are not supported")
861
}
862
for cte in &with.cte_tables {
863
let cte_name = cte.alias.name.value.clone();
864
let mut lf = self.execute_query(&cte.query)?;
865
lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
866
self.register_cte(&cte_name, lf);
867
}
868
}
869
Ok(())
870
}
871
872
fn register_named_windows(
873
&mut self,
874
named_windows: &[NamedWindowDefinition],
875
) -> PolarsResult<()> {
876
for NamedWindowDefinition(name, expr) in named_windows {
877
let spec = match expr {
878
NamedWindowExpr::NamedWindow(ref_name) => self
879
.named_windows
880
.get(&ref_name.value)
881
.ok_or_else(|| {
882
polars_err!(
883
SQLInterface:
884
"named window '{}' references undefined window '{}'",
885
name.value, ref_name.value
886
)
887
})?
888
.clone(),
889
NamedWindowExpr::WindowSpec(spec) => spec.clone(),
890
};
891
self.named_windows.insert(name.value.clone(), spec);
892
}
893
Ok(())
894
}
895
896
/// execute the 'FROM' part of the query
897
fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
898
let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
899
if !tbl_expr.joins.is_empty() {
900
for join in &tbl_expr.joins {
901
// Handle "CROSS JOIN UNNEST(col)" as a lateral join op
902
if let (
903
JoinOperator::CrossJoin(JoinConstraint::None),
904
TableFactor::UNNEST {
905
alias,
906
array_exprs,
907
with_offset,
908
..
909
},
910
) = (&join.join_operator, &join.relation)
911
{
912
if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
913
lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
914
continue;
915
}
916
}
917
918
let (r_name, mut rf) = self.get_table(&join.relation)?;
919
if r_name.is_empty() {
920
// Require non-empty to avoid duplicate column errors from nested self-joins.
921
polars_bail!(
922
SQLInterface:
923
"cannot JOIN on unnamed relation; please provide an alias"
924
)
925
}
926
let left_schema = self.get_frame_schema(&mut lf)?;
927
let right_schema = self.get_frame_schema(&mut rf)?;
928
929
lf = match &join.join_operator {
930
op @ (JoinOperator::Join(constraint) // note: bare "join" is inner
931
| JoinOperator::FullOuter(constraint)
932
| JoinOperator::Left(constraint)
933
| JoinOperator::LeftOuter(constraint)
934
| JoinOperator::Right(constraint)
935
| JoinOperator::RightOuter(constraint)
936
| JoinOperator::Inner(constraint)
937
| JoinOperator::Anti(constraint)
938
| JoinOperator::Semi(constraint)
939
| JoinOperator::LeftAnti(constraint)
940
| JoinOperator::LeftSemi(constraint)
941
| JoinOperator::RightAnti(constraint)
942
| JoinOperator::RightSemi(constraint)) => {
943
let (lf, rf) = match op {
944
JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
945
_ => (lf, rf),
946
};
947
self.process_join(
948
&TableInfo {
949
frame: lf,
950
name: (&l_name).into(),
951
schema: left_schema.clone(),
952
},
953
&TableInfo {
954
frame: rf,
955
name: (&r_name).into(),
956
schema: right_schema.clone(),
957
},
958
constraint,
959
match op {
960
JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
961
JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
962
JoinType::Left
963
},
964
JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
965
JoinType::Right
966
},
967
JoinOperator::FullOuter(_) => JoinType::Full,
968
#[cfg(feature = "semi_anti_join")]
969
JoinOperator::Anti(_)
970
| JoinOperator::LeftAnti(_)
971
| JoinOperator::RightAnti(_) => JoinType::Anti,
972
#[cfg(feature = "semi_anti_join")]
973
JoinOperator::Semi(_)
974
| JoinOperator::LeftSemi(_)
975
| JoinOperator::RightSemi(_) => JoinType::Semi,
976
join_type => polars_bail!(
977
SQLInterface:
978
"join type '{:?}' not currently supported",
979
join_type
980
),
981
},
982
)?
983
},
984
JoinOperator::CrossJoin(JoinConstraint::None) => {
985
lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
986
},
987
JoinOperator::CrossJoin(constraint) => {
988
polars_bail!(
989
SQLInterface:
990
"CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
991
constraint
992
)
993
},
994
join_type => {
995
polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
996
},
997
};
998
999
// track join-aliased columns so we can resolve/check them later
1000
let joined_schema = self.get_frame_schema(&mut lf)?;
1001
1002
self.joined_aliases.insert(
1003
r_name.clone(),
1004
right_schema
1005
.iter_names()
1006
.filter_map(|name| {
1007
// col exists in both tables and is aliased in the joined result
1008
let aliased_name = format!("{name}:{r_name}");
1009
if left_schema.contains(name)
1010
&& joined_schema.contains(aliased_name.as_str())
1011
{
1012
Some((name.to_string(), aliased_name))
1013
} else {
1014
None
1015
}
1016
})
1017
.collect::<PlHashMap<String, String>>(),
1018
);
1019
}
1020
};
1021
Ok(lf)
1022
}
1023
1024
/// Check that the SELECT statement only contains supported clauses.
1025
fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
1026
// Destructure "Select" exhaustively; that way if/when new fields are added in
1027
// future sqlparser versions, we'll get a compilation error and can handle them
1028
let Select {
1029
// Supported clauses
1030
distinct: _,
1031
from: _,
1032
group_by: _,
1033
having: _,
1034
named_window: _,
1035
projection: _,
1036
qualify: _,
1037
selection: _,
1038
1039
// Metadata/token fields (can ignore)
1040
flavor: _,
1041
select_token: _,
1042
top_before_distinct: _,
1043
window_before_qualify: _,
1044
1045
// Unsupported clauses
1046
ref cluster_by,
1047
ref connect_by,
1048
ref distribute_by,
1049
ref exclude,
1050
ref into,
1051
ref lateral_views,
1052
ref prewhere,
1053
ref sort_by,
1054
ref top,
1055
ref value_table_mode,
1056
} = *select_stmt;
1057
1058
// Raise specific error messages for unsupported attributes
1059
polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
1060
polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
1061
polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
1062
polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
1063
polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO` clause is not supported");
1064
polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
1065
polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
1066
polars_ensure!(sort_by.is_empty(), SQLInterface: "`SORT BY` clause is not supported; use `ORDER BY` instead");
1067
polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
1068
polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
1069
1070
Ok(())
1071
}
1072
1073
/// Check that the QUERY only contains supported clauses.
1074
fn validate_query(&self, query: &Query) -> PolarsResult<()> {
1075
// As with "Select" validation (above) destructure "Query" exhaustively
1076
let Query {
1077
// Supported clauses
1078
with: _,
1079
body: _,
1080
order_by: _,
1081
limit_clause: _,
1082
fetch,
1083
1084
// Unsupported clauses
1085
for_clause,
1086
format_clause,
1087
locks,
1088
pipe_operators,
1089
settings,
1090
} = query;
1091
1092
// Raise specific error messages for unsupported attributes
1093
polars_ensure!(for_clause.is_none(), SQLInterface: "`FOR` clause is not supported");
1094
polars_ensure!(format_clause.is_none(), SQLInterface: "`FORMAT` clause is not supported");
1095
polars_ensure!(locks.is_empty(), SQLInterface: "`FOR UPDATE/SHARE` locking clause is not supported");
1096
polars_ensure!(pipe_operators.is_empty(), SQLInterface: "pipe operators are not supported");
1097
polars_ensure!(settings.is_none(), SQLInterface: "`SETTINGS` clause is not supported");
1098
1099
// Validate FETCH clause options (if present)
1100
if let Some(Fetch {
1101
quantity: _, // supported
1102
percent,
1103
with_ties,
1104
}) = fetch
1105
{
1106
polars_ensure!(!percent, SQLInterface: "`FETCH` with `PERCENT` is not supported");
1107
polars_ensure!(!with_ties, SQLInterface: "`FETCH` with `WITH TIES` is not supported");
1108
}
1109
Ok(())
1110
}
1111
1112
/// Execute the 'SELECT' part of the query.
1113
fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
1114
// Check that the statement doesn't contain unsupported SELECT clauses
1115
self.validate_select(select_stmt)?;
1116
1117
// Parse named windows first, as they may be referenced in the SELECT clause
1118
self.register_named_windows(&select_stmt.named_window)?;
1119
1120
// Get `FROM` table/data
1121
let (mut lf, base_table_name) = if select_stmt.from.is_empty() {
1122
(DataFrame::empty().lazy(), None)
1123
} else {
1124
// Note: implicit joins need more work to support properly,
1125
// explicit joins are preferred for now (ref: #16662)
1126
let from = select_stmt.clone().from;
1127
if from.len() > 1 {
1128
polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
1129
}
1130
let tbl_expr = from.first().unwrap();
1131
let lf = self.execute_from_statement(tbl_expr)?;
1132
let base_name = get_table_name(&tbl_expr.relation);
1133
(lf, base_name)
1134
};
1135
1136
// Check for ambiguous column references in SELECT and WHERE (if there were joins)
1137
if let Some(ref base_name) = base_table_name {
1138
if !self.joined_aliases.is_empty() {
1139
// Extract USING columns from joins (these are coalesced and not ambiguous)
1140
let using_cols: PlHashSet<String> = select_stmt
1141
.from
1142
.first()
1143
.into_iter()
1144
.flat_map(|t| t.joins.iter())
1145
.filter_map(|join| get_using_cols(&join.join_operator))
1146
.flatten()
1147
.collect();
1148
1149
// Check SELECT and WHERE expressions for ambiguous column references
1150
let check_expr = |e| {
1151
check_for_ambiguous_column_refs(e, &self.joined_aliases, base_name, &using_cols)
1152
};
1153
for item in &select_stmt.projection {
1154
match item {
1155
SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
1156
check_expr(e)?
1157
},
1158
_ => {},
1159
}
1160
}
1161
if let Some(ref where_expr) = select_stmt.selection {
1162
check_expr(where_expr)?;
1163
}
1164
}
1165
}
1166
1167
// Apply `WHERE` constraint
1168
let mut schema = self.get_frame_schema(&mut lf)?;
1169
lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
1170
1171
// Determine projections
1172
let mut select_modifiers = SelectModifiers {
1173
ilike: None,
1174
exclude: PlHashSet::new(),
1175
rename: PlHashMap::new(),
1176
replace: vec![],
1177
};
1178
1179
// Collect window function cols if QUALIFY is present (we check at the
1180
// SQL level because empty OVER() clauses don't create Expr::Over)
1181
let window_fn_columns = if select_stmt.qualify.is_some() {
1182
select_stmt
1183
.projection
1184
.iter()
1185
.filter_map(|item| match item {
1186
SelectItem::ExprWithAlias { expr, alias }
1187
if expr_has_window_functions(expr) =>
1188
{
1189
Some(alias.value.clone())
1190
},
1191
_ => None,
1192
})
1193
.collect::<PlHashSet<_>>()
1194
} else {
1195
PlHashSet::new()
1196
};
1197
1198
let mut projections =
1199
self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1200
1201
// Apply `UNNEST` expressions
1202
let mut explode_names = Vec::new();
1203
let mut explode_exprs = Vec::new();
1204
let mut explode_lookup = PlHashMap::new();
1205
1206
for expr in &projections {
1207
for e in expr {
1208
if let Expr::Explode { input, .. } = e {
1209
match input.as_ref() {
1210
Expr::Column(name) => explode_names.push(name.clone()),
1211
other_expr => {
1212
// Note: skip aggregate expressions; those are handled in the GROUP BY phase
1213
if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1214
let temp_name =
1215
format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1216
explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1217
explode_lookup.insert(other_expr.clone(), temp_name.clone());
1218
explode_names.push(temp_name);
1219
}
1220
},
1221
}
1222
}
1223
}
1224
}
1225
if !explode_names.is_empty() {
1226
if !explode_exprs.is_empty() {
1227
lf = lf.with_columns(explode_exprs);
1228
}
1229
lf = lf.explode(
1230
Selector::ByName {
1231
names: Arc::from(explode_names),
1232
strict: true,
1233
},
1234
ExplodeOptions {
1235
empty_as_null: true,
1236
keep_nulls: true,
1237
},
1238
);
1239
projections = projections
1240
.into_iter()
1241
.map(|p| {
1242
// Update "projections" with column refs to the now-exploded expressions
1243
p.map_expr(|e| match e {
1244
Expr::Explode { input, .. } => explode_lookup
1245
.get(input.as_ref())
1246
.map(|name| Expr::Column(name.clone()))
1247
.unwrap_or_else(|| input.as_ref().clone()),
1248
_ => e,
1249
})
1250
})
1251
.collect();
1252
1253
schema = self.get_frame_schema(&mut lf)?;
1254
}
1255
1256
// Check for "GROUP BY ..." (after determining projections)
1257
let mut group_by_keys: Vec<Expr> = Vec::new();
1258
match &select_stmt.group_by {
1259
// Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
1260
GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1261
if !modifiers.is_empty() {
1262
polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1263
}
1264
// Translate the group expressions, resolving ordinal values and SELECT aliases
1265
group_by_keys = group_by_exprs
1266
.iter()
1267
.map(|e| match e {
1268
SQLExpr::Identifier(ident) => {
1269
resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1270
|| {
1271
self.expr_or_ordinal(
1272
e,
1273
&projections,
1274
None,
1275
Some(&schema),
1276
"GROUP BY",
1277
)
1278
},
1279
Ok,
1280
)
1281
},
1282
_ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1283
})
1284
.collect::<PolarsResult<_>>()?
1285
},
1286
// "GROUP BY ALL" syntax; automatically adds expressions that do not contain
1287
// nested agg/window funcs to the group key (also ignores literals).
1288
GroupByExpr::All(modifiers) => {
1289
if !modifiers.is_empty() {
1290
polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1291
}
1292
projections.iter().for_each(|expr| match expr {
1293
// immediately match the most common cases (col|agg|len|lit, optionally aliased).
1294
Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1295
Expr::Column(_) => group_by_keys.push(expr.clone()),
1296
Expr::Alias(e, _)
1297
if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1298
Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1299
if let Expr::Column(name) = &**e {
1300
group_by_keys.push(col(name.clone()));
1301
}
1302
},
1303
_ => {
1304
// If not quick-matched, add if no nested agg/window expressions
1305
if !has_expr(expr, |e| {
1306
matches!(e, Expr::Agg(_))
1307
|| matches!(e, Expr::Len)
1308
|| matches!(e, Expr::Over { .. })
1309
|| {
1310
#[cfg(feature = "dynamic_group_by")]
1311
{
1312
matches!(e, Expr::Rolling { .. })
1313
}
1314
#[cfg(not(feature = "dynamic_group_by"))]
1315
{
1316
false
1317
}
1318
}
1319
}) {
1320
group_by_keys.push(expr.clone())
1321
}
1322
},
1323
});
1324
},
1325
};
1326
1327
lf = if group_by_keys.is_empty() {
1328
// The 'having' clause is only valid inside 'group by'
1329
if select_stmt.having.is_some() {
1330
polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1331
};
1332
1333
// Final/selected cols, accounting for 'SELECT *' modifiers
1334
let mut retained_cols = Vec::with_capacity(projections.len());
1335
let mut retained_names = Vec::with_capacity(projections.len());
1336
let have_order_by = query.order_by.is_some();
1337
1338
// Initialize containing InheritsContext to handle empty projection case.
1339
let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1340
1341
// Note: if there is an 'order by' then we project everything (original cols
1342
// and new projections) and *then* select the final cols; the retained cols
1343
// are used to ensure a correct final projection. If there's no 'order by',
1344
// clause then we can project the final column *expressions* directly.
1345
for p in projections.iter() {
1346
let name = p.to_field(schema.deref())?.name.to_string();
1347
if select_modifiers.matches_ilike(&name)
1348
&& !select_modifiers.exclude.contains(&name)
1349
{
1350
projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1351
1352
retained_cols.push(if have_order_by {
1353
col(name.as_str())
1354
} else {
1355
p.clone()
1356
});
1357
retained_names.push(col(name));
1358
}
1359
}
1360
1361
// Apply the remaining modifiers and establish the final projection
1362
if have_order_by {
1363
// We can safely use `with_columns()` and avoid a join if:
1364
// * There is already a projection that projects to the table height.
1365
// * All projection heights inherit from context (e.g. all scalar literals that
1366
// are to be broadcasted to table height).
1367
if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1368
|| projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1369
{
1370
lf = lf.with_columns(projections);
1371
} else {
1372
// We hit this branch if the output height is not guaranteed to match the table
1373
// height. E.g.:
1374
//
1375
// * SELECT COUNT(*) FROM df ORDER BY sort_key;
1376
//
1377
// For these cases we truncate / extend the sorting columns with NULLs to match
1378
// the output height. We do this by projecting independently and then joining
1379
// back the original frame on the row index.
1380
const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1381
lf = lf
1382
.clone()
1383
.select(projections)
1384
.with_row_index(NAME, None)
1385
.join(
1386
lf.with_row_index(NAME, None),
1387
[col(NAME)],
1388
[col(NAME)],
1389
JoinArgs {
1390
how: JoinType::Left,
1391
validation: Default::default(),
1392
suffix: None,
1393
slice: None,
1394
nulls_equal: false,
1395
coalesce: Default::default(),
1396
maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
1397
build_side: None,
1398
},
1399
);
1400
}
1401
}
1402
if !select_modifiers.replace.is_empty() {
1403
lf = lf.with_columns(&select_modifiers.replace);
1404
}
1405
if !select_modifiers.rename.is_empty() {
1406
lf = lf.with_columns(select_modifiers.renamed_cols());
1407
}
1408
lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1409
1410
// Note: If `have_order_by`, with_columns is already done above.
1411
if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1412
&& !have_order_by
1413
{
1414
// All projections need to be broadcasted to table height, so evaluate in `with_columns()`
1415
lf = lf.with_columns(retained_cols).select(retained_names);
1416
} else {
1417
lf = lf.select(retained_cols);
1418
}
1419
if !select_modifiers.rename.is_empty() {
1420
lf = lf.rename(
1421
select_modifiers.rename.keys(),
1422
select_modifiers.rename.values(),
1423
true,
1424
);
1425
};
1426
lf
1427
} else {
1428
let having = select_stmt
1429
.having
1430
.as_ref()
1431
.map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1432
.transpose()?;
1433
lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1434
lf = self.process_order_by(lf, &query.order_by, None)?;
1435
1436
// Drop any extra columns (eg: added to maintain ORDER BY access to original cols)
1437
let output_cols: Vec<_> = projections
1438
.iter()
1439
.map(|p| p.to_field(&schema))
1440
.collect::<PolarsResult<Vec<_>>>()?
1441
.into_iter()
1442
.map(|f| col(f.name))
1443
.collect();
1444
1445
lf.select(&output_cols)
1446
};
1447
1448
// Apply optional QUALIFY clause (filters on window functions).
1449
lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1450
1451
// Apply optional DISTINCT clause.
1452
lf = match &select_stmt.distinct {
1453
Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1454
Some(Distinct::On(exprs)) => {
1455
// TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
1456
let schema = Some(self.get_frame_schema(&mut lf)?);
1457
let cols = exprs
1458
.iter()
1459
.map(|e| {
1460
let expr = parse_sql_expr(e, self, schema.as_deref())?;
1461
if let Expr::Column(name) = expr {
1462
Ok(name)
1463
} else {
1464
Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1465
}
1466
})
1467
.collect::<PolarsResult<Vec<_>>>()?;
1468
1469
// DISTINCT ON has to apply the ORDER BY before the operation.
1470
lf = self.process_order_by(lf, &query.order_by, None)?;
1471
return Ok(lf.unique_stable(
1472
Some(Selector::ByName {
1473
names: cols.into(),
1474
strict: true,
1475
}),
1476
UniqueKeepStrategy::First,
1477
));
1478
},
1479
None => lf,
1480
};
1481
Ok(lf)
1482
}
1483
1484
fn column_projections(
1485
&mut self,
1486
select_stmt: &Select,
1487
schema: &SchemaRef,
1488
select_modifiers: &mut SelectModifiers,
1489
) -> PolarsResult<Vec<Expr>> {
1490
let mut items: Vec<ProjectionItem> = Vec::with_capacity(select_stmt.projection.len());
1491
let mut has_qualified_wildcard = false;
1492
1493
for select_item in &select_stmt.projection {
1494
match select_item {
1495
SelectItem::UnnamedExpr(expr) => {
1496
items.push(ProjectionItem::Exprs(vec![parse_sql_expr(
1497
expr,
1498
self,
1499
Some(schema),
1500
)?]));
1501
},
1502
SelectItem::ExprWithAlias { expr, alias } => {
1503
let expr = parse_sql_expr(expr, self, Some(schema))?;
1504
items.push(ProjectionItem::Exprs(vec![
1505
expr.alias(PlSmallStr::from_str(alias.value.as_str())),
1506
]));
1507
},
1508
SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1509
SelectItemQualifiedWildcardKind::ObjectName(obj_name) => {
1510
let tbl_name = obj_name
1511
.0
1512
.last()
1513
.and_then(|p| p.as_ident())
1514
.map(|i| PlSmallStr::from_str(&i.value))
1515
.unwrap_or_default();
1516
let exprs = self.process_qualified_wildcard(
1517
obj_name,
1518
wildcard_options,
1519
select_modifiers,
1520
Some(schema),
1521
)?;
1522
items.push(ProjectionItem::QualifiedExprs(tbl_name, exprs));
1523
has_qualified_wildcard = true;
1524
},
1525
SelectItemQualifiedWildcardKind::Expr(_) => {
1526
polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1527
},
1528
},
1529
SelectItem::Wildcard(wildcard_options) => {
1530
let cols = schema
1531
.iter_names()
1532
.map(|name| col(name.clone()))
1533
.collect::<Vec<_>>();
1534
1535
items.push(ProjectionItem::Exprs(
1536
self.process_wildcard_additional_options(
1537
cols,
1538
wildcard_options,
1539
select_modifiers,
1540
Some(schema),
1541
)?,
1542
));
1543
},
1544
}
1545
}
1546
1547
// Disambiguate qualified wildcards (if any) and flatten expressions
1548
let exprs = if has_qualified_wildcard {
1549
disambiguate_projection_cols(items, schema)?
1550
} else {
1551
items
1552
.into_iter()
1553
.flat_map(|item| match item {
1554
ProjectionItem::Exprs(exprs) | ProjectionItem::QualifiedExprs(_, exprs) => {
1555
exprs
1556
},
1557
})
1558
.collect()
1559
};
1560
let flattened_exprs = exprs
1561
.into_iter()
1562
.flat_map(|expr| expand_exprs(expr, schema))
1563
.collect();
1564
1565
Ok(flattened_exprs)
1566
}
1567
1568
fn process_where(
1569
&mut self,
1570
mut lf: LazyFrame,
1571
expr: &Option<SQLExpr>,
1572
invert_filter: bool,
1573
schema: Option<SchemaRef>,
1574
) -> PolarsResult<LazyFrame> {
1575
if let Some(expr) = expr {
1576
let schema = match schema {
1577
None => self.get_frame_schema(&mut lf)?,
1578
Some(s) => s,
1579
};
1580
1581
// shortcut filter evaluation if given expression is just TRUE or FALSE
1582
let (all_true, all_false) = match expr {
1583
SQLExpr::Value(ValueWithSpan {
1584
value: SQLValue::Boolean(b),
1585
..
1586
}) => (*b, !*b),
1587
SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1588
(SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => {
1589
(a.value == b.value, a.value != b.value)
1590
},
1591
(SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1592
(a.value != b.value, a.value == b.value)
1593
},
1594
_ => (false, false),
1595
},
1596
_ => (false, false),
1597
};
1598
if (all_true && !invert_filter) || (all_false && invert_filter) {
1599
return Ok(lf);
1600
} else if (all_false && !invert_filter) || (all_true && invert_filter) {
1601
return Ok(clear_lf(lf));
1602
}
1603
1604
// ...otherwise parse and apply the filter as normal
1605
let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1606
if filter_expression.clone().meta().has_multiple_outputs() {
1607
filter_expression = all_horizontal([filter_expression])?;
1608
}
1609
lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1610
lf = if invert_filter {
1611
lf.remove(filter_expression)
1612
} else {
1613
lf.filter(filter_expression)
1614
};
1615
}
1616
Ok(lf)
1617
}
1618
1619
pub(super) fn process_join(
1620
&mut self,
1621
tbl_left: &TableInfo,
1622
tbl_right: &TableInfo,
1623
constraint: &JoinConstraint,
1624
join_type: JoinType,
1625
) -> PolarsResult<LazyFrame> {
1626
let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1627
let coalesce_type = match constraint {
1628
// "NATURAL" joins should coalesce; otherwise we disambiguate
1629
JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1630
_ => JoinCoalesce::KeepColumns,
1631
};
1632
let joined = tbl_left
1633
.frame
1634
.clone()
1635
.join_builder()
1636
.with(tbl_right.frame.clone())
1637
.left_on(left_on)
1638
.right_on(right_on)
1639
.how(join_type)
1640
.suffix(format!(":{}", tbl_right.name))
1641
.coalesce(coalesce_type)
1642
.finish();
1643
1644
Ok(joined)
1645
}
1646
1647
fn process_qualify(
1648
&mut self,
1649
mut lf: LazyFrame,
1650
qualify_expr: &Option<SQLExpr>,
1651
window_fn_columns: &PlHashSet<String>,
1652
) -> PolarsResult<LazyFrame> {
1653
if let Some(expr) = qualify_expr {
1654
// Check the QUALIFY expression to identify window functions
1655
// and collect column refs (for looking up aliases from SELECT)
1656
let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1657
let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1658
if !has_window_fns && !references_window_alias {
1659
polars_bail!(
1660
SQLSyntax:
1661
"QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1662
);
1663
}
1664
let schema = self.get_frame_schema(&mut lf)?;
1665
let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1666
if filter_expression.clone().meta().has_multiple_outputs() {
1667
filter_expression = all_horizontal([filter_expression])?;
1668
}
1669
lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1670
lf = lf.filter(filter_expression);
1671
}
1672
Ok(lf)
1673
}
1674
1675
fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1676
let mut subplans = vec![];
1677
1678
for e in exprs {
1679
*e = e.clone().map_expr(|e| {
1680
if let Expr::SubPlan(lp, names) = e {
1681
assert_eq!(
1682
names.len(),
1683
1,
1684
"multiple columns in subqueries not yet supported"
1685
);
1686
1687
let select_expr = names[0].1.clone();
1688
let cb =
1689
PlanCallback::new(move |(plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
1690
let schema = &schemas[0];
1691
polars_ensure!(schema.len() == 1, SQLSyntax: "SQL subquery returns more than one column");
1692
Ok(LazyFrame::from(plans.into_iter().next().unwrap()).select([select_expr.clone()]).logical_plan)
1693
});
1694
subplans.push(LazyFrame::from((**lp).clone()).pipe_with_schema(cb));
1695
Expr::Column(names[0].0.clone()).first()
1696
} else {
1697
e
1698
}
1699
});
1700
}
1701
1702
if subplans.is_empty() {
1703
lf
1704
} else {
1705
subplans.insert(0, lf);
1706
concat_lf_horizontal(
1707
subplans,
1708
HConcatOptions {
1709
broadcast_unit_length: true,
1710
..Default::default()
1711
},
1712
)
1713
.unwrap()
1714
}
1715
}
1716
1717
fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1718
if let Statement::CreateTable(CreateTable {
1719
if_not_exists,
1720
name,
1721
query,
1722
columns,
1723
like,
1724
..
1725
}) = stmt
1726
{
1727
let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1728
if *if_not_exists && self.table_map.read().unwrap().contains_key(tbl_name) {
1729
polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1730
}
1731
let lf = match (query, columns.is_empty(), like) {
1732
(Some(query), true, None) => {
1733
// ----------------------------------------------------
1734
// CREATE TABLE [IF NOT EXISTS] <name> AS <query>
1735
// ----------------------------------------------------
1736
self.execute_query(query)?
1737
},
1738
(None, false, None) => {
1739
// ----------------------------------------------------
1740
// CREATE TABLE [IF NOT EXISTS] <name> (<coldef>, ...)
1741
// ----------------------------------------------------
1742
let mut schema = Schema::with_capacity(columns.len());
1743
for col in columns {
1744
let col_name = col.name.value.as_str();
1745
let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1746
schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1747
}
1748
DataFrame::empty_with_schema(&schema).lazy()
1749
},
1750
(None, true, Some(like_kind)) => {
1751
// ----------------------------------------------------
1752
// CREATE TABLE [IF NOT EXISTS] <name> LIKE <table>
1753
// ----------------------------------------------------
1754
let like_name = match like_kind {
1755
CreateTableLikeKind::Plain(like)
1756
| CreateTableLikeKind::Parenthesized(like) => &like.name,
1757
};
1758
let like_table = like_name
1759
.0
1760
.first()
1761
.unwrap()
1762
.as_ident()
1763
.unwrap()
1764
.value
1765
.as_str();
1766
if let Some(table) = self.table_map.read().unwrap().get(like_table).cloned() {
1767
clear_lf(table)
1768
} else {
1769
polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1770
}
1771
},
1772
// No valid options provided
1773
(None, true, None) => {
1774
polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1775
},
1776
// Mutually exclusive options
1777
_ => {
1778
polars_bail!(
1779
SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1780
query,
1781
columns,
1782
like,
1783
)
1784
},
1785
};
1786
self.register(tbl_name, lf);
1787
1788
let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1789
Ok(df_created.unwrap().lazy())
1790
} else {
1791
unreachable!()
1792
}
1793
}
1794
1795
fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1796
match relation {
1797
TableFactor::Table {
1798
name, alias, args, ..
1799
} => {
1800
if let Some(args) = args {
1801
return self.execute_table_function(name, alias, &args.args);
1802
}
1803
let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1804
if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1805
match alias {
1806
Some(alias) => {
1807
self.table_aliases
1808
.insert(alias.name.value.clone(), tbl_name.to_string());
1809
Ok((alias.name.value.clone(), lf))
1810
},
1811
None => Ok((tbl_name.to_string(), lf)),
1812
}
1813
} else {
1814
polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1815
}
1816
},
1817
TableFactor::Derived {
1818
lateral,
1819
subquery,
1820
alias,
1821
} => {
1822
polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1823
if let Some(alias) = alias {
1824
let mut lf = self.execute_query_no_ctes(subquery)?;
1825
lf = self.rename_columns_from_table_alias(lf, alias)?;
1826
self.table_map
1827
.write()
1828
.unwrap()
1829
.insert(alias.name.value.clone(), lf.clone());
1830
Ok((alias.name.value.clone(), lf))
1831
} else {
1832
let lf = self.execute_query_no_ctes(subquery)?;
1833
Ok(("".to_string(), lf))
1834
}
1835
},
1836
TableFactor::UNNEST {
1837
alias,
1838
array_exprs,
1839
with_offset,
1840
with_offset_alias: _,
1841
..
1842
} => {
1843
if let Some(alias) = alias {
1844
let column_names: Vec<Option<PlSmallStr>> = alias
1845
.columns
1846
.iter()
1847
.map(|c| {
1848
if c.name.value.is_empty() {
1849
None
1850
} else {
1851
Some(PlSmallStr::from_str(c.name.value.as_str()))
1852
}
1853
})
1854
.collect();
1855
1856
let column_values: Vec<Series> = array_exprs
1857
.iter()
1858
.map(|arr| parse_sql_array(arr, self))
1859
.collect::<Result<_, _>>()?;
1860
1861
polars_ensure!(!column_names.is_empty(),
1862
SQLSyntax:
1863
"UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1864
);
1865
if column_names.len() != column_values.len() {
1866
let plural = if column_values.len() > 1 { "s" } else { "" };
1867
polars_bail!(
1868
SQLSyntax:
1869
"UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1870
);
1871
}
1872
let column_series: Vec<Column> = column_values
1873
.into_iter()
1874
.zip(column_names)
1875
.map(|(s, name)| {
1876
if let Some(name) = name {
1877
s.with_name(name)
1878
} else {
1879
s
1880
}
1881
})
1882
.map(Column::from)
1883
.collect();
1884
1885
let lf = DataFrame::new_infer_height(column_series)?.lazy();
1886
1887
if *with_offset {
1888
// TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1889
polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1890
}
1891
let table_name = alias.name.value.clone();
1892
self.table_map
1893
.write()
1894
.unwrap()
1895
.insert(table_name.clone(), lf.clone());
1896
Ok((table_name, lf))
1897
} else {
1898
polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1899
}
1900
},
1901
TableFactor::NestedJoin {
1902
table_with_joins,
1903
alias,
1904
} => {
1905
let lf = self.execute_from_statement(table_with_joins)?;
1906
match alias {
1907
Some(a) => Ok((a.name.value.clone(), lf)),
1908
None => Ok(("".to_string(), lf)),
1909
}
1910
},
1911
// Support bare table, optionally with an alias, for now
1912
_ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1913
}
1914
}
1915
1916
fn execute_table_function(
1917
&mut self,
1918
name: &ObjectName,
1919
alias: &Option<TableAlias>,
1920
args: &[FunctionArg],
1921
) -> PolarsResult<(String, LazyFrame)> {
1922
let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1923
let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1924
let (tbl_name, lf) = read_fn.execute(args)?;
1925
#[allow(clippy::useless_asref)]
1926
let tbl_name = alias
1927
.as_ref()
1928
.map(|a| a.name.value.clone())
1929
.unwrap_or_else(|| tbl_name.to_string());
1930
1931
self.table_map
1932
.write()
1933
.unwrap()
1934
.insert(tbl_name.clone(), lf.clone());
1935
Ok((tbl_name, lf))
1936
}
1937
1938
fn process_order_by(
1939
&mut self,
1940
mut lf: LazyFrame,
1941
order_by: &Option<OrderBy>,
1942
selected: Option<&[Expr]>,
1943
) -> PolarsResult<LazyFrame> {
1944
if order_by.as_ref().is_none_or(|ob| match &ob.kind {
1945
OrderByKind::Expressions(exprs) => exprs.is_empty(),
1946
OrderByKind::All(_) => false,
1947
}) {
1948
return Ok(lf);
1949
}
1950
let schema = self.get_frame_schema(&mut lf)?;
1951
let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1952
let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
1953
OrderByKind::Expressions(exprs) => {
1954
// TODO: will look at making an upstream PR that allows us to more easily
1955
// create a GenericDialect variant supporting "OrderByKind::All" instead
1956
if exprs.len() == 1
1957
&& matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
1958
if ident.value.to_uppercase() == "ALL"
1959
&& !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1960
{
1961
// Treat as ORDER BY ALL
1962
let n_cols = if let Some(selected) = selected {
1963
selected.len()
1964
} else {
1965
schema.len()
1966
};
1967
(vec![], Some(&exprs[0].options), n_cols)
1968
} else {
1969
(exprs.clone(), None, exprs.len())
1970
}
1971
},
1972
OrderByKind::All(opts) => {
1973
let n_cols = if let Some(selected) = selected {
1974
selected.len()
1975
} else {
1976
schema.len()
1977
};
1978
(vec![], Some(opts), n_cols)
1979
},
1980
};
1981
let mut descending = Vec::with_capacity(n_order_cols);
1982
let mut nulls_last = Vec::with_capacity(n_order_cols);
1983
let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
1984
1985
if let Some(opts) = order_by_all {
1986
if let Some(selected) = selected {
1987
by.extend(selected.iter().cloned());
1988
} else {
1989
by.extend(columns_iter);
1990
};
1991
let desc_order = !opts.asc.unwrap_or(true);
1992
nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
1993
descending.resize(by.len(), desc_order);
1994
} else {
1995
let columns = &columns_iter.collect::<Vec<_>>();
1996
for ob in order_by {
1997
// note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1998
// https://www.postgresql.org/docs/current/queries-order.html
1999
let desc_order = !ob.options.asc.unwrap_or(true);
2000
nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
2001
descending.push(desc_order);
2002
2003
// translate order expression, allowing ordinal values
2004
by.push(self.expr_or_ordinal(
2005
&ob.expr,
2006
columns,
2007
selected,
2008
Some(&schema),
2009
"ORDER BY",
2010
)?)
2011
}
2012
}
2013
Ok(lf.sort_by_exprs(
2014
&by,
2015
SortMultipleOptions::default()
2016
.with_order_descending_multi(descending)
2017
.with_nulls_last_multi(nulls_last),
2018
))
2019
}
2020
2021
fn process_group_by(
2022
&mut self,
2023
mut lf: LazyFrame,
2024
group_by_keys: &[Expr],
2025
projections: &[Expr],
2026
having: Option<Expr>,
2027
) -> PolarsResult<LazyFrame> {
2028
let schema_before = self.get_frame_schema(&mut lf)?;
2029
let group_by_keys_schema =
2030
expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
2031
format!("group_by keys contained duplicate output name '{duplicate_name}'")
2032
})?;
2033
2034
// Note: remove the `group_by` keys as Polars adds those implicitly.
2035
let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
2036
let mut aggregation_projection = Vec::with_capacity(projections.len());
2037
let mut projection_overrides = PlHashMap::with_capacity(projections.len());
2038
let mut projection_aliases = PlHashSet::new();
2039
let mut group_key_aliases = PlHashSet::new();
2040
2041
// Pre-compute group key data (stripped expression + output name) to avoid repeated work.
2042
// We check both expression AND output name match to avoid cross-aliasing issues.
2043
let group_key_data: Vec<_> = group_by_keys
2044
.iter()
2045
.map(|gk| {
2046
(
2047
strip_outer_alias(gk),
2048
gk.to_field(&schema_before).ok().map(|f| f.name),
2049
)
2050
})
2051
.collect();
2052
2053
let projection_matches_group_key: Vec<bool> = projections
2054
.iter()
2055
.map(|p| {
2056
let p_stripped = strip_outer_alias(p);
2057
let p_name = p.to_field(&schema_before).ok().map(|f| f.name);
2058
group_key_data
2059
.iter()
2060
.any(|(gk_stripped, gk_name)| *gk_stripped == p_stripped && *gk_name == p_name)
2061
})
2062
.collect();
2063
2064
for (e, &matches_group_key) in projections.iter().zip(&projection_matches_group_key) {
2065
// `Len` represents COUNT(*) so we treat as an aggregation here.
2066
let is_non_group_key_expr = !matches_group_key
2067
&& has_expr(e, |e| {
2068
match e {
2069
Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
2070
#[cfg(feature = "dynamic_group_by")]
2071
Expr::Rolling { .. } => true,
2072
Expr::Function { function: func, .. }
2073
if !matches!(func, FunctionExpr::StructExpr(_)) =>
2074
{
2075
// If it's a function call containing a column NOT in the group by keys,
2076
// we treat it as an aggregation.
2077
has_expr(e, |e| match e {
2078
Expr::Column(name) => !group_by_keys_schema.contains(name),
2079
_ => false,
2080
})
2081
},
2082
_ => false,
2083
}
2084
});
2085
2086
// Note: if simple aliased expression we defer aliasing until after the group_by.
2087
// Use `e_inner` to track the potentially unwrapped expression for field lookup.
2088
let mut e_inner = e;
2089
if let Expr::Alias(expr, alias) = e {
2090
if e.clone().meta().is_simple_projection(Some(&schema_before)) {
2091
group_key_aliases.insert(alias.as_ref());
2092
e_inner = expr
2093
} else if let Expr::Function {
2094
function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
2095
..
2096
} = expr.deref()
2097
{
2098
projection_overrides
2099
.insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
2100
} else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
2101
projection_aliases.insert(alias.as_ref());
2102
}
2103
}
2104
let field = e_inner.to_field(&schema_before)?;
2105
if is_non_group_key_expr {
2106
let mut e = e.clone();
2107
if let Expr::Agg(AggExpr::Implode(expr)) = &e {
2108
e = (**expr).clone();
2109
} else if let Expr::Alias(expr, name) = &e {
2110
if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
2111
e = (**expr).clone().alias(name.clone());
2112
}
2113
}
2114
// If aggregation colname conflicts with a group key,
2115
// alias it to avoid duplicate/mis-tracked columns
2116
if group_by_keys_schema.get(&field.name).is_some() {
2117
let alias_name = format!("__POLARS_AGG_{}", field.name);
2118
e = e.alias(alias_name.as_str());
2119
aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
2120
}
2121
aggregation_projection.push(e);
2122
} else if !matches_group_key {
2123
// Non-aggregated columns must be part of the GROUP BY clause
2124
if let Expr::Column(_)
2125
| Expr::Function {
2126
function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
2127
..
2128
} = e_inner
2129
{
2130
if !group_by_keys_schema.contains(&field.name) {
2131
polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
2132
}
2133
}
2134
}
2135
}
2136
2137
// Process HAVING clause: identify aggregate expressions, reusing those already
2138
// in projections, or compute as temporary columns and then post-filter/discard
2139
let having_filter = if let Some(having_expr) = having {
2140
let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
2141
.iter()
2142
.filter_map(|p| match p {
2143
Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
2144
Some((inner.as_ref().clone(), name.clone()))
2145
},
2146
e @ (Expr::Agg(_) | Expr::Len) => Some((
2147
e.clone(),
2148
e.to_field(&schema_before)
2149
.map(|f| f.name)
2150
.unwrap_or_default(),
2151
)),
2152
_ => None,
2153
})
2154
.collect();
2155
2156
let mut n_having_aggs = 0;
2157
let updated_having = having_expr.map_expr(|e| {
2158
if !matches!(&e, Expr::Agg(_) | Expr::Len) {
2159
return e;
2160
}
2161
let name = agg_to_name
2162
.iter()
2163
.find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
2164
.unwrap_or_else(|| {
2165
let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
2166
aggregation_projection.push(e.clone().alias(n.clone()));
2167
agg_to_name.push((e.clone(), n.clone()));
2168
n_having_aggs += 1;
2169
n
2170
});
2171
col(name)
2172
});
2173
Some(updated_having)
2174
} else {
2175
None
2176
};
2177
2178
// Apply HAVING filter after aggregation
2179
let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
2180
if let Some(filter_expr) = having_filter {
2181
aggregated = aggregated.filter(filter_expr);
2182
}
2183
2184
let projection_schema =
2185
expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
2186
format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
2187
})?;
2188
2189
// A final projection to get the proper order and any deferred transforms/aliases
2190
// (will also drop any temporary columns created for the HAVING post-filter).
2191
let final_projection = projection_schema
2192
.iter_names()
2193
.zip(projections.iter().zip(&projection_matches_group_key))
2194
.map(|(name, (projection_expr, &matches_group_key))| {
2195
if let Some(expr) = projection_overrides.get(name.as_str()) {
2196
expr.clone()
2197
} else if let Some(aliased_name) = aliased_aggregations.get(name) {
2198
col(aliased_name.clone()).alias(name.clone())
2199
} else if group_by_keys_schema.get(name).is_some() && matches_group_key {
2200
col(name.clone())
2201
} else if group_by_keys_schema.get(name).is_some()
2202
|| projection_aliases.contains(name.as_str())
2203
|| group_key_aliases.contains(name.as_str())
2204
{
2205
if has_expr(projection_expr, |e| {
2206
matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
2207
}) {
2208
col(name.clone())
2209
} else {
2210
projection_expr.clone()
2211
}
2212
} else {
2213
col(name.clone())
2214
}
2215
})
2216
.collect::<Vec<_>>();
2217
2218
// Include original GROUP BY columns for ORDER BY access (if aliased).
2219
let mut output_projection = final_projection;
2220
for key_name in group_by_keys_schema.iter_names() {
2221
if !projection_schema.contains(key_name) {
2222
// Original col name not in output - add for ORDER BY access
2223
output_projection.push(col(key_name.clone()));
2224
} else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2225
// Original col name in output - check if cross-aliased
2226
let is_cross_aliased = projections.iter().any(|p| {
2227
p.to_field(&schema_before).is_ok_and(|f| f.name == key_name)
2228
&& !is_simple_col_ref(p, key_name)
2229
});
2230
if is_cross_aliased {
2231
// Add original name under a prefixed alias for subsequent ORDER BY resolution
2232
let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2233
output_projection.push(col(key_name.clone()).alias(internal_name));
2234
}
2235
}
2236
}
2237
Ok(aggregated.select(&output_projection))
2238
}
2239
2240
fn process_limit_offset(
2241
&self,
2242
lf: LazyFrame,
2243
limit_clause: &Option<LimitClause>,
2244
fetch: &Option<Fetch>,
2245
) -> PolarsResult<LazyFrame> {
2246
// Extract limit and offset from LimitClause
2247
let (limit, offset) = match limit_clause {
2248
Some(LimitClause::LimitOffset {
2249
limit,
2250
offset,
2251
limit_by,
2252
}) => {
2253
if !limit_by.is_empty() {
2254
// TODO: might be able to support as an aggregate `top_k_by` operation?
2255
// (https://clickhouse.com/docs/sql-reference/statements/select/limit-by)
2256
polars_bail!(SQLSyntax: "`LIMIT <n> BY <exprs>` clause is not supported");
2257
}
2258
(limit.as_ref(), offset.as_ref().map(|o| &o.value))
2259
},
2260
Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
2261
None => (None, None),
2262
};
2263
2264
// Handle FETCH clause (alternative to LIMIT, mutually exclusive)
2265
let limit = match (fetch, limit) {
2266
(Some(fetch), None) => fetch.quantity.as_ref(),
2267
(Some(_), Some(_)) => {
2268
polars_bail!(SQLSyntax: "cannot use both `LIMIT` and `FETCH` in the same query")
2269
},
2270
(None, limit) => limit,
2271
};
2272
2273
// Apply limit and/or offset
2274
match (offset, limit) {
2275
(
2276
Some(SQLExpr::Value(ValueWithSpan {
2277
value: SQLValue::Number(offset, _),
2278
..
2279
})),
2280
Some(SQLExpr::Value(ValueWithSpan {
2281
value: SQLValue::Number(limit, _),
2282
..
2283
})),
2284
) => Ok(lf.slice(
2285
offset
2286
.parse()
2287
.map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2288
limit.parse().map_err(
2289
|e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2290
)?,
2291
)),
2292
(
2293
Some(SQLExpr::Value(ValueWithSpan {
2294
value: SQLValue::Number(offset, _),
2295
..
2296
})),
2297
None,
2298
) => Ok(lf.slice(
2299
offset
2300
.parse()
2301
.map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2302
IdxSize::MAX,
2303
)),
2304
(
2305
None,
2306
Some(SQLExpr::Value(ValueWithSpan {
2307
value: SQLValue::Number(limit, _),
2308
..
2309
})),
2310
) => {
2311
Ok(lf.limit(limit.parse().map_err(
2312
|e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2313
)?))
2314
},
2315
(None, None) => Ok(lf),
2316
_ => polars_bail!(
2317
SQLSyntax: "non-numeric arguments for LIMIT/OFFSET/FETCH are not supported",
2318
),
2319
}
2320
}
2321
2322
fn process_qualified_wildcard(
2323
&mut self,
2324
ObjectName(idents): &ObjectName,
2325
options: &WildcardAdditionalOptions,
2326
modifiers: &mut SelectModifiers,
2327
schema: Option<&Schema>,
2328
) -> PolarsResult<Vec<Expr>> {
2329
let mut idents_with_wildcard: Vec<Ident> = idents
2330
.iter()
2331
.filter_map(|p| p.as_ident().cloned())
2332
.collect();
2333
idents_with_wildcard.push(Ident::new("*"));
2334
2335
let exprs = resolve_compound_identifier(self, &idents_with_wildcard, schema)?;
2336
self.process_wildcard_additional_options(exprs, options, modifiers, schema)
2337
}
2338
2339
fn process_wildcard_additional_options(
2340
&mut self,
2341
exprs: Vec<Expr>,
2342
options: &WildcardAdditionalOptions,
2343
modifiers: &mut SelectModifiers,
2344
schema: Option<&Schema>,
2345
) -> PolarsResult<Vec<Expr>> {
2346
if options.opt_except.is_some() && options.opt_exclude.is_some() {
2347
polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2348
} else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2349
polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2350
}
2351
2352
// SELECT * EXCLUDE
2353
if let Some(items) = &options.opt_exclude {
2354
match items {
2355
ExcludeSelectItem::Single(ident) => {
2356
modifiers.exclude.insert(ident.value.clone());
2357
},
2358
ExcludeSelectItem::Multiple(idents) => {
2359
modifiers
2360
.exclude
2361
.extend(idents.iter().map(|i| i.value.clone()));
2362
},
2363
};
2364
}
2365
2366
// SELECT * EXCEPT
2367
if let Some(items) = &options.opt_except {
2368
modifiers.exclude.insert(items.first_element.value.clone());
2369
modifiers
2370
.exclude
2371
.extend(items.additional_elements.iter().map(|i| i.value.clone()));
2372
}
2373
2374
// SELECT * ILIKE
2375
if let Some(item) = &options.opt_ilike {
2376
let rx = regex::escape(item.pattern.as_str())
2377
.replace('%', ".*")
2378
.replace('_', ".");
2379
2380
modifiers.ilike = Some(
2381
polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2382
);
2383
}
2384
2385
// SELECT * RENAME
2386
if let Some(items) = &options.opt_rename {
2387
let renames = match items {
2388
RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2389
RenameSelectItem::Multiple(renames) => renames.as_slice(),
2390
};
2391
for rn in renames {
2392
let before = PlSmallStr::from_str(rn.ident.value.as_str());
2393
let after = PlSmallStr::from_str(rn.alias.value.as_str());
2394
if before != after {
2395
modifiers.rename.insert(before, after);
2396
}
2397
}
2398
}
2399
2400
// SELECT * REPLACE
2401
if let Some(replacements) = &options.opt_replace {
2402
for rp in &replacements.items {
2403
let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2404
modifiers
2405
.replace
2406
.push(replacement_expr?.alias(rp.column_name.value.as_str()));
2407
}
2408
}
2409
Ok(exprs)
2410
}
2411
2412
fn rename_columns_from_table_alias(
2413
&mut self,
2414
mut lf: LazyFrame,
2415
alias: &TableAlias,
2416
) -> PolarsResult<LazyFrame> {
2417
if alias.columns.is_empty() {
2418
Ok(lf)
2419
} else {
2420
let schema = self.get_frame_schema(&mut lf)?;
2421
if alias.columns.len() != schema.len() {
2422
polars_bail!(
2423
SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2424
alias.columns.len(), alias.name.value, schema.len()
2425
)
2426
} else {
2427
let existing_columns: Vec<_> = schema.iter_names().collect();
2428
let new_columns: Vec<_> =
2429
alias.columns.iter().map(|c| c.name.value.clone()).collect();
2430
Ok(lf.rename(existing_columns, new_columns, true))
2431
}
2432
}
2433
}
2434
}
2435
2436
impl SQLContext {
2437
/// Create a new SQLContext from a table map. For internal use only
2438
pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2439
Self {
2440
table_map: Arc::new(RwLock::new(table_map)),
2441
..Default::default()
2442
}
2443
}
2444
}
2445
2446
fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2447
match expr {
2448
Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2449
let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2450
schema
2451
.iter_names()
2452
.filter(|name| re.is_match(name))
2453
.map(|name| col(name.clone()))
2454
.collect::<Vec<_>>()
2455
},
2456
Expr::Selector(s) => s
2457
.into_columns(schema, &Default::default())
2458
.unwrap()
2459
.into_iter()
2460
.map(col)
2461
.collect::<Vec<_>>(),
2462
_ => vec![expr],
2463
}
2464
}
2465
2466
fn is_regex_colname(nm: &str) -> bool {
2467
nm.starts_with('^') && nm.ends_with('$')
2468
}
2469
2470
/// Extract column names from a USING clause in a JoinOperator (if present).
2471
fn get_using_cols(op: &JoinOperator) -> Option<impl Iterator<Item = String> + '_> {
2472
use JoinOperator::*;
2473
match op {
2474
Join(JoinConstraint::Using(cols))
2475
| Inner(JoinConstraint::Using(cols))
2476
| Left(JoinConstraint::Using(cols))
2477
| LeftOuter(JoinConstraint::Using(cols))
2478
| Right(JoinConstraint::Using(cols))
2479
| RightOuter(JoinConstraint::Using(cols))
2480
| FullOuter(JoinConstraint::Using(cols))
2481
| Semi(JoinConstraint::Using(cols))
2482
| Anti(JoinConstraint::Using(cols))
2483
| LeftSemi(JoinConstraint::Using(cols))
2484
| LeftAnti(JoinConstraint::Using(cols))
2485
| RightSemi(JoinConstraint::Using(cols))
2486
| RightAnti(JoinConstraint::Using(cols)) => Some(cols.iter().filter_map(|c| {
2487
c.0.first()
2488
.and_then(|p| p.as_ident())
2489
.map(|i| i.value.clone())
2490
})),
2491
_ => None,
2492
}
2493
}
2494
2495
/// Extract the table name (or alias) from a TableFactor.
2496
fn get_table_name(factor: &TableFactor) -> Option<String> {
2497
match factor {
2498
TableFactor::Table { name, alias, .. } => {
2499
alias.as_ref().map(|a| a.name.value.clone()).or_else(|| {
2500
name.0
2501
.last()
2502
.and_then(|p| p.as_ident())
2503
.map(|i| i.value.clone())
2504
})
2505
},
2506
TableFactor::Derived { alias, .. }
2507
| TableFactor::NestedJoin { alias, .. }
2508
| TableFactor::TableFunction { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2509
_ => None,
2510
}
2511
}
2512
2513
/// Check if an expression is a simple column reference (with optional alias) to the given name.
2514
fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2515
match expr {
2516
Expr::Column(n) => n == col_name,
2517
Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2518
_ => false,
2519
}
2520
}
2521
2522
/// Strip the outer alias from an expression (if present) for expression equality comparison.
2523
fn strip_outer_alias(expr: &Expr) -> Expr {
2524
if let Expr::Alias(inner, _) = expr {
2525
inner.as_ref().clone()
2526
} else {
2527
expr.clone()
2528
}
2529
}
2530
2531
/// Resolve a SELECT alias to its underlying expression (for use in GROUP BY).
2532
///
2533
/// Returns the expression WITH alias if the name matches a projection alias and is NOT a column
2534
/// that exists in the schema; otherwise returns `None` to use the default/standard resolution.
2535
fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2536
// Original columns take precedence over SELECT aliases
2537
if schema.contains(name) {
2538
return None;
2539
}
2540
// Find a projection with this alias and return its expression (preserving the alias)
2541
projections.iter().find_map(|p| match p {
2542
Expr::Alias(inner, alias) if alias.as_str() == name => {
2543
Some(inner.as_ref().clone().alias(alias.clone()))
2544
},
2545
_ => None,
2546
})
2547
}
2548
2549
/// Check if all columns referred to in a Polars expression exist in the given Schema.
2550
fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2551
let mut found_cols = false;
2552
let mut all_in_schema = true;
2553
for e in expr.into_iter() {
2554
if let Expr::Column(name) = e {
2555
found_cols = true;
2556
if !schema.contains(name.as_str()) {
2557
all_in_schema = false;
2558
break;
2559
}
2560
}
2561
}
2562
found_cols && all_in_schema
2563
}
2564
2565
/// Determine which parsed join expressions actually belong in `left_om` and which in `right_on`.
2566
///
2567
/// This needs to be handled carefully because in SQL joins you can write "join on" constraints
2568
/// either way round, and in joins with more than two tables you can also join against an earlier
2569
/// table (e.g.: you could be joining `df1` to `df2` to `df3`, but the final join condition where
2570
/// we join `df2` to `df3` could refer to `df1.a = df3.b`; this takes a little more work to
2571
/// resolve as our native `join` function operates on only two tables at a time.
2572
fn determine_left_right_join_on(
2573
ctx: &mut SQLContext,
2574
expr_left: &SQLExpr,
2575
expr_right: &SQLExpr,
2576
tbl_left: &TableInfo,
2577
tbl_right: &TableInfo,
2578
join_schema: &Schema,
2579
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2580
// parse, removing any aliases that may have been added by `resolve_column`
2581
// (called inside `parse_sql_expr`) as we need the actual/underlying col
2582
let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2583
Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2584
e => e,
2585
};
2586
let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2587
Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2588
e => e,
2589
};
2590
2591
// ------------------------------------------------------------------
2592
// simple/typical case: can fully resolve SQL-level table references
2593
// ------------------------------------------------------------------
2594
let left_refs = (
2595
expr_refers_to_table(expr_left, &tbl_left.name),
2596
expr_refers_to_table(expr_left, &tbl_right.name),
2597
);
2598
let right_refs = (
2599
expr_refers_to_table(expr_right, &tbl_left.name),
2600
expr_refers_to_table(expr_right, &tbl_right.name),
2601
);
2602
// if the SQL-level references unambiguously indicate table ownership, we're done
2603
match (left_refs, right_refs) {
2604
// standard: left expr → left table, right expr → right table
2605
((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2606
// reversed: left expr → right table, right expr → left table
2607
((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2608
// unsupported: one side references *both* tables
2609
((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2610
polars_bail!(
2611
SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2612
if left_refs.0 && left_refs.1 {
2613
"left"
2614
} else {
2615
"right"
2616
}, tbl_left.name, tbl_right.name
2617
)
2618
},
2619
// fall through to the more involved col/ref resolution
2620
_ => {},
2621
}
2622
2623
// ------------------------------------------------------------------
2624
// more involved: additionally employ schema-based column resolution
2625
// (applies to unqualified columns and/or chained joins)
2626
// ------------------------------------------------------------------
2627
let left_on_cols_in = (
2628
expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2629
expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2630
);
2631
let right_on_cols_in = (
2632
expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2633
expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2634
);
2635
match (left_on_cols_in, right_on_cols_in) {
2636
// each expression's columns exist in exactly one schema
2637
((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2638
((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2639
// one expression in both, other only in one; prefer the unique one
2640
((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2641
((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2642
((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2643
((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2644
// pass through as-is
2645
_ => Ok((vec![left_on], vec![right_on])),
2646
}
2647
}
2648
2649
fn process_join_on(
2650
ctx: &mut SQLContext,
2651
sql_expr: &SQLExpr,
2652
tbl_left: &TableInfo,
2653
tbl_right: &TableInfo,
2654
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2655
match sql_expr {
2656
SQLExpr::BinaryOp { left, op, right } => match op {
2657
BinaryOperator::And => {
2658
let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
2659
let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
2660
left_i.append(&mut left_j);
2661
right_i.append(&mut right_j);
2662
Ok((left_i, right_i))
2663
},
2664
BinaryOperator::Eq => {
2665
// establish unified schema with cols from both tables; needed for multi/chained
2666
// joins where suffixed intermediary/joined cols aren't in an existing schema.
2667
let mut join_schema =
2668
Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2669
for (name, dtype) in tbl_left.schema.iter() {
2670
join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2671
}
2672
for (name, dtype) in tbl_right.schema.iter() {
2673
if !join_schema.contains(name) {
2674
join_schema.insert_at_index(
2675
join_schema.len(),
2676
name.clone(),
2677
dtype.clone(),
2678
)?;
2679
}
2680
}
2681
determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
2682
},
2683
_ => polars_bail!(
2684
// TODO: should be able to support more operators later (via `join_where`?)
2685
SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
2686
),
2687
},
2688
SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2689
_ => polars_bail!(
2690
SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
2691
),
2692
}
2693
}
2694
2695
fn process_join_constraint(
2696
constraint: &JoinConstraint,
2697
tbl_left: &TableInfo,
2698
tbl_right: &TableInfo,
2699
ctx: &mut SQLContext,
2700
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2701
match constraint {
2702
JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2703
process_join_on(ctx, expr, tbl_left, tbl_right)
2704
},
2705
JoinConstraint::Using(idents) if !idents.is_empty() => {
2706
let using: Vec<Expr> = idents
2707
.iter()
2708
.map(|ObjectName(parts)| {
2709
if parts.len() != 1 {
2710
polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2711
}
2712
match parts[0].as_ident() {
2713
Some(ident) => Ok(col(ident.value.as_str())),
2714
None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2715
}
2716
})
2717
.collect::<PolarsResult<Vec<_>>>()?;
2718
Ok((using.clone(), using))
2719
},
2720
JoinConstraint::Natural => {
2721
let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2722
let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2723
let on: Vec<Expr> = left_names
2724
.intersection(&right_names)
2725
.map(|&name| col(name.clone()))
2726
.collect();
2727
if on.is_empty() {
2728
polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2729
}
2730
Ok((on.clone(), on))
2731
},
2732
_ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2733
}
2734
}
2735
2736
/// Extract table identifiers referenced in a SQL query; uses a visitor to
2737
/// collect all table names that appear in FROM clauses, JOINs, TABLE refs
2738
/// in set operations, and subqueries.
2739
pub fn extract_table_identifiers(
2740
query: &str,
2741
include_schema: bool,
2742
unique: bool,
2743
) -> PolarsResult<Vec<String>> {
2744
let mut parser = Parser::new(&GenericDialect);
2745
parser = parser.with_options(ParserOptions {
2746
trailing_commas: true,
2747
..Default::default()
2748
});
2749
let ast = parser
2750
.try_with_sql(query)
2751
.map_err(to_sql_interface_err)?
2752
.parse_statements()
2753
.map_err(to_sql_interface_err)?;
2754
2755
let mut collector = TableIdentifierCollector {
2756
include_schema,
2757
..Default::default()
2758
};
2759
for stmt in &ast {
2760
let _ = stmt.visit(&mut collector);
2761
}
2762
Ok(if unique {
2763
collector
2764
.tables
2765
.into_iter()
2766
.collect::<PlIndexSet<_>>()
2767
.into_iter()
2768
.collect()
2769
} else {
2770
collector.tables
2771
})
2772
}
2773
2774
bitflags::bitflags! {
2775
/// Bitfield indicating whether there exists a projection with the specified height behavior.
2776
///
2777
/// Used to help determine whether to execute projections in `select()` or `with_columns()`
2778
/// context.
2779
#[derive(PartialEq)]
2780
struct ExprSqlProjectionHeightBehavior: u8 {
2781
/// Maintains the height of input column(s)
2782
const MaintainsColumn = 1 << 0;
2783
/// Height is independent of input, e.g.:
2784
/// * expressions that change length: e.g. slice, explode, filter, gather etc.
2785
/// * aggregations: count(*), first(), sum() etc.
2786
const Independent = 1 << 1;
2787
/// "Inherits" the height of the context, e.g.:
2788
/// * Scalar literals
2789
const InheritsContext = 1 << 2;
2790
}
2791
}
2792
2793
impl ExprSqlProjectionHeightBehavior {
2794
fn identify_from_expr(expr: &Expr) -> Self {
2795
let mut has_column = false;
2796
let mut has_independent = false;
2797
2798
for e in expr.into_iter() {
2799
use Expr::*;
2800
has_column |= matches!(e, Column(_) | Selector(_));
2801
has_independent |= match e {
2802
// @TODO: This is broken now with functions.
2803
AnonymousFunction { options, .. } => {
2804
options.returns_scalar() || !options.is_length_preserving()
2805
},
2806
Literal(v) => !v.is_scalar(),
2807
Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
2808
Agg { .. } | Len => true,
2809
_ => false,
2810
}
2811
}
2812
if has_independent {
2813
Self::Independent
2814
} else if has_column {
2815
Self::MaintainsColumn
2816
} else {
2817
Self::InheritsContext
2818
}
2819
}
2820
}
2821
2822