Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/catalog/unity/utils.rs
6939 views
1
use bytes::Bytes;
2
use polars_error::{PolarsResult, to_compute_err};
3
use polars_utils::error::TruncateErrorDetail;
4
use reqwest::RequestBuilder;
5
6
/// Performs the request and attaches the response body to any error messages.
7
pub(super) async fn do_request(request: reqwest::RequestBuilder) -> PolarsResult<bytes::Bytes> {
8
let resp = request.send().await.map_err(to_compute_err)?;
9
let opt_err = resp.error_for_status_ref().map(|_| ());
10
let resp_bytes = resp.bytes().await.map_err(to_compute_err)?;
11
12
opt_err.map_err(|e| {
13
to_compute_err(e).wrap_msg(|e| {
14
let body = String::from_utf8_lossy(&resp_bytes);
15
16
format!(
17
"error: {}, response body: {}",
18
e,
19
TruncateErrorDetail(&body)
20
)
21
})
22
})?;
23
24
Ok(resp_bytes)
25
}
26
27
/// Support for traversing paginated response values that look like:
28
/// ```text
29
/// {
30
/// $key_name: [$T, $T, ...],
31
/// next_page_token: "token" or null,
32
/// }
33
/// ```
34
#[macro_export]
35
macro_rules! impl_page_walk {
36
($S:ty, $T:ty, key_name = $key_name:tt) => {
37
impl $S {
38
pub async fn next(&mut self) -> PolarsResult<Option<Vec<$T>>> {
39
return self
40
.0
41
.next(|bytes| {
42
let Response {
43
$key_name: out,
44
next_page_token,
45
} = decode_json_response(bytes)?;
46
47
Ok((out, next_page_token))
48
})
49
.await;
50
51
#[derive(serde::Deserialize)]
52
struct Response {
53
#[serde(default = "Vec::new")]
54
$key_name: Vec<$T>,
55
#[serde(default)]
56
next_page_token: Option<String>,
57
}
58
}
59
60
pub async fn read_all_pages(mut self) -> PolarsResult<Vec<$T>> {
61
let Some(mut out) = self.next().await? else {
62
return Ok(vec![]);
63
};
64
65
while let Some(v) = self.next().await? {
66
out.extend(v);
67
}
68
69
Ok(out)
70
}
71
}
72
};
73
}
74
75
pub(crate) struct PageWalker {
76
request: RequestBuilder,
77
next_page_token: Option<String>,
78
has_run: bool,
79
}
80
81
impl PageWalker {
82
pub(crate) fn new(request: RequestBuilder) -> Self {
83
Self {
84
request,
85
next_page_token: None,
86
has_run: false,
87
}
88
}
89
90
pub(crate) async fn next<F, T>(&mut self, deserializer: F) -> PolarsResult<Option<T>>
91
where
92
F: Fn(&[u8]) -> PolarsResult<(T, Option<String>)>,
93
{
94
let Some(resp_bytes) = self.next_bytes().await? else {
95
return Ok(None);
96
};
97
98
let (value, next_page_token) = deserializer(&resp_bytes)?;
99
self.next_page_token = next_page_token;
100
101
Ok(Some(value))
102
}
103
104
pub(crate) async fn next_bytes(&mut self) -> PolarsResult<Option<Bytes>> {
105
if self.has_run && self.next_page_token.is_none() {
106
return Ok(None);
107
}
108
109
self.has_run = true;
110
111
let request = self.request.try_clone().unwrap();
112
113
let request = if let Some(page_token) = self.next_page_token.take() {
114
request.query(&[("page_token", page_token)])
115
} else {
116
request
117
};
118
119
do_request(request).await.map(Some)
120
}
121
}
122
123