#[cfg(feature = "timezones")]
use polars_core::datatypes::time_zone::parse_time_zone;
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_ops::prelude::*;
use polars_ops::series::SeriesMethods;
use crate::prelude::*;
pub trait PolarsUpsample {
fn upsample<I: IntoVec<PlSmallStr>>(
&self,
by: I,
time_column: &str,
every: Duration,
) -> PolarsResult<DataFrame>;
fn upsample_stable<I: IntoVec<PlSmallStr>>(
&self,
by: I,
time_column: &str,
every: Duration,
) -> PolarsResult<DataFrame>;
}
impl PolarsUpsample for DataFrame {
fn upsample<I: IntoVec<PlSmallStr>>(
&self,
by: I,
time_column: &str,
every: Duration,
) -> PolarsResult<DataFrame> {
let by = by.into_vec();
let time_type = self.column(time_column)?.dtype();
ensure_duration_matches_dtype(every, time_type, "every")?;
upsample_impl(self, by, time_column, every, false)
}
fn upsample_stable<I: IntoVec<PlSmallStr>>(
&self,
by: I,
time_column: &str,
every: Duration,
) -> PolarsResult<DataFrame> {
let by = by.into_vec();
let time_type = self.column(time_column)?.dtype();
ensure_duration_matches_dtype(every, time_type, "every")?;
upsample_impl(self, by, time_column, every, true)
}
}
fn upsample_impl(
source: &DataFrame,
by: Vec<PlSmallStr>,
index_column: &str,
every: Duration,
stable: bool,
) -> PolarsResult<DataFrame> {
let s = source.column(index_column)?;
let original_type = s.dtype();
let needs_cast = matches!(
original_type,
DataType::Date | DataType::UInt32 | DataType::UInt64 | DataType::Int32 | DataType::Int64
);
let mut df = source.clone();
if needs_cast {
df.try_apply(index_column, |s| match s.dtype() {
#[cfg(feature = "dtype-date")]
DataType::Date => s.cast(&DataType::Datetime(TimeUnit::Microseconds, None)),
DataType::UInt32 | DataType::UInt64 | DataType::Int32 => s
.cast(&DataType::Int64)?
.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
DataType::Int64 => s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
_ => Ok(s.clone()),
})?;
}
let mut out = upsample_core(&df, by, index_column, every, stable)?;
if needs_cast {
out.try_apply(index_column, |s| s.cast(original_type))?;
}
Ok(out)
}
fn upsample_core(
source: &DataFrame,
by: Vec<PlSmallStr>,
index_column: &str,
every: Duration,
stable: bool,
) -> PolarsResult<DataFrame> {
if by.is_empty() {
let index_column = source.column(index_column)?;
return upsample_single_impl(source, index_column.as_materialized_series(), every);
}
let source_schema = source.schema();
let group_keys_df = source.select(by)?;
let group_keys_schema = group_keys_df.schema();
let groups = if stable {
group_keys_df.group_by_stable(group_keys_schema.iter_names_cloned())
} else {
group_keys_df.group_by(group_keys_schema.iter_names_cloned())
}?
.into_groups();
let non_group_keys_df = unsafe {
source.select_unchecked(
source_schema
.iter_names()
.filter(|name| !group_keys_schema.contains(name.as_str())),
)?
};
let upsample_index_col_idx: Option<usize> = non_group_keys_df.schema().index_of(index_column);
let dfs: Vec<DataFrame> = groups
.iter()
.map(|g| {
let first_idx = g.first();
let mut non_group_keys_df = unsafe { non_group_keys_df.gather_group_unchecked(&g) };
if let Some(i) = upsample_index_col_idx {
non_group_keys_df = upsample_single_impl(
&non_group_keys_df,
non_group_keys_df.columns()[i].as_materialized_series(),
every,
)?
}
let mut out = non_group_keys_df;
let group_keys_df = group_keys_df.new_from_index(first_idx as usize, out.height());
let out_cols = unsafe { out.columns_mut() };
out_cols.reserve(group_keys_df.width());
out_cols.extend(group_keys_df.into_columns());
Ok(out)
})
.collect::<PolarsResult<_>>()?;
Ok(unsafe {
accumulate_dataframes_vertical_unchecked(dfs)
.select_unchecked(source_schema.iter_names())?
.with_schema(source_schema.clone())
})
}
fn upsample_single_impl(
source: &DataFrame,
index_column: &Series,
every: Duration,
) -> PolarsResult<DataFrame> {
index_column.ensure_sorted_arg("upsample")?;
let index_col_name = index_column.name();
use DataType::*;
match index_column.dtype() {
#[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
Datetime(tu, tz) => {
let s = index_column.cast(&Int64).unwrap();
let ca = s.i64().unwrap();
let first = ca.iter().flatten().next();
let last = ca.iter().flatten().next_back();
match (first, last) {
(Some(first), Some(last)) => {
let tz = match tz {
#[cfg(feature = "timezones")]
Some(tz) => Some(parse_time_zone(tz)?),
_ => None,
};
let range = datetime_range_impl(
index_col_name.clone(),
first,
last,
every,
ClosedWindow::Both,
*tu,
tz.as_ref(),
)?
.into_series()
.into_frame();
range.join(
source,
[index_col_name.clone()],
[index_col_name.clone()],
JoinArgs::new(JoinType::Left),
None,
)
},
_ => polars_bail!(
ComputeError: "cannot determine upsample boundaries: all elements are null"
),
}
},
dt => polars_bail!(
ComputeError: "upsample not allowed for index column of dtype {}", dt,
),
}
}