From 512fb8de0df931347aef24663ffebd9bb1a923ca Mon Sep 17 00:00:00 2001 From: mohsaka <135669458+mohsaka@users.noreply.github.com> Date: Thu, 21 May 2026 16:04:39 -0700 Subject: [PATCH] refactor: Refactor and comment cleanup of velox/dwio iceberg related files. Co-authored-by: Ping Liu --- velox/dwio/parquet/common/CMakeLists.txt | 2 +- .../dwio/parquet/writer/arrow/ColumnWriter.h | 2 +- velox/dwio/parquet/writer/arrow/FileWriter.h | 8 +- velox/dwio/parquet/writer/arrow/Properties.h | 222 ++++++++---------- 4 files changed, 110 insertions(+), 124 deletions(-) diff --git a/velox/dwio/parquet/common/CMakeLists.txt b/velox/dwio/parquet/common/CMakeLists.txt index 4f1256edd75..9b99962af58 100644 --- a/velox/dwio/parquet/common/CMakeLists.txt +++ b/velox/dwio/parquet/common/CMakeLists.txt @@ -15,9 +15,9 @@ velox_add_library( velox_dwio_parquet_common BloomFilter.cpp - XxHasher.cpp LevelComparison.cpp LevelConversion.cpp + XxHasher.cpp HEADERS BitStreamUtilsInternal.h BloomFilter.h diff --git a/velox/dwio/parquet/writer/arrow/ColumnWriter.h b/velox/dwio/parquet/writer/arrow/ColumnWriter.h index 52699ca2969..44cd717ba9f 100644 --- a/velox/dwio/parquet/writer/arrow/ColumnWriter.h +++ b/velox/dwio/parquet/writer/arrow/ColumnWriter.h @@ -270,7 +270,7 @@ class TypedColumnWriter : public ColumnWriter { int64_t validBitsOffset, const T* values) = 0; - // Estimated size of the values that are not written to a page yet + // Estimated size of the values that are not written to a page yet. virtual int64_t estimatedBufferedValueBytes() const = 0; }; diff --git a/velox/dwio/parquet/writer/arrow/FileWriter.h b/velox/dwio/parquet/writer/arrow/FileWriter.h index 5e466929769..6e384c97ffc 100644 --- a/velox/dwio/parquet/writer/arrow/FileWriter.h +++ b/velox/dwio/parquet/writer/arrow/FileWriter.h @@ -53,11 +53,11 @@ class PARQUET_EXPORT RowGroupWriter { virtual int currentColumn() const = 0; virtual void close() = 0; - /// \brief Total uncompressed bytes written by the page writer + /// \brief Total uncompressed bytes written by the page writer. virtual int64_t totalBytesWritten() const = 0; - /// \brief Total bytes still compressed but not written by the page writer + /// \brief Total bytes still compressed but not written by the page writer. virtual int64_t totalCompressedBytes() const = 0; - /// \brief Total compressed bytes written by the page writer + /// \brief Total compressed bytes written by the page writer. virtual int64_t totalCompressedBytesWritten() const = 0; virtual bool buffered() const = 0; @@ -98,7 +98,7 @@ class PARQUET_EXPORT RowGroupWriter { /// \brief Total bytes still compressed but not written by the page writer. /// It will always return 0 from the SerializedPageWriter. int64_t totalCompressedBytes() const; - /// \brief Total compressed bytes written by the page writer + /// \brief Total compressed bytes written by the page writer. int64_t totalCompressedBytesWritten() const; /// Returns whether the current RowGroupWriter is in the buffered mode and is /// created by calling ParquetFileWriter::appendBufferedRowGroup(). diff --git a/velox/dwio/parquet/writer/arrow/Properties.h b/velox/dwio/parquet/writer/arrow/Properties.h index 60cd17ff426..2b00324f92b 100644 --- a/velox/dwio/parquet/writer/arrow/Properties.h +++ b/velox/dwio/parquet/writer/arrow/Properties.h @@ -434,8 +434,8 @@ class PARQUET_EXPORT WriterProperties { /// \brief Define the encoding that is used when we don't utilise /// dictionary encoding. // - /// This either applies if dictionary encoding is disabled or if we - /// fallback because the dictionary grew too large. + /// This either applies if dictionary encoding is disabled or if we fallback + /// because the dictionary grew too large. Builder* encoding(Encoding::type encodingType) { if (encodingType == Encoding::kPlainDictionary || encodingType == Encoding::kRleDictionary) { @@ -450,8 +450,8 @@ class PARQUET_EXPORT WriterProperties { /// \brief Define the encoding that is used when we don't utilise /// dictionary encoding. // - /// This either applies if dictionary encoding is disabled or if we - /// fallback because the dictionary grew too large. + /// This either applies if dictionary encoding is disabled or if we fallback + /// because the dictionary grew too large. Builder* encoding(const std::string& path, Encoding::type encodingType) { if (encodingType == Encoding::kPlainDictionary || encodingType == Encoding::kRleDictionary) { @@ -466,8 +466,8 @@ class PARQUET_EXPORT WriterProperties { /// \brief Define the encoding that is used when we don't utilise /// dictionary encoding. // - /// This either applies if dictionary encoding is disabled or if we - /// fallback because the dictionary grew too large. + /// This either applies if dictionary encoding is disabled or if we fallback + /// because the dictionary grew too large. Builder* encoding( const std::shared_ptr& path, Encoding::type encodingType) { @@ -503,37 +503,37 @@ class PARQUET_EXPORT WriterProperties { return this->compression(path->toDotString(), codec); } - /// \brief Specify the default compression level for the compressor - /// in every column. In case a column does not have an explicitly - /// specified compression level, the default one would be used. + /// \brief Specify the default compression level for the compressor in + /// every column. In case a column does not have an explicitly specified + /// compression level, the default one would be used. /// - /// The provided compression level is compressor specific. The user - /// would have to familiarize oneself with the available levels for - /// the selected compressor. If the compressor does not allow for - /// selecting different compression levels, calling this function - /// would not have any effect. Parquet and Arrow do not validate the - /// passed compression level. If no level is selected by the user or - /// if the special std::numeric_limits::min() value is passed, - /// then Arrow selects the compression level. + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. /// - /// If other compressor-specific options need to be set in addition - /// to the compression level, use the codec_options method. + /// If other compressor-specific options need to be set in addition to the + /// compression level, use the codec_options method. Builder* compressionLevel(int compressionLevel) { defaultColumnProperties_.setCompressionLevel(compressionLevel); return this; } - /// \brief Specify a compression level for the compressor for the - /// column described by path. + /// \brief Specify a compression level for the compressor for the column + /// described by path. /// - /// The provided compression level is compressor specific. The user - /// would have to familiarize oneself with the available levels for - /// the selected compressor. If the compressor does not allow for - /// selecting different compression levels, calling this function - /// would not have any effect. Parquet and Arrow do not validate the - /// passed compression level. If no level is selected by the user or - /// if the special std::numeric_limits::min() value is passed, - /// then Arrow selects the compression level. + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. Builder* compressionLevel(const std::string& path, int compressionLevel) { if (!codecOptions_[path]) { codecOptions_[path] = std::make_shared(); @@ -542,17 +542,17 @@ class PARQUET_EXPORT WriterProperties { return this; } - /// \brief Specify a compression level for the compressor for the - /// column described by path. + /// \brief Specify a compression level for the compressor for the column + /// described by path. /// - /// The provided compression level is compressor specific. The user - /// would have to familiarize oneself with the available levels for - /// the selected compressor. If the compressor does not allow for - /// selecting different compression levels, calling this function - /// would not have any effect. Parquet and Arrow do not validate the - /// passed compression level. If no level is selected by the user or - /// if the special std::numeric_limits::min() value is passed, - /// then Arrow selects the compression level. + /// The provided compression level is compressor specific. The user would + /// have to familiarize oneself with the available levels for the selected + /// compressor. If the compressor does not allow for selecting different + /// compression levels, calling this function would not have any effect. + /// Parquet and Arrow do not validate the passed compression level. If no + /// level is selected by the user or if the special + /// std::numeric_limits::min() value is passed, then Arrow selects the + /// compression level. Builder* compressionLevel( const std::shared_ptr& path, int compressionLevel) { @@ -562,15 +562,15 @@ class PARQUET_EXPORT WriterProperties { /// \brief Specify the default codec options for the compressor in /// every column. /// - /// The codec options allow configuring the compression level as - /// well as other codec-specific options. + /// The codec options allow configuring the compression level as well + /// as other codec-specific options. Builder* codecOptions(const std::shared_ptr& codecOptions) { defaultColumnProperties_.setCodecOptions(codecOptions); return this; } - /// \brief Specify the codec options for the compressor for the - /// column described by path. + /// \brief Specify the codec options for the compressor for the column + /// described by path. Builder* codecOptions( const std::string& path, const std::shared_ptr& codecOptions) { @@ -578,8 +578,8 @@ class PARQUET_EXPORT WriterProperties { return this; } - /// \brief Specify the codec options for the compressor for the - /// column described by path. + /// \brief Specify the codec options for the compressor for the column + /// described by path. Builder* codecOptions( const std::shared_ptr& path, const std::shared_ptr& codecOptions) { @@ -646,38 +646,35 @@ class PARQUET_EXPORT WriterProperties { return this->disableStatistics(path->toDotString()); } - /// Allow decimals with 1 <= precision <= 18 to be stored as - /// integers. + /// Allow decimals with 1 <= precision <= 18 to be stored as integers. /// - /// In Parquet, DECIMAL can be stored in any of the following - /// physical types: + /// In Parquet, DECIMAL can be stored in any of the following physical + /// types: /// - Int32: For 1 <= precision <= 9. /// - Int64: For 10 <= precision <= 18. /// - Fixed_len_byte_array: Precision is limited by the array size. - /// Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 - /// digits. - /// - Binary: Precision is unlimited. The minimum number of bytes to - /// store + /// Length n can store <= floor(log_10(2^(8*n - 1) - 1)) base-10 digits. + /// - Binary: Precision is unlimited. The minimum number of bytes to store /// the unscaled value is used. /// /// By default, this is DISABLED and all decimal types annotate. /// Fixed_len_byte_array. /// - /// When enabled, the C++ writer will use following physical types - /// to store decimals: + /// When enabled, the C++ writer will use following physical types to store + /// decimals: /// - Int32: For 1 <= precision <= 9. /// - Int64: For 10 <= precision <= 18. /// - Fixed_len_byte_array: For precision > 18. /// - /// As a consequence, decimal columns stored in integer types are - /// more compact. + /// As a consequence, decimal columns stored in integer types are more + /// compact. Builder* enableStoreDecimalAsInteger() { storeDecimalAsInteger_ = true; return this; } - /// Disable decimal logical type with 1 <= precision <= 18 to be - /// stored as integer physical type. + /// Disable decimal logical type with 1 <= precision <= 18 to be stored as + /// integer physical type. /// /// Default disabled. Builder* disableStoreDecimalAsInteger() { @@ -685,13 +682,12 @@ class PARQUET_EXPORT WriterProperties { return this; } - /// Enable writing page index in general for all columns. Default - /// disabled. + /// Enable writing page index in general for all columns. Default disabled. /// - /// Writing statistics to the page index disables the old method of - /// writing statistics to each data page header. The page index - /// makes filtering more efficient than the page header, as it - /// gathers all the statistics for a Parquet file in a single place, + /// Writing statistics to the page index disables the old method of writing + /// statistics to each data page header. + /// The page index makes filtering more efficient than the page header, as + /// it gathers all the statistics for a Parquet file in a single place, /// avoiding scattered I/O. /// /// Please check the link below for more details: @@ -701,8 +697,7 @@ class PARQUET_EXPORT WriterProperties { return this; } - /// Disable writing page index in general for all columns. Default - /// disabled. + /// Disable writing page index in general for all columns. Default disabled. Builder* disableWritePageIndex() { defaultColumnProperties_.setPageIndexEnabled(false); return this; @@ -795,8 +790,7 @@ class PARQUET_EXPORT WriterProperties { // If empty, there is no sorting columns. std::vector sortingColumns_; - // Settings used for each column unless overridden in any of the - // maps below. + // Settings used for each column unless overridden in any of the maps below. ColumnProperties defaultColumnProperties_; std::unordered_map encodings_; std::unordered_map codecs_; @@ -1010,8 +1004,7 @@ class PARQUET_EXPORT ArrowReaderProperties { cacheOptions_(::arrow::io::CacheOptions::Defaults()), coerceInt96TimestampUnit_(::arrow::TimeUnit::NANO) {} - /// \brief Set whether to use the IO thread pool to parse columns in - /// parallel. + /// \brief Set whether to use the IO thread pool to parse columns in parallel. /// /// Default is false. void setUseThreads(bool useThreads) { @@ -1022,8 +1015,7 @@ class PARQUET_EXPORT ArrowReaderProperties { return useThreads_; } - /// \brief Set whether to read a particular column as dictionary - /// encoded. + /// \brief Set whether to read a particular column as dictionary encoded. /// /// If the file metadata contains a serialized Arrow schema, then this /// is only supported for columns with a Parquet physical type of @@ -1060,9 +1052,9 @@ class PARQUET_EXPORT ArrowReaderProperties { /// Enable read coalescing (default false). /// - /// When enabled, the Arrow reader will pre-buffer necessary regions. - /// Of the file in-memory. This is intended to improve performance on. - /// High-latency filesystems (e.g. Amazon S3). + /// When enabled, the Arrow reader will pre-buffer necessary regions + /// of the file in-memory. This is intended to improve performance on + /// high-latency filesystems (e.g. Amazon S3). void setPreBuffer(bool preBuffer) { preBuffer_ = preBuffer; } @@ -1071,8 +1063,8 @@ class PARQUET_EXPORT ArrowReaderProperties { return preBuffer_; } - /// Set options for read coalescing. This can be used to tune the. - /// Implementation for characteristics of different filesystems. + /// Set options for read coalescing. This can be used to tune the + /// implementation for characteristics of different filesystems. void setCacheOptions(::arrow::io::CacheOptions options) { cacheOptions_ = options; } @@ -1134,18 +1126,16 @@ class PARQUET_EXPORT ArrowWriterProperties { executor_(NULLPTR) {} virtual ~Builder() = default; - /// \brief Disable writing legacy int96 timestamps (default - /// disabled). + /// \brief Disable writing legacy int96 timestamps (default disabled). Builder* disableDeprecatedInt96Timestamps() { writeTimestampsAsInt96_ = false; return this; } - /// \brief Enable writing legacy int96 timestamps (default - /// disabled). + /// \brief Enable writing legacy int96 timestamps (default disabled). /// - /// May be turned on to write timestamps compatible with older - /// Parquet writers. This takes precedent over coerceTimestamps. + /// May be turned on to write timestamps compatible with older Parquet + /// writers. This takes precedent over coerceTimestamps. Builder* enableDeprecatedInt96Timestamps() { writeTimestampsAsInt96_ = true; return this; @@ -1153,8 +1143,8 @@ class PARQUET_EXPORT ArrowWriterProperties { /// \brief Coerce all timestamps to the specified time unit. /// \param unit time unit to truncate to. - /// For Parquet versions 1.0 and 2.4, nanoseconds are casted to. - /// Microseconds. + /// For Parquet versions 1.0 and 2.4, nanoseconds are casted to + /// microseconds. Builder* coerceTimestamps(::arrow::TimeUnit::type unit) { coerceTimestampsEnabled_ = true; coerceTimestampsUnit_ = unit; @@ -1169,27 +1159,25 @@ class PARQUET_EXPORT ArrowWriterProperties { return this; } - /// \brief Disallow loss of data when truncating timestamps - /// (default). + /// \brief Disallow loss of data when truncating timestamps (default). Builder* disallowTruncatedTimestamps() { truncatedTimestampsAllowed_ = false; return this; } - /// \brief EXPERIMENTAL: Write binary serialized Arrow schema to the - /// file, to enable certain read options (like "read_dictionary") to - /// be set automatically. + /// \brief EXPERIMENTAL: Write binary serialized Arrow schema to the file, + /// to enable certain read options (like "read_dictionary") to be set + /// automatically. Builder* storeSchema() { storeSchema_ = true; return this; } - /// \brief When enabled, will not preserve Arrow field names for - /// list types. + /// \brief When enabled, will not preserve Arrow field names for list types. /// - /// Instead of using the field names Arrow uses for the values array - /// of. List types (default "item"), will use "element", as is - /// specified in. The Parquet spec. + /// Instead of using the field names Arrow uses for the values array of + /// list types (default "item"), will use "element", as is specified in + /// the Parquet spec. /// /// This is enabled by default. Builder* enableCompliantNestedTypes() { @@ -1209,12 +1197,12 @@ class PARQUET_EXPORT ArrowWriterProperties { return this; } - /// \brief Set whether to use multiple threads to write columns. - /// In parallel in the buffered row group mode. + /// \brief Set whether to use multiple threads to write columns + /// in parallel in the buffered row group mode. /// - /// WARNING: If writing multiple files in parallel in the same. - /// Executor, deadlock may occur if use_threads is true. Please. - /// Disable it in this case. + /// WARNING: If writing multiple files in parallel in the same + /// executor, deadlock may occur if use_threads is true. Please + /// disable it in this case. /// /// Default is false. Builder* setUseThreads(bool useThreads) { @@ -1222,8 +1210,8 @@ class PARQUET_EXPORT ArrowWriterProperties { return this; } - /// \brief Set the executor to write columns in parallel in the. - /// Buffered row group mode. + /// \brief Set the executor to write columns in parallel in the + /// buffered row group mode. /// /// Default is nullptr and the default cpu executor will be used. Builder* setExecutor(::arrow::internal::Executor* executor) { @@ -1279,27 +1267,25 @@ class PARQUET_EXPORT ArrowWriterProperties { return storeSchema_; } - /// \brief Enable nested type naming according to the parquet - /// specification. + /// \brief Enable nested type naming according to the parquet specification. /// - /// Older versions of arrow wrote out field names for nested lists - /// based on the name of the field. According to the Parquet - /// specification they should always be "element". + /// Older versions of arrow wrote out field names for nested lists based on + /// the name of the field. According to the Parquet specification they + /// should always be "element". bool compliantNestedTypes() const { return compliantNestedTypes_; } - /// \brief The underlying engine version to use when writing Arrow - /// data. + /// \brief The underlying engine version to use when writing Arrow data. /// - /// V2 is currently the latest V1 is considered deprecated but left - /// in. Place in case there are bugs detected in V2. + /// V2 is currently the latest. V1 is considered deprecated but left in + /// place in case there are bugs detected in V2. EngineVersion engineVersion() const { return engineVersion_; } - /// \brief Returns whether the writer will use multiple threads. - /// To write columns in parallel in the buffered row group mode. + /// \brief Returns whether the writer will use multiple threads + /// to write columns in parallel in the buffered row group mode. bool useThreads() const { return useThreads_; } @@ -1339,8 +1325,8 @@ class PARQUET_EXPORT ArrowWriterProperties { ::arrow::internal::Executor* executor_; }; -/// \brief State object used for writing Arrow data directly to a -/// Parquet. Column chunk. API possibly not stable. +/// \brief State object used for writing Arrow data directly to a Parquet +/// column chunk. API possibly not stable. struct ArrowWriteContext { ArrowWriteContext(MemoryPool* memoryPool, ArrowWriterProperties* properties) : memoryPool(memoryPool), @@ -1358,8 +1344,8 @@ struct ArrowWriteContext { MemoryPool* memoryPool; const ArrowWriterProperties* properties; - // Buffer used for storing the data of an array converted to the - // physical type. As expected by parquet-cpp. + // Buffer used for storing the data of an array converted to the physical + // type. As expected by parquet-cpp. std::shared_ptr dataBuffer; // We use the shared ownership of this buffer.