Path: blob/main/crates/polars-io/src/utils/compression.rs
8424 views
use std::cmp;1use std::io::{BufRead, Cursor, Read, Write};23use polars_buffer::Buffer;4use polars_core::prelude::*;5use polars_error::{feature_gated, to_compute_err};67use crate::utils::file::{Writeable, WriteableTrait};8#[cfg(feature = "async")]9use crate::utils::stream_buf_reader::ReaderSource;10use crate::utils::sync_on_close::SyncOnCloseType;1112/// Represents the compression algorithms that we have decoders for13#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]14pub enum SupportedCompression {15GZIP,16ZLIB,17ZSTD,18}1920impl SupportedCompression {21/// If the given byte slice starts with the "magic" bytes for a supported compression family, return22/// that family, for unsupported/uncompressed slices, return None.23/// Based on <https://en.wikipedia.org/wiki/List_of_file_signatures>.24pub fn check(bytes: &[u8]) -> Option<Self> {25if bytes.len() < 4 {26// not enough bytes to perform prefix checks27return None;28}29match bytes[..4] {30[0x1f, 0x8b, _, _] => Some(Self::GZIP),31// Different zlib compression levels without preset dictionary.32[0x78, 0x01, _, _] => Some(Self::ZLIB),33[0x78, 0x5e, _, _] => Some(Self::ZLIB),34[0x78, 0x9c, _, _] => Some(Self::ZLIB),35[0x78, 0xda, _, _] => Some(Self::ZLIB),36[0x28, 0xb5, 0x2f, 0xfd] => Some(Self::ZSTD),37_ => None,38}39}40}4142/// Decompress `bytes` if compression is detected, otherwise simply return it.43/// An `out` vec must be given for ownership of the decompressed data.44#[allow(clippy::ptr_arg)]45#[deprecated(note = "may cause OOM, use CompressedReader instead")]46pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec<u8>) -> PolarsResult<&'a [u8]> {47assert!(out.is_empty());4849let Some(algo) = SupportedCompression::check(bytes) else {50return Ok(bytes);51};5253feature_gated!("decompress", {54match algo {55SupportedCompression::GZIP => {56flate2::read::MultiGzDecoder::new(bytes)57.read_to_end(out)58.map_err(to_compute_err)?;59},60SupportedCompression::ZLIB => {61flate2::read::ZlibDecoder::new(bytes)62.read_to_end(out)63.map_err(to_compute_err)?;64},65SupportedCompression::ZSTD => {66zstd::Decoder::with_buffer(bytes)?.read_to_end(out)?;67},68}6970Ok(out)71})72}7374/// Reader that implements a streaming read trait for uncompressed, gzip, zlib and zstd75/// compression.76///77/// This allows handling decompression transparently in a streaming fashion.78pub enum CompressedReader {79Uncompressed {80slice: Buffer<u8>,81offset: usize,82},83#[cfg(feature = "decompress")]84Gzip(flate2::bufread::MultiGzDecoder<Cursor<Buffer<u8>>>),85#[cfg(feature = "decompress")]86Zlib(flate2::bufread::ZlibDecoder<Cursor<Buffer<u8>>>),87#[cfg(feature = "decompress")]88Zstd(zstd::Decoder<'static, Cursor<Buffer<u8>>>),89}9091impl CompressedReader {92pub fn try_new(slice: Buffer<u8>) -> PolarsResult<Self> {93let algo = SupportedCompression::check(&slice);9495Ok(match algo {96None => CompressedReader::Uncompressed { slice, offset: 0 },97#[cfg(feature = "decompress")]98Some(SupportedCompression::GZIP) => {99CompressedReader::Gzip(flate2::bufread::MultiGzDecoder::new(Cursor::new(slice)))100},101#[cfg(feature = "decompress")]102Some(SupportedCompression::ZLIB) => {103CompressedReader::Zlib(flate2::bufread::ZlibDecoder::new(Cursor::new(slice)))104},105#[cfg(feature = "decompress")]106Some(SupportedCompression::ZSTD) => {107CompressedReader::Zstd(zstd::Decoder::with_buffer(Cursor::new(slice))?)108},109#[cfg(not(feature = "decompress"))]110_ => panic!("activate 'decompress' feature"),111})112}113114pub fn is_compressed(&self) -> bool {115!matches!(&self, CompressedReader::Uncompressed { .. })116}117118pub const fn initial_read_size() -> usize {119// We don't want to read too much at the beginning to keep decompression to a minimum if for120// example only the schema is needed or a slice op is used. Keep in sync with121// `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.12232 * 1024123}124125pub const fn ideal_read_size() -> usize {126// Somewhat conservative guess for L2 size, which performs the best on most machines and is127// nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not128// recouped by amortizing the block processing cost even further.129//130// It's possible that callers use or need a larger `read_size` if for example a single row131// doesn't fit in the 512KB.132512 * 1024133}134135/// If possible returns the total number of bytes that will be produced by reading from the136/// start to finish.137pub fn total_len_estimate(&self) -> usize {138const ESTIMATED_DEFLATE_RATIO: usize = 3;139const ESTIMATED_ZSTD_RATIO: usize = 5;140141match self {142CompressedReader::Uncompressed { slice, .. } => slice.len(),143#[cfg(feature = "decompress")]144CompressedReader::Gzip(reader) => {145reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO146},147#[cfg(feature = "decompress")]148CompressedReader::Zlib(reader) => {149reader.get_ref().get_ref().len() * ESTIMATED_DEFLATE_RATIO150},151#[cfg(feature = "decompress")]152CompressedReader::Zstd(reader) => {153reader.get_ref().get_ref().len() * ESTIMATED_ZSTD_RATIO154},155}156}157158/// Reads exactly `read_size` bytes if possible from the internal readers and creates a new159/// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.160///161/// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and162/// this function is called again.163///164/// If the underlying reader is uncompressed the operation is a cheap zero-copy165/// [`Buffer::sliced`] operation.166///167/// By handling slice concatenation at this level we can implement zero-copy reading *and* make168/// the interface easier to use.169///170/// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this171/// function.172pub fn read_next_slice(173&mut self,174prev_leftover: &Buffer<u8>,175read_size: usize,176) -> std::io::Result<(Buffer<u8>, usize)> {177// Assuming that callers of this function correctly handle re-trying, by continuously growing178// prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily179// sized rows.180let prev_len = prev_leftover.len();181182let mut buf = Vec::new();183if self.is_compressed() {184let reserve_size = cmp::min(185prev_len.saturating_add(read_size),186self.total_len_estimate().saturating_mul(2),187);188buf.reserve_exact(reserve_size);189buf.extend_from_slice(prev_leftover);190}191192let new_slice_from_read =193|bytes_read: usize, mut buf: Vec<u8>| -> std::io::Result<(Buffer<u8>, usize)> {194buf.truncate(prev_len + bytes_read);195Ok((Buffer::from_vec(buf), bytes_read))196};197198match self {199CompressedReader::Uncompressed { slice, offset, .. } => {200let bytes_read = cmp::min(read_size, slice.len() - *offset);201let new_slice = slice202.clone()203.sliced(*offset - prev_len..*offset + bytes_read);204*offset += bytes_read;205Ok((new_slice, bytes_read))206},207#[cfg(feature = "decompress")]208CompressedReader::Gzip(decoder) => {209new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)210},211#[cfg(feature = "decompress")]212CompressedReader::Zlib(decoder) => {213new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)214},215#[cfg(feature = "decompress")]216CompressedReader::Zstd(decoder) => {217new_slice_from_read(decoder.take(read_size as u64).read_to_end(&mut buf)?, buf)218},219}220}221}222223/// This implementation is meant for compatibility. Use [`Self::read_next_slice`] for best224/// performance.225impl Read for CompressedReader {226fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {227match self {228CompressedReader::Uncompressed { slice, offset, .. } => {229let bytes_read = cmp::min(buf.len(), slice.len() - *offset);230buf[..bytes_read].copy_from_slice(&slice[*offset..(*offset + bytes_read)]);231*offset += bytes_read;232Ok(bytes_read)233},234#[cfg(feature = "decompress")]235CompressedReader::Gzip(decoder) => decoder.read(buf),236#[cfg(feature = "decompress")]237CompressedReader::Zlib(decoder) => decoder.read(buf),238#[cfg(feature = "decompress")]239CompressedReader::Zstd(decoder) => decoder.read(buf),240}241}242}243244/// A byte source that abstracts over in-memory buffers and streaming245/// readers, with optional transparent decompression and buffering.246///247/// Implements `BufRead`, allowing uniform access regardless of whether248/// the underlying data is an in-memory slice, a raw stream, or a249/// compressed stream (gzip/zlib/zstd).250///251/// This is the generic successor to [`CompressedReader`], which only252/// supports in-memory (`Buffer<u8>`) sources.253#[cfg(feature = "async")]254pub enum ByteSourceReader<R: BufRead> {255UncompressedMemory {256slice: Buffer<u8>,257offset: usize,258},259UncompressedStream(R),260#[cfg(feature = "decompress")]261Gzip(flate2::bufread::MultiGzDecoder<R>),262#[cfg(feature = "decompress")]263Zlib(flate2::bufread::ZlibDecoder<R>),264#[cfg(feature = "decompress")]265Zstd(zstd::Decoder<'static, R>),266}267268#[cfg(feature = "async")]269impl<R: BufRead> ByteSourceReader<R> {270pub fn try_new(reader: R, compression: Option<SupportedCompression>) -> PolarsResult<Self> {271Ok(match compression {272None => Self::UncompressedStream(reader),273#[cfg(feature = "decompress")]274Some(SupportedCompression::GZIP) => {275Self::Gzip(flate2::bufread::MultiGzDecoder::new(reader))276},277#[cfg(feature = "decompress")]278Some(SupportedCompression::ZLIB) => {279Self::Zlib(flate2::bufread::ZlibDecoder::new(reader))280},281#[cfg(feature = "decompress")]282Some(SupportedCompression::ZSTD) => Self::Zstd(zstd::Decoder::with_buffer(reader)?),283#[cfg(not(feature = "decompress"))]284_ => panic!("activate 'decompress' feature"),285})286}287288pub fn is_compressed(&self) -> bool {289!matches!(290&self,291Self::UncompressedMemory { .. } | Self::UncompressedStream(_)292)293}294295pub const fn initial_read_size() -> usize {296// We don't want to read too much at the beginning to keep decompression to a minimum if for297// example only the schema is needed or a slice op is used. Keep in sync with298// `ideal_read_size` so that `initial_read_size * N * 4 == ideal_read_size`.29932 * 1024300}301302pub const fn ideal_read_size() -> usize {303// Somewhat conservative guess for L2 size, which performs the best on most machines and is304// nearly always core exclusive. The loss of going larger and accidentally hitting L3 is not305// recouped by amortizing the block processing cost even further.306//307// It's possible that callers use or need a larger `read_size` if for example a single row308// doesn't fit in the 512KB.309512 * 1024310}311312/// Reads exactly `read_size` bytes if possible from the internal readers and creates a new313/// [`Buffer`] with the content `concat(prev_leftover, new_bytes)`.314///315/// Returns the new slice and the number of bytes read, which will be 0 when eof is reached and316/// this function is called again.317///318/// If the underlying reader is uncompressed the operation is a cheap zero-copy319/// [`Buffer::sliced`] operation.320///321/// By handling slice concatenation at this level we can implement zero-copy reading *and* make322/// the interface easier to use.323///324/// It's a logic bug if `prev_leftover` is neither empty nor the last slice returned by this325/// function.326pub fn read_next_slice(327&mut self,328prev_leftover: &Buffer<u8>,329read_size: usize,330uncompressed_size_hint: Option<usize>,331) -> std::io::Result<(Buffer<u8>, usize)> {332// Assuming that callers of this function correctly handle re-trying, by continuously growing333// prev_leftover if it doesn't contain a single row, this abstraction supports arbitrarily334// sized rows.335let prev_len = prev_leftover.len();336337let reader: &mut dyn Read = match self {338// Zero-copy fast-path — no allocation required339Self::UncompressedMemory { slice, offset } => {340let bytes_read = cmp::min(read_size, slice.len() - *offset);341let new_slice = slice342.clone()343.sliced(*offset - prev_len..*offset + bytes_read);344*offset += bytes_read;345return Ok((new_slice, bytes_read));346},347Self::UncompressedStream(reader) => reader,348#[cfg(feature = "decompress")]349Self::Gzip(reader) => reader,350#[cfg(feature = "decompress")]351Self::Zlib(reader) => reader,352#[cfg(feature = "decompress")]353Self::Zstd(reader) => reader,354};355356let mut buf = Vec::new();357358// Cap the reserve_size, for the scenario where read_size == usize::MAX359let max_reserve_size = uncompressed_size_hint.unwrap_or(4 * 1024 * 1024);360let reserve_size = cmp::min(prev_len.saturating_add(read_size), max_reserve_size);361buf.reserve_exact(reserve_size);362buf.extend_from_slice(prev_leftover);363364let bytes_read = reader.take(read_size as u64).read_to_end(&mut buf)?;365buf.truncate(prev_len + bytes_read);366Ok((Buffer::from_vec(buf), bytes_read))367}368}369370#[cfg(feature = "async")]371impl ByteSourceReader<ReaderSource> {372pub fn from_memory(373slice: Buffer<u8>,374compression: Option<SupportedCompression>,375) -> PolarsResult<Self> {376match compression {377None => Ok(Self::UncompressedMemory { slice, offset: 0 }),378_ => Self::try_new(ReaderSource::Memory(Cursor::new(slice)), compression),379}380}381}382383/// Constructor for `WriteableTrait` compressed encoders.384pub enum CompressedWriter {385#[cfg(feature = "decompress")]386Gzip(Option<flate2::write::GzEncoder<Writeable>>),387#[cfg(feature = "decompress")]388Zstd(Option<zstd::Encoder<'static, Writeable>>),389}390391impl CompressedWriter {392pub fn gzip(writer: Writeable, level: Option<u32>) -> Self {393feature_gated!("decompress", {394Self::Gzip(Some(flate2::write::GzEncoder::new(395writer,396level.map(flate2::Compression::new).unwrap_or_default(),397)))398})399}400401pub fn zstd(writer: Writeable, level: Option<u32>) -> std::io::Result<Self> {402feature_gated!("decompress", {403zstd::Encoder::new(writer, level.unwrap_or(3) as i32)404.map(Some)405.map(Self::Zstd)406})407}408}409410impl Write for CompressedWriter {411fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {412feature_gated!("decompress", {413match self {414Self::Gzip(encoder) => encoder.as_mut().unwrap().write(buf),415Self::Zstd(encoder) => encoder.as_mut().unwrap().write(buf),416}417})418}419420fn flush(&mut self) -> std::io::Result<()> {421feature_gated!("decompress", {422match self {423Self::Gzip(encoder) => encoder.as_mut().unwrap().flush(),424Self::Zstd(encoder) => encoder.as_mut().unwrap().flush(),425}426})427}428}429430impl WriteableTrait for CompressedWriter {431fn close(&mut self) -> std::io::Result<()> {432feature_gated!("decompress", {433let writer = match self {434Self::Gzip(encoder) => encoder.take().unwrap().finish()?,435Self::Zstd(encoder) => encoder.take().unwrap().finish()?,436};437438writer.close(SyncOnCloseType::All)439})440}441442fn sync_all(&self) -> std::io::Result<()> {443feature_gated!("decompress", {444match self {445Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),446Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_all(),447}448})449}450451fn sync_data(&self) -> std::io::Result<()> {452feature_gated!("decompress", {453match self {454Self::Gzip(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),455Self::Zstd(encoder) => encoder.as_ref().unwrap().get_ref().sync_data(),456}457})458}459}460461462