Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-sql/src/context.rs
6939 views
1
use std::ops::Deref;
2
3
use polars_core::frame::row::Row;
4
use polars_core::prelude::*;
5
use polars_lazy::prelude::*;
6
use polars_ops::frame::JoinCoalesce;
7
use polars_plan::dsl::function_expr::StructFunction;
8
use polars_plan::prelude::*;
9
use polars_utils::format_pl_smallstr;
10
use sqlparser::ast::{
11
BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
12
FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
13
OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
14
Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
15
WildcardAdditionalOptions,
16
};
17
use sqlparser::dialect::GenericDialect;
18
use sqlparser::parser::{Parser, ParserOptions};
19
20
use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
21
use crate::sql_expr::{
22
parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
23
};
24
use crate::table_functions::PolarsTableFunctions;
25
26
#[derive(Clone)]
27
pub struct TableInfo {
28
pub(crate) frame: LazyFrame,
29
pub(crate) name: PlSmallStr,
30
pub(crate) schema: Arc<Schema>,
31
}
32
33
struct SelectModifiers {
34
exclude: PlHashSet<String>, // SELECT * EXCLUDE
35
ilike: Option<regex::Regex>, // SELECT * ILIKE
36
rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
37
replace: Vec<Expr>, // SELECT * REPLACE
38
}
39
impl SelectModifiers {
40
fn matches_ilike(&self, s: &str) -> bool {
41
match &self.ilike {
42
Some(rx) => rx.is_match(s),
43
None => true,
44
}
45
}
46
fn renamed_cols(&self) -> Vec<Expr> {
47
self.rename
48
.iter()
49
.map(|(before, after)| col(before.clone()).alias(after.clone()))
50
.collect()
51
}
52
}
53
54
/// The SQLContext is the main entry point for executing SQL queries.
55
#[derive(Clone)]
56
pub struct SQLContext {
57
pub(crate) table_map: PlHashMap<String, LazyFrame>,
58
pub(crate) function_registry: Arc<dyn FunctionRegistry>,
59
pub(crate) lp_arena: Arena<IR>,
60
pub(crate) expr_arena: Arena<AExpr>,
61
62
cte_map: PlHashMap<String, LazyFrame>,
63
table_aliases: PlHashMap<String, String>,
64
joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
65
}
66
67
impl Default for SQLContext {
68
fn default() -> Self {
69
Self {
70
function_registry: Arc::new(DefaultFunctionRegistry {}),
71
table_map: Default::default(),
72
cte_map: Default::default(),
73
table_aliases: Default::default(),
74
joined_aliases: Default::default(),
75
lp_arena: Default::default(),
76
expr_arena: Default::default(),
77
}
78
}
79
}
80
81
impl SQLContext {
82
/// Create a new SQLContext.
83
/// ```rust
84
/// # use polars_sql::SQLContext;
85
/// # fn main() {
86
/// let ctx = SQLContext::new();
87
/// # }
88
/// ```
89
pub fn new() -> Self {
90
Self::default()
91
}
92
93
/// Get the names of all registered tables, in sorted order.
94
pub fn get_tables(&self) -> Vec<String> {
95
let mut tables = Vec::from_iter(self.table_map.keys().cloned());
96
tables.sort_unstable();
97
tables
98
}
99
100
/// Register a [`LazyFrame`] as a table in the SQLContext.
101
/// ```rust
102
/// # use polars_sql::SQLContext;
103
/// # use polars_core::prelude::*;
104
/// # use polars_lazy::prelude::*;
105
/// # fn main() {
106
///
107
/// let mut ctx = SQLContext::new();
108
/// let df = df! {
109
/// "a" => [1, 2, 3],
110
/// }.unwrap().lazy();
111
///
112
/// ctx.register("df", df);
113
/// # }
114
///```
115
pub fn register(&mut self, name: &str, lf: LazyFrame) {
116
self.table_map.insert(name.to_owned(), lf);
117
}
118
119
/// Unregister a [`LazyFrame`] table from the [`SQLContext`].
120
pub fn unregister(&mut self, name: &str) {
121
self.table_map.remove(&name.to_owned());
122
}
123
124
/// Execute a SQL query, returning a [`LazyFrame`].
125
/// ```rust
126
/// # use polars_sql::SQLContext;
127
/// # use polars_core::prelude::*;
128
/// # use polars_lazy::prelude::*;
129
/// # fn main() {
130
///
131
/// let mut ctx = SQLContext::new();
132
/// let df = df! {
133
/// "a" => [1, 2, 3],
134
/// }
135
/// .unwrap();
136
///
137
/// ctx.register("df", df.clone().lazy());
138
/// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
139
/// assert!(sql_df.equals(&df));
140
/// # }
141
///```
142
pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
143
let mut parser = Parser::new(&GenericDialect);
144
parser = parser.with_options(ParserOptions {
145
trailing_commas: true,
146
..Default::default()
147
});
148
149
let ast = parser
150
.try_with_sql(query)
151
.map_err(to_sql_interface_err)?
152
.parse_statements()
153
.map_err(to_sql_interface_err)?;
154
155
polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
156
let res = self.execute_statement(ast.first().unwrap())?;
157
158
// Ensure the result uses the proper arenas.
159
// This will instantiate new arenas with a new version.
160
let lp_arena = std::mem::take(&mut self.lp_arena);
161
let expr_arena = std::mem::take(&mut self.expr_arena);
162
res.set_cached_arena(lp_arena, expr_arena);
163
164
// Every execution should clear the statement-level maps.
165
self.cte_map.clear();
166
self.table_aliases.clear();
167
self.joined_aliases.clear();
168
169
Ok(res)
170
}
171
172
/// add a function registry to the SQLContext
173
/// the registry provides the ability to add custom functions to the SQLContext
174
pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
175
self.function_registry = function_registry;
176
self
177
}
178
179
/// Get the function registry of the SQLContext
180
pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
181
&self.function_registry
182
}
183
184
/// Get a mutable reference to the function registry of the SQLContext
185
pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
186
Arc::get_mut(&mut self.function_registry).unwrap()
187
}
188
}
189
190
impl SQLContext {
191
pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
192
let ast = stmt;
193
Ok(match ast {
194
Statement::Query(query) => self.execute_query(query)?,
195
stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
196
stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
197
stmt @ Statement::Drop {
198
object_type: ObjectType::Table,
199
..
200
} => self.execute_drop_table(stmt)?,
201
stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
202
stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
203
stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
204
_ => polars_bail!(
205
SQLInterface: "statement type is not supported:\n{:?}", ast,
206
),
207
})
208
}
209
210
pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
211
self.register_ctes(query)?;
212
self.execute_query_no_ctes(query)
213
}
214
215
pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
216
let lf = self.process_query(&query.body, query)?;
217
self.process_limit_offset(lf, &query.limit, &query.offset)
218
}
219
220
pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
221
frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
222
}
223
224
pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
225
let table = self.table_map.get(name).cloned();
226
table
227
.or_else(|| self.cte_map.get(name).cloned())
228
.or_else(|| {
229
self.table_aliases
230
.get(name)
231
.and_then(|alias| self.table_map.get(alias).cloned())
232
})
233
}
234
235
fn expr_or_ordinal(
236
&mut self,
237
e: &SQLExpr,
238
exprs: &[Expr],
239
selected: Option<&[Expr]>,
240
schema: Option<&Schema>,
241
clause: &str,
242
) -> PolarsResult<Expr> {
243
match e {
244
SQLExpr::UnaryOp {
245
op: UnaryOperator::Minus,
246
expr,
247
} if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
248
if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
249
Err(polars_err!(
250
SQLSyntax:
251
"negative ordinal values are invalid for {}; found -{}",
252
clause,
253
idx
254
))
255
} else {
256
unreachable!()
257
}
258
},
259
SQLExpr::Value(SQLValue::Number(idx, _)) => {
260
// note: sql queries are 1-indexed
261
let idx = idx.parse::<usize>().map_err(|_| {
262
polars_err!(
263
SQLSyntax:
264
"negative ordinal values are invalid for {}; found {}",
265
clause,
266
idx
267
)
268
})?;
269
// note: "selected" cols represent final projection order, so we use those for
270
// ordinal resolution. "exprs" may include cols that are subsequently dropped.
271
let cols = if let Some(cols) = selected {
272
cols
273
} else {
274
exprs
275
};
276
Ok(cols
277
.get(idx - 1)
278
.ok_or_else(|| {
279
polars_err!(
280
SQLInterface:
281
"{} ordinal value must refer to a valid column; found {}",
282
clause,
283
idx
284
)
285
})?
286
.clone())
287
},
288
SQLExpr::Value(v) => Err(polars_err!(
289
SQLSyntax:
290
"{} requires a valid expression or positive ordinal; found {}", clause, v,
291
)),
292
_ => parse_sql_expr(e, self, schema),
293
}
294
}
295
296
pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
297
if let Some(aliases) = self.joined_aliases.get(tbl_name) {
298
if let Some(name) = aliases.get(column_name) {
299
return name.to_string();
300
}
301
}
302
column_name.to_string()
303
}
304
305
fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
306
match expr {
307
SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
308
SetExpr::Query(query) => self.execute_query_no_ctes(query),
309
SetExpr::SetOperation {
310
op: SetOperator::Union,
311
set_quantifier,
312
left,
313
right,
314
} => self.process_union(left, right, set_quantifier, query),
315
316
#[cfg(feature = "semi_anti_join")]
317
SetExpr::SetOperation {
318
op: SetOperator::Intersect | SetOperator::Except,
319
set_quantifier,
320
left,
321
right,
322
} => self.process_except_intersect(left, right, set_quantifier, query),
323
324
SetExpr::Values(Values {
325
explicit_row: _,
326
rows,
327
}) => self.process_values(rows),
328
329
SetExpr::Table(tbl) => {
330
if tbl.table_name.is_some() {
331
let table_name = tbl.table_name.as_ref().unwrap();
332
self.get_table_from_current_scope(table_name)
333
.ok_or_else(|| {
334
polars_err!(
335
SQLInterface: "no table or alias named '{}' found",
336
tbl
337
)
338
})
339
} else {
340
polars_bail!(SQLInterface: "'TABLE' requires valid table name")
341
}
342
},
343
op => {
344
polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
345
},
346
}
347
}
348
349
#[cfg(feature = "semi_anti_join")]
350
fn process_except_intersect(
351
&mut self,
352
left: &SetExpr,
353
right: &SetExpr,
354
quantifier: &SetQuantifier,
355
query: &Query,
356
) -> PolarsResult<LazyFrame> {
357
let (join_type, op_name) = match *query.body {
358
SetExpr::SetOperation {
359
op: SetOperator::Except,
360
..
361
} => (JoinType::Anti, "EXCEPT"),
362
_ => (JoinType::Semi, "INTERSECT"),
363
};
364
let mut lf = self.process_query(left, query)?;
365
let mut rf = self.process_query(right, query)?;
366
let join = lf
367
.clone()
368
.join_builder()
369
.with(rf.clone())
370
.how(join_type)
371
.join_nulls(true);
372
373
let lf_schema = self.get_frame_schema(&mut lf)?;
374
let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
375
let joined_tbl = match quantifier {
376
SetQuantifier::ByName => join.on(lf_cols).finish(),
377
SetQuantifier::Distinct | SetQuantifier::None => {
378
let rf_schema = self.get_frame_schema(&mut rf)?;
379
let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
380
if lf_cols.len() != rf_cols.len() {
381
polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
382
}
383
join.left_on(lf_cols).right_on(rf_cols).finish()
384
},
385
_ => {
386
polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
387
},
388
};
389
Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
390
}
391
392
fn process_union(
393
&mut self,
394
left: &SetExpr,
395
right: &SetExpr,
396
quantifier: &SetQuantifier,
397
query: &Query,
398
) -> PolarsResult<LazyFrame> {
399
let mut lf = self.process_query(left, query)?;
400
let mut rf = self.process_query(right, query)?;
401
let opts = UnionArgs {
402
parallel: true,
403
to_supertypes: true,
404
..Default::default()
405
};
406
match quantifier {
407
// UNION [ALL | DISTINCT]
408
SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
409
let lf_schema = self.get_frame_schema(&mut lf)?;
410
let rf_schema = self.get_frame_schema(&mut rf)?;
411
if lf_schema.len() != rf_schema.len() {
412
polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
413
}
414
let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
415
match quantifier {
416
SetQuantifier::Distinct | SetQuantifier::None => {
417
concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
418
},
419
_ => concatenated,
420
}
421
},
422
// UNION ALL BY NAME
423
#[cfg(feature = "diagonal_concat")]
424
SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
425
// UNION [DISTINCT] BY NAME
426
#[cfg(feature = "diagonal_concat")]
427
SetQuantifier::ByName | SetQuantifier::DistinctByName => {
428
let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
429
concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
430
},
431
#[allow(unreachable_patterns)]
432
_ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
433
}
434
}
435
436
fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
437
let frame_rows: Vec<Row> = values.iter().map(|row| {
438
let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
439
let expr = parse_sql_expr(expr, self, None)?;
440
match expr {
441
Expr::Literal(value) => {
442
value.to_any_value()
443
.ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
444
.map(|av| av.into_static())
445
},
446
_ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
447
}
448
}).collect();
449
row_data.map(Row::new)
450
}).collect::<Result<_, _>>()?;
451
452
Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
453
}
454
455
// EXPLAIN SELECT * FROM DF
456
fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
457
match stmt {
458
Statement::Explain { statement, .. } => {
459
let lf = self.execute_statement(statement)?;
460
let plan = lf.describe_optimized_plan()?;
461
let plan = plan
462
.split('\n')
463
.collect::<Series>()
464
.with_name(PlSmallStr::from_static("Logical Plan"))
465
.into_column();
466
let df = DataFrame::new(vec![plan])?;
467
Ok(df.lazy())
468
},
469
_ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
470
}
471
}
472
473
// SHOW TABLES
474
fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
475
let tables = Column::new("name".into(), self.get_tables());
476
let df = DataFrame::new(vec![tables])?;
477
Ok(df.lazy())
478
}
479
480
// DROP TABLE <tbl>
481
fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
482
match stmt {
483
Statement::Drop { names, .. } => {
484
names.iter().for_each(|name| {
485
self.table_map.remove(&name.to_string());
486
});
487
Ok(DataFrame::empty().lazy())
488
},
489
_ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
490
}
491
}
492
493
// DELETE FROM <tbl> [WHERE ...]
494
fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
495
if let Statement::Delete(Delete {
496
tables,
497
from,
498
using,
499
selection,
500
returning,
501
order_by,
502
limit,
503
}) = stmt
504
{
505
if !tables.is_empty()
506
|| using.is_some()
507
|| returning.is_some()
508
|| limit.is_some()
509
|| !order_by.is_empty()
510
{
511
let error_message = match () {
512
_ if !tables.is_empty() => "DELETE expects exactly one table name",
513
_ if using.is_some() => "DELETE does not support the USING clause",
514
_ if returning.is_some() => "DELETE does not support the RETURNING clause",
515
_ if limit.is_some() => "DELETE does not support the LIMIT clause",
516
_ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
517
_ => unreachable!(),
518
};
519
polars_bail!(SQLInterface: error_message);
520
}
521
let from_tables = match &from {
522
FromTable::WithFromKeyword(from) => from,
523
FromTable::WithoutKeyword(from) => from,
524
};
525
if from_tables.len() > 1 {
526
polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
527
}
528
let tbl_expr = from_tables.first().unwrap();
529
if !tbl_expr.joins.is_empty() {
530
polars_bail!(SQLInterface: "DELETE does not support table JOINs")
531
}
532
let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
533
if selection.is_none() {
534
// no WHERE clause; equivalent to TRUNCATE (drop all rows)
535
Ok(DataFrame::empty_with_schema(
536
lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
537
.unwrap()
538
.as_ref(),
539
)
540
.lazy())
541
} else {
542
// apply constraint as inverted filter (drops rows matching the selection)
543
Ok(self.process_where(lf.clone(), selection, true)?)
544
}
545
} else {
546
polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
547
}
548
}
549
550
// TRUNCATE <tbl>
551
fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
552
if let Statement::Truncate {
553
table_names,
554
partitions,
555
..
556
} = stmt
557
{
558
match partitions {
559
None => {
560
if table_names.len() != 1 {
561
polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
562
}
563
let tbl = table_names[0].to_string();
564
if let Some(lf) = self.table_map.get_mut(&tbl) {
565
*lf = DataFrame::empty_with_schema(
566
lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
567
.unwrap()
568
.as_ref(),
569
)
570
.lazy();
571
Ok(lf.clone())
572
} else {
573
polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
574
}
575
},
576
_ => {
577
polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
578
},
579
}
580
} else {
581
polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
582
}
583
}
584
585
fn register_cte(&mut self, name: &str, lf: LazyFrame) {
586
self.cte_map.insert(name.to_owned(), lf);
587
}
588
589
fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
590
if let Some(with) = &query.with {
591
if with.recursive {
592
polars_bail!(SQLInterface: "recursive CTEs are not supported")
593
}
594
for cte in &with.cte_tables {
595
let cte_name = cte.alias.name.value.clone();
596
let mut lf = self.execute_query(&cte.query)?;
597
lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
598
self.register_cte(&cte_name, lf);
599
}
600
}
601
Ok(())
602
}
603
604
/// execute the 'FROM' part of the query
605
fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
606
let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
607
if !tbl_expr.joins.is_empty() {
608
for join in &tbl_expr.joins {
609
let (r_name, mut rf) = self.get_table(&join.relation)?;
610
if r_name.is_empty() {
611
// Require non-empty to avoid duplicate column errors from nested self-joins.
612
polars_bail!(
613
SQLInterface:
614
"cannot join on unnamed relation; please provide an alias"
615
)
616
}
617
let left_schema = self.get_frame_schema(&mut lf)?;
618
let right_schema = self.get_frame_schema(&mut rf)?;
619
620
lf = match &join.join_operator {
621
op @ (JoinOperator::FullOuter(constraint)
622
| JoinOperator::LeftOuter(constraint)
623
| JoinOperator::RightOuter(constraint)
624
| JoinOperator::Inner(constraint)
625
| JoinOperator::Anti(constraint)
626
| JoinOperator::Semi(constraint)
627
| JoinOperator::LeftAnti(constraint)
628
| JoinOperator::LeftSemi(constraint)
629
| JoinOperator::RightAnti(constraint)
630
| JoinOperator::RightSemi(constraint)) => {
631
let (lf, rf) = match op {
632
JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
633
_ => (lf, rf),
634
};
635
self.process_join(
636
&TableInfo {
637
frame: lf,
638
name: (&l_name).into(),
639
schema: left_schema.clone(),
640
},
641
&TableInfo {
642
frame: rf,
643
name: (&r_name).into(),
644
schema: right_schema.clone(),
645
},
646
constraint,
647
match op {
648
JoinOperator::FullOuter(_) => JoinType::Full,
649
JoinOperator::LeftOuter(_) => JoinType::Left,
650
JoinOperator::RightOuter(_) => JoinType::Right,
651
JoinOperator::Inner(_) => JoinType::Inner,
652
#[cfg(feature = "semi_anti_join")]
653
JoinOperator::Anti(_)
654
| JoinOperator::LeftAnti(_)
655
| JoinOperator::RightAnti(_) => JoinType::Anti,
656
#[cfg(feature = "semi_anti_join")]
657
JoinOperator::Semi(_)
658
| JoinOperator::LeftSemi(_)
659
| JoinOperator::RightSemi(_) => JoinType::Semi,
660
join_type => polars_bail!(
661
SQLInterface:
662
"join type '{:?}' not currently supported",
663
join_type
664
),
665
},
666
)?
667
},
668
JoinOperator::CrossJoin => {
669
lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
670
},
671
join_type => {
672
polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
673
},
674
};
675
676
// track join-aliased columns so we can resolve them later
677
let joined_schema = self.get_frame_schema(&mut lf)?;
678
679
self.joined_aliases.insert(
680
r_name.clone(),
681
right_schema
682
.iter_names()
683
.filter_map(|name| {
684
// col exists in both tables and is aliased in the joined result
685
let aliased_name = format!("{name}:{r_name}");
686
if left_schema.contains(name)
687
&& joined_schema.contains(aliased_name.as_str())
688
{
689
Some((name.to_string(), aliased_name))
690
} else {
691
None
692
}
693
})
694
.collect::<PlHashMap<String, String>>(),
695
);
696
}
697
};
698
Ok(lf)
699
}
700
701
/// Execute the 'SELECT' part of the query.
702
fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
703
let mut lf = if select_stmt.from.is_empty() {
704
DataFrame::empty().lazy()
705
} else {
706
// Note: implicit joins need more work to support properly,
707
// explicit joins are preferred for now (ref: #16662)
708
let from = select_stmt.clone().from;
709
if from.len() > 1 {
710
polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
711
}
712
self.execute_from_statement(from.first().unwrap())?
713
};
714
715
// Filter expression (WHERE clause)
716
let schema = self.get_frame_schema(&mut lf)?;
717
lf = self.process_where(lf, &select_stmt.selection, false)?;
718
719
// 'SELECT *' modifiers
720
let mut select_modifiers = SelectModifiers {
721
ilike: None,
722
exclude: PlHashSet::new(),
723
rename: PlHashMap::new(),
724
replace: vec![],
725
};
726
727
let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
728
729
// Check for "GROUP BY ..." (after determining projections)
730
let mut group_by_keys: Vec<Expr> = Vec::new();
731
match &select_stmt.group_by {
732
// Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
733
GroupByExpr::Expressions(group_by_exprs, modifiers) => {
734
if !modifiers.is_empty() {
735
polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
736
}
737
// translate the group expressions, allowing ordinal values
738
group_by_keys = group_by_exprs
739
.iter()
740
.map(|e| {
741
self.expr_or_ordinal(
742
e,
743
&projections,
744
None,
745
Some(schema.deref()),
746
"GROUP BY",
747
)
748
})
749
.collect::<PolarsResult<_>>()?
750
},
751
// "GROUP BY ALL" syntax; automatically adds expressions that do not contain
752
// nested agg/window funcs to the group key (also ignores literals).
753
GroupByExpr::All(modifiers) => {
754
if !modifiers.is_empty() {
755
polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
756
}
757
projections.iter().for_each(|expr| match expr {
758
// immediately match the most common cases (col|agg|len|lit, optionally aliased).
759
Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
760
Expr::Column(_) => group_by_keys.push(expr.clone()),
761
Expr::Alias(e, _)
762
if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
763
Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
764
if let Expr::Column(name) = &**e {
765
group_by_keys.push(col(name.clone()));
766
}
767
},
768
_ => {
769
// If not quick-matched, add if no nested agg/window expressions
770
if !has_expr(expr, |e| {
771
matches!(e, Expr::Agg(_))
772
|| matches!(e, Expr::Len)
773
|| matches!(e, Expr::Window { .. })
774
}) {
775
group_by_keys.push(expr.clone())
776
}
777
},
778
});
779
},
780
};
781
782
lf = if group_by_keys.is_empty() {
783
// The 'having' clause is only valid inside 'group by'
784
if select_stmt.having.is_some() {
785
polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
786
};
787
788
// Final/selected cols, accounting for 'SELECT *' modifiers
789
let mut retained_cols = Vec::with_capacity(projections.len());
790
let mut retained_names = Vec::with_capacity(projections.len());
791
let have_order_by = query.order_by.is_some();
792
// Initialize containing InheritsContext to handle empty projection case.
793
let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
794
795
// Note: if there is an 'order by' then we project everything (original cols
796
// and new projections) and *then* select the final cols; the retained cols
797
// are used to ensure a correct final projection. If there's no 'order by',
798
// clause then we can project the final column *expressions* directly.
799
for p in projections.iter() {
800
let name = p.to_field(schema.deref())?.name.to_string();
801
if select_modifiers.matches_ilike(&name)
802
&& !select_modifiers.exclude.contains(&name)
803
{
804
projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
805
806
retained_cols.push(if have_order_by {
807
col(name.as_str())
808
} else {
809
p.clone()
810
});
811
retained_names.push(col(name));
812
}
813
}
814
815
// Apply the remaining modifiers and establish the final projection
816
if have_order_by {
817
// We can safely use `with_columns()` and avoid a join if:
818
// * There is already a projection that projects to the table height.
819
// * All projection heights inherit from context (e.g. all scalar literals that
820
// are to be broadcasted to table height).
821
if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
822
|| projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
823
{
824
lf = lf.with_columns(projections);
825
} else {
826
// We hit this branch if the output height is not guaranteed to match the table
827
// height. E.g.:
828
//
829
// * SELECT COUNT(*) FROM df ORDER BY sort_key;
830
// * SELECT UNNEST(list_col) FROM df ORDER BY sort_key;
831
//
832
// For these cases we truncate / extend the sorting columns with NULLs to match
833
// the output height. We do this by projecting independently and then joining
834
// back the original frame on the row index.
835
const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
836
lf = lf
837
.clone()
838
.select(projections)
839
.with_row_index(NAME, None)
840
.join(
841
lf.with_row_index(NAME, None),
842
[col(NAME)],
843
[col(NAME)],
844
JoinArgs {
845
how: JoinType::Left,
846
validation: Default::default(),
847
suffix: None,
848
slice: None,
849
nulls_equal: false,
850
coalesce: Default::default(),
851
maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
852
},
853
);
854
}
855
}
856
857
if !select_modifiers.replace.is_empty() {
858
lf = lf.with_columns(&select_modifiers.replace);
859
}
860
if !select_modifiers.rename.is_empty() {
861
lf = lf.with_columns(select_modifiers.renamed_cols());
862
}
863
864
lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
865
866
// Note: If `have_order_by`, with_columns is already done above.
867
if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
868
&& !have_order_by
869
{
870
// All projections need to be broadcasted to table height, so evaluate in `with_columns()`
871
lf = lf.with_columns(retained_cols).select(retained_names);
872
} else {
873
lf = lf.select(retained_cols);
874
}
875
876
if !select_modifiers.rename.is_empty() {
877
lf = lf.rename(
878
select_modifiers.rename.keys(),
879
select_modifiers.rename.values(),
880
true,
881
);
882
};
883
lf
884
} else {
885
lf = self.process_group_by(lf, &group_by_keys, &projections)?;
886
lf = self.process_order_by(lf, &query.order_by, None)?;
887
888
// Apply optional 'having' clause, post-aggregation.
889
let schema = Some(self.get_frame_schema(&mut lf)?);
890
match select_stmt.having.as_ref() {
891
Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
892
None => lf,
893
}
894
};
895
896
// Apply optional DISTINCT clause.
897
lf = match &select_stmt.distinct {
898
Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
899
Some(Distinct::On(exprs)) => {
900
// TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
901
let schema = Some(self.get_frame_schema(&mut lf)?);
902
let cols = exprs
903
.iter()
904
.map(|e| {
905
let expr = parse_sql_expr(e, self, schema.as_deref())?;
906
if let Expr::Column(name) = expr {
907
Ok(name)
908
} else {
909
Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
910
}
911
})
912
.collect::<PolarsResult<Vec<_>>>()?;
913
914
// DISTINCT ON has to apply the ORDER BY before the operation.
915
lf = self.process_order_by(lf, &query.order_by, None)?;
916
return Ok(lf.unique_stable(
917
Some(Selector::ByName {
918
names: cols.into(),
919
strict: true,
920
}),
921
UniqueKeepStrategy::First,
922
));
923
},
924
None => lf,
925
};
926
Ok(lf)
927
}
928
929
fn column_projections(
930
&mut self,
931
select_stmt: &Select,
932
schema: &SchemaRef,
933
select_modifiers: &mut SelectModifiers,
934
) -> PolarsResult<Vec<Expr>> {
935
let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
936
.projection
937
.iter()
938
.map(|select_item| match select_item {
939
SelectItem::UnnamedExpr(expr) => {
940
Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
941
},
942
SelectItem::ExprWithAlias { expr, alias } => {
943
let expr = parse_sql_expr(expr, self, Some(schema))?;
944
Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
945
},
946
SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
947
.process_qualified_wildcard(
948
obj_name,
949
wildcard_options,
950
select_modifiers,
951
Some(schema),
952
),
953
SelectItem::Wildcard(wildcard_options) => {
954
let cols = schema
955
.iter_names()
956
.map(|name| col(name.clone()))
957
.collect::<Vec<_>>();
958
959
self.process_wildcard_additional_options(
960
cols,
961
wildcard_options,
962
select_modifiers,
963
Some(schema),
964
)
965
},
966
})
967
.collect();
968
969
let flattened_exprs: Vec<Expr> = parsed_items?
970
.into_iter()
971
.flatten()
972
.flat_map(|expr| expand_exprs(expr, schema))
973
.collect();
974
975
Ok(flattened_exprs)
976
}
977
978
fn process_where(
979
&mut self,
980
mut lf: LazyFrame,
981
expr: &Option<SQLExpr>,
982
invert_filter: bool,
983
) -> PolarsResult<LazyFrame> {
984
if let Some(expr) = expr {
985
let schema = self.get_frame_schema(&mut lf)?;
986
987
// shortcut filter evaluation if given expression is just TRUE or FALSE
988
let (all_true, all_false) = match expr {
989
SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
990
SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
991
(SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
992
(SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
993
(a != b, a == b)
994
},
995
_ => (false, false),
996
},
997
_ => (false, false),
998
};
999
if (all_true && !invert_filter) || (all_false && invert_filter) {
1000
return Ok(lf);
1001
} else if (all_false && !invert_filter) || (all_true && invert_filter) {
1002
return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1003
}
1004
1005
// ...otherwise parse and apply the filter as normal
1006
let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1007
if filter_expression.clone().meta().has_multiple_outputs() {
1008
filter_expression = all_horizontal([filter_expression])?;
1009
}
1010
lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1011
lf = if invert_filter {
1012
lf.remove(filter_expression)
1013
} else {
1014
lf.filter(filter_expression)
1015
};
1016
}
1017
Ok(lf)
1018
}
1019
1020
pub(super) fn process_join(
1021
&mut self,
1022
tbl_left: &TableInfo,
1023
tbl_right: &TableInfo,
1024
constraint: &JoinConstraint,
1025
join_type: JoinType,
1026
) -> PolarsResult<LazyFrame> {
1027
let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1028
1029
let joined = tbl_left
1030
.frame
1031
.clone()
1032
.join_builder()
1033
.with(tbl_right.frame.clone())
1034
.left_on(left_on)
1035
.right_on(right_on)
1036
.how(join_type)
1037
.suffix(format!(":{}", tbl_right.name))
1038
.coalesce(JoinCoalesce::KeepColumns)
1039
.finish();
1040
1041
Ok(joined)
1042
}
1043
1044
fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1045
let mut contexts = vec![];
1046
for expr in exprs {
1047
*expr = expr.clone().map_expr(|e| match e {
1048
Expr::SubPlan(lp, names) => {
1049
contexts.push(<LazyFrame>::from((**lp).clone()));
1050
if names.len() == 1 {
1051
Expr::Column(names[0].as_str().into())
1052
} else {
1053
Expr::SubPlan(lp, names)
1054
}
1055
},
1056
e => e,
1057
})
1058
}
1059
1060
if contexts.is_empty() {
1061
lf
1062
} else {
1063
lf.with_context(contexts)
1064
}
1065
}
1066
1067
fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1068
if let Statement::CreateTable(CreateTable {
1069
if_not_exists,
1070
name,
1071
query,
1072
..
1073
}) = stmt
1074
{
1075
let tbl_name = name.0.first().unwrap().value.as_str();
1076
// CREATE TABLE IF NOT EXISTS
1077
if *if_not_exists && self.table_map.contains_key(tbl_name) {
1078
polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1079
// CREATE OR REPLACE TABLE
1080
}
1081
if let Some(query) = query {
1082
let lf = self.execute_query(query)?;
1083
self.register(tbl_name, lf);
1084
let out = df! {
1085
"Response" => ["CREATE TABLE"]
1086
}
1087
.unwrap()
1088
.lazy();
1089
Ok(out)
1090
} else {
1091
polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1092
}
1093
} else {
1094
unreachable!()
1095
}
1096
}
1097
1098
fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1099
match relation {
1100
TableFactor::Table {
1101
name, alias, args, ..
1102
} => {
1103
if let Some(args) = args {
1104
return self.execute_table_function(name, alias, &args.args);
1105
}
1106
let tbl_name = name.0.first().unwrap().value.as_str();
1107
if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1108
match alias {
1109
Some(alias) => {
1110
self.table_aliases
1111
.insert(alias.name.value.clone(), tbl_name.to_string());
1112
Ok((alias.to_string(), lf))
1113
},
1114
None => Ok((tbl_name.to_string(), lf)),
1115
}
1116
} else {
1117
polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1118
}
1119
},
1120
TableFactor::Derived {
1121
lateral,
1122
subquery,
1123
alias,
1124
} => {
1125
polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1126
if let Some(alias) = alias {
1127
let mut lf = self.execute_query_no_ctes(subquery)?;
1128
lf = self.rename_columns_from_table_alias(lf, alias)?;
1129
self.table_map.insert(alias.name.value.clone(), lf.clone());
1130
Ok((alias.name.value.clone(), lf))
1131
} else {
1132
polars_bail!(SQLSyntax: "derived tables must have aliases");
1133
}
1134
},
1135
TableFactor::UNNEST {
1136
alias,
1137
array_exprs,
1138
with_offset,
1139
with_offset_alias: _,
1140
..
1141
} => {
1142
if let Some(alias) = alias {
1143
let column_names: Vec<Option<PlSmallStr>> = alias
1144
.columns
1145
.iter()
1146
.map(|c| {
1147
if c.name.value.is_empty() {
1148
None
1149
} else {
1150
Some(PlSmallStr::from_str(c.name.value.as_str()))
1151
}
1152
})
1153
.collect();
1154
1155
let column_values: Vec<Series> = array_exprs
1156
.iter()
1157
.map(|arr| parse_sql_array(arr, self))
1158
.collect::<Result<_, _>>()?;
1159
1160
polars_ensure!(!column_names.is_empty(),
1161
SQLSyntax:
1162
"UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1163
);
1164
if column_names.len() != column_values.len() {
1165
let plural = if column_values.len() > 1 { "s" } else { "" };
1166
polars_bail!(
1167
SQLSyntax:
1168
"UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1169
);
1170
}
1171
let column_series: Vec<Column> = column_values
1172
.into_iter()
1173
.zip(column_names)
1174
.map(|(s, name)| {
1175
if let Some(name) = name {
1176
s.with_name(name)
1177
} else {
1178
s
1179
}
1180
})
1181
.map(Column::from)
1182
.collect();
1183
1184
let lf = DataFrame::new(column_series)?.lazy();
1185
1186
if *with_offset {
1187
// TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1188
// (note that 'WITH OFFSET' is BigQuery-specific syntax, not PostgreSQL)
1189
polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1190
}
1191
let table_name = alias.name.value.clone();
1192
self.table_map.insert(table_name.clone(), lf.clone());
1193
Ok((table_name, lf))
1194
} else {
1195
polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1196
}
1197
},
1198
TableFactor::NestedJoin {
1199
table_with_joins,
1200
alias,
1201
} => {
1202
let lf = self.execute_from_statement(table_with_joins)?;
1203
match alias {
1204
Some(a) => Ok((a.name.value.clone(), lf)),
1205
None => Ok(("".to_string(), lf)),
1206
}
1207
},
1208
// Support bare table, optionally with an alias, for now
1209
_ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1210
}
1211
}
1212
1213
fn execute_table_function(
1214
&mut self,
1215
name: &ObjectName,
1216
alias: &Option<TableAlias>,
1217
args: &[FunctionArg],
1218
) -> PolarsResult<(String, LazyFrame)> {
1219
let tbl_fn = name.0.first().unwrap().value.as_str();
1220
let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1221
let (tbl_name, lf) = read_fn.execute(args)?;
1222
#[allow(clippy::useless_asref)]
1223
let tbl_name = alias
1224
.as_ref()
1225
.map(|a| a.name.value.clone())
1226
.unwrap_or_else(|| tbl_name.to_str().to_string());
1227
1228
self.table_map.insert(tbl_name.clone(), lf.clone());
1229
Ok((tbl_name, lf))
1230
}
1231
1232
fn process_order_by(
1233
&mut self,
1234
mut lf: LazyFrame,
1235
order_by: &Option<OrderBy>,
1236
selected: Option<&[Expr]>,
1237
) -> PolarsResult<LazyFrame> {
1238
if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1239
return Ok(lf);
1240
}
1241
let schema = self.get_frame_schema(&mut lf)?;
1242
let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1243
1244
let order_by = order_by.as_ref().unwrap().exprs.clone();
1245
let mut descending = Vec::with_capacity(order_by.len());
1246
let mut nulls_last = Vec::with_capacity(order_by.len());
1247
let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1248
1249
if order_by.len() == 1 // support `ORDER BY ALL` (iff there is no column named 'ALL' in the schema)
1250
&& matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1251
{
1252
if let Some(selected) = selected {
1253
by.extend(selected.iter().cloned());
1254
} else {
1255
by.extend(columns_iter);
1256
};
1257
let desc_order = !order_by[0].asc.unwrap_or(true);
1258
nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1259
descending.resize(by.len(), desc_order);
1260
} else {
1261
let columns = &columns_iter.collect::<Vec<_>>();
1262
for ob in order_by {
1263
// note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1264
// https://www.postgresql.org/docs/current/queries-order.html
1265
let desc_order = !ob.asc.unwrap_or(true);
1266
nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1267
descending.push(desc_order);
1268
1269
// translate order expression, allowing ordinal values
1270
by.push(self.expr_or_ordinal(
1271
&ob.expr,
1272
columns,
1273
selected,
1274
Some(&schema),
1275
"ORDER BY",
1276
)?)
1277
}
1278
}
1279
Ok(lf.sort_by_exprs(
1280
&by,
1281
SortMultipleOptions::default()
1282
.with_order_descending_multi(descending)
1283
.with_nulls_last_multi(nulls_last)
1284
.with_maintain_order(true),
1285
))
1286
}
1287
1288
fn process_group_by(
1289
&mut self,
1290
mut lf: LazyFrame,
1291
group_by_keys: &[Expr],
1292
projections: &[Expr],
1293
) -> PolarsResult<LazyFrame> {
1294
let schema_before = self.get_frame_schema(&mut lf)?;
1295
let group_by_keys_schema = expressions_to_schema(group_by_keys, &schema_before)?;
1296
1297
// Remove the group_by keys as polars adds those implicitly.
1298
let mut aggregation_projection = Vec::with_capacity(projections.len());
1299
let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1300
let mut projection_aliases = PlHashSet::new();
1301
let mut group_key_aliases = PlHashSet::new();
1302
1303
for mut e in projections {
1304
// `Len` represents COUNT(*) so we treat as an aggregation here.
1305
let is_non_group_key_expr = has_expr(e, |e| {
1306
match e {
1307
Expr::Agg(_) | Expr::Len | Expr::Window { .. } => true,
1308
Expr::Function { function: func, .. }
1309
if !matches!(func, FunctionExpr::StructExpr(_)) =>
1310
{
1311
// If it's a function call containing a column NOT in the group by keys,
1312
// we treat it as an aggregation.
1313
has_expr(e, |e| match e {
1314
Expr::Column(name) => !group_by_keys_schema.contains(name),
1315
_ => false,
1316
})
1317
},
1318
_ => false,
1319
}
1320
});
1321
1322
// Note: if simple aliased expression we defer aliasing until after the group_by.
1323
if let Expr::Alias(expr, alias) = e {
1324
if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1325
group_key_aliases.insert(alias.as_ref());
1326
e = expr
1327
} else if let Expr::Function {
1328
function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1329
..
1330
} = expr.deref()
1331
{
1332
projection_overrides
1333
.insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1334
} else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
1335
projection_aliases.insert(alias.as_ref());
1336
}
1337
}
1338
let field = e.to_field(&schema_before)?;
1339
if group_by_keys_schema.get(&field.name).is_none() && is_non_group_key_expr {
1340
let mut e = e.clone();
1341
if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1342
e = (**expr).clone();
1343
} else if let Expr::Alias(expr, name) = &e {
1344
if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1345
e = (**expr).clone().alias(name.clone());
1346
}
1347
}
1348
aggregation_projection.push(e);
1349
} else if let Expr::Column(_)
1350
| Expr::Function {
1351
function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1352
..
1353
} = e
1354
{
1355
// Non-aggregated columns must be part of the GROUP BY clause
1356
if !group_by_keys_schema.contains(&field.name) {
1357
polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1358
}
1359
}
1360
}
1361
let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1362
let projection_schema = expressions_to_schema(projections, &schema_before)?;
1363
1364
// A final projection to get the proper order and any deferred transforms/aliases.
1365
let final_projection = projection_schema
1366
.iter_names()
1367
.zip(projections)
1368
.map(|(name, projection_expr)| {
1369
if let Some(expr) = projection_overrides.get(name.as_str()) {
1370
expr.clone()
1371
} else if group_by_keys_schema.get(name).is_some()
1372
|| projection_aliases.contains(name.as_str())
1373
|| group_key_aliases.contains(name.as_str())
1374
{
1375
projection_expr.clone()
1376
} else {
1377
col(name.clone())
1378
}
1379
})
1380
.collect::<Vec<_>>();
1381
1382
Ok(aggregated.select(&final_projection))
1383
}
1384
1385
fn process_limit_offset(
1386
&self,
1387
lf: LazyFrame,
1388
limit: &Option<SQLExpr>,
1389
offset: &Option<Offset>,
1390
) -> PolarsResult<LazyFrame> {
1391
match (offset, limit) {
1392
(
1393
Some(Offset {
1394
value: SQLExpr::Value(SQLValue::Number(offset, _)),
1395
..
1396
}),
1397
Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1398
) => Ok(lf.slice(
1399
offset
1400
.parse()
1401
.map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1402
limit
1403
.parse()
1404
.map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1405
)),
1406
(
1407
Some(Offset {
1408
value: SQLExpr::Value(SQLValue::Number(offset, _)),
1409
..
1410
}),
1411
None,
1412
) => Ok(lf.slice(
1413
offset
1414
.parse()
1415
.map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1416
IdxSize::MAX,
1417
)),
1418
(None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1419
limit
1420
.parse()
1421
.map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1422
)),
1423
(None, None) => Ok(lf),
1424
_ => polars_bail!(
1425
SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1426
),
1427
}
1428
}
1429
1430
fn process_qualified_wildcard(
1431
&mut self,
1432
ObjectName(idents): &ObjectName,
1433
options: &WildcardAdditionalOptions,
1434
modifiers: &mut SelectModifiers,
1435
schema: Option<&Schema>,
1436
) -> PolarsResult<Vec<Expr>> {
1437
let mut new_idents = idents.clone();
1438
new_idents.push(Ident::new("*"));
1439
1440
let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1441
self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1442
}
1443
1444
fn process_wildcard_additional_options(
1445
&mut self,
1446
exprs: Vec<Expr>,
1447
options: &WildcardAdditionalOptions,
1448
modifiers: &mut SelectModifiers,
1449
schema: Option<&Schema>,
1450
) -> PolarsResult<Vec<Expr>> {
1451
if options.opt_except.is_some() && options.opt_exclude.is_some() {
1452
polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1453
} else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1454
polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1455
}
1456
1457
// SELECT * EXCLUDE
1458
if let Some(items) = &options.opt_exclude {
1459
match items {
1460
ExcludeSelectItem::Single(ident) => {
1461
modifiers.exclude.insert(ident.value.clone());
1462
},
1463
ExcludeSelectItem::Multiple(idents) => {
1464
modifiers
1465
.exclude
1466
.extend(idents.iter().map(|i| i.value.clone()));
1467
},
1468
};
1469
}
1470
1471
// SELECT * EXCEPT
1472
if let Some(items) = &options.opt_except {
1473
modifiers.exclude.insert(items.first_element.value.clone());
1474
modifiers
1475
.exclude
1476
.extend(items.additional_elements.iter().map(|i| i.value.clone()));
1477
}
1478
1479
// SELECT * ILIKE
1480
if let Some(item) = &options.opt_ilike {
1481
let rx = regex::escape(item.pattern.as_str())
1482
.replace('%', ".*")
1483
.replace('_', ".");
1484
1485
modifiers.ilike = Some(
1486
polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1487
);
1488
}
1489
1490
// SELECT * RENAME
1491
if let Some(items) = &options.opt_rename {
1492
let renames = match items {
1493
RenameSelectItem::Single(rename) => vec![rename],
1494
RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1495
};
1496
for rn in renames {
1497
let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1498
let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1499
if before != after {
1500
modifiers.rename.insert(before, after);
1501
}
1502
}
1503
}
1504
1505
// SELECT * REPLACE
1506
if let Some(replacements) = &options.opt_replace {
1507
for rp in &replacements.items {
1508
let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1509
modifiers
1510
.replace
1511
.push(replacement_expr?.alias(rp.column_name.value.as_str()));
1512
}
1513
}
1514
Ok(exprs)
1515
}
1516
1517
fn rename_columns_from_table_alias(
1518
&mut self,
1519
mut lf: LazyFrame,
1520
alias: &TableAlias,
1521
) -> PolarsResult<LazyFrame> {
1522
if alias.columns.is_empty() {
1523
Ok(lf)
1524
} else {
1525
let schema = self.get_frame_schema(&mut lf)?;
1526
if alias.columns.len() != schema.len() {
1527
polars_bail!(
1528
SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1529
alias.columns.len(), alias.name.value, schema.len()
1530
)
1531
} else {
1532
let existing_columns: Vec<_> = schema.iter_names().collect();
1533
let new_columns: Vec<_> =
1534
alias.columns.iter().map(|c| c.name.value.clone()).collect();
1535
Ok(lf.rename(existing_columns, new_columns, true))
1536
}
1537
}
1538
}
1539
}
1540
1541
impl SQLContext {
1542
/// Get internal table map. For internal use only.
1543
pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1544
self.table_map.clone()
1545
}
1546
1547
/// Create a new SQLContext from a table map. For internal use only
1548
pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1549
Self {
1550
table_map,
1551
..Default::default()
1552
}
1553
}
1554
}
1555
1556
fn collect_compound_identifiers(
1557
left: &[Ident],
1558
right: &[Ident],
1559
left_name: &str,
1560
right_name: &str,
1561
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1562
if left.len() == 2 && right.len() == 2 {
1563
let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1564
let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1565
1566
// switch left/right operands if the caller has them in reverse
1567
if left_name == tbl_b || right_name == tbl_a {
1568
Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1569
} else {
1570
Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1571
}
1572
} else {
1573
polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1574
}
1575
}
1576
1577
fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1578
match expr {
1579
Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1580
let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1581
schema
1582
.iter_names()
1583
.filter(|name| re.is_match(name))
1584
.map(|name| col(name.clone()))
1585
.collect::<Vec<_>>()
1586
},
1587
Expr::Selector(s) => s
1588
.into_columns(schema, &Default::default())
1589
.unwrap()
1590
.into_iter()
1591
.map(col)
1592
.collect::<Vec<_>>(),
1593
_ => vec![expr],
1594
}
1595
}
1596
1597
fn is_regex_colname(nm: &str) -> bool {
1598
nm.starts_with('^') && nm.ends_with('$')
1599
}
1600
1601
fn process_join_on(
1602
expression: &sqlparser::ast::Expr,
1603
tbl_left: &TableInfo,
1604
tbl_right: &TableInfo,
1605
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1606
match expression {
1607
SQLExpr::BinaryOp { left, op, right } => match op {
1608
BinaryOperator::And => {
1609
let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1610
let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1611
left_i.append(&mut left_j);
1612
right_i.append(&mut right_j);
1613
Ok((left_i, right_i))
1614
},
1615
BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1616
(SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1617
collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1618
},
1619
_ => {
1620
polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1621
},
1622
},
1623
_ => {
1624
polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1625
},
1626
},
1627
SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1628
_ => {
1629
polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1630
},
1631
}
1632
}
1633
1634
fn process_join_constraint(
1635
constraint: &JoinConstraint,
1636
tbl_left: &TableInfo,
1637
tbl_right: &TableInfo,
1638
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1639
match constraint {
1640
JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1641
process_join_on(expr, tbl_left, tbl_right)
1642
},
1643
JoinConstraint::Using(idents) if !idents.is_empty() => {
1644
let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1645
Ok((using.clone(), using))
1646
},
1647
JoinConstraint::Natural => {
1648
let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1649
let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1650
let on: Vec<Expr> = left_names
1651
.intersection(&right_names)
1652
.map(|&name| col(name.clone()))
1653
.collect();
1654
if on.is_empty() {
1655
polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1656
}
1657
Ok((on.clone(), on))
1658
},
1659
_ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1660
}
1661
}
1662
1663
bitflags::bitflags! {
1664
/// Bitfield indicating whether there exists a projection with the specified height behavior.
1665
///
1666
/// Used to help determine whether to execute projections in `select()` or `with_columns()`
1667
/// context.
1668
#[derive(PartialEq)]
1669
struct ExprSqlProjectionHeightBehavior: u8 {
1670
/// Maintains the height of input column(s)
1671
const MaintainsColumn = 1 << 0;
1672
/// Height is independent of input, e.g.:
1673
/// * expressions that change length: e.g. slice, explode, filter, gather etc.
1674
/// * aggregations: count(*), first(), sum() etc.
1675
const Independent = 1 << 1;
1676
/// "Inherits" the height of the context, e.g.:
1677
/// * Scalar literals
1678
const InheritsContext = 1 << 2;
1679
}
1680
}
1681
1682
impl ExprSqlProjectionHeightBehavior {
1683
fn identify_from_expr(expr: &Expr) -> Self {
1684
let mut has_column = false;
1685
let mut has_independent = false;
1686
1687
for e in expr.into_iter() {
1688
use Expr::*;
1689
1690
has_column |= matches!(e, Column(_) | Selector(_));
1691
1692
has_independent |= match e {
1693
// @TODO: This is broken now with functions.
1694
AnonymousFunction { options, .. } => {
1695
options.returns_scalar() || !options.is_length_preserving()
1696
},
1697
1698
Literal(v) => !v.is_scalar(),
1699
1700
Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1701
1702
Agg { .. } | Len => true,
1703
1704
_ => false,
1705
}
1706
}
1707
1708
if has_independent {
1709
Self::Independent
1710
} else if has_column {
1711
Self::MaintainsColumn
1712
} else {
1713
Self::InheritsContext
1714
}
1715
}
1716
}
1717
1718