diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index dd2c872ede50..b6b3ab00b64b 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -73,7 +73,7 @@ hashbrown = { version = "0.17", default-features = false } twox-hash = { version = "2.0", default-features = false, features = ["xxhash64"] } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } -crc32fast = { version = "1.4.2", optional = true, default-features = false } +crc32fast = { version = "1.4.2", default-features = false } simdutf8 = { workspace = true , optional = true } ring = { version = "0.17", default-features = false, features = ["std"], optional = true } @@ -120,8 +120,8 @@ async = ["futures", "tokio"] object_store = ["dep:object_store", "async"] # Group Zstd dependencies zstd = ["dep:zstd"] -# Verify 32-bit CRC checksum when decoding parquet pages -crc = ["dep:crc32fast"] +# Verify 32-bit CRC checksum when decoding parquet pages (crc32fast is always available) +crc = [] # Enable SIMD UTF-8 validation simdutf8 = ["dep:simdutf8"] # Enable Parquet modular encryption support diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 145431c26465..9ee2dd3fb8f4 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -136,7 +136,9 @@ impl FallbackEncoder { .encoding(descr.path()) .unwrap_or_else(|| match props.writer_version() { WriterVersion::PARQUET_1_0 => Encoding::PLAIN, - WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY, + WriterVersion::PARQUET_2_0 | WriterVersion::PARQUET_3_0 => { + Encoding::DELTA_BYTE_ARRAY + } }); let encoder = match encoding { diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index aa9cef16c5ad..4873698eac9c 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -875,7 +875,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder { match props.writer_version() { WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level), - WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level), + WriterVersion::PARQUET_2_0 | WriterVersion::PARQUET_3_0 => { + LevelEncoder::v2_streaming(max_level) + } } } @@ -1368,7 +1370,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { CompressedPage::new(data_page, uncompressed_size) } - WriterVersion::PARQUET_2_0 => { + WriterVersion::PARQUET_2_0 | WriterVersion::PARQUET_3_0 => { let mut rep_levels_byte_len = 0; let mut def_levels_byte_len = 0; let mut buffer = vec![]; diff --git a/parquet/src/file/metadata/footer_tail.rs b/parquet/src/file/metadata/footer_tail.rs index c33bc7a25c5a..de50652ef1cc 100644 --- a/parquet/src/file/metadata/footer_tail.rs +++ b/parquet/src/file/metadata/footer_tail.rs @@ -16,20 +16,62 @@ // under the License. use crate::errors::{ParquetError, Result}; -use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; +use crate::file::{ + FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER, PARQUET_MAGIC_PARX, PARX_FOOTER_SIZE, + PARX_KNOWN_FEATURE_FLAGS, +}; -/// Parsed Parquet footer tail (last 8 bytes of a Parquet file) -/// -/// There are 8 bytes at the end of the Parquet footer with the following layout: -/// * 4 bytes for the metadata length -/// * 4 bytes for the magic bytes 'PAR1' or 'PARE' (encrypted footer) +/// Additional metadata stored in the footer of PARX-format files. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ParxInfo { + feature_flags: u32, + expected_crc: u32, + /// The 8 bytes [length(4)][flags(4)] used in CRC computation. + crc_suffix_bytes: [u8; 8], +} + +impl ParxInfo { + /// Returns the feature flags from the PARX footer. + pub fn feature_flags(&self) -> u32 { + self.feature_flags + } + + /// Validates the CRC32 of the given metadata bytes against the expected CRC stored in the footer. + /// + /// CRC covers: `[metadata_bytes][length(4)][flags(4)]` + pub fn validate_crc(&self, metadata_bytes: &[u8]) -> Result<()> { + let mut hasher = crc32fast::Hasher::new(); + hasher.update(metadata_bytes); + hasher.update(&self.crc_suffix_bytes); + let actual_crc = hasher.finalize(); + if actual_crc != self.expected_crc { + return Err(general_err!( + "PARX footer CRC32 mismatch: expected {:#010x}, got {:#010x}", + self.expected_crc, + actual_crc + )); + } + Ok(()) + } +} + +/// Parsed Parquet footer tail. /// +/// PAR1/PARE: last 8 bytes of a Parquet file: /// ```text /// +-----+------------------+ /// | len | 'PAR1' or 'PARE' | /// +-----+------------------+ /// ``` /// +/// PARX: last 16 bytes of a PARX file: +/// ```text +/// +-----+-------+-----+--------+ +/// | len | flags | crc | 'PARX' | +/// +-----+-------+-----+--------+ +/// 4 4 4 4 +/// ``` +/// /// # Examples /// ``` /// # use parquet::file::metadata::FooterTail; @@ -49,14 +91,15 @@ use crate::file::{FOOTER_SIZE, PARQUET_MAGIC, PARQUET_MAGIC_ENCR_FOOTER}; /// assert_eq!(footer_tail.is_encrypted_footer(), true); /// ``` /// -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct FooterTail { metadata_length: usize, encrypted_footer: bool, + parx_info: Option, } impl FooterTail { - /// Try to decode the footer tail from the given 8 bytes + /// Try to decode the footer tail from the given 8 bytes (PAR1/PARE format only). pub fn try_new(slice: &[u8; FOOTER_SIZE]) -> Result { let magic = &slice[4..]; let encrypted_footer = if magic == PARQUET_MAGIC_ENCR_FOOTER { @@ -66,25 +109,87 @@ impl FooterTail { } else { return Err(general_err!("Invalid Parquet file. Corrupt footer")); }; - // get the metadata length from the footer let metadata_len = u32::from_le_bytes(slice[..4].try_into().unwrap()); Ok(FooterTail { - // u32 won't be larger than usize in most cases metadata_length: metadata_len.try_into()?, encrypted_footer, + parx_info: None, }) } - /// The length of the footer metadata in bytes + /// Try to decode the footer tail from a 16-byte PARX footer. + /// + /// Layout: `[len(4)][flags(4)][crc(4)]['PARX'(4)]` + pub fn try_new_parx(slice: &[u8; PARX_FOOTER_SIZE]) -> Result { + let magic = &slice[12..16]; + if magic != PARQUET_MAGIC_PARX { + return Err(general_err!( + "Invalid PARX footer: expected 'PARX' magic, got {:?}", + magic + )); + } + + let metadata_len = u32::from_le_bytes(slice[0..4].try_into().unwrap()); + let feature_flags = u32::from_le_bytes(slice[4..8].try_into().unwrap()); + let expected_crc = u32::from_le_bytes(slice[8..12].try_into().unwrap()); + + // Flags are checked before CRC: future flag values may alter how the CRC is computed, + // so we must know which flags are active before validating the checksum. + let unknown_flags = feature_flags & !PARX_KNOWN_FEATURE_FLAGS; + if unknown_flags != 0 { + let bits: Vec = (0..32) + .filter(|&i| unknown_flags & (1u32 << i) != 0) + .map(|i| format!("{:#010x}", 1u32 << i)) + .collect(); + return Err(general_err!( + "PARX footer contains unknown feature flags: {}", + bits.join(", ") + )); + } + + let encrypted_footer = + feature_flags & crate::file::PARX_FEATURE_FLAG_ENCRYPTED_FOOTER != 0; + + let mut crc_suffix_bytes = [0u8; 8]; + crc_suffix_bytes.copy_from_slice(&slice[0..8]); + + Ok(FooterTail { + metadata_length: metadata_len.try_into()?, + encrypted_footer, + parx_info: Some(ParxInfo { + feature_flags, + expected_crc, + crc_suffix_bytes, + }), + }) + } + + /// The length of the footer metadata in bytes. pub fn metadata_length(&self) -> usize { self.metadata_length } - /// Whether the footer metadata is encrypted + /// Whether the footer metadata is encrypted. pub fn is_encrypted_footer(&self) -> bool { self.encrypted_footer } + + /// The size of the fixed footer tail in bytes (not including variable-length metadata). + /// + /// Returns [`PARX_FOOTER_SIZE`] for PARX files, [`FOOTER_SIZE`] otherwise. + pub fn fixed_footer_size(&self) -> usize { + if self.parx_info.is_some() { + PARX_FOOTER_SIZE + } else { + FOOTER_SIZE + } + } + + /// Returns PARX-specific footer info, if this is a PARX file. + pub fn parx_info(&self) -> Option<&ParxInfo> { + self.parx_info.as_ref() + } } impl TryFrom<[u8; FOOTER_SIZE]> for FooterTail { @@ -99,13 +204,31 @@ impl TryFrom<&[u8]> for FooterTail { type Error = ParquetError; fn try_from(value: &[u8]) -> Result { - if value.len() != FOOTER_SIZE { + let len = value.len(); + if len < FOOTER_SIZE { return Err(general_err!( - "Invalid footer length {}, expected {FOOTER_SIZE}", - value.len() + "Invalid footer length {}, minimum {}", + len, + FOOTER_SIZE )); } - let slice: &[u8; FOOTER_SIZE] = value.try_into().unwrap(); - Self::try_new(slice) + + let magic = &value[len - 4..len]; + + if magic == PARQUET_MAGIC || magic == PARQUET_MAGIC_ENCR_FOOTER { + let slice: &[u8; FOOTER_SIZE] = value[len - FOOTER_SIZE..len].try_into().unwrap(); + return Self::try_new(slice); + } + + if magic == PARQUET_MAGIC_PARX { + if len < PARX_FOOTER_SIZE { + return Err(ParquetError::NeedMoreData(PARX_FOOTER_SIZE)); + } + let slice: &[u8; PARX_FOOTER_SIZE] = + value[len - PARX_FOOTER_SIZE..len].try_into().unwrap(); + return Self::try_new_parx(slice); + } + + Err(general_err!("Invalid Parquet file. Corrupt footer")) } } diff --git a/parquet/src/file/metadata/push_decoder.rs b/parquet/src/file/metadata/push_decoder.rs index 7e4beb5ad9c2..9ec5f4151dbc 100644 --- a/parquet/src/file/metadata/push_decoder.rs +++ b/parquet/src/file/metadata/push_decoder.rs @@ -19,7 +19,7 @@ use crate::DecodeResult; #[cfg(feature = "encryption")] use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; -use crate::file::FOOTER_SIZE; +use crate::file::PARX_FOOTER_SIZE; use crate::file::metadata::parser::{MetadataParser, parse_column_index, parse_offset_index}; use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions}; use crate::file::page_index::index_reader::acc_range; @@ -367,11 +367,12 @@ impl ParquetMetaDataPushDecoder { /// decoded metadata or an error if not enough data is available. pub fn try_decode(&mut self) -> Result> { let file_len = self.buffers.file_len(); - let footer_len = FOOTER_SIZE as u64; + // We always request PARX_FOOTER_SIZE bytes so we can detect PARX magic without + // a second round-trip. For PAR1/PARE files the extra bytes are unused. + let footer_len = PARX_FOOTER_SIZE as u64; loop { match std::mem::replace(&mut self.state, DecodeState::Intermediate) { DecodeState::ReadingFooter => { - // need to have the last 8 bytes of the file to decode the metadata let footer_start = file_len.saturating_sub(footer_len); let footer_range = footer_start..file_len; @@ -387,8 +388,9 @@ impl ParquetMetaDataPushDecoder { } DecodeState::ReadingMetadata(footer_tail) => { + let footer_fixed_size: u64 = footer_tail.fixed_footer_size() as u64; let metadata_len: u64 = footer_tail.metadata_length() as u64; - let metadata_start = file_len - footer_len - metadata_len; + let metadata_start = file_len - footer_fixed_size - metadata_len; let metadata_end = metadata_start + metadata_len; let metadata_range = metadata_start..metadata_end; @@ -397,8 +399,14 @@ impl ParquetMetaDataPushDecoder { return Ok(needs_range(metadata_range)); } + let metadata_bytes = self.get_bytes(&metadata_range)?; + + if let Some(parx_info) = footer_tail.parx_info() { + parx_info.validate_crc(&metadata_bytes)?; + } + let metadata = self.metadata_parser.decode_metadata( - &self.get_bytes(&metadata_range)?, + &metadata_bytes, footer_tail.is_encrypted_footer(), )?; // Note: ReadingPageIndex first checks if page indexes are needed diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 844ae747c7c9..0bc811825cf1 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -18,7 +18,7 @@ #[cfg(feature = "encryption")] use crate::encryption::decrypt::FileDecryptionProperties; use crate::errors::{ParquetError, Result}; -use crate::file::FOOTER_SIZE; +use crate::file::PARX_FOOTER_SIZE; use crate::file::metadata::parser::decode_metadata; use crate::file::metadata::thrift::parquet_schema_from_bytes; use crate::file::metadata::{ @@ -533,20 +533,20 @@ impl ParquetMetaDataReader { // One-shot parse of footer. // Side effect: this will set `self.metadata_size` fn parse_metadata(&mut self, chunk_reader: &R) -> Result { - // check file is large enough to hold footer let file_size = chunk_reader.len(); - if file_size < (FOOTER_SIZE as u64) { - return Err(ParquetError::NeedMoreData(FOOTER_SIZE)); + if file_size < PARX_FOOTER_SIZE as u64 { + return Err(ParquetError::NeedMoreData(PARX_FOOTER_SIZE)); } - let mut footer = [0_u8; FOOTER_SIZE]; + let mut footer = [0_u8; PARX_FOOTER_SIZE]; chunk_reader - .get_read(file_size - FOOTER_SIZE as u64)? + .get_read(file_size - PARX_FOOTER_SIZE as u64)? .read_exact(&mut footer)?; - let footer = FooterTail::try_new(&footer)?; + let footer = FooterTail::try_from(&footer[..])?; let metadata_len = footer.metadata_length(); - let footer_metadata_len = FOOTER_SIZE + metadata_len; + let footer_fixed_size = footer.fixed_footer_size(); + let footer_metadata_len = footer_fixed_size + metadata_len; self.metadata_size = Some(footer_metadata_len); if footer_metadata_len as u64 > file_size { @@ -558,23 +558,23 @@ impl ParquetMetaDataReader { self.decode_footer_metadata(bytes, file_size, footer) } - /// Size of the serialized thrift metadata plus the 8 byte footer. Only set if + /// Size of the serialized thrift metadata plus the footer tail. Only set if /// `self.parse_metadata` is called. pub fn metadata_size(&self) -> Option { self.metadata_size } /// Return the number of bytes to read in the initial pass. If `prefetch_size` has - /// been provided, then return that value if it is larger than the size of the Parquet - /// file footer (8 bytes). Otherwise returns `8`. + /// been provided, then return that value if it is larger than [`PARX_FOOTER_SIZE`]. + /// Otherwise returns [`PARX_FOOTER_SIZE`]. #[cfg(all(feature = "async", feature = "arrow"))] fn get_prefetch_size(&self) -> usize { if let Some(prefetch) = self.prefetch_hint { - if prefetch > FOOTER_SIZE { + if prefetch > PARX_FOOTER_SIZE { return prefetch; } } - FOOTER_SIZE + PARX_FOOTER_SIZE } #[cfg(all(feature = "async", feature = "arrow"))] @@ -585,7 +585,7 @@ impl ParquetMetaDataReader { ) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { let prefetch = self.get_prefetch_size() as u64; - if file_size < FOOTER_SIZE as u64 { + if file_size < PARX_FOOTER_SIZE as u64 { return Err(eof_err!("file size of {} is less than footer", file_size)); } @@ -607,32 +607,30 @@ impl ParquetMetaDataReader { )); } - let mut footer = [0; FOOTER_SIZE]; - footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]); - - let footer = FooterTail::try_new(&footer)?; + let footer = FooterTail::try_from(&suffix[suffix_len - PARX_FOOTER_SIZE..suffix_len])?; let length = footer.metadata_length(); + let footer_fixed_size = footer.fixed_footer_size(); - if file_size < (length + FOOTER_SIZE) as u64 { + if file_size < (length + footer_fixed_size) as u64 { return Err(eof_err!( "file size of {} is less than footer + metadata {}", file_size, - length + FOOTER_SIZE + length + footer_fixed_size )); } // Did not fetch the entire file metadata in the initial read, need to make a second request - if length > suffix_len - FOOTER_SIZE { - let metadata_start = file_size - (length + FOOTER_SIZE) as u64; + if length > suffix_len - footer_fixed_size { + let metadata_start = file_size - (length + footer_fixed_size) as u64; let meta = fetch - .fetch(metadata_start..(file_size - FOOTER_SIZE as u64)) + .fetch(metadata_start..(file_size - footer_fixed_size as u64)) .await?; Ok((self.decode_footer_metadata(meta, file_size, footer)?, None)) } else { - let metadata_start = (file_size - (length + FOOTER_SIZE) as u64 - footer_start) + let metadata_start = (file_size - (length + footer_fixed_size) as u64 - footer_start) .try_into() .expect("metadata length should never be larger than u32"); - let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE); + let slice = suffix.slice(metadata_start..suffix_len - footer_fixed_size); Ok(( self.decode_footer_metadata(slice, file_size, footer)?, Some((footer_start as usize, suffix.slice(..metadata_start))), @@ -650,26 +648,25 @@ impl ParquetMetaDataReader { let suffix = fetch.fetch_suffix(prefetch as _).await?; let suffix_len = suffix.len(); - if suffix_len < FOOTER_SIZE { + if suffix_len < PARX_FOOTER_SIZE { return Err(eof_err!( "footer metadata requires {} bytes, but could only read {}", - FOOTER_SIZE, + PARX_FOOTER_SIZE, suffix_len )); } - let mut footer = [0; FOOTER_SIZE]; - footer.copy_from_slice(&suffix[suffix_len - FOOTER_SIZE..suffix_len]); - - let footer = FooterTail::try_new(&footer)?; + let footer = + FooterTail::try_from(&suffix[suffix_len - PARX_FOOTER_SIZE..suffix_len])?; let length = footer.metadata_length(); + let footer_fixed_size = footer.fixed_footer_size(); // fake file size as we are only parsing the footer metadata here // (cant be parsing page indexes without the full file size) - let file_size = (length + FOOTER_SIZE) as u64; + let file_size = (length + footer_fixed_size) as u64; // Did not fetch the entire file metadata in the initial read, need to make a second request - let metadata_offset = length + FOOTER_SIZE; - if length > suffix_len - FOOTER_SIZE { + let metadata_offset = length + footer_fixed_size; + if length > suffix_len - footer_fixed_size { let meta = fetch.fetch_suffix(metadata_offset).await?; if meta.len() < metadata_offset { @@ -685,7 +682,7 @@ impl ParquetMetaDataReader { Ok((self.decode_footer_metadata(meta, file_size, footer)?, None)) } else { let metadata_start = suffix_len - metadata_offset; - let slice = suffix.slice(metadata_start..suffix_len - FOOTER_SIZE); + let slice = suffix.slice(metadata_start..suffix_len - footer_fixed_size); Ok(( self.decode_footer_metadata(slice, file_size, footer)?, Some((0, suffix.slice(..metadata_start))), @@ -712,14 +709,19 @@ impl ParquetMetaDataReader { file_size: u64, footer_tail: FooterTail, ) -> Result { + if let Some(parx_info) = footer_tail.parx_info() { + parx_info.validate_crc(&buf)?; + } + + let footer_fixed_size = footer_tail.fixed_footer_size() as u64; // The push decoder expects the metadata to be at the end of the file // (... data ...) + (metadata) + (footer) // so we need to provide the starting offset of the metadata // within the file. - let ending_offset = file_size.checked_sub(FOOTER_SIZE as u64).ok_or_else(|| { + let ending_offset = file_size.checked_sub(footer_fixed_size).ok_or_else(|| { general_err!( "file size {file_size} is smaller than footer size {}", - FOOTER_SIZE + footer_fixed_size ) })?; @@ -727,7 +729,7 @@ impl ParquetMetaDataReader { general_err!( "file size {file_size} is smaller than buffer size {} + footer size {}", buf.len(), - FOOTER_SIZE + footer_fixed_size ) })?; @@ -853,7 +855,7 @@ mod tests { let err = ParquetMetaDataReader::new() .parse_metadata(&test_file) .unwrap_err(); - assert!(matches!(err, ParquetError::NeedMoreData(FOOTER_SIZE))); + assert!(matches!(err, ParquetError::NeedMoreData(PARX_FOOTER_SIZE))); } #[test] diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index cd5d617f9372..d1787277b062 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -19,6 +19,7 @@ use crate::file::metadata::thrift::FileMeta; use crate::file::metadata::{ ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData, }; +use crate::file::{PARQUET_MAGIC_PARX, PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED}; use crate::schema::types::{SchemaDescPtr, SchemaDescriptor}; use crate::{ basic::ColumnOrder, @@ -32,6 +33,7 @@ use crate::{ }, file::column_crypto_metadata::ColumnCryptoMetaData, file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm, FileCryptoMetaData}, + file::PARX_FEATURE_FLAG_ENCRYPTED_FOOTER, }; use crate::{errors::Result, file::page_index::column_index::ColumnIndexMetaData}; @@ -49,6 +51,27 @@ use crate::{ use std::io::Write; use std::sync::Arc; +fn write_parx_footer( + buf: &mut impl Write, + metadata_bytes: &[u8], + flags: u32, +) -> Result<()> { + let len_bytes = (metadata_bytes.len() as u32).to_le_bytes(); + let flag_bytes = flags.to_le_bytes(); + + let mut hasher = crc32fast::Hasher::new(); + hasher.update(metadata_bytes); + hasher.update(&len_bytes); + hasher.update(&flag_bytes); + let crc = hasher.finalize(); + + buf.write_all(&len_bytes)?; + buf.write_all(&flag_bytes)?; + buf.write_all(&crc.to_le_bytes())?; + buf.write_all(&PARQUET_MAGIC_PARX)?; + Ok(()) +} + /// Writes `crate::file::metadata` structures to a thrift encoded byte stream /// /// See [`ParquetMetaDataWriter`] for background and example. @@ -63,6 +86,7 @@ pub(crate) struct ThriftMetadataWriter<'a, W: Write> { object_writer: MetadataObjectWriter, writer_version: i32, write_path_in_schema: bool, + use_parx: bool, } impl<'a, W: Write> ThriftMetadataWriter<'a, W> { @@ -265,15 +289,23 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { // Write file metadata let start_pos = self.buf.bytes_written(); - self.object_writer - .write_file_metadata(&file_meta, &mut self.buf)?; - let end_pos = self.buf.bytes_written(); - // Write footer - let metadata_len = (end_pos - start_pos) as u32; - - self.buf.write_all(&metadata_len.to_le_bytes())?; - self.buf.write_all(self.object_writer.get_file_magic())?; + if self.use_parx { + let mut temp_buf = TrackedWrite::new(Vec::::new()); + self.object_writer + .write_file_metadata(&file_meta, &mut temp_buf)?; + let metadata_bytes = temp_buf.into_inner()?; + self.buf.write_all(&metadata_bytes)?; + let flags = self.object_writer.get_parx_feature_flags(self.write_path_in_schema); + write_parx_footer(&mut self.buf, &metadata_bytes, flags)?; + } else { + self.object_writer + .write_file_metadata(&file_meta, &mut self.buf)?; + let end_pos = self.buf.bytes_written(); + let metadata_len = (end_pos - start_pos) as u32; + self.buf.write_all(&metadata_len.to_le_bytes())?; + self.buf.write_all(self.object_writer.get_file_magic())?; + } // If row group metadata was encrypted, we replace the encrypted row groups with // unencrypted metadata before it is returned to users. This allows the metadata @@ -308,9 +340,15 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> { object_writer: Default::default(), writer_version, write_path_in_schema, + use_parx: false, } } + pub fn with_parx(mut self, use_parx: bool) -> Self { + self.use_parx = use_parx; + self + } + pub fn with_column_indexes( mut self, column_indexes: Vec>>, @@ -607,6 +645,14 @@ impl MetadataObjectWriter { pub fn get_file_magic(&self) -> &[u8; 4] { get_file_magic() } + + fn get_parx_feature_flags(&self, write_path_in_schema: bool) -> u32 { + if !write_path_in_schema { + PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED + } else { + 0 + } + } } /// Implementations of [`MetadataObjectWriter`] methods that rely on encryption being enabled @@ -729,6 +775,17 @@ impl MetadataObjectWriter { ) } + fn get_parx_feature_flags(&self, write_path_in_schema: bool) -> u32 { + let mut flags = 0u32; + if matches!(&self.file_encryptor, Some(e) if e.properties().encrypt_footer()) { + flags |= PARX_FEATURE_FLAG_ENCRYPTED_FOOTER; + } + if !write_path_in_schema { + flags |= PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED; + } + flags + } + fn write_thrift_object_with_encryption( object: &impl WriteThrift, mut sink: impl Write, diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 09036cd7d7b9..7bbd63b28b86 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -107,7 +107,19 @@ pub mod serialized_reader; pub mod statistics; pub mod writer; -/// The length of the parquet footer in bytes +/// The length of the parquet footer in bytes (PAR1/PARE format) pub const FOOTER_SIZE: usize = 8; +/// The length of the PARX footer fixed tail in bytes (not including variable-length metadata) +pub const PARX_FOOTER_SIZE: usize = 16; const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1']; const PARQUET_MAGIC_ENCR_FOOTER: [u8; 4] = [b'P', b'A', b'R', b'E']; +pub(crate) const PARQUET_MAGIC_PARX: [u8; 4] = [b'P', b'A', b'R', b'X']; +/// Feature flag: footer uses modular encryption (equivalent to PARE) +pub(crate) const PARX_FEATURE_FLAG_ENCRYPTED_FOOTER: u32 = 0x0001; +/// Feature flag: column `path_in_schema` fields are omitted from the schema (write_path_in_schema=false) +pub(crate) const PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED: u32 = 0x0002; +pub(crate) const PARX_KNOWN_FEATURE_FLAGS: u32 = + PARX_FEATURE_FLAG_ENCRYPTED_FOOTER | PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED; +/// Mask of feature flags that represent structural changes requiring PARX format. +/// Excludes `ENCRYPTED_FOOTER` because encryption can be expressed without PARX via the PARE format. +pub(crate) const PARX_STRUCTURAL_FEATURE_FLAGS: u32 = PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED; diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 19191f601846..f3658048850a 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -22,6 +22,7 @@ use crate::compression::{CodecOptions, CodecOptionsBuilder}; use crate::encryption::encrypt::FileEncryptionProperties; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{KeyValue, SortingColumn}; +use crate::file::{PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED, PARX_STRUCTURAL_FEATURE_FLAGS}; use crate::schema::types::ColumnPath; use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; @@ -141,6 +142,10 @@ pub enum WriterVersion { PARQUET_1_0, /// Parquet format version 2.0 PARQUET_2_0, + /// Parquet format version 3.0 + /// + /// Automatically enables the PARX file format. + PARQUET_3_0, } impl WriterVersion { @@ -149,6 +154,7 @@ impl WriterVersion { match self { WriterVersion::PARQUET_1_0 => 1, WriterVersion::PARQUET_2_0 => 2, + WriterVersion::PARQUET_3_0 => 3, } } } @@ -160,6 +166,7 @@ impl FromStr for WriterVersion { match s { "PARQUET_1_0" | "parquet_1_0" => Ok(WriterVersion::PARQUET_1_0), "PARQUET_2_0" | "parquet_2_0" => Ok(WriterVersion::PARQUET_2_0), + "PARQUET_3_0" | "parquet_3_0" => Ok(WriterVersion::PARQUET_3_0), _ => Err(format!("Invalid writer version: {s}")), } } @@ -256,6 +263,7 @@ pub struct WriterProperties { coerce_types: bool, content_defined_chunking: Option, write_path_in_schema: bool, + use_parx_format: bool, #[cfg(feature = "encryption")] pub(crate) file_encryption_properties: Option>, } @@ -449,6 +457,19 @@ impl WriterProperties { self.write_path_in_schema } + /// Returns whether the PARX file format is enabled. + /// + /// This value is resolved at [`WriterPropertiesBuilder::build`] time. It defaults to `true` + /// when writer version is 3 or greater, or when [`write_path_in_schema`] is `false` (both + /// are PARX-only features). It can be overridden with + /// [`WriterPropertiesBuilder::with_parx_format`], but setting it to `false` while + /// `write_path_in_schema` is also `false` is a build-time panic. + /// + /// [`write_path_in_schema`]: WriterProperties::write_path_in_schema + pub fn use_parx_format(&self) -> bool { + self.use_parx_format + } + /// EXPERIMENTAL: Returns content-defined chunking options, or `None` if CDC is disabled. /// /// For more details see [`WriterPropertiesBuilder::set_content_defined_chunking`] @@ -605,6 +626,7 @@ pub struct WriterPropertiesBuilder { coerce_types: bool, content_defined_chunking: Option, write_path_in_schema: bool, + use_parx_format: bool, #[cfg(feature = "encryption")] file_encryption_properties: Option>, } @@ -630,6 +652,7 @@ impl Default for WriterPropertiesBuilder { coerce_types: DEFAULT_COERCE_TYPES, content_defined_chunking: None, write_path_in_schema: DEFAULT_WRITE_PATH_IN_SCHEMA, + use_parx_format: false, #[cfg(feature = "encryption")] file_encryption_properties: None, } @@ -667,6 +690,23 @@ impl WriterPropertiesBuilder { props.resolve_bloom_filter_ndv(default_ndv); } + // Compute which structural PARX flags (those that cannot be expressed in PAR1/PARE) + // the current settings would require. Add new PARX-only features here as they are + // introduced; the conflict check below catches them automatically. + let mut required_structural_flags = 0u32; + if !self.write_path_in_schema { + required_structural_flags |= PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED; + } + + if !self.use_parx_format && (required_structural_flags & PARX_STRUCTURAL_FEATURE_FLAGS) != 0 { + panic!( + "Invalid WriterProperties: PARX format is disabled but PARX-only features are \ + in use (structural flags: {:#010x}). Either enable PARX format or disable the \ + conflicting features.", + required_structural_flags & PARX_STRUCTURAL_FEATURE_FLAGS + ); + } + WriterProperties { data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, @@ -685,6 +725,7 @@ impl WriterPropertiesBuilder { coerce_types: self.coerce_types, content_defined_chunking: self.content_defined_chunking, write_path_in_schema: self.write_path_in_schema, + use_parx_format: self.use_parx_format, #[cfg(feature = "encryption")] file_encryption_properties: self.file_encryption_properties, } @@ -693,6 +734,16 @@ impl WriterPropertiesBuilder { // ---------------------------------------------------------------------- // Writer properties related to a file + /// Enables or disables PARX format writing (defaults to `false`). + /// + /// When enabled, files use the `PARX` magic number instead of `PAR1`/`PARE`. + /// The PARX footer includes feature flags, spec version, and a CRC32 checksum. + /// Requires the `parx` feature to be enabled at compile time. + pub fn with_parx_format(mut self, use_parx_format: bool) -> Self { + self.use_parx_format = use_parx_format; + self + } + /// Sets the `WriterVersion` written into the parquet metadata (defaults to [`PARQUET_1_0`] /// via [`DEFAULT_WRITER_VERSION`]) /// @@ -700,6 +751,10 @@ impl WriterPropertiesBuilder { /// /// [`PARQUET_1_0`]: [WriterVersion::PARQUET_1_0] pub fn set_writer_version(mut self, value: WriterVersion) -> Self { + if value.as_num() >= 3 { + self.use_parx_format = true; + self.write_path_in_schema = false; + } self.writer_version = value; self } @@ -933,6 +988,9 @@ impl WriterPropertiesBuilder { /// /// [GH-563]: https://github.com/apache/parquet-format/issues/563 pub fn set_write_path_in_schema(mut self, write_path_in_schema: bool) -> Self { + if !write_path_in_schema { + self.use_parx_format = true; + } self.write_path_in_schema = write_path_in_schema; self } @@ -1348,6 +1406,7 @@ impl From for WriterPropertiesBuilder { coerce_types: props.coerce_types, content_defined_chunking: props.content_defined_chunking, write_path_in_schema: props.write_path_in_schema, + use_parx_format: props.use_parx_format, #[cfg(feature = "encryption")] file_encryption_properties: props.file_encryption_properties, } @@ -2169,6 +2228,17 @@ mod tests { ); } + #[test] + #[should_panic(expected = "PARX format is disabled but PARX-only features are in use")] + fn test_writer_properties_panic_on_parx_disabled_with_path_in_schema_omitted() { + // set_write_path_in_schema(false) eagerly enables PARX; calling with_parx_format(false) + // after tries to override that, and build() catches the resulting conflict. + WriterProperties::builder() + .set_write_path_in_schema(false) + .with_parx_format(false) + .build(); + } + #[test] #[should_panic( expected = "data_page_v2_compression_ratio_threshold must be a positive finite number" diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 8ec16ba36739..8089623765a2 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -310,7 +310,11 @@ impl SerializedFileWriter { /// Writes magic bytes at the beginning of the file. #[cfg(not(feature = "encryption"))] - fn start_file(_properties: &WriterPropertiesPtr, buf: &mut TrackedWrite) -> Result<()> { + fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite) -> Result<()> { + if properties.use_parx_format() { + buf.write_all(&crate::file::PARQUET_MAGIC_PARX)?; + return Ok(()); + } buf.write_all(get_file_magic())?; Ok(()) } @@ -318,8 +322,11 @@ impl SerializedFileWriter { /// Writes magic bytes at the beginning of the file. #[cfg(feature = "encryption")] fn start_file(properties: &WriterPropertiesPtr, buf: &mut TrackedWrite) -> Result<()> { + if properties.use_parx_format() { + buf.write_all(&crate::file::PARQUET_MAGIC_PARX)?; + return Ok(()); + } let magic = get_file_magic(properties.file_encryption_properties.as_ref()); - buf.write_all(magic)?; Ok(()) } @@ -346,12 +353,16 @@ impl SerializedFileWriter { let offset_indexes = std::mem::take(&mut self.offset_indexes); let write_path_in_schema = self.props.write_path_in_schema(); + let writer_version_num = { + let v = self.props.writer_version().as_num(); + if !write_path_in_schema && v < 3 { 3 } else { v } + }; let mut encoder = ThriftMetadataWriter::new( &mut self.buf, &self.descr, row_groups, Some(self.props.created_by().to_string()), - self.props.writer_version().as_num(), + writer_version_num, write_path_in_schema, ); @@ -360,6 +371,8 @@ impl SerializedFileWriter { encoder = encoder.with_file_encryptor(self.file_encryptor.clone()); } + encoder = encoder.with_parx(self.props.use_parx_format()); + if let Some(key_value_metadata) = key_value_metadata { encoder = encoder.with_key_value_metadata(key_value_metadata) } @@ -1090,12 +1103,20 @@ mod tests { reader::{FileReader, SerializedFileReader, SerializedPageReader}, statistics::Statistics, }; + use crate::file::{PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED, PARX_FOOTER_SIZE}; use crate::record::{Row, RowAccessor}; use crate::schema::parser::parse_message_type; use crate::schema::types; use crate::schema::types::{ColumnDescriptor, ColumnPath}; use crate::util::test_common::file_util::get_test_file; use crate::util::test_common::rand_gen::RandGen; + #[cfg(feature = "encryption")] + use crate::{ + encryption::decrypt::FileDecryptionProperties, + encryption::encrypt::FileEncryptionProperties, + file::metadata::ParquetMetaDataReader, + file::PARX_FEATURE_FLAG_ENCRYPTED_FOOTER, + }; #[test] fn test_row_group_writer_error_not_all_columns_written() { @@ -2536,4 +2557,238 @@ mod tests { } writer.close().unwrap(); } + + fn make_test_schema() -> Arc { + Arc::new( + types::Type::group_type_builder("schema") + .with_fields(vec![Arc::new( + types::Type::primitive_type_builder("col1", Type::INT32) + .with_repetition(Repetition::REQUIRED) + .build() + .unwrap(), + )]) + .build() + .unwrap(), + ) + } + + fn write_parx_file(schema: Arc, props: WriterProperties) -> Vec { + let buf = Vec::::new(); + let mut writer = SerializedFileWriter::new(buf, schema, Arc::new(props)).unwrap(); + let mut row_group = writer.next_row_group().unwrap(); + let mut col_writer = row_group.next_column().unwrap().unwrap(); + col_writer + .typed::() + .write_batch(&[1, 2, 3], None, None) + .unwrap(); + col_writer.close().unwrap(); + row_group.close().unwrap(); + writer.into_inner().unwrap() + } + + #[test] + fn test_parx_unencrypted_roundtrip() { + + let schema = make_test_schema(); + let props = WriterProperties::builder() + .with_parx_format(true) + .build(); + let data = write_parx_file(schema, props); + + assert_eq!(&data[..4], b"PARX", "Expected PARX magic at start"); + assert_eq!( + &data[data.len() - 4..], + b"PARX", + "Expected PARX magic at end" + ); + + let reader = SerializedFileReader::new(Bytes::from(data)).unwrap(); + let meta = reader.metadata(); + assert_eq!(meta.num_row_groups(), 1); + assert_eq!(meta.row_group(0).num_rows(), 3); + + let row_group_reader = reader.get_row_group(0).unwrap(); + let col_reader = row_group_reader.get_column_reader(0).unwrap(); + let mut typed = get_typed_column_reader::(col_reader); + let mut values = Vec::new(); + let (num_records, _, _) = typed.read_records(3, None, None, &mut values).unwrap(); + assert_eq!(num_records, 3); + assert_eq!(values, vec![1, 2, 3]); + } + + #[test] + fn test_parx_unknown_feature_flag_rejected() { + let schema = make_test_schema(); + let props = WriterProperties::builder() + .with_parx_format(true) + .build(); + let mut data = write_parx_file(schema, props); + + // Set bit 31 (the maximum bit) as an unknown feature flag. + // Using the highest bit makes this test resilient to new low-order flags being added. + // flags are at: [len - PARX_FOOTER_SIZE + 4 .. + 8] (little-endian u32) + let flags_offset = data.len() - PARX_FOOTER_SIZE + 4; + data[flags_offset + 3] |= 0x80; // sets bit 31 = 0x80000000 + + let err = SerializedFileReader::new(Bytes::from(data)).err().unwrap(); + let msg = err.to_string(); + assert!( + msg.contains("unknown feature flags"), + "Expected unknown feature flag error, got: {msg}" + ); + assert!( + msg.contains("0x80000000"), + "Expected flags listed in error message, got: {msg}" + ); + } + + #[test] + fn test_parx_crc_corruption_detected() { + let schema = make_test_schema(); + let props = WriterProperties::builder() + .with_parx_format(true) + .build(); + let data = write_parx_file(schema, props); + let n = data.len(); + let metadata_end = n - PARX_FOOTER_SIZE; + // Derive metadata_start from the length stored in the footer (bytes 0..4 of the tail). + let metadata_len = + u32::from_le_bytes(data[metadata_end..metadata_end + 4].try_into().unwrap()) as usize; + let metadata_start = metadata_end - metadata_len; + + // Layout of bytes covered by the CRC: + // [metadata_start .. metadata_end] — thrift metadata + // [metadata_end .. metadata_end + 8] — crc_suffix: len(4)+flags(4) + // [metadata_end+8 .. metadata_end + 12] — stored CRC (mutating it changes expected_crc) + // The end PARX magic [metadata_end+12..n] is not covered by the CRC. + for offset in metadata_start..(metadata_end + 12) { + let in_len_range = offset >= metadata_end && offset < metadata_end + 4; + let in_flags_range = offset >= metadata_end + 4 && offset < metadata_end + 8; + let is_flags_byte_0 = offset == metadata_end + 4; + // For flags byte 0: flip bit 0 (a known flag) — keeps flags within known values + // while still corrupting the CRC input, so the CRC error fires. + // For other flag bytes: any non-zero flip sets unknown flag bits, so the + // unknown-flags check fires before CRC (flags are checked first because future + // flags may influence CRC computation). + let mask: u8 = if is_flags_byte_0 { 0x01 } else { 0xFF }; + + let mut corrupted = data.clone(); + corrupted[offset] ^= mask; + let err = SerializedFileReader::new(Bytes::from(corrupted)) + .err() + .unwrap_or_else(|| panic!("Expected error at offset {offset}, got Ok")); + + // Length bytes govern how many bytes are read as metadata; a corrupted length + // may exceed the file size and cause an EOF before CRC is checked. + // Flag bytes 1-3 trigger the unknown-flags check before CRC. + // All other bytes in the CRC range must produce a specific CRC32 error. + let expect_crc = !in_len_range && (!in_flags_range || is_flags_byte_0); + if expect_crc { + let msg = err.to_string(); + assert!( + msg.contains("CRC32"), + "Expected CRC32 error at offset {offset}, got: {msg}" + ); + } + } + } + + #[test] + fn test_parx_enabled_by_writer_version_3() { + let schema = make_test_schema(); + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_3_0) + .build(); + assert!(props.use_parx_format()); + let data = write_parx_file(schema, props); + assert_eq!(&data[..4], b"PARX"); + assert_eq!(&data[data.len() - 4..], b"PARX"); + } + + #[test] + fn test_parx_path_in_schema_omitted_flag() { + // write_path_in_schema=false should auto-enable PARX and set PATH_IN_SCHEMA_OMITTED + let schema = make_test_schema(); + let props = WriterProperties::builder() + .set_write_path_in_schema(false) + .build(); + assert!(props.use_parx_format()); + assert!(!props.write_path_in_schema()); + let data = write_parx_file(schema, props); + + assert_eq!(&data[..4], b"PARX", "Expected PARX magic at start"); + let flags_offset = data.len() - PARX_FOOTER_SIZE + 4; + let flags = + u32::from_le_bytes(data[flags_offset..flags_offset + 4].try_into().unwrap()); + assert_ne!( + flags & PARX_FEATURE_FLAG_PATH_IN_SCHEMA_OMITTED, + 0, + "Expected PATH_IN_SCHEMA_OMITTED flag to be set, got flags={flags:#010x}" + ); + + // Read back and verify data is accessible + let reader = SerializedFileReader::new(Bytes::from(data)).unwrap(); + assert_eq!(reader.metadata().num_row_groups(), 1); + } + + #[test] + fn test_parx_disabled_on_v3_with_explicit_write_path() { + // v3 sets parx=true and write_path_in_schema=false by default, + // but both can be overridden explicitly to produce a PAR1 file. + let schema = make_test_schema(); + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_3_0) + .with_parx_format(false) + .set_write_path_in_schema(true) + .build(); + assert!(!props.use_parx_format()); + assert!(props.write_path_in_schema()); + let data = write_parx_file(schema, props); + + assert_eq!(&data[..4], b"PAR1", "Expected PAR1 magic, not PARX"); + assert_eq!(&data[data.len() - 4..], b"PAR1"); + } + + #[test] + #[cfg(feature = "encryption")] + fn test_parx_encrypted_footer_roundtrip() { + let footer_key = b"0123456789012345".to_vec(); + let encryption_properties = FileEncryptionProperties::builder(footer_key.clone()) + .build() + .unwrap(); + let schema = make_test_schema(); + let props = WriterProperties::builder() + .with_parx_format(true) + .with_file_encryption_properties(encryption_properties) + .build(); + let data = write_parx_file(schema, props); + + assert_eq!(&data[..4], b"PARX", "Expected PARX magic at start"); + assert_eq!( + &data[data.len() - 4..], + b"PARX", + "Expected PARX magic at end" + ); + + // Verify the encrypted footer feature flag is set + let flags_offset = data.len() - PARX_FOOTER_SIZE + 4; + let flags = + u32::from_le_bytes(data[flags_offset..flags_offset + 4].try_into().unwrap()); + assert_ne!( + flags & PARX_FEATURE_FLAG_ENCRYPTED_FOOTER, + 0, + "Expected encrypted footer flag to be set, got flags={flags:#010x}" + ); + + // Decrypt and parse metadata, verify row group count and row count + let decryption_properties = FileDecryptionProperties::builder(footer_key) + .build() + .unwrap(); + let meta = ParquetMetaDataReader::new() + .with_decryption_properties(Some(decryption_properties)) + .parse_and_finish(&Bytes::from(data)) + .unwrap(); + assert_eq!(meta.num_row_groups(), 1); + assert_eq!(meta.row_group(0).num_rows(), 3); + } }