Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ hashbrown = { version = "0.17", default-features = false }
twox-hash = { version = "2.0", default-features = false, features = ["xxhash64"] }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }
crc32fast = { version = "1.4.2", default-features = false }
simdutf8 = { workspace = true , optional = true }
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }

Expand Down Expand Up @@ -120,8 +120,8 @@ async = ["futures", "tokio"]
object_store = ["dep:object_store", "async"]
# Group Zstd dependencies
zstd = ["dep:zstd"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["dep:crc32fast"]
# Verify 32-bit CRC checksum when decoding parquet pages (crc32fast is always available)
crc = []
# Enable SIMD UTF-8 validation
simdutf8 = ["dep:simdutf8"]
# Enable Parquet modular encryption support
Expand Down
4 changes: 3 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ impl FallbackEncoder {
.encoding(descr.path())
.unwrap_or_else(|| match props.writer_version() {
WriterVersion::PARQUET_1_0 => Encoding::PLAIN,
WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY,
WriterVersion::PARQUET_2_0 | WriterVersion::PARQUET_3_0 => {
Encoding::DELTA_BYTE_ARRAY
}
});

let encoder = match encoding {
Expand Down
6 changes: 4 additions & 2 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
match props.writer_version() {
WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level),
WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
WriterVersion::PARQUET_2_0 | WriterVersion::PARQUET_3_0 => {
LevelEncoder::v2_streaming(max_level)
}
}
}

Expand Down Expand Up @@ -1368,7 +1370,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

CompressedPage::new(data_page, uncompressed_size)
}
WriterVersion::PARQUET_2_0 => {
WriterVersion::PARQUET_2_0 | WriterVersion::PARQUET_3_0 => {
let mut rep_levels_byte_len = 0;
let mut def_levels_byte_len = 0;
let mut buffer = vec![];
Expand Down
157 changes: 140 additions & 17 deletions parquet/src/file/metadata/footer_tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,62 @@
// under the License.

use crate::errors::{ParquetError, Result};
use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
use crate::file::{
FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER, PARQUET_MAGIC_PARX, PARX_FOOTER_SIZE,
PARX_KNOWN_FEATURE_FLAGS,
};

/// Parsed Parquet footer tail (last 8 bytes of a Parquet file)
///
/// There are 8 bytes at the end of the Parquet footer with the following layout:
/// * 4 bytes for the metadata length
/// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer)
/// Additional metadata stored in the footer of PARX-format files.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParxInfo {
feature_flags: u32,
expected_crc: u32,
/// The 8 bytes [length(4)][flags(4)] used in CRC computation.
crc_suffix_bytes: [u8; 8],
}

impl ParxInfo {
/// Returns the feature flags from the PARX footer.
pub fn feature_flags(&self) -> u32 {
self.feature_flags
}

/// Validates the CRC32 of the given metadata bytes against the expected CRC stored in the footer.
///
/// CRC covers: `[metadata_bytes][length(4)][flags(4)]`
pub fn validate_crc(&self, metadata_bytes: &[u8]) -> Result<()> {
let mut hasher = crc32fast::Hasher::new();
hasher.update(metadata_bytes);
hasher.update(&self.crc_suffix_bytes);
let actual_crc = hasher.finalize();
if actual_crc != self.expected_crc {
return Err(general_err!(
"PARX footer CRC32 mismatch: expected {:#010x}, got {:#010x}",
self.expected_crc,
actual_crc
));
}
Ok(())
}
}

/// Parsed Parquet footer tail.
///
/// PAR1/PARE: last 8 bytes of a Parquet file:
/// ```text
/// +-----+------------------+
/// | len | 'PAR1' or 'PARE' |
/// +-----+------------------+
/// ```
///
/// PARX: last 16 bytes of a PARX file:
/// ```text
/// +-----+-------+-----+--------+
/// | len | flags | crc | 'PARX' |
/// +-----+-------+-----+--------+
/// 4 4 4 4
/// ```
///
/// # Examples
/// ```
/// # use parquet::file::metadata::FooterTail;
Expand All @@ -49,14 +91,15 @@ use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER};
/// assert_eq!(footer_tail.is_encrypted_footer(), true);
/// ```
///
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FooterTail {
metadata_length: usize,
encrypted_footer: bool,
parx_info: Option<ParxInfo>,
}

impl FooterTail {
/// Try to decode the footer tail from the given 8 bytes
/// Try to decode the footer tail from the given 8 bytes (PAR1/PARE format only).
pub fn try_new(slice: &[u8; FOOTER_SIZE]) -> Result<FooterTail> {
let magic = &slice[4..];
let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER {
Expand All @@ -66,25 +109,87 @@ impl FooterTail {
} else {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
};
// get the metadata length from the footer
let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap());

Ok(FooterTail {
// u32 won't be larger than usize in most cases
metadata_length: metadata_len.try_into()?,
encrypted_footer,
parx_info: None,
})
}

/// The length of the footer metadata in bytes
/// Try to decode the footer tail from a 16-byte PARX footer.
///
/// Layout: `[len(4)][flags(4)][crc(4)]['PARX'(4)]`
pub fn try_new_parx(slice: &[u8; PARX_FOOTER_SIZE]) -> Result<FooterTail> {
let magic = &slice[12..16];
if magic != PARQUET_MAGIC_PARX {
return Err(general_err!(
"Invalid PARX footer: expected 'PARX' magic, got {:?}",
magic
));
}

let metadata_len = u32::from_le_bytes(slice[0..4].try_into().unwrap());
let feature_flags = u32::from_le_bytes(slice[4..8].try_into().unwrap());
let expected_crc = u32::from_le_bytes(slice[8..12].try_into().unwrap());

// Flags are checked before CRC: future flag values may alter how the CRC is computed,
// so we must know which flags are active before validating the checksum.
let unknown_flags = feature_flags & !PARX_KNOWN_FEATURE_FLAGS;
if unknown_flags != 0 {
let bits: Vec<String> = (0..32)
.filter(|&i| unknown_flags & (1u32 << i) != 0)
.map(|i| format!("{:#010x}", 1u32 << i))
.collect();
return Err(general_err!(
"PARX footer contains unknown feature flags: {}",
bits.join(", ")
));
}

let encrypted_footer =
feature_flags & crate::file::PARX_FEATURE_FLAG_ENCRYPTED_FOOTER != 0;

let mut crc_suffix_bytes = [0u8; 8];
crc_suffix_bytes.copy_from_slice(&slice[0..8]);

Ok(FooterTail {
metadata_length: metadata_len.try_into()?,
encrypted_footer,
parx_info: Some(ParxInfo {
feature_flags,
expected_crc,
crc_suffix_bytes,
}),
})
}

/// The length of the footer metadata in bytes.
pub fn metadata_length(&self) -> usize {
self.metadata_length
}

/// Whether the footer metadata is encrypted
/// Whether the footer metadata is encrypted.
pub fn is_encrypted_footer(&self) -> bool {
self.encrypted_footer
}

/// The size of the fixed footer tail in bytes (not including variable-length metadata).
///
/// Returns [`PARX_FOOTER_SIZE`] for PARX files, [`FOOTER_SIZE`] otherwise.
pub fn fixed_footer_size(&self) -> usize {
if self.parx_info.is_some() {
PARX_FOOTER_SIZE
} else {
FOOTER_SIZE
}
}

/// Returns PARX-specific footer info, if this is a PARX file.
pub fn parx_info(&self) -> Option<&ParxInfo> {
self.parx_info.as_ref()
}
}

impl TryFrom<[u8; FOOTER_SIZE]> for FooterTail {
Expand All @@ -99,13 +204,31 @@ impl TryFrom<&[u8]> for FooterTail {
type Error = ParquetError;

fn try_from(value: &[u8]) -> Result<Self> {
if value.len() != FOOTER_SIZE {
let len = value.len();
if len < FOOTER_SIZE {
return Err(general_err!(
"Invalid footer length {}, expected {FOOTER_SIZE}",
value.len()
"Invalid footer length {}, minimum {}",
len,
FOOTER_SIZE
));
}
let slice: &[u8; FOOTER_SIZE] = value.try_into().unwrap();
Self::try_new(slice)

let magic = &value[len - 4..len];

if magic == PARQUET_MAGIC || magic == PARQUET_MAGIC_ENCR_FOOTER {
let slice: &[u8; FOOTER_SIZE] = value[len - FOOTER_SIZE..len].try_into().unwrap();
return Self::try_new(slice);
}

if magic == PARQUET_MAGIC_PARX {
if len < PARX_FOOTER_SIZE {
return Err(ParquetError::NeedMoreData(PARX_FOOTER_SIZE));
}
let slice: &[u8; PARX_FOOTER_SIZE] =
value[len - PARX_FOOTER_SIZE..len].try_into().unwrap();
return Self::try_new_parx(slice);
}

Err(general_err!("Invalid Parquet file. Corrupt footer"))
}
}
18 changes: 13 additions & 5 deletions parquet/src/file/metadata/push_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::DecodeResult;
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::FOOTER_SIZE;
use crate::file::PARX_FOOTER_SIZE;
use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index};
use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions};
use crate::file::page_index::index_reader::acc_range;
Expand Down Expand Up @@ -367,11 +367,12 @@ impl ParquetMetaDataPushDecoder {
/// decoded metadata or an error if not enough data is available.
pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>> {
let file_len = self.buffers.file_len();
let footer_len = FOOTER_SIZE as u64;
// We always request PARX_FOOTER_SIZE bytes so we can detect PARX magic without
// a second round-trip. For PAR1/PARE files the extra bytes are unused.
let footer_len = PARX_FOOTER_SIZE as u64;
loop {
match std::mem::replace(&mut self.state, DecodeState::Intermediate) {
DecodeState::ReadingFooter => {
// need to have the last 8 bytes of the file to decode the metadata
let footer_start = file_len.saturating_sub(footer_len);
let footer_range = footer_start..file_len;

Expand All @@ -387,8 +388,9 @@ impl ParquetMetaDataPushDecoder {
}

DecodeState::ReadingMetadata(footer_tail) => {
let footer_fixed_size: u64 = footer_tail.fixed_footer_size() as u64;
let metadata_len: u64 = footer_tail.metadata_length() as u64;
let metadata_start = file_len - footer_len - metadata_len;
let metadata_start = file_len - footer_fixed_size - metadata_len;
let metadata_end = metadata_start + metadata_len;
let metadata_range = metadata_start..metadata_end;

Expand All @@ -397,8 +399,14 @@ impl ParquetMetaDataPushDecoder {
return Ok(needs_range(metadata_range));
}

let metadata_bytes = self.get_bytes(&metadata_range)?;

if let Some(parx_info) = footer_tail.parx_info() {
parx_info.validate_crc(&metadata_bytes)?;
}

let metadata = self.metadata_parser.decode_metadata(
&self.get_bytes(&metadata_range)?,
&metadata_bytes,
footer_tail.is_encrypted_footer(),
)?;
// Note: ReadingPageIndex first checks if page indexes are needed
Expand Down
Loading
Loading