Path: blob/main/crates/polars-plan/src/plans/conversion/dsl_to_ir/scans.rs
7889 views
use std::sync::LazyLock;12use arrow::buffer::Buffer;3use either::Either;4use polars_io::RowIndex;5#[cfg(feature = "cloud")]6use polars_io::pl_async::get_runtime;7use polars_io::prelude::*;8use polars_io::utils::compression::maybe_decompress_bytes;910use super::*;1112pub(super) fn dsl_to_ir(13sources: ScanSources,14mut unified_scan_args_box: Box<UnifiedScanArgs>,15scan_type: Box<FileScanDsl>,16cached_ir: Arc<Mutex<Option<IR>>>,17ctxt: &mut DslConversionContext,18) -> PolarsResult<IR> {19// Note that the first metadata can still end up being `None` later if the files were20// filtered from predicate pushdown.21let mut cached_ir = cached_ir.lock().unwrap();2223if cached_ir.is_none() {24let unified_scan_args = unified_scan_args_box.as_mut();2526if let Some(hive_schema) = unified_scan_args.hive_options.schema.as_deref() {27match unified_scan_args.hive_options.enabled {28// Enable hive_partitioning if it is unspecified but a non-empty hive_schema given29None if !hive_schema.is_empty() => {30unified_scan_args.hive_options.enabled = Some(true)31},32// hive_partitioning was explicitly disabled33Some(false) => polars_bail!(34ComputeError:35"a hive schema was given but hive_partitioning was disabled"36),37Some(true) | None => {},38}39}4041let sources_before_expansion = &sources;4243let sources = match &*scan_type {44#[cfg(feature = "parquet")]45FileScanDsl::Parquet { .. } => {46sources.expand_paths_with_hive_update(unified_scan_args)?47},48#[cfg(feature = "ipc")]49FileScanDsl::Ipc { .. } => sources.expand_paths_with_hive_update(unified_scan_args)?,50#[cfg(feature = "csv")]51FileScanDsl::Csv { .. } => sources.expand_paths(unified_scan_args)?,52#[cfg(feature = "json")]53FileScanDsl::NDJson { .. } => sources.expand_paths(unified_scan_args)?,54#[cfg(feature = "python")]55FileScanDsl::PythonDataset { .. } => {56// There are a lot of places that short-circuit if the paths is empty,57// so we just give a dummy path here.58ScanSources::Paths(Buffer::from_iter([PlPath::from_str("dummy")]))59},60#[cfg(feature = "scan_lines")]61FileScanDsl::Lines { .. } => sources.expand_paths(unified_scan_args)?,62FileScanDsl::Anonymous { .. } => sources.clone(),63};6465// For cloud we must deduplicate files. Serialization/deserialization leads to Arc's losing there66// sharing.67let (mut file_info, scan_type_ir) = ctxt.cache_file_info.get_or_insert(68&scan_type,69&sources,70sources_before_expansion,71unified_scan_args,72ctxt.verbose,73)?;7475if unified_scan_args.hive_options.enabled.is_none() {76// We expect this to be `Some(_)` after this point. If it hasn't been auto-enabled77// we explicitly set it to disabled.78unified_scan_args.hive_options.enabled = Some(false);79}8081let hive_parts = if unified_scan_args.hive_options.enabled.unwrap()82&& file_info.reader_schema.is_some()83{84let paths = sources85.as_paths()86.ok_or_else(|| polars_err!(nyi = "Hive-partitioning of in-memory buffers"))?;8788#[allow(unused_assignments)]89let mut owned = None;9091hive_partitions_from_paths(92paths,93unified_scan_args.hive_options.hive_start_idx,94unified_scan_args.hive_options.schema.clone(),95match file_info.reader_schema.as_ref().unwrap() {96Either::Left(v) => {97owned = Some(Schema::from_arrow_schema(v.as_ref()));98owned.as_ref().unwrap()99},100Either::Right(v) => v.as_ref(),101},102unified_scan_args.hive_options.try_parse_dates,103)?104} else {105None106};107108if let Some(ref hive_parts) = hive_parts {109let hive_schema = hive_parts.schema();110file_info.update_schema_with_hive_schema(hive_schema.clone());111} else if let Some(hive_schema) = unified_scan_args.hive_options.schema.clone() {112// We hit here if we are passed the `hive_schema` to `scan_parquet` but end up with an empty file113// list during path expansion. In this case we still want to return an empty DataFrame with this114// schema.115file_info.update_schema_with_hive_schema(hive_schema);116}117118if let Some(ref file_path_col) = unified_scan_args.include_file_paths {119let schema: &mut Schema = Arc::make_mut(&mut file_info.schema);120121if schema.contains(file_path_col) {122polars_bail!(123Duplicate: r#"column name for file paths "{}" conflicts with column name from file"#,124file_path_col125);126}127128schema.insert_at_index(schema.len(), file_path_col.clone(), DataType::String)?;129}130131unified_scan_args.projection = if file_info.reader_schema.is_some() {132maybe_init_projection_excluding_hive(133file_info.reader_schema.as_ref().unwrap(),134hive_parts.as_ref().map(|h| h.schema()),135)136} else {137None138};139140if let Some(row_index) = &unified_scan_args.row_index {141let schema = Arc::make_mut(&mut file_info.schema);142*schema = schema143.new_inserting_at_index(0, row_index.name.clone(), IDX_DTYPE)144.unwrap();145}146147let ir = if sources.is_empty() && !matches!(&(*scan_type), FileScanDsl::Anonymous { .. }) {148IR::DataFrameScan {149df: Arc::new(DataFrame::empty_with_schema(&file_info.schema)),150schema: file_info.schema,151output_schema: None,152}153} else {154let unified_scan_args = unified_scan_args_box;155156IR::Scan {157sources,158file_info,159hive_parts,160predicate: None,161predicate_file_skip_applied: None,162scan_type: Box::new(scan_type_ir),163output_schema: None,164unified_scan_args,165}166};167168cached_ir.replace(ir);169}170171Ok(cached_ir.clone().unwrap())172}173174pub(super) fn insert_row_index_to_schema(175schema: &mut Schema,176name: PlSmallStr,177) -> PolarsResult<()> {178if schema.contains(&name) {179polars_bail!(180Duplicate:181"cannot add row_index with name '{}': \182column already exists in file.",183name,184)185}186187schema.insert_at_index(0, name, IDX_DTYPE)?;188189Ok(())190}191192#[cfg(any(feature = "parquet", feature = "ipc"))]193fn prepare_output_schema(194mut schema: Schema,195row_index: Option<&RowIndex>,196) -> PolarsResult<SchemaRef> {197if let Some(rc) = row_index {198insert_row_index_to_schema(&mut schema, rc.name.clone())?;199}200Ok(Arc::new(schema))201}202203#[cfg(any(feature = "json", feature = "csv"))]204fn prepare_schemas(205mut schema: Schema,206row_index: Option<&RowIndex>,207) -> PolarsResult<(SchemaRef, SchemaRef)> {208Ok(if let Some(rc) = row_index {209let reader_schema = schema.clone();210insert_row_index_to_schema(&mut schema, rc.name.clone())?;211(Arc::new(reader_schema), Arc::new(schema))212} else {213let schema = Arc::new(schema);214(schema.clone(), schema)215})216}217218#[cfg(feature = "parquet")]219pub(super) fn parquet_file_info(220first_scan_source: ScanSourceRef<'_>,221row_index: Option<&RowIndex>,222#[allow(unused)] cloud_options: Option<&polars_io::cloud::CloudOptions>,223n_sources: usize,224) -> PolarsResult<(FileInfo, Option<FileMetadataRef>)> {225use polars_core::error::feature_gated;226227let (reader_schema, num_rows, metadata) = {228if first_scan_source.is_cloud_url() {229let first_path = first_scan_source.as_path().unwrap();230feature_gated!("cloud", {231get_runtime().block_in_place_on(async {232let mut reader =233ParquetObjectStore::from_uri(first_path, cloud_options, None).await?;234235PolarsResult::Ok((236reader.schema().await?,237reader.num_rows().await?,238reader.get_metadata().await?.clone(),239))240})?241})242} else {243let memslice = first_scan_source.to_memslice()?;244let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));245(246reader.schema()?,247reader.num_rows()?,248reader.get_metadata()?.clone(),249)250}251};252253let schema =254prepare_output_schema(Schema::from_arrow_schema(reader_schema.as_ref()), row_index)?;255256let known_size = if n_sources == 1 { Some(num_rows) } else { None };257258let file_info = FileInfo::new(259schema,260Some(Either::Left(reader_schema)),261(known_size, num_rows * n_sources),262);263264Ok((file_info, Some(metadata)))265}266267pub fn max_metadata_scan_cached() -> usize {268static MAX_SCANS_METADATA_CACHED: LazyLock<usize> = LazyLock::new(|| {269let value = std::env::var("POLARS_MAX_CACHED_METADATA_SCANS").map_or(8, |v| {270v.parse::<usize>()271.expect("invalid `POLARS_MAX_CACHED_METADATA_SCANS` value")272});273if value == 0 {274return usize::MAX;275}276value277});278*MAX_SCANS_METADATA_CACHED279}280281// TODO! return metadata arced282#[cfg(feature = "ipc")]283pub(super) fn ipc_file_info(284first_scan_source: ScanSourceRef<'_>,285row_index: Option<&RowIndex>,286cloud_options: Option<&polars_io::cloud::CloudOptions>,287) -> PolarsResult<(FileInfo, arrow::io::ipc::read::FileMetadata)> {288use polars_core::error::feature_gated;289use polars_utils::plpath::PlPathRef;290291let metadata = match first_scan_source {292ScanSourceRef::Path(path) => match path {293PlPathRef::Cloud(_) => {294feature_gated!("cloud", {295get_runtime().block_on(async {296polars_io::ipc::IpcReaderAsync::from_uri(path, cloud_options)297.await?298.metadata()299.await300})?301})302},303PlPathRef::Local(path) => arrow::io::ipc::read::read_file_metadata(304&mut std::io::BufReader::new(polars_utils::open_file(path)?),305)?,306},307ScanSourceRef::File(file) => {308arrow::io::ipc::read::read_file_metadata(&mut std::io::BufReader::new(file))?309},310ScanSourceRef::Buffer(buff) => {311arrow::io::ipc::read::read_file_metadata(&mut std::io::Cursor::new(buff))?312},313};314315let file_info = FileInfo::new(316prepare_output_schema(317Schema::from_arrow_schema(metadata.schema.as_ref()),318row_index,319)?,320Some(Either::Left(Arc::clone(&metadata.schema))),321(None, usize::MAX),322);323324Ok((file_info, metadata))325}326327#[cfg(feature = "csv")]328pub fn csv_file_info(329sources: &ScanSources,330_first_scan_source: ScanSourceRef<'_>,331row_index: Option<&RowIndex>,332csv_options: &mut CsvReadOptions,333cloud_options: Option<&polars_io::cloud::CloudOptions>,334) -> PolarsResult<FileInfo> {335use std::io::{Read, Seek};336337use polars_core::error::feature_gated;338use polars_core::{POOL, config};339use polars_io::csv::read::schema_inference::SchemaInferenceResult;340use polars_io::utils::get_reader_bytes;341use rayon::iter::{IntoParallelIterator, ParallelIterator};342343// Holding _first_scan_source should guarantee sources is not empty.344debug_assert!(!sources.is_empty());345346// TODO:347// * See if we can do better than scanning all files if there is a row limit348// * See if we can do this without downloading the entire file349350// prints the error message if paths is empty.351let run_async = sources.is_cloud_url() || (sources.is_paths() && config::force_async());352353let cache_entries = {354if run_async {355feature_gated!("cloud", {356Some(polars_io::file_cache::init_entries_from_uri_list(357sources358.as_paths()359.unwrap()360.iter()361.map(|path| Arc::from(path.to_str())),362cloud_options,363)?)364})365} else {366None367}368};369370let infer_schema_func = |i| {371let source = sources.at(i);372let memslice = source.to_memslice_possibly_async(run_async, cache_entries.as_ref(), i)?;373let owned = &mut vec![];374let mut reader = std::io::Cursor::new(maybe_decompress_bytes(&memslice, owned)?);375if reader.read(&mut [0; 4])? < 2 && csv_options.raise_if_empty {376polars_bail!(NoData: "empty CSV")377}378reader.rewind()?;379380let reader_bytes = get_reader_bytes(&mut reader).expect("could not mmap file");381382// this needs a way to estimated bytes/rows.383SchemaInferenceResult::try_from_reader_bytes_and_options(&reader_bytes, csv_options)384};385386let merge_func = |a: PolarsResult<SchemaInferenceResult>,387b: PolarsResult<SchemaInferenceResult>| {388match (a, b) {389(Err(e), _) | (_, Err(e)) => Err(e),390(Ok(a), Ok(b)) => {391let merged_schema = if csv_options.schema.is_some() {392csv_options.schema.clone().unwrap()393} else {394let schema_a = a.get_inferred_schema();395let schema_b = b.get_inferred_schema();396397match (schema_a.is_empty(), schema_b.is_empty()) {398(true, _) => schema_b,399(_, true) => schema_a,400_ => {401let mut s = Arc::unwrap_or_clone(schema_a);402s.to_supertype(&schema_b)?;403Arc::new(s)404},405}406};407408Ok(a.with_inferred_schema(merged_schema))409},410}411};412413let si_results = POOL.join(414|| infer_schema_func(0),415|| {416(1..sources.len())417.into_par_iter()418.map(infer_schema_func)419.reduce(|| Ok(Default::default()), merge_func)420},421);422423let si_result = merge_func(si_results.0, si_results.1)?;424425csv_options.update_with_inference_result(&si_result);426427let mut schema = csv_options428.schema429.clone()430.unwrap_or_else(|| si_result.get_inferred_schema());431432let reader_schema = if let Some(rc) = row_index {433let reader_schema = schema.clone();434let mut output_schema = (*reader_schema).clone();435insert_row_index_to_schema(&mut output_schema, rc.name.clone())?;436schema = Arc::new(output_schema);437reader_schema438} else {439schema.clone()440};441442let estimated_n_rows = si_result.get_estimated_n_rows();443444Ok(FileInfo::new(445schema,446Some(Either::Right(reader_schema)),447(None, estimated_n_rows),448))449}450451#[cfg(feature = "json")]452pub fn ndjson_file_info(453sources: &ScanSources,454first_scan_source: ScanSourceRef<'_>,455row_index: Option<&RowIndex>,456ndjson_options: &NDJsonReadOptions,457cloud_options: Option<&polars_io::cloud::CloudOptions>,458) -> PolarsResult<FileInfo> {459use polars_core::config;460use polars_core::error::feature_gated;461462let run_async = sources.is_cloud_url() || (sources.is_paths() && config::force_async());463464let cache_entries = {465if run_async {466feature_gated!("cloud", {467Some(polars_io::file_cache::init_entries_from_uri_list(468sources469.as_paths()470.unwrap()471.iter()472.map(|path| Arc::from(path.to_str())),473cloud_options,474)?)475})476} else {477None478}479};480481let owned = &mut vec![];482483let mut schema = if let Some(schema) = ndjson_options.schema.clone() {484schema485} else {486let memslice =487first_scan_source.to_memslice_possibly_async(run_async, cache_entries.as_ref(), 0)?;488let mut reader = std::io::Cursor::new(maybe_decompress_bytes(&memslice, owned)?);489490Arc::new(polars_io::ndjson::infer_schema(491&mut reader,492ndjson_options.infer_schema_length,493)?)494};495496if let Some(overwriting_schema) = &ndjson_options.schema_overwrite {497overwrite_schema(Arc::make_mut(&mut schema), overwriting_schema)?;498}499500let mut reader_schema = schema.clone();501502if row_index.is_some() {503(schema, reader_schema) = prepare_schemas(Arc::unwrap_or_clone(schema), row_index)?504}505506Ok(FileInfo::new(507schema,508Some(Either::Right(reader_schema)),509(None, usize::MAX),510))511}512513// Add flags that influence metadata/schema here514#[derive(Eq, Hash, PartialEq)]515enum CachedSourceKey {516ParquetIpc {517first_path: PlPath,518schema_overwrite: Option<SchemaRef>,519},520CsvJson {521paths: Buffer<PlPath>,522schema: Option<SchemaRef>,523schema_overwrite: Option<SchemaRef>,524},525}526527#[derive(Default)]528pub(super) struct SourcesToFileInfo {529inner: PlHashMap<CachedSourceKey, (FileInfo, FileScanIR)>,530}531532impl SourcesToFileInfo {533fn infer_or_parse(534&mut self,535scan_type: FileScanDsl,536sources: &ScanSources,537sources_before_expansion: &ScanSources,538unified_scan_args: &mut UnifiedScanArgs,539) -> PolarsResult<(FileInfo, FileScanIR)> {540let require_first_source = |failed_operation_name: &'static str, hint: &'static str| {541sources.first_or_empty_expand_err(542failed_operation_name,543sources_before_expansion,544unified_scan_args.glob,545hint,546)547};548549let n_sources = sources.len();550let cloud_options = unified_scan_args.cloud_options.as_ref();551552Ok(match scan_type {553#[cfg(feature = "parquet")]554FileScanDsl::Parquet { options } => {555if let Some(schema) = &options.schema {556// We were passed a schema, we don't have to call `parquet_file_info`,557// but this does mean we don't have `row_estimation` and `first_metadata`.558559(560FileInfo {561schema: schema.clone(),562reader_schema: Some(either::Either::Left(Arc::new(563schema.to_arrow(CompatLevel::newest()),564))),565row_estimation: (None, usize::MAX),566},567FileScanIR::Parquet {568options,569metadata: None,570},571)572} else {573(|| {574let first_scan_source = require_first_source(575"failed to retrieve first file schema (parquet)",576"\577passing a schema can allow \578this scan to succeed with an empty DataFrame.",579)?;580581if verbose() {582eprintln!(583"sourcing parquet scan file schema from: '{}'",584first_scan_source.to_include_path_name()585)586}587588let (mut file_info, mut metadata) = scans::parquet_file_info(589first_scan_source,590unified_scan_args.row_index.as_ref(),591cloud_options,592n_sources,593)?;594595if let Some((total, deleted)) = unified_scan_args.row_count {596let size = (total - deleted) as usize;597file_info.row_estimation = (Some(size), size);598}599600if self.inner.len() > max_metadata_scan_cached() {601_ = metadata.take();602}603604PolarsResult::Ok((file_info, FileScanIR::Parquet { options, metadata }))605})()606.map_err(|e| e.context(failed_here!(parquet scan)))?607}608},609#[cfg(feature = "ipc")]610FileScanDsl::Ipc { options } => (|| {611let first_scan_source =612require_first_source("failed to retrieve first file schema (ipc)", "")?;613614if verbose() {615eprintln!(616"sourcing ipc scan file schema from: '{}'",617first_scan_source.to_include_path_name()618)619}620621let (file_info, md) = scans::ipc_file_info(622first_scan_source,623unified_scan_args.row_index.as_ref(),624cloud_options,625)?;626627PolarsResult::Ok((628file_info,629FileScanIR::Ipc {630options,631metadata: Some(Arc::new(md)),632},633))634})()635.map_err(|e| e.context(failed_here!(ipc scan)))?,636#[cfg(feature = "csv")]637FileScanDsl::Csv { mut options } => {638(|| {639// TODO: This is a hack. We conditionally set `allow_missing_columns` to640// mimic existing behavior, but this should be taken from a user provided641// parameter instead.642if options.schema.is_some() && options.has_header {643unified_scan_args.missing_columns_policy = MissingColumnsPolicy::Insert;644}645646let file_info = if let Some(schema) = options.schema.clone() {647FileInfo {648schema: schema.clone(),649reader_schema: Some(either::Either::Right(schema)),650row_estimation: (None, usize::MAX),651}652} else {653let first_scan_source =654require_first_source("failed to retrieve file schemas (csv)", "")?;655656if verbose() {657eprintln!(658"sourcing csv scan file schema from: '{}'",659first_scan_source.to_include_path_name()660)661}662663scans::csv_file_info(664sources,665first_scan_source,666unified_scan_args.row_index.as_ref(),667&mut options,668cloud_options,669)?670};671672PolarsResult::Ok((file_info, FileScanIR::Csv { options }))673})()674.map_err(|e| e.context(failed_here!(csv scan)))?675},676#[cfg(feature = "json")]677FileScanDsl::NDJson { options } => (|| {678let file_info = if let Some(schema) = options.schema.clone() {679FileInfo {680schema: schema.clone(),681reader_schema: Some(either::Either::Right(schema)),682row_estimation: (None, usize::MAX),683}684} else {685let first_scan_source =686require_first_source("failed to retrieve first file schema (ndjson)", "")?;687688if verbose() {689eprintln!(690"sourcing ndjson scan file schema from: '{}'",691first_scan_source.to_include_path_name()692)693}694695scans::ndjson_file_info(696sources,697first_scan_source,698unified_scan_args.row_index.as_ref(),699&options,700cloud_options,701)?702};703704PolarsResult::Ok((file_info, FileScanIR::NDJson { options }))705})()706.map_err(|e| e.context(failed_here!(ndjson scan)))?,707#[cfg(feature = "python")]708FileScanDsl::PythonDataset { dataset_object } => (|| {709if crate::dsl::DATASET_PROVIDER_VTABLE.get().is_none() {710polars_bail!(ComputeError: "DATASET_PROVIDER_VTABLE (python) not initialized")711}712713let mut schema = dataset_object.schema()?;714let reader_schema = schema.clone();715716if let Some(row_index) = &unified_scan_args.row_index {717insert_row_index_to_schema(Arc::make_mut(&mut schema), row_index.name.clone())?;718}719720PolarsResult::Ok((721FileInfo {722schema,723reader_schema: Some(either::Either::Right(reader_schema)),724row_estimation: (None, usize::MAX),725},726FileScanIR::PythonDataset {727dataset_object,728cached_ir: Default::default(),729},730))731})()732.map_err(|e| e.context(failed_here!(python dataset scan)))?,733#[cfg(feature = "scan_lines")]734FileScanDsl::Lines { name } => {735// We were passed a schema, we don't have to call `parquet_file_info`,736// but this does mean we don't have `row_estimation` and `first_metadata`.737let schema = Arc::new(Schema::from_iter([(name.clone(), DataType::String)]));738739(740FileInfo {741schema: schema.clone(),742reader_schema: Some(either::Either::Right(schema.clone())),743row_estimation: (None, usize::MAX),744},745FileScanIR::Lines { name },746)747},748FileScanDsl::Anonymous {749file_info,750options,751function,752} => (file_info, FileScanIR::Anonymous { options, function }),753})754}755756pub(super) fn get_or_insert(757&mut self,758scan_type: &FileScanDsl,759sources: &ScanSources,760sources_before_expansion: &ScanSources,761unified_scan_args: &mut UnifiedScanArgs,762verbose: bool,763) -> PolarsResult<(FileInfo, FileScanIR)> {764// Only cache non-empty paths. Others are directly parsed.765let paths = match sources {766ScanSources::Paths(paths) if !paths.is_empty() => paths.clone(),767768_ => {769return self.infer_or_parse(770scan_type.clone(),771sources,772sources_before_expansion,773unified_scan_args,774);775},776};777778let (k, v): (CachedSourceKey, Option<&(FileInfo, FileScanIR)>) = match scan_type {779#[cfg(feature = "parquet")]780FileScanDsl::Parquet { options } => {781let key = CachedSourceKey::ParquetIpc {782first_path: paths[0].clone(),783schema_overwrite: options.schema.clone(),784};785786let v = self.inner.get(&key);787(key, v)788},789#[cfg(feature = "ipc")]790FileScanDsl::Ipc { options: _ } => {791let key = CachedSourceKey::ParquetIpc {792first_path: paths[0].clone(),793schema_overwrite: None,794};795796let v = self.inner.get(&key);797(key, v)798},799#[cfg(feature = "csv")]800FileScanDsl::Csv { options } => {801let key = CachedSourceKey::CsvJson {802paths: paths.clone(),803schema: options.schema.clone(),804schema_overwrite: options.schema_overwrite.clone(),805};806let v = self.inner.get(&key);807(key, v)808},809#[cfg(feature = "json")]810FileScanDsl::NDJson { options } => {811let key = CachedSourceKey::CsvJson {812paths: paths.clone(),813schema: options.schema.clone(),814schema_overwrite: options.schema_overwrite.clone(),815};816let v = self.inner.get(&key);817(key, v)818},819_ => {820return self.infer_or_parse(821scan_type.clone(),822sources,823sources_before_expansion,824unified_scan_args,825);826},827};828829if let Some(out) = v {830if verbose {831eprintln!("FILE_INFO CACHE HIT")832}833Ok(out.clone())834} else {835let v = self.infer_or_parse(836scan_type.clone(),837sources,838sources_before_expansion,839unified_scan_args,840)?;841self.inner.insert(k, v.clone());842Ok(v)843}844}845}846847848