use arrow::legacy::time_zone::Tz;
use polars_core::prelude::arity::broadcast_try_binary_elementwise;
use polars_core::prelude::*;
use polars_core::series::IsSorted;
use crate::Duration;
fn apply_offsets_to_datetime(
datetime: &Logical<DatetimeType, Int64Type>,
offsets: &StringChunked,
time_zone: Option<&Tz>,
) -> PolarsResult<Int64Chunked> {
match offsets.len() {
1 => match offsets.get(0) {
Some(offset) => {
let offset = &Duration::parse(offset);
if offset.is_constant_duration(datetime.time_zone().as_ref()) {
let mut duration = match datetime.time_unit() {
TimeUnit::Milliseconds => offset.duration_ms(),
TimeUnit::Microseconds => offset.duration_us(),
TimeUnit::Nanoseconds => offset.duration_ns(),
};
if offset.negative() {
duration = -duration;
}
Ok(datetime.phys.clone().wrapping_add_scalar(duration))
} else {
let offset_fn = match datetime.time_unit() {
TimeUnit::Milliseconds => Duration::add_ms,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Nanoseconds => Duration::add_ns,
};
datetime
.phys
.try_apply_nonnull_values_generic(|v| offset_fn(offset, v, time_zone))
}
},
_ => Ok(datetime.phys.apply(|_| None)),
},
_ => {
let offset_fn = match datetime.time_unit() {
TimeUnit::Milliseconds => Duration::add_ms,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Nanoseconds => Duration::add_ns,
};
broadcast_try_binary_elementwise(
datetime.physical(),
offsets,
|timestamp_opt, offset_opt| match (timestamp_opt, offset_opt) {
(Some(timestamp), Some(offset)) => {
offset_fn(&Duration::try_parse(offset)?, timestamp, time_zone).map(Some)
},
_ => Ok(None),
},
)
},
}
}
pub fn impl_offset_by(ts: &Series, offsets: &Series) -> PolarsResult<Series> {
let offsets = offsets.str()?;
polars_ensure!(
ts.len() == offsets.len() || offsets.len() == 1 || ts.len() == 1,
length_mismatch = "dt.offset_by",
ts.len(),
offsets.len()
);
let dtype = ts.dtype();
let tz = match dtype {
DataType::Date => None,
DataType::Datetime(_, tz) => tz.clone(),
_ => polars_bail!(InvalidOperation: "expected Date or Datetime, got {}", dtype),
};
let preserve_sortedness = match offsets.len() {
1 => match offsets.get(0) {
Some(offset) => {
let offset = Duration::try_parse(offset)?;
offset.is_constant_duration(tz.as_ref())
},
None => false,
},
_ => false,
};
let out = match dtype {
DataType::Date => {
let ts = ts
.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
.unwrap();
let datetime = ts.datetime().unwrap();
let out = apply_offsets_to_datetime(datetime, offsets, None)?;
out.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
.unwrap()
.cast(&DataType::Date)
},
DataType::Datetime(tu, tz) => {
let datetime = ts.datetime().unwrap();
let out = match tz {
#[cfg(feature = "timezones")]
Some(tz) => {
apply_offsets_to_datetime(datetime, offsets, tz.parse::<Tz>().ok().as_ref())?
},
_ => apply_offsets_to_datetime(datetime, offsets, None)?,
};
out.cast(&DataType::Datetime(*tu, tz.clone()))
},
dt => polars_bail!(
ComputeError: "cannot use 'offset_by' on Series of datatype {}", dt,
),
};
if preserve_sortedness {
out.map(|mut out| {
out.set_sorted_flag(ts.is_sorted_flag());
out
})
} else {
out.map(|mut out| {
out.set_sorted_flag(IsSorted::Not);
out
})
}
}