Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/catalog/unity/schema.rs
6939 views
1
use polars_core::prelude::{DataType, Field};
2
use polars_core::schema::{Schema, SchemaRef};
3
use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};
4
use polars_utils::error::TruncateErrorDetail;
5
use polars_utils::format_pl_smallstr;
6
use polars_utils::pl_str::PlSmallStr;
7
8
use super::models::{ColumnInfo, ColumnTypeJson, ColumnTypeJsonType, TableInfo};
9
use crate::utils::decode_json_response;
10
11
/// Returns `(schema, hive_schema)`
12
pub fn table_info_to_schemas(
13
table_info: &TableInfo,
14
) -> PolarsResult<(Option<SchemaRef>, Option<SchemaRef>)> {
15
let Some(columns) = table_info.columns.as_deref() else {
16
return Ok((None, None));
17
};
18
19
let mut schema = Schema::default();
20
let mut hive_schema = Schema::default();
21
22
for (i, col) in columns.iter().enumerate() {
23
if let Some(position) = col.position {
24
if usize::try_from(position).unwrap() != i {
25
polars_bail!(
26
ComputeError:
27
"not yet supported: position was not ordered"
28
)
29
}
30
}
31
32
let field = column_info_to_field(col)?;
33
34
if let Some(i) = col.partition_index {
35
if usize::try_from(i).unwrap() != hive_schema.len() {
36
polars_bail!(
37
ComputeError:
38
"not yet supported: partition_index was not ordered"
39
)
40
}
41
42
hive_schema.extend([field]);
43
} else {
44
schema.extend([field])
45
}
46
}
47
48
Ok((
49
Some(schema.into()),
50
Some(hive_schema)
51
.filter(|x| !x.is_empty())
52
.map(|x| x.into()),
53
))
54
}
55
56
pub fn column_info_to_field(column_info: &ColumnInfo) -> PolarsResult<Field> {
57
Ok(Field::new(
58
column_info.name.clone(),
59
parse_type_json_str(&column_info.type_json)?,
60
))
61
}
62
63
/// e.g.
64
/// ```json
65
/// {"name":"Int64","type":"long","nullable":true}
66
/// {"name":"List","type":{"type":"array","elementType":"long","containsNull":true},"nullable":true}
67
/// ```
68
pub fn parse_type_json_str(type_json: &str) -> PolarsResult<DataType> {
69
let decoded: ColumnTypeJson = decode_json_response(type_json.as_bytes())?;
70
71
parse_type_json(&decoded).map_err(|e| {
72
e.wrap_msg(|e| {
73
format!(
74
"error parsing type response: {}, type_json: {}",
75
e,
76
TruncateErrorDetail(type_json)
77
)
78
})
79
})
80
}
81
82
/// We prefer this as `type_text` cannot be trusted for consistency (e.g. we may expect `decimal(int,int)`
83
/// but instead get `decimal`, or `struct<...>` and instead get `struct`).
84
pub fn parse_type_json(type_json: &ColumnTypeJson) -> PolarsResult<DataType> {
85
use ColumnTypeJsonType::*;
86
87
let out = match &type_json.type_ {
88
TypeName(name) => match name.as_str() {
89
"array" => {
90
let inner_json: &ColumnTypeJsonType =
91
type_json.element_type.as_ref().ok_or_else(|| {
92
polars_err!(
93
ComputeError:
94
"missing elementType in response for array type"
95
)
96
})?;
97
98
let inner_dtype = parse_type_json_type(inner_json)?;
99
100
DataType::List(Box::new(inner_dtype))
101
},
102
103
"struct" => {
104
let fields_json: &[ColumnTypeJson] =
105
type_json.fields.as_deref().ok_or_else(|| {
106
polars_err!(
107
ComputeError:
108
"missing elementType in response for array type"
109
)
110
})?;
111
112
let fields = fields_json
113
.iter()
114
.map(|x| {
115
let name = x.name.clone().ok_or_else(|| {
116
polars_err!(
117
ComputeError:
118
"missing name in fields response for struct type"
119
)
120
})?;
121
let dtype = parse_type_json(x)?;
122
123
Ok(Field::new(name, dtype))
124
})
125
.collect::<PolarsResult<Vec<_>>>()?;
126
127
DataType::Struct(fields)
128
},
129
130
"map" => {
131
let key_type = type_json.key_type.as_ref().ok_or_else(|| {
132
polars_err!(
133
ComputeError:
134
"missing keyType in response for map type"
135
)
136
})?;
137
138
let value_type = type_json.value_type.as_ref().ok_or_else(|| {
139
polars_err!(
140
ComputeError:
141
"missing valueType in response for map type"
142
)
143
})?;
144
145
DataType::List(Box::new(DataType::Struct(vec![
146
Field::new(
147
PlSmallStr::from_static("key"),
148
parse_type_json_type(key_type)?,
149
),
150
Field::new(
151
PlSmallStr::from_static("value"),
152
parse_type_json_type(value_type)?,
153
),
154
])))
155
},
156
157
name => parse_type_text(name)?,
158
},
159
160
TypeJson(type_json) => parse_type_json(type_json.as_ref())?,
161
};
162
163
Ok(out)
164
}
165
166
fn parse_type_json_type(type_json_type: &ColumnTypeJsonType) -> PolarsResult<DataType> {
167
use ColumnTypeJsonType::*;
168
169
match type_json_type {
170
TypeName(name) => parse_type_text(name),
171
TypeJson(type_json) => parse_type_json(type_json.as_ref()),
172
}
173
}
174
175
/// Parses the string variant of the `type` field within a `type_json`. This can be understood as
176
/// the leaf / non-nested datatypes of the field.
177
///
178
/// References:
179
/// * https://spark.apache.org/docs/latest/sql-ref-datatypes.html
180
/// * https://docs.databricks.com/api/workspace/tables/get
181
/// * https://docs.databricks.com/en/sql/language-manual/sql-ref-datatypes.html
182
///
183
/// Notes:
184
/// * `type_precision` and `type_scale` in the API response are defined as supplementary data to
185
/// the `type_text`, but from testing they aren't actually used - e.g. a decimal type would have a
186
/// `type_text` of `decimal(18, 2)`
187
fn parse_type_text(type_text: &str) -> PolarsResult<DataType> {
188
use DataType::*;
189
use polars_core::prelude::TimeUnit;
190
191
let dtype = match type_text {
192
"boolean" => Boolean,
193
194
"tinyint" | "byte" => Int8,
195
"smallint" | "short" => Int16,
196
"int" | "integer" => Int32,
197
"bigint" | "long" => Int64,
198
199
"float" | "real" => Float32,
200
"double" => Float64,
201
202
"date" => Date,
203
"timestamp" | "timestamp_ntz" | "timestamp_ltz" => Datetime(TimeUnit::Microseconds, None),
204
205
"string" => String,
206
"binary" => Binary,
207
208
"null" | "void" => Null,
209
210
v => {
211
if v.starts_with("decimal") {
212
// e.g. decimal(38,18)
213
(|| {
214
let (precision, scale) = v
215
.get(7..)?
216
.strip_prefix('(')?
217
.strip_suffix(')')?
218
.split_once(',')?;
219
let precision: usize = precision.parse().ok()?;
220
let scale: usize = scale.parse().ok()?;
221
222
Some(DataType::Decimal(Some(precision), Some(scale)))
223
})()
224
.ok_or_else(|| {
225
polars_err!(
226
ComputeError:
227
"type format did not match decimal(int,int): {}",
228
v
229
)
230
})?
231
} else {
232
polars_bail!(
233
ComputeError:
234
"parse_type_text unknown type name: {}",
235
v
236
)
237
}
238
},
239
};
240
241
Ok(dtype)
242
}
243
244
// Conversion functions to API format. Mainly used for constructing the request to create tables.
245
246
pub fn schema_to_column_info_list(schema: &Schema) -> PolarsResult<Vec<ColumnInfo>> {
247
schema
248
.iter()
249
.enumerate()
250
.map(|(i, (name, dtype))| {
251
let name = name.clone();
252
let type_text = dtype_to_type_text(dtype)?;
253
let type_name = dtype_to_type_name(dtype)?;
254
let type_json = serde_json::to_string(&field_to_type_json(name.clone(), dtype)?)
255
.map_err(to_compute_err)?;
256
257
Ok(ColumnInfo {
258
name,
259
type_name,
260
type_text,
261
type_json,
262
position: Some(i.try_into().unwrap()),
263
comment: None,
264
partition_index: None,
265
})
266
})
267
.collect::<PolarsResult<_>>()
268
}
269
270
/// Creates the `type_text` field of the API. Opposite of [`parse_type_text`]
271
fn dtype_to_type_text(dtype: &DataType) -> PolarsResult<PlSmallStr> {
272
use DataType::*;
273
use polars_core::prelude::TimeUnit;
274
275
macro_rules! S {
276
($e:expr) => {
277
PlSmallStr::from_static($e)
278
};
279
}
280
281
let out = match dtype {
282
Boolean => S!("boolean"),
283
284
Int8 => S!("tinyint"),
285
Int16 => S!("smallint"),
286
Int32 => S!("int"),
287
Int64 => S!("bigint"),
288
289
Float32 => S!("float"),
290
Float64 => S!("double"),
291
292
Date => S!("date"),
293
Datetime(TimeUnit::Microseconds, None) => S!("timestamp_ntz"),
294
295
String => S!("string"),
296
Binary => S!("binary"),
297
298
Null => S!("null"),
299
300
Decimal(precision, scale) => {
301
let precision = precision.unwrap_or(38);
302
let scale = scale.unwrap_or(0);
303
304
format_pl_smallstr!("decimal({},{})", precision, scale)
305
},
306
307
List(inner) => {
308
if let Some((key_type, value_type)) = get_list_map_type(inner) {
309
format_pl_smallstr!(
310
"map<{},{}>",
311
dtype_to_type_text(key_type)?,
312
dtype_to_type_text(value_type)?
313
)
314
} else {
315
format_pl_smallstr!("array<{}>", dtype_to_type_text(inner)?)
316
}
317
},
318
319
Struct(fields) => {
320
// Yes, it's possible to construct column names containing the brackets. This won't
321
// affect us as we parse using `type_json` rather than this field.
322
let mut out = std::string::String::from("struct<");
323
324
for Field { name, dtype } in fields {
325
out.push_str(name);
326
out.push(':');
327
out.push_str(&dtype_to_type_text(dtype)?);
328
out.push(',');
329
}
330
331
if out.ends_with(',') {
332
out.truncate(out.len() - 1);
333
}
334
335
out.push('>');
336
337
out.into()
338
},
339
340
v => polars_bail!(
341
ComputeError:
342
"dtype_to_type_text unsupported type: {}",
343
v
344
),
345
};
346
347
Ok(out)
348
}
349
350
/// Creates the `type_name` field, from testing this wasn't exactly the same as the `type_text` field.
351
fn dtype_to_type_name(dtype: &DataType) -> PolarsResult<PlSmallStr> {
352
use DataType::*;
353
use polars_core::prelude::TimeUnit;
354
355
macro_rules! S {
356
($e:expr) => {
357
PlSmallStr::from_static($e)
358
};
359
}
360
361
let out = match dtype {
362
Boolean => S!("BOOLEAN"),
363
364
Int8 => S!("BYTE"),
365
Int16 => S!("SHORT"),
366
Int32 => S!("INT"),
367
Int64 => S!("LONG"),
368
369
Float32 => S!("FLOAT"),
370
Float64 => S!("DOUBLE"),
371
372
Date => S!("DATE"),
373
Datetime(TimeUnit::Microseconds, None) => S!("TIMESTAMP_NTZ"),
374
String => S!("STRING"),
375
Binary => S!("BINARY"),
376
377
Null => S!("NULL"),
378
379
Decimal(..) => S!("DECIMAL"),
380
381
List(inner) => {
382
if get_list_map_type(inner).is_some() {
383
S!("MAP")
384
} else {
385
S!("ARRAY")
386
}
387
},
388
389
Struct(..) => S!("STRUCT"),
390
391
v => polars_bail!(
392
ComputeError:
393
"dtype_to_type_text unsupported type: {}",
394
v
395
),
396
};
397
398
Ok(out)
399
}
400
401
/// Creates the `type_json` field.
402
fn field_to_type_json(name: PlSmallStr, dtype: &DataType) -> PolarsResult<ColumnTypeJson> {
403
Ok(ColumnTypeJson {
404
name: Some(name),
405
type_: dtype_to_type_json(dtype)?,
406
nullable: Some(true),
407
// We set this to Some(_) so that the output matches the one generated by Databricks.
408
metadata: Some(Default::default()),
409
410
..Default::default()
411
})
412
}
413
414
fn dtype_to_type_json(dtype: &DataType) -> PolarsResult<ColumnTypeJsonType> {
415
use DataType::*;
416
use polars_core::prelude::TimeUnit;
417
418
macro_rules! S {
419
($e:expr) => {
420
ColumnTypeJsonType::from_static_type_name($e)
421
};
422
}
423
424
let out = match dtype {
425
Boolean => S!("boolean"),
426
427
Int8 => S!("byte"),
428
Int16 => S!("short"),
429
Int32 => S!("integer"),
430
Int64 => S!("long"),
431
432
Float32 => S!("float"),
433
Float64 => S!("double"),
434
435
Date => S!("date"),
436
Datetime(TimeUnit::Microseconds, None) => S!("timestamp_ntz"),
437
438
String => S!("string"),
439
Binary => S!("binary"),
440
441
Null => S!("null"),
442
443
Decimal(..) => ColumnTypeJsonType::TypeName(dtype_to_type_text(dtype)?),
444
445
List(inner) => {
446
let out = if let Some((key_type, value_type)) = get_list_map_type(inner) {
447
ColumnTypeJson {
448
type_: ColumnTypeJsonType::from_static_type_name("map"),
449
key_type: Some(dtype_to_type_json(key_type)?),
450
value_type: Some(dtype_to_type_json(value_type)?),
451
value_contains_null: Some(true),
452
453
..Default::default()
454
}
455
} else {
456
ColumnTypeJson {
457
type_: ColumnTypeJsonType::from_static_type_name("array"),
458
element_type: Some(dtype_to_type_json(inner)?),
459
contains_null: Some(true),
460
461
..Default::default()
462
}
463
};
464
465
ColumnTypeJsonType::TypeJson(Box::new(out))
466
},
467
468
Struct(fields) => {
469
let out = ColumnTypeJson {
470
type_: ColumnTypeJsonType::from_static_type_name("struct"),
471
fields: Some(
472
fields
473
.iter()
474
.map(|Field { name, dtype }| field_to_type_json(name.clone(), dtype))
475
.collect::<PolarsResult<_>>()?,
476
),
477
478
..Default::default()
479
};
480
481
ColumnTypeJsonType::TypeJson(Box::new(out))
482
},
483
484
v => polars_bail!(
485
ComputeError:
486
"dtype_to_type_text unsupported type: {}",
487
v
488
),
489
};
490
491
Ok(out)
492
}
493
494
/// Tries to interpret the List type as a `map` field, which is essentially
495
/// List(Struct(("key", <dtype>), ("value", <dtyoe>))).
496
///
497
/// Returns `Option<(key_type, value_type)>`
498
fn get_list_map_type(list_inner_dtype: &DataType) -> Option<(&DataType, &DataType)> {
499
let DataType::Struct(fields) = list_inner_dtype else {
500
return None;
501
};
502
503
let [fld1, fld2] = fields.as_slice() else {
504
return None;
505
};
506
507
if !(fld1.name == "key" && fld2.name == "value") {
508
return None;
509
}
510
511
Some((fld1.dtype(), fld2.dtype()))
512
}
513
514