Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/append/mod.rs
8424 views
1
//! A struct adapter of Read+Seek+Write to append to IPC files
2
// read header and convert to writer information
3
// seek to first byte of header - 1
4
// write new batch
5
// write new footer
6
use std::io::{Read, Seek, SeekFrom, Write};
7
8
use polars_error::{PolarsResult, polars_bail, polars_err};
9
10
use super::endianness::is_native_little_endian;
11
use super::read::{self, FileMetadata};
12
use super::write::common::DictionaryTracker;
13
use super::write::writer::*;
14
use super::write::*;
15
16
impl<R: Read + Seek + Write> FileWriter<R> {
17
/// Creates a new [`FileWriter`] from an existing file, seeking to the last message
18
/// and appending new messages afterwards. Users call `finish` to write the footer (with both)
19
/// the existing and appended messages on it.
20
/// # Error
21
/// This function errors iff:
22
/// * the file's endianness is not the native endianness (not yet supported)
23
/// * the file is not a valid Arrow IPC file
24
pub fn try_from_file(
25
mut writer: R,
26
metadata: FileMetadata,
27
options: WriteOptions,
28
) -> PolarsResult<FileWriter<R>> {
29
if metadata.ipc_schema.is_little_endian != is_native_little_endian() {
30
polars_bail!(ComputeError: "appending to a file of a non-native endianness is not supported")
31
}
32
33
let dictionaries = read::read_file_dictionaries(
34
&mut writer,
35
&metadata,
36
&mut Default::default(),
37
Default::default(),
38
)?;
39
40
let last_block = metadata.blocks.last().ok_or_else(|| {
41
polars_err!(oos = "an Arrow IPC file must have at least 1 message (the schema message)")
42
})?;
43
let offset: u64 = last_block
44
.offset
45
.try_into()
46
.map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
47
let meta_data_length: u64 = last_block
48
.meta_data_length
49
.try_into()
50
.map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
51
let body_length: u64 = last_block
52
.body_length
53
.try_into()
54
.map_err(|_| polars_err!(oos = "the block's body length must be a positive number"))?;
55
let offset: u64 = offset + meta_data_length + body_length;
56
57
writer.seek(SeekFrom::Start(offset))?;
58
59
Ok(FileWriter {
60
writer,
61
options,
62
schema: metadata.schema,
63
ipc_fields: metadata.ipc_schema.fields,
64
block_offsets: offset as usize,
65
dictionary_blocks: metadata.dictionaries.unwrap_or_default(),
66
record_blocks: metadata.blocks,
67
state: State::Started, // file already exists, so we are ready
68
dictionary_tracker: DictionaryTracker {
69
dictionaries,
70
cannot_replace: true,
71
},
72
encoded_message: Default::default(),
73
custom_schema_metadata: None,
74
})
75
}
76
}
77
78