Path: blob/main/crates/polars-io/src/catalog/unity/utils.rs
6939 views
use bytes::Bytes;1use polars_error::{PolarsResult, to_compute_err};2use polars_utils::error::TruncateErrorDetail;3use reqwest::RequestBuilder;45/// Performs the request and attaches the response body to any error messages.6pub(super) async fn do_request(request: reqwest::RequestBuilder) -> PolarsResult<bytes::Bytes> {7let resp = request.send().await.map_err(to_compute_err)?;8let opt_err = resp.error_for_status_ref().map(|_| ());9let resp_bytes = resp.bytes().await.map_err(to_compute_err)?;1011opt_err.map_err(|e| {12to_compute_err(e).wrap_msg(|e| {13let body = String::from_utf8_lossy(&resp_bytes);1415format!(16"error: {}, response body: {}",17e,18TruncateErrorDetail(&body)19)20})21})?;2223Ok(resp_bytes)24}2526/// Support for traversing paginated response values that look like:27/// ```text28/// {29/// $key_name: [$T, $T, ...],30/// next_page_token: "token" or null,31/// }32/// ```33#[macro_export]34macro_rules! impl_page_walk {35($S:ty, $T:ty, key_name = $key_name:tt) => {36impl $S {37pub async fn next(&mut self) -> PolarsResult<Option<Vec<$T>>> {38return self39.040.next(|bytes| {41let Response {42$key_name: out,43next_page_token,44} = decode_json_response(bytes)?;4546Ok((out, next_page_token))47})48.await;4950#[derive(serde::Deserialize)]51struct Response {52#[serde(default = "Vec::new")]53$key_name: Vec<$T>,54#[serde(default)]55next_page_token: Option<String>,56}57}5859pub async fn read_all_pages(mut self) -> PolarsResult<Vec<$T>> {60let Some(mut out) = self.next().await? else {61return Ok(vec![]);62};6364while let Some(v) = self.next().await? {65out.extend(v);66}6768Ok(out)69}70}71};72}7374pub(crate) struct PageWalker {75request: RequestBuilder,76next_page_token: Option<String>,77has_run: bool,78}7980impl PageWalker {81pub(crate) fn new(request: RequestBuilder) -> Self {82Self {83request,84next_page_token: None,85has_run: false,86}87}8889pub(crate) async fn next<F, T>(&mut self, deserializer: F) -> PolarsResult<Option<T>>90where91F: Fn(&[u8]) -> PolarsResult<(T, Option<String>)>,92{93let Some(resp_bytes) = self.next_bytes().await? else {94return Ok(None);95};9697let (value, next_page_token) = deserializer(&resp_bytes)?;98self.next_page_token = next_page_token;99100Ok(Some(value))101}102103pub(crate) async fn next_bytes(&mut self) -> PolarsResult<Option<Bytes>> {104if self.has_run && self.next_page_token.is_none() {105return Ok(None);106}107108self.has_run = true;109110let request = self.request.try_clone().unwrap();111112let request = if let Some(page_token) = self.next_page_token.take() {113request.query(&[("page_token", page_token)])114} else {115request116};117118do_request(request).await.map(Some)119}120}121122123