Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/io/ipc/append/mod.rs
6940 views
1
//! A struct adapter of Read+Seek+Write to append to IPC files
2
// read header and convert to writer information
3
// seek to first byte of header - 1
4
// write new batch
5
// write new footer
6
use std::io::{Read, Seek, SeekFrom, Write};
7
8
use polars_error::{PolarsResult, polars_bail, polars_err};
9
10
use super::endianness::is_native_little_endian;
11
use super::read::{self, FileMetadata};
12
use super::write::common::DictionaryTracker;
13
use super::write::writer::*;
14
use super::write::*;
15
16
impl<R: Read + Seek + Write> FileWriter<R> {
17
/// Creates a new [`FileWriter`] from an existing file, seeking to the last message
18
/// and appending new messages afterwards. Users call `finish` to write the footer (with both)
19
/// the existing and appended messages on it.
20
/// # Error
21
/// This function errors iff:
22
/// * the file's endianness is not the native endianness (not yet supported)
23
/// * the file is not a valid Arrow IPC file
24
pub fn try_from_file(
25
mut writer: R,
26
metadata: FileMetadata,
27
options: WriteOptions,
28
) -> PolarsResult<FileWriter<R>> {
29
if metadata.ipc_schema.is_little_endian != is_native_little_endian() {
30
polars_bail!(ComputeError: "appending to a file of a non-native endianness is not supported")
31
}
32
33
let dictionaries =
34
read::read_file_dictionaries(&mut writer, &metadata, &mut Default::default())?;
35
36
let last_block = metadata.blocks.last().ok_or_else(|| {
37
polars_err!(oos = "an Arrow IPC file must have at least 1 message (the schema message)")
38
})?;
39
let offset: u64 = last_block
40
.offset
41
.try_into()
42
.map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
43
let meta_data_length: u64 = last_block
44
.meta_data_length
45
.try_into()
46
.map_err(|_| polars_err!(oos = "the block's offset must be a positive number"))?;
47
let body_length: u64 = last_block
48
.body_length
49
.try_into()
50
.map_err(|_| polars_err!(oos = "the block's body length must be a positive number"))?;
51
let offset: u64 = offset + meta_data_length + body_length;
52
53
writer.seek(SeekFrom::Start(offset))?;
54
55
Ok(FileWriter {
56
writer,
57
options,
58
schema: metadata.schema,
59
ipc_fields: metadata.ipc_schema.fields,
60
block_offsets: offset as usize,
61
dictionary_blocks: metadata.dictionaries.unwrap_or_default(),
62
record_blocks: metadata.blocks,
63
state: State::Started, // file already exists, so we are ready
64
dictionary_tracker: DictionaryTracker {
65
dictionaries,
66
cannot_replace: true,
67
},
68
encoded_message: Default::default(),
69
custom_schema_metadata: None,
70
})
71
}
72
}
73
74