Path: blob/main/crates/polars-io/src/catalog/unity/schema.rs
6939 views
use polars_core::prelude::{DataType, Field};1use polars_core::schema::{Schema, SchemaRef};2use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};3use polars_utils::error::TruncateErrorDetail;4use polars_utils::format_pl_smallstr;5use polars_utils::pl_str::PlSmallStr;67use super::models::{ColumnInfo, ColumnTypeJson, ColumnTypeJsonType, TableInfo};8use crate::utils::decode_json_response;910/// Returns `(schema, hive_schema)`11pub fn table_info_to_schemas(12table_info: &TableInfo,13) -> PolarsResult<(Option<SchemaRef>, Option<SchemaRef>)> {14let Some(columns) = table_info.columns.as_deref() else {15return Ok((None, None));16};1718let mut schema = Schema::default();19let mut hive_schema = Schema::default();2021for (i, col) in columns.iter().enumerate() {22if let Some(position) = col.position {23if usize::try_from(position).unwrap() != i {24polars_bail!(25ComputeError:26"not yet supported: position was not ordered"27)28}29}3031let field = column_info_to_field(col)?;3233if let Some(i) = col.partition_index {34if usize::try_from(i).unwrap() != hive_schema.len() {35polars_bail!(36ComputeError:37"not yet supported: partition_index was not ordered"38)39}4041hive_schema.extend([field]);42} else {43schema.extend([field])44}45}4647Ok((48Some(schema.into()),49Some(hive_schema)50.filter(|x| !x.is_empty())51.map(|x| x.into()),52))53}5455pub fn column_info_to_field(column_info: &ColumnInfo) -> PolarsResult<Field> {56Ok(Field::new(57column_info.name.clone(),58parse_type_json_str(&column_info.type_json)?,59))60}6162/// e.g.63/// ```json64/// {"name":"Int64","type":"long","nullable":true}65/// {"name":"List","type":{"type":"array","elementType":"long","containsNull":true},"nullable":true}66/// ```67pub fn parse_type_json_str(type_json: &str) -> PolarsResult<DataType> {68let decoded: ColumnTypeJson = decode_json_response(type_json.as_bytes())?;6970parse_type_json(&decoded).map_err(|e| {71e.wrap_msg(|e| {72format!(73"error parsing type response: {}, type_json: {}",74e,75TruncateErrorDetail(type_json)76)77})78})79}8081/// We prefer this as `type_text` cannot be trusted for consistency (e.g. we may expect `decimal(int,int)`82/// but instead get `decimal`, or `struct<...>` and instead get `struct`).83pub fn parse_type_json(type_json: &ColumnTypeJson) -> PolarsResult<DataType> {84use ColumnTypeJsonType::*;8586let out = match &type_json.type_ {87TypeName(name) => match name.as_str() {88"array" => {89let inner_json: &ColumnTypeJsonType =90type_json.element_type.as_ref().ok_or_else(|| {91polars_err!(92ComputeError:93"missing elementType in response for array type"94)95})?;9697let inner_dtype = parse_type_json_type(inner_json)?;9899DataType::List(Box::new(inner_dtype))100},101102"struct" => {103let fields_json: &[ColumnTypeJson] =104type_json.fields.as_deref().ok_or_else(|| {105polars_err!(106ComputeError:107"missing elementType in response for array type"108)109})?;110111let fields = fields_json112.iter()113.map(|x| {114let name = x.name.clone().ok_or_else(|| {115polars_err!(116ComputeError:117"missing name in fields response for struct type"118)119})?;120let dtype = parse_type_json(x)?;121122Ok(Field::new(name, dtype))123})124.collect::<PolarsResult<Vec<_>>>()?;125126DataType::Struct(fields)127},128129"map" => {130let key_type = type_json.key_type.as_ref().ok_or_else(|| {131polars_err!(132ComputeError:133"missing keyType in response for map type"134)135})?;136137let value_type = type_json.value_type.as_ref().ok_or_else(|| {138polars_err!(139ComputeError:140"missing valueType in response for map type"141)142})?;143144DataType::List(Box::new(DataType::Struct(vec![145Field::new(146PlSmallStr::from_static("key"),147parse_type_json_type(key_type)?,148),149Field::new(150PlSmallStr::from_static("value"),151parse_type_json_type(value_type)?,152),153])))154},155156name => parse_type_text(name)?,157},158159TypeJson(type_json) => parse_type_json(type_json.as_ref())?,160};161162Ok(out)163}164165fn parse_type_json_type(type_json_type: &ColumnTypeJsonType) -> PolarsResult<DataType> {166use ColumnTypeJsonType::*;167168match type_json_type {169TypeName(name) => parse_type_text(name),170TypeJson(type_json) => parse_type_json(type_json.as_ref()),171}172}173174/// Parses the string variant of the `type` field within a `type_json`. This can be understood as175/// the leaf / non-nested datatypes of the field.176///177/// References:178/// * https://spark.apache.org/docs/latest/sql-ref-datatypes.html179/// * https://docs.databricks.com/api/workspace/tables/get180/// * https://docs.databricks.com/en/sql/language-manual/sql-ref-datatypes.html181///182/// Notes:183/// * `type_precision` and `type_scale` in the API response are defined as supplementary data to184/// the `type_text`, but from testing they aren't actually used - e.g. a decimal type would have a185/// `type_text` of `decimal(18, 2)`186fn parse_type_text(type_text: &str) -> PolarsResult<DataType> {187use DataType::*;188use polars_core::prelude::TimeUnit;189190let dtype = match type_text {191"boolean" => Boolean,192193"tinyint" | "byte" => Int8,194"smallint" | "short" => Int16,195"int" | "integer" => Int32,196"bigint" | "long" => Int64,197198"float" | "real" => Float32,199"double" => Float64,200201"date" => Date,202"timestamp" | "timestamp_ntz" | "timestamp_ltz" => Datetime(TimeUnit::Microseconds, None),203204"string" => String,205"binary" => Binary,206207"null" | "void" => Null,208209v => {210if v.starts_with("decimal") {211// e.g. decimal(38,18)212(|| {213let (precision, scale) = v214.get(7..)?215.strip_prefix('(')?216.strip_suffix(')')?217.split_once(',')?;218let precision: usize = precision.parse().ok()?;219let scale: usize = scale.parse().ok()?;220221Some(DataType::Decimal(Some(precision), Some(scale)))222})()223.ok_or_else(|| {224polars_err!(225ComputeError:226"type format did not match decimal(int,int): {}",227v228)229})?230} else {231polars_bail!(232ComputeError:233"parse_type_text unknown type name: {}",234v235)236}237},238};239240Ok(dtype)241}242243// Conversion functions to API format. Mainly used for constructing the request to create tables.244245pub fn schema_to_column_info_list(schema: &Schema) -> PolarsResult<Vec<ColumnInfo>> {246schema247.iter()248.enumerate()249.map(|(i, (name, dtype))| {250let name = name.clone();251let type_text = dtype_to_type_text(dtype)?;252let type_name = dtype_to_type_name(dtype)?;253let type_json = serde_json::to_string(&field_to_type_json(name.clone(), dtype)?)254.map_err(to_compute_err)?;255256Ok(ColumnInfo {257name,258type_name,259type_text,260type_json,261position: Some(i.try_into().unwrap()),262comment: None,263partition_index: None,264})265})266.collect::<PolarsResult<_>>()267}268269/// Creates the `type_text` field of the API. Opposite of [`parse_type_text`]270fn dtype_to_type_text(dtype: &DataType) -> PolarsResult<PlSmallStr> {271use DataType::*;272use polars_core::prelude::TimeUnit;273274macro_rules! S {275($e:expr) => {276PlSmallStr::from_static($e)277};278}279280let out = match dtype {281Boolean => S!("boolean"),282283Int8 => S!("tinyint"),284Int16 => S!("smallint"),285Int32 => S!("int"),286Int64 => S!("bigint"),287288Float32 => S!("float"),289Float64 => S!("double"),290291Date => S!("date"),292Datetime(TimeUnit::Microseconds, None) => S!("timestamp_ntz"),293294String => S!("string"),295Binary => S!("binary"),296297Null => S!("null"),298299Decimal(precision, scale) => {300let precision = precision.unwrap_or(38);301let scale = scale.unwrap_or(0);302303format_pl_smallstr!("decimal({},{})", precision, scale)304},305306List(inner) => {307if let Some((key_type, value_type)) = get_list_map_type(inner) {308format_pl_smallstr!(309"map<{},{}>",310dtype_to_type_text(key_type)?,311dtype_to_type_text(value_type)?312)313} else {314format_pl_smallstr!("array<{}>", dtype_to_type_text(inner)?)315}316},317318Struct(fields) => {319// Yes, it's possible to construct column names containing the brackets. This won't320// affect us as we parse using `type_json` rather than this field.321let mut out = std::string::String::from("struct<");322323for Field { name, dtype } in fields {324out.push_str(name);325out.push(':');326out.push_str(&dtype_to_type_text(dtype)?);327out.push(',');328}329330if out.ends_with(',') {331out.truncate(out.len() - 1);332}333334out.push('>');335336out.into()337},338339v => polars_bail!(340ComputeError:341"dtype_to_type_text unsupported type: {}",342v343),344};345346Ok(out)347}348349/// Creates the `type_name` field, from testing this wasn't exactly the same as the `type_text` field.350fn dtype_to_type_name(dtype: &DataType) -> PolarsResult<PlSmallStr> {351use DataType::*;352use polars_core::prelude::TimeUnit;353354macro_rules! S {355($e:expr) => {356PlSmallStr::from_static($e)357};358}359360let out = match dtype {361Boolean => S!("BOOLEAN"),362363Int8 => S!("BYTE"),364Int16 => S!("SHORT"),365Int32 => S!("INT"),366Int64 => S!("LONG"),367368Float32 => S!("FLOAT"),369Float64 => S!("DOUBLE"),370371Date => S!("DATE"),372Datetime(TimeUnit::Microseconds, None) => S!("TIMESTAMP_NTZ"),373String => S!("STRING"),374Binary => S!("BINARY"),375376Null => S!("NULL"),377378Decimal(..) => S!("DECIMAL"),379380List(inner) => {381if get_list_map_type(inner).is_some() {382S!("MAP")383} else {384S!("ARRAY")385}386},387388Struct(..) => S!("STRUCT"),389390v => polars_bail!(391ComputeError:392"dtype_to_type_text unsupported type: {}",393v394),395};396397Ok(out)398}399400/// Creates the `type_json` field.401fn field_to_type_json(name: PlSmallStr, dtype: &DataType) -> PolarsResult<ColumnTypeJson> {402Ok(ColumnTypeJson {403name: Some(name),404type_: dtype_to_type_json(dtype)?,405nullable: Some(true),406// We set this to Some(_) so that the output matches the one generated by Databricks.407metadata: Some(Default::default()),408409..Default::default()410})411}412413fn dtype_to_type_json(dtype: &DataType) -> PolarsResult<ColumnTypeJsonType> {414use DataType::*;415use polars_core::prelude::TimeUnit;416417macro_rules! S {418($e:expr) => {419ColumnTypeJsonType::from_static_type_name($e)420};421}422423let out = match dtype {424Boolean => S!("boolean"),425426Int8 => S!("byte"),427Int16 => S!("short"),428Int32 => S!("integer"),429Int64 => S!("long"),430431Float32 => S!("float"),432Float64 => S!("double"),433434Date => S!("date"),435Datetime(TimeUnit::Microseconds, None) => S!("timestamp_ntz"),436437String => S!("string"),438Binary => S!("binary"),439440Null => S!("null"),441442Decimal(..) => ColumnTypeJsonType::TypeName(dtype_to_type_text(dtype)?),443444List(inner) => {445let out = if let Some((key_type, value_type)) = get_list_map_type(inner) {446ColumnTypeJson {447type_: ColumnTypeJsonType::from_static_type_name("map"),448key_type: Some(dtype_to_type_json(key_type)?),449value_type: Some(dtype_to_type_json(value_type)?),450value_contains_null: Some(true),451452..Default::default()453}454} else {455ColumnTypeJson {456type_: ColumnTypeJsonType::from_static_type_name("array"),457element_type: Some(dtype_to_type_json(inner)?),458contains_null: Some(true),459460..Default::default()461}462};463464ColumnTypeJsonType::TypeJson(Box::new(out))465},466467Struct(fields) => {468let out = ColumnTypeJson {469type_: ColumnTypeJsonType::from_static_type_name("struct"),470fields: Some(471fields472.iter()473.map(|Field { name, dtype }| field_to_type_json(name.clone(), dtype))474.collect::<PolarsResult<_>>()?,475),476477..Default::default()478};479480ColumnTypeJsonType::TypeJson(Box::new(out))481},482483v => polars_bail!(484ComputeError:485"dtype_to_type_text unsupported type: {}",486v487),488};489490Ok(out)491}492493/// Tries to interpret the List type as a `map` field, which is essentially494/// List(Struct(("key", <dtype>), ("value", <dtyoe>))).495///496/// Returns `Option<(key_type, value_type)>`497fn get_list_map_type(list_inner_dtype: &DataType) -> Option<(&DataType, &DataType)> {498let DataType::Struct(fields) = list_inner_dtype else {499return None;500};501502let [fld1, fld2] = fields.as_slice() else {503return None;504};505506if !(fld1.name == "key" && fld2.name == "value") {507return None;508}509510Some((fld1.dtype(), fld2.dtype()))511}512513514