#[cfg(feature = "timezones")]
use polars_core::datatypes::time_zone::parse_time_zone;
use polars_core::prelude::*;
use polars_ops::prelude::*;
use polars_ops::series::SeriesMethods;
use crate::prelude::*;
pub trait PolarsUpsample {
fn upsample<I: IntoVec<PlSmallStr>>(
&self,
by: I,
time_column: &str,
every: Duration,
) -> PolarsResult<DataFrame>;
fn upsample_stable<I: IntoVec<PlSmallStr>>(
&self,
by: I,
time_column: &str,
every: Duration,
) -> PolarsResult<DataFrame>;
}
impl PolarsUpsample for DataFrame {
fn upsample<I: IntoVec<PlSmallStr>>(
&self,
by: I,
time_column: &str,
every: Duration,
) -> PolarsResult<DataFrame> {
let by = by.into_vec();
let time_type = self.column(time_column)?.dtype();
ensure_duration_matches_dtype(every, time_type, "every")?;
upsample_impl(self, by, time_column, every, false)
}
fn upsample_stable<I: IntoVec<PlSmallStr>>(
&self,
by: I,
time_column: &str,
every: Duration,
) -> PolarsResult<DataFrame> {
let by = by.into_vec();
let time_type = self.column(time_column)?.dtype();
ensure_duration_matches_dtype(every, time_type, "every")?;
upsample_impl(self, by, time_column, every, true)
}
}
fn upsample_impl(
source: &DataFrame,
by: Vec<PlSmallStr>,
index_column: &str,
every: Duration,
stable: bool,
) -> PolarsResult<DataFrame> {
let s = source.column(index_column)?;
let time_type = s.dtype();
if matches!(time_type, DataType::Date) {
let mut df = source.clone();
df.apply(index_column, |s| {
s.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
.unwrap()
})
.unwrap();
let mut out = upsample_impl(&df, by, index_column, every, stable)?;
out.apply(index_column, |s| s.cast(time_type).unwrap())
.unwrap();
Ok(out)
} else if matches!(
time_type,
DataType::UInt32 | DataType::UInt64 | DataType::Int32
) {
let mut df = source.clone();
df.apply(index_column, |s| {
s.cast(&DataType::Int64)
.unwrap()
.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
.unwrap()
})
.unwrap();
let mut out = upsample_impl(&df, by, index_column, every, stable)?;
out.apply(index_column, |s| s.cast(time_type).unwrap())
.unwrap();
Ok(out)
} else if matches!(time_type, DataType::Int64) {
let mut df = source.clone();
df.apply(index_column, |s| {
s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
.unwrap()
})
.unwrap();
let mut out = upsample_impl(&df, by, index_column, every, stable)?;
out.apply(index_column, |s| s.cast(time_type).unwrap())
.unwrap();
Ok(out)
} else if by.is_empty() {
let index_column = source.column(index_column)?;
upsample_single_impl(source, index_column.as_materialized_series(), every)
} else {
let gb = if stable {
source.group_by_stable(by)
} else {
source.group_by(by)
};
gb?.apply(|df| {
let index_column = df.column(index_column)?;
upsample_single_impl(&df, index_column.as_materialized_series(), every)
})
}
}
fn upsample_single_impl(
source: &DataFrame,
index_column: &Series,
every: Duration,
) -> PolarsResult<DataFrame> {
index_column.ensure_sorted_arg("upsample")?;
let index_col_name = index_column.name();
use DataType::*;
match index_column.dtype() {
Datetime(tu, tz) => {
let s = index_column.cast(&Int64).unwrap();
let ca = s.i64().unwrap();
let first = ca.iter().flatten().next();
let last = ca.iter().flatten().next_back();
match (first, last) {
(Some(first), Some(last)) => {
let tz = match tz {
#[cfg(feature = "timezones")]
Some(tz) => Some(parse_time_zone(tz)?),
_ => None,
};
let range = datetime_range_impl(
index_col_name.clone(),
first,
last,
every,
ClosedWindow::Both,
*tu,
tz.as_ref(),
)?
.into_series()
.into_frame();
range.join(
source,
[index_col_name.clone()],
[index_col_name.clone()],
JoinArgs::new(JoinType::Left),
None,
)
},
_ => polars_bail!(
ComputeError: "cannot determine upsample boundaries: all elements are null"
),
}
},
dt => polars_bail!(
ComputeError: "upsample not allowed for index column of dtype {}", dt,
),
}
}