Path: blob/main/crates/polars-python/src/cloud_server.rs
7884 views
use polars_core::error::{PolarsResult, polars_err};1use polars_expr::state::ExecutionState;2use polars_mem_engine::create_physical_plan;3use polars_plan::plans::{AExpr, IR, IRPlan};4use polars_plan::prelude::{Arena, Node};5use polars_utils::pl_serialize;6use pyo3::intern;7use pyo3::prelude::{PyAnyMethods, PyModule, Python, *};8use pyo3::types::IntoPyDict;910use crate::PyDataFrame;11use crate::error::PyPolarsErr;12use crate::lazyframe::visit::NodeTraverser;13use crate::utils::EnterPolarsExt;1415/// Take a serialized `IRPlan` and execute it on the GPU engine.16///17/// This is done as a Python function because the `NodeTraverser` class created for this purpose18/// must exactly match the one expected by the `cudf_polars` package.19#[pyfunction]20pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec<u8>, py: Python) -> PyResult<PyDataFrame> {21// Deserialize into IRPlan.22let mut ir_plan: IRPlan =23pl_serialize::deserialize_from_reader::<_, _, false>(ir_plan_ser.as_slice())24.map_err(PyPolarsErr::from)?;2526// Edit for use with GPU engine.27gpu_post_opt(28py,29ir_plan.lp_top,30&mut ir_plan.lp_arena,31&mut ir_plan.expr_arena,32)33.map_err(PyPolarsErr::from)?;3435// Convert to physical plan.36let mut physical_plan = create_physical_plan(37ir_plan.lp_top,38&mut ir_plan.lp_arena,39&mut ir_plan.expr_arena,40None,41)42.map_err(PyPolarsErr::from)?;4344// Execute the plan.45let mut state = ExecutionState::new();46py.enter_polars_df(|| physical_plan.execute(&mut state))47}4849/// Prepare the IR for execution by the Polars GPU engine.50fn gpu_post_opt(51py: Python<'_>,52root: Node,53lp_arena: &mut Arena<IR>,54expr_arena: &mut Arena<AExpr>,55) -> PolarsResult<()> {56// Get cuDF Python function.57let cudf = PyModule::import(py, intern!(py, "cudf_polars")).unwrap();58let lambda = cudf.getattr(intern!(py, "execute_with_cudf")).unwrap();5960// Define cuDF config.61let polars = PyModule::import(py, intern!(py, "polars")).unwrap();62let engine = polars.getattr(intern!(py, "GPUEngine")).unwrap();63let kwargs = [("raise_on_fail", true)].into_py_dict(py).unwrap();64let engine = engine.call((), Some(&kwargs)).unwrap();6566// Define node traverser.67let nt = NodeTraverser::new(root, std::mem::take(lp_arena), std::mem::take(expr_arena));6869// Get a copy of the arenas.70let arenas = nt.get_arenas();7172// Pass the node visitor which allows the Python callback to replace parts of the query plan.73// Remove "cuda" or specify better once we have multiple post-opt callbacks.74let kwargs = [("config", engine)].into_py_dict(py).unwrap();75lambda76.call((nt,), Some(&kwargs))77.map_err(|e| polars_err!(ComputeError: "'cuda' conversion failed: {}", e))?;7879// Unpack the arena's.80// At this point the `nt` is useless.81std::mem::swap(lp_arena, &mut *arenas.0.lock().unwrap());82std::mem::swap(expr_arena, &mut *arenas.1.lock().unwrap());8384Ok(())85}868788