Path: blob/main/crates/polars-io/src/catalog/unity/schema.rs
8430 views
use polars_core::prelude::{DataType, Field};1use polars_core::schema::{Schema, SchemaRef};2use polars_error::{PolarsResult, polars_bail, polars_err, to_compute_err};3use polars_utils::error::TruncateErrorDetail;4use polars_utils::format_pl_smallstr;5use polars_utils::pl_str::PlSmallStr;67use super::models::{ColumnInfo, ColumnTypeJson, ColumnTypeJsonType, TableInfo};8use crate::utils::decode_json_response;910/// Returns `(schema, hive_schema)`11pub fn table_info_to_schemas(12table_info: &TableInfo,13) -> PolarsResult<(Option<SchemaRef>, Option<SchemaRef>)> {14let Some(columns) = table_info.columns.as_deref() else {15return Ok((None, None));16};1718let mut schema = Schema::default();19let mut hive_schema = Schema::default();2021for (i, col) in columns.iter().enumerate() {22if let Some(position) = col.position {23if usize::try_from(position).unwrap() != i {24polars_bail!(25ComputeError:26"not yet supported: position was not ordered"27)28}29}3031let field = column_info_to_field(col)?;3233if let Some(i) = col.partition_index {34if usize::try_from(i).unwrap() != hive_schema.len() {35polars_bail!(36ComputeError:37"not yet supported: partition_index was not ordered"38)39}4041hive_schema.extend([field]);42} else {43schema.extend([field])44}45}4647Ok((48Some(schema.into()),49Some(hive_schema)50.filter(|x| !x.is_empty())51.map(|x| x.into()),52))53}5455pub fn column_info_to_field(column_info: &ColumnInfo) -> PolarsResult<Field> {56Ok(Field::new(57column_info.name.clone(),58parse_type_json_str(&column_info.type_json)?,59))60}6162/// e.g.63/// ```json64/// {"name":"Int64","type":"long","nullable":true}65/// {"name":"List","type":{"type":"array","elementType":"long","containsNull":true},"nullable":true}66/// ```67pub fn parse_type_json_str(type_json: &str) -> PolarsResult<DataType> {68let decoded: ColumnTypeJson = decode_json_response(type_json.as_bytes())?;6970parse_type_json(&decoded).map_err(|e| {71e.wrap_msg(|e| {72format!(73"error parsing type response: {}, type_json: {}",74e,75TruncateErrorDetail(type_json)76)77})78})79}8081/// We prefer this as `type_text` cannot be trusted for consistency (e.g. we may expect `decimal(int,int)`82/// but instead get `decimal`, or `struct<...>` and instead get `struct`).83pub fn parse_type_json(type_json: &ColumnTypeJson) -> PolarsResult<DataType> {84use ColumnTypeJsonType::*;8586let out = match &type_json.type_ {87TypeName(name) => match name.as_str() {88"array" => {89let inner_json: &ColumnTypeJsonType =90type_json.element_type.as_ref().ok_or_else(|| {91polars_err!(92ComputeError:93"missing elementType in response for array type"94)95})?;9697let inner_dtype = parse_type_json_type(inner_json)?;9899DataType::List(Box::new(inner_dtype))100},101102"struct" => {103let fields_json: &[ColumnTypeJson] =104type_json.fields.as_deref().ok_or_else(|| {105polars_err!(106ComputeError:107"missing elementType in response for array type"108)109})?;110111let fields = fields_json112.iter()113.map(|x| {114let name = x.name.clone().ok_or_else(|| {115polars_err!(116ComputeError:117"missing name in fields response for struct type"118)119})?;120let dtype = parse_type_json(x)?;121122Ok(Field::new(name, dtype))123})124.collect::<PolarsResult<Vec<_>>>()?;125126DataType::Struct(fields)127},128129"map" => {130let key_type = type_json.key_type.as_ref().ok_or_else(|| {131polars_err!(132ComputeError:133"missing keyType in response for map type"134)135})?;136137let value_type = type_json.value_type.as_ref().ok_or_else(|| {138polars_err!(139ComputeError:140"missing valueType in response for map type"141)142})?;143144DataType::List(Box::new(DataType::Struct(vec![145Field::new(146PlSmallStr::from_static("key"),147parse_type_json_type(key_type)?,148),149Field::new(150PlSmallStr::from_static("value"),151parse_type_json_type(value_type)?,152),153])))154},155156name => parse_type_text(name)?,157},158159TypeJson(type_json) => parse_type_json(type_json.as_ref())?,160};161162Ok(out)163}164165fn parse_type_json_type(type_json_type: &ColumnTypeJsonType) -> PolarsResult<DataType> {166use ColumnTypeJsonType::*;167168match type_json_type {169TypeName(name) => parse_type_text(name),170TypeJson(type_json) => parse_type_json(type_json.as_ref()),171}172}173174/// Parses the string variant of the `type` field within a `type_json`. This can be understood as175/// the leaf / non-nested datatypes of the field.176///177/// References:178/// * https://spark.apache.org/docs/latest/sql-ref-datatypes.html179/// * https://docs.databricks.com/api/workspace/tables/get180/// * https://docs.databricks.com/en/sql/language-manual/sql-ref-datatypes.html181///182/// Notes:183/// * `type_precision` and `type_scale` in the API response are defined as supplementary data to184/// the `type_text`, but from testing they aren't actually used - e.g. a decimal type would have a185/// `type_text` of `decimal(18, 2)`186fn parse_type_text(type_text: &str) -> PolarsResult<DataType> {187use DataType::*;188use polars_core::prelude::TimeUnit;189190let dtype = match type_text {191"boolean" => Boolean,192193"tinyint" | "byte" => Int8,194"smallint" | "short" => Int16,195"int" | "integer" => Int32,196"bigint" | "long" => Int64,197198"float" | "real" => Float32,199"double" => Float64,200201"date" => Date,202"timestamp" | "timestamp_ntz" | "timestamp_ltz" => Datetime(TimeUnit::Microseconds, None),203204"string" => String,205"binary" => Binary,206207"null" | "void" => Null,208209v => {210if v.starts_with("decimal") {211// e.g. decimal(38,18)212(|| {213let (precision, scale) = v214.get(7..)?215.strip_prefix('(')?216.strip_suffix(')')?217.split_once(',')?;218let precision: usize = precision.parse().ok()?;219let scale: usize = scale.parse().ok()?;220221Some(DataType::Decimal(precision, scale))222})()223.ok_or_else(|| {224polars_err!(225ComputeError:226"type format did not match decimal(int,int): {}",227v228)229})?230} else {231polars_bail!(232ComputeError:233"parse_type_text unknown type name: {}",234v235)236}237},238};239240Ok(dtype)241}242243// Conversion functions to API format. Mainly used for constructing the request to create tables.244245pub fn schema_to_column_info_list(schema: &Schema) -> PolarsResult<Vec<ColumnInfo>> {246schema247.iter()248.enumerate()249.map(|(i, (name, dtype))| {250let name = name.clone();251let type_text = dtype_to_type_text(dtype)?;252let type_name = dtype_to_type_name(dtype)?;253let type_json = serde_json::to_string(&field_to_type_json(name.clone(), dtype)?)254.map_err(to_compute_err)?;255256Ok(ColumnInfo {257name,258type_name,259type_text,260type_json,261position: Some(i.try_into().unwrap()),262comment: None,263partition_index: None,264})265})266.collect::<PolarsResult<_>>()267}268269/// Creates the `type_text` field of the API. Opposite of [`parse_type_text`]270fn dtype_to_type_text(dtype: &DataType) -> PolarsResult<PlSmallStr> {271use DataType::*;272use polars_core::prelude::TimeUnit;273274macro_rules! S {275($e:expr) => {276PlSmallStr::from_static($e)277};278}279280let out = match dtype {281Boolean => S!("boolean"),282283Int8 => S!("tinyint"),284Int16 => S!("smallint"),285Int32 => S!("int"),286Int64 => S!("bigint"),287288Float32 => S!("float"),289Float64 => S!("double"),290291Date => S!("date"),292Datetime(TimeUnit::Microseconds, None) => S!("timestamp_ntz"),293294String => S!("string"),295Binary => S!("binary"),296297Null => S!("null"),298299Decimal(precision, scale) => {300format_pl_smallstr!("decimal({},{})", precision, scale)301},302303List(inner) => {304if let Some((key_type, value_type)) = get_list_map_type(inner) {305format_pl_smallstr!(306"map<{},{}>",307dtype_to_type_text(key_type)?,308dtype_to_type_text(value_type)?309)310} else {311format_pl_smallstr!("array<{}>", dtype_to_type_text(inner)?)312}313},314315Struct(fields) => {316// Yes, it's possible to construct column names containing the brackets. This won't317// affect us as we parse using `type_json` rather than this field.318let mut out = std::string::String::from("struct<");319320for Field { name, dtype } in fields {321out.push_str(name);322out.push(':');323out.push_str(&dtype_to_type_text(dtype)?);324out.push(',');325}326327if out.ends_with(',') {328out.truncate(out.len() - 1);329}330331out.push('>');332333out.into()334},335336v => polars_bail!(337ComputeError:338"dtype_to_type_text unsupported type: {}",339v340),341};342343Ok(out)344}345346/// Creates the `type_name` field, from testing this wasn't exactly the same as the `type_text` field.347fn dtype_to_type_name(dtype: &DataType) -> PolarsResult<PlSmallStr> {348use DataType::*;349use polars_core::prelude::TimeUnit;350351macro_rules! S {352($e:expr) => {353PlSmallStr::from_static($e)354};355}356357let out = match dtype {358Boolean => S!("BOOLEAN"),359360Int8 => S!("BYTE"),361Int16 => S!("SHORT"),362Int32 => S!("INT"),363Int64 => S!("LONG"),364365Float32 => S!("FLOAT"),366Float64 => S!("DOUBLE"),367368Date => S!("DATE"),369Datetime(TimeUnit::Microseconds, None) => S!("TIMESTAMP_NTZ"),370String => S!("STRING"),371Binary => S!("BINARY"),372373Null => S!("NULL"),374375Decimal(..) => S!("DECIMAL"),376377List(inner) => {378if get_list_map_type(inner).is_some() {379S!("MAP")380} else {381S!("ARRAY")382}383},384385Struct(..) => S!("STRUCT"),386387v => polars_bail!(388ComputeError:389"dtype_to_type_text unsupported type: {}",390v391),392};393394Ok(out)395}396397/// Creates the `type_json` field.398fn field_to_type_json(name: PlSmallStr, dtype: &DataType) -> PolarsResult<ColumnTypeJson> {399Ok(ColumnTypeJson {400name: Some(name),401type_: dtype_to_type_json(dtype)?,402nullable: Some(true),403// We set this to Some(_) so that the output matches the one generated by Databricks.404metadata: Some(Default::default()),405406..Default::default()407})408}409410fn dtype_to_type_json(dtype: &DataType) -> PolarsResult<ColumnTypeJsonType> {411use DataType::*;412use polars_core::prelude::TimeUnit;413414macro_rules! S {415($e:expr) => {416ColumnTypeJsonType::from_static_type_name($e)417};418}419420let out = match dtype {421Boolean => S!("boolean"),422423Int8 => S!("byte"),424Int16 => S!("short"),425Int32 => S!("integer"),426Int64 => S!("long"),427428Float32 => S!("float"),429Float64 => S!("double"),430431Date => S!("date"),432Datetime(TimeUnit::Microseconds, None) => S!("timestamp_ntz"),433434String => S!("string"),435Binary => S!("binary"),436437Null => S!("null"),438439Decimal(..) => ColumnTypeJsonType::TypeName(dtype_to_type_text(dtype)?),440441List(inner) => {442let out = if let Some((key_type, value_type)) = get_list_map_type(inner) {443ColumnTypeJson {444type_: ColumnTypeJsonType::from_static_type_name("map"),445key_type: Some(dtype_to_type_json(key_type)?),446value_type: Some(dtype_to_type_json(value_type)?),447value_contains_null: Some(true),448449..Default::default()450}451} else {452ColumnTypeJson {453type_: ColumnTypeJsonType::from_static_type_name("array"),454element_type: Some(dtype_to_type_json(inner)?),455contains_null: Some(true),456457..Default::default()458}459};460461ColumnTypeJsonType::TypeJson(Box::new(out))462},463464Struct(fields) => {465let out = ColumnTypeJson {466type_: ColumnTypeJsonType::from_static_type_name("struct"),467fields: Some(468fields469.iter()470.map(|Field { name, dtype }| field_to_type_json(name.clone(), dtype))471.collect::<PolarsResult<_>>()?,472),473474..Default::default()475};476477ColumnTypeJsonType::TypeJson(Box::new(out))478},479480v => polars_bail!(481ComputeError:482"dtype_to_type_text unsupported type: {}",483v484),485};486487Ok(out)488}489490/// Tries to interpret the List type as a `map` field, which is essentially491/// List(Struct(("key", <dtype>), ("value", <dtyoe>))).492///493/// Returns `Option<(key_type, value_type)>`494fn get_list_map_type(list_inner_dtype: &DataType) -> Option<(&DataType, &DataType)> {495let DataType::Struct(fields) = list_inner_dtype else {496return None;497};498499let [fld1, fld2] = fields.as_slice() else {500return None;501};502503if !(fld1.name == "key" && fld2.name == "value") {504return None;505}506507Some((fld1.dtype(), fld2.dtype()))508}509510511