-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Python: Compute parquet stats #7831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+1,318
−0
Merged
Changes from 2 commits
Commits
Show all changes
54 commits
Select commit
Hold shift + click to select a range
e16d7d4
Add function to compute parquet file metadata
maxdebayser 96adb31
Addition of docstring and extra parameter to avoid reading the file
maxdebayser ce8a5df
Refactor the statistics computation entirely to use pyarrow metadata
maxdebayser 1be86e7
Merge remote-tracking branch 'iceberg/master' into compute_parquet_stats
maxdebayser e6c3f94
Appease pre-commit hooks
maxdebayser e4e0b2b
Fix temporary path
maxdebayser a5f4ef9
Merge remote-tracking branch 'iceberg/master' into compute_parquet_stats
maxdebayser ac23783
Merge remote-tracking branch 'iceberg/master' into compute_parquet_stats
maxdebayser ed27875
Make the metrics mode configurable as documented here: https://iceber…
maxdebayser de46bef
Initialize binary serializers only once
maxdebayser 5ae5b2e
Log arrow not implemented exception
maxdebayser 33218eb
Fix None comparison expression
maxdebayser 4975e99
Add map column to test data
maxdebayser 98c93ca
Moving pyarrow specific code to io.pyarrow
maxdebayser a480539
type annotation
maxdebayser a0f44d5
Refactor the stats collection using the pyarrow visitor
maxdebayser 3e738fe
Merge remote-tracking branch 'iceberg/master' into compute_parquet_stats
maxdebayser 1d5cbbf
Clean redundant code and add warning message to the log
maxdebayser f2f001e
Merge remote-tracking branch 'iceberg/master' into compute_parquet_stats
maxdebayser dc34698
Address some of the review comments
maxdebayser e233f54
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 8dda3fa
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 820938a
Add tests to check of the number of columns found by the statistics
maxdebayser 9e114c8
We don't want to truncate numeric data types
maxdebayser e7a6fb8
Verify match of Iceberg types with Parquet physical types
maxdebayser 8ad7f3f
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser c965a3e
Fix truncation of upper bounds
maxdebayser 1ba46d6
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 44dbb0c
Transform asserts to ValueErrors
maxdebayser 74a3d6a
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser cdc6eb8
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 5b4c2f2
Add review suggestions
maxdebayser ec5fcaa
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 4ee5036
Address simple code style review comments
maxdebayser 45abc6d
Fix potential null write
maxdebayser 7ee1ef0
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 5898f3f
Apply function name refactoring
maxdebayser e7edf0b
Move pyarrow statistics tests to a new file
maxdebayser 6f7bd98
Disable stats computation for nested types
maxdebayser 05579ff
Modularize the fill_parquet_file_metadata function
maxdebayser aae1118
Allow metrics modes to have extra whitespace but not other trailing
maxdebayser 11b5d3a
Move upper bound truncation logic to another file
maxdebayser 4332a95
Be defensive with regards to missing row group statistics
maxdebayser 09c5955
Add tests for structs
maxdebayser c131b58
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 5e01924
Remove special treatment of UUIDType
maxdebayser 7f768eb
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser be70fd5
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 0be438e
Rely on parquet column path rather than column order
maxdebayser 8226a01
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser 867ea80
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser ebb604a
Change mood to imperative to appease linter
maxdebayser 640f885
Merge branch 'master' of https://github.com/apache/iceberg into compu…
maxdebayser acf6d4f
Factor out the logic to obtain the current table schema
maxdebayser File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,164 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| from pyiceberg.manifest import DataFile, FileFormat | ||
| import pyarrow.parquet as pq | ||
| import pyarrow.compute as pc | ||
| import pyarrow as pa | ||
| import struct | ||
| import datetime | ||
|
|
||
| BOUND_TRUNCATED_LENGHT = 16 | ||
|
|
||
| # Serialization rules: https://iceberg.apache.org/spec/#binary-single-value-serialization | ||
| # | ||
| # Type Binary serialization | ||
| # boolean 0x00 for false, non-zero byte for true | ||
| # int Stored as 4-byte little-endian | ||
| # long Stored as 8-byte little-endian | ||
| # float Stored as 4-byte little-endian | ||
| # double Stored as 8-byte little-endian | ||
| # date Stores days from the 1970-01-01 in an 4-byte little-endian int | ||
| # time Stores microseconds from midnight in an 8-byte little-endian long | ||
| # timestamp without zone Stores microseconds from 1970-01-01 00:00:00.000000 in an 8-byte little-endian long | ||
| # timestamp with zone Stores microseconds from 1970-01-01 00:00:00.000000 UTC in an 8-byte little-endian long | ||
| # string UTF-8 bytes (without length) | ||
| # uuid 16-byte big-endian value, see example in Appendix B | ||
| # fixed(L) Binary value | ||
| # binary Binary value (without length) | ||
| # | ||
| def serialize_to_binary(scalar: pa.Scalar) -> bytes: | ||
|
maxdebayser marked this conversation as resolved.
Outdated
|
||
| value = scalar.as_py() | ||
| if isinstance(scalar, pa.BooleanScalar): | ||
| return struct.pack('?', value) | ||
|
maxdebayser marked this conversation as resolved.
Outdated
|
||
| elif isinstance(scalar, (pa.Int8Scalar, pa.UInt8Scalar)): | ||
| return struct.pack('<b', value) | ||
| elif isinstance(scalar, (pa.Int16Scalar, pa.UInt16Scalar)): | ||
| return struct.pack('<h', value) | ||
| elif isinstance(scalar, (pa.Int32Scalar, pa.UInt32Scalar)): | ||
| return struct.pack('<i', value) | ||
| elif isinstance(scalar, (pa.Int64Scalar, pa.UInt64Scalar)): | ||
| return struct.pack('<q', value) | ||
| elif isinstance(scalar, pa.FloatScalar): | ||
| return struct.pack('<f', value) | ||
| elif isinstance(scalar, pa.DoubleScalar): | ||
| return struct.pack('<d', value) | ||
| elif isinstance(scalar, pa.StringScalar): | ||
| return value.encode('utf-8') | ||
| elif isinstance(scalar, pa.BinaryScalar): | ||
| return value | ||
| elif isinstance(scalar, (pa.Date32Scalar, pa.Date64Scalar)): | ||
| epoch = datetime.date(1970, 1, 1) | ||
| days = (value - epoch).days | ||
| return struct.pack('<i', days) | ||
| elif isinstance(scalar, (pa.Time32Scalar, pa.Time64Scalar)): | ||
| microseconds = int(value.hour * 60 * 60 * 1e6 + | ||
| value.minute * 60 * 1e6 + | ||
| value.second * 1e6 + | ||
| value.microsecond) | ||
| return struct.pack('<q', microseconds) | ||
| elif isinstance(scalar, pa.TimestampScalar): | ||
| epoch = datetime.datetime(1970, 1, 1) | ||
| microseconds = int((value - epoch).total_seconds() * 1e6) | ||
| return struct.pack('<q', microseconds) | ||
| else: | ||
| raise TypeError('Unsupported type: {}'.format(type(scalar))) | ||
|
|
||
|
|
||
| def fill_parquet_file_metadata(df: DataFile, file_object: pa.NativeFile, table: pa.Table = None) -> None: | ||
| """ | ||
| Computes and fills the following fields of the DataFile object: | ||
|
|
||
| - file_format | ||
| - record_count | ||
| - file_size_in_bytes | ||
| - column_sizes | ||
| - value_counts | ||
| - null_value_counts | ||
| - nan_value_counts | ||
| - lower_bounds | ||
| - upper_bounds | ||
| - split_offsets | ||
|
|
||
| Args: | ||
| df (DataFile): A DataFile object representing the Parquet file for which metadata is to be filled. | ||
| file_object (pa.NativeFile): A pyarrow NativeFile object pointing to the location where the | ||
| Parquet file is stored. | ||
| table (pa.Table, optional): If the metadata is computed while writing a pyarrow Table to parquet | ||
| the table can be passed to compute the column statistics. If absent the table will be read | ||
| from file_object using pyarrow.parquet.read_table. | ||
| """ | ||
|
|
||
| parquet_file = pq.ParquetFile(file_object) | ||
|
maxdebayser marked this conversation as resolved.
Outdated
|
||
| metadata = parquet_file.metadata | ||
|
|
||
| column_sizes = {} | ||
| value_counts = {} | ||
|
|
||
| for r in range(metadata.num_row_groups): | ||
| for c in range(metadata.num_columns): | ||
| column_sizes[c+1] = column_sizes.get(c+1, 0) + metadata.row_group(r).column(c).total_compressed_size | ||
|
maxdebayser marked this conversation as resolved.
Outdated
|
||
| value_counts[c+1] = value_counts.get(c+1, 0) + metadata.row_group(r).column(c).num_values | ||
|
|
||
|
|
||
| # References: | ||
| # https://github.com/apache/iceberg/blob/fc381a81a1fdb8f51a0637ca27cd30673bd7aad3/parquet/src/main/java/org/apache/iceberg/parquet/ParquetUtil.java#L232 | ||
| # https://github.com/apache/parquet-mr/blob/ac29db4611f86a07cc6877b416aa4b183e09b353/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java#L184 | ||
| split_offsets = [] | ||
| for r in range(metadata.num_row_groups): | ||
| data_offset = metadata.row_group(r).column(0).data_page_offset | ||
| dictionary_offset = metadata.row_group(r).column(0).dictionary_page_offset | ||
| if metadata.row_group(r).column(0).has_dictionary_page and dictionary_offset < data_offset: | ||
| split_offsets.append(dictionary_offset) | ||
| else: | ||
| split_offsets.append(data_offset) | ||
|
|
||
| split_offsets.sort() | ||
|
|
||
| if table is None: | ||
| table = pa.parquet.read_table(file_object) | ||
|
|
||
| null_value_counts = {} | ||
| nan_value_counts = {} | ||
| lower_bounds = {} | ||
| upper_bounds = {} | ||
|
|
||
| for c in range(metadata.num_columns): | ||
| null_value_counts[c+1] = table.filter(pc.field(c).is_null(nan_is_null=False)).num_rows | ||
|
maxdebayser marked this conversation as resolved.
Outdated
|
||
| nan_value_counts[c+1] = table.filter(pc.field(c).is_null(nan_is_null=True)).num_rows - null_value_counts[c+1] | ||
|
|
||
| try: | ||
| lower = pc.min(table[c]) | ||
| upper = pc.max(table[c]) | ||
|
|
||
| lower_bounds[c+1] = serialize_to_binary(lower)[:BOUND_TRUNCATED_LENGHT] | ||
| upper_bounds[c+1] = serialize_to_binary(upper)[:BOUND_TRUNCATED_LENGHT] | ||
| except pa.lib.ArrowNotImplementedError: | ||
| # skip bound detection for types such as lists | ||
| pass | ||
|
|
||
|
|
||
| df.file_format = FileFormat.PARQUET | ||
| df.record_count = parquet_file.metadata.num_rows | ||
| df.file_size_in_bytes = file_object.size() | ||
| df.column_sizes = column_sizes | ||
| df.value_counts = value_counts | ||
| df.null_value_counts = null_value_counts | ||
| df.nan_value_counts = nan_value_counts | ||
| df.lower_bounds = lower_bounds | ||
| df.upper_bounds = upper_bounds | ||
| df.split_offsets = split_offsets | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
|
|
||
| import pyarrow as pa | ||
| import pyarrow.parquet as pq | ||
| import math | ||
|
|
||
| from pyiceberg.utils.file_stats import fill_parquet_file_metadata, serialize_to_binary, BOUND_TRUNCATED_LENGHT | ||
| from pyiceberg.manifest import DataFile | ||
| import datetime | ||
| import binascii | ||
| import struct | ||
|
|
||
| def test_boolean_scalar(): | ||
| scalar1 = pa.scalar(True) | ||
| assert serialize_to_binary(scalar1) == struct.pack('?', True) | ||
|
|
||
| scalar2 = pa.scalar(False) | ||
| assert serialize_to_binary(scalar2) == struct.pack('?', False) | ||
|
|
||
| def test_int8_scalar(): | ||
| scalar = pa.scalar(123, type=pa.int8()) | ||
| assert serialize_to_binary(scalar) == struct.pack('<b', 123) | ||
|
|
||
| def test_int32_scalar(): | ||
| scalar = pa.scalar(123456789, type=pa.int32()) | ||
| assert serialize_to_binary(scalar) == struct.pack('<i', 123456789) | ||
|
|
||
| def test_int64_scalar(): | ||
| scalar = pa.scalar(1234567891011121314, type=pa.int64()) | ||
| assert serialize_to_binary(scalar) == struct.pack('<q', 1234567891011121314) | ||
|
|
||
| def test_float_scalar(): | ||
| scalar = pa.scalar(123.456, type=pa.float32()) | ||
| assert serialize_to_binary(scalar) == struct.pack('<f', 123.456) | ||
|
|
||
| def test_double_scalar(): | ||
| scalar = pa.scalar(123.456, type=pa.float64()) | ||
| assert serialize_to_binary(scalar) == struct.pack('<d', 123.456) | ||
|
|
||
| def test_string_scalar(): | ||
| scalar = pa.scalar('abc') | ||
| assert serialize_to_binary(scalar) == 'abc'.encode() | ||
|
|
||
| def test_date32_scalar(): | ||
| scalar = pa.scalar(datetime.date(1970, 1, 2), type=pa.date32()) | ||
| reference = (datetime.date(1970, 1, 2) - datetime.date(1970, 1, 1)).days | ||
| assert serialize_to_binary(scalar) == struct.pack('<i', reference) | ||
|
|
||
| def test_time32_scalar(): | ||
| scalar = pa.scalar(datetime.time(1, 2, 3, 456000), type=pa.time64('us')) | ||
| assert serialize_to_binary(scalar) == struct.pack('<q', int(1*60*60*1e6 + 2*60*1e6 + 3* 1e6 + 456000)) | ||
|
|
||
| def test_timestamp_scalar(): | ||
| scalar = pa.scalar(datetime.datetime(2023, 1, 1, 1, 2, 3), type=pa.timestamp('us')) | ||
| reference = int((datetime.datetime(2023, 1, 1, 1,2,3) - datetime.datetime(1970, 1, 1, 0,0,0)).total_seconds()*1e6) | ||
| assert serialize_to_binary(scalar) == struct.pack('<q', reference) | ||
|
|
||
| def construct_test_table() -> pa.Buffer: | ||
|
|
||
| schema = pa.schema([ | ||
| pa.field("strings", pa.string()), | ||
| pa.field('floats', pa.float64()), | ||
| pa.field('list', pa.list_(pa.int64())) | ||
| ]) | ||
|
|
||
| _strings = [ | ||
| "zzzzzzzzzzzzzzzzzzzz", | ||
| "rrrrrrrrrrrrrrrrrrrr", | ||
| None, | ||
| "aaaaaaaaaaaaaaaaaaaa" | ||
| ] | ||
|
|
||
| _floats = [ | ||
| 3.14, | ||
| math.nan, | ||
| 1.69, | ||
| 100 | ||
| ] | ||
|
|
||
| _list = [ | ||
| [ 1, 2, 3], | ||
| [ 4, 5, 6], | ||
| None, | ||
| [ 7, 8, 9] | ||
| ] | ||
|
|
||
| table = pa.Table.from_pydict({"strings": _strings, "floats": _floats, "list": _list}, schema=schema) | ||
| f = pa.BufferOutputStream() | ||
|
|
||
| pq.write_table(table, f) | ||
|
|
||
| return f.getvalue() | ||
|
|
||
| def test_record_count() -> None: | ||
|
|
||
| file_obj = pa.BufferReader(construct_test_table()) | ||
|
|
||
| datafile = DataFile() | ||
| fill_parquet_file_metadata(datafile, file_obj) | ||
|
|
||
| assert datafile.record_count == 4 | ||
|
|
||
|
|
||
| def test_file_size() -> None: | ||
|
|
||
| file_obj = pa.BufferReader(construct_test_table()) | ||
|
|
||
| datafile = DataFile() | ||
| fill_parquet_file_metadata(datafile, file_obj) | ||
|
|
||
| assert datafile.file_size_in_bytes == 1558 | ||
|
|
||
|
|
||
| def test_value_counts() -> None: | ||
|
|
||
| file_obj = pa.BufferReader(construct_test_table()) | ||
|
|
||
| datafile = DataFile() | ||
| fill_parquet_file_metadata(datafile, file_obj) | ||
|
|
||
| assert len(datafile.value_counts) == 3 | ||
| assert datafile.value_counts[1] == 4 | ||
| assert datafile.value_counts[2] == 4 | ||
| assert datafile.value_counts[3] == 10 # 3 lists with 3 items and a None value | ||
|
|
||
|
|
||
| def test_column_sizes() -> None: | ||
|
|
||
| file_obj = pa.BufferReader(construct_test_table()) | ||
|
|
||
| datafile = DataFile() | ||
| fill_parquet_file_metadata(datafile, file_obj) | ||
|
|
||
| assert len(datafile.column_sizes) == 3 | ||
| # these values are an artifact of how the write_table encodes the columns | ||
| assert datafile.column_sizes[1] == 116 | ||
| assert datafile.column_sizes[2] == 119 | ||
| assert datafile.column_sizes[3] == 151 | ||
|
|
||
|
|
||
| def test_null_and_nan_counts() -> None: | ||
|
|
||
| file_obj = pa.BufferReader(construct_test_table()) | ||
|
|
||
| datafile = DataFile() | ||
| fill_parquet_file_metadata(datafile, file_obj) | ||
|
|
||
| assert len(datafile.null_value_counts) == 3 | ||
| assert datafile.null_value_counts[1] == 1 | ||
| assert datafile.null_value_counts[2] == 0 | ||
| assert datafile.null_value_counts[3] == 1 | ||
|
|
||
| assert len(datafile.nan_value_counts) == 3 | ||
| assert datafile.nan_value_counts[1] == 0 | ||
| assert datafile.nan_value_counts[2] == 1 | ||
| assert datafile.nan_value_counts[3] == 0 | ||
|
|
||
|
|
||
| def test_bounds() -> None: | ||
|
|
||
| file_obj = pa.BufferReader(construct_test_table()) | ||
|
|
||
| datafile = DataFile() | ||
| fill_parquet_file_metadata(datafile, file_obj) | ||
|
|
||
| assert len(datafile.lower_bounds) == 2 | ||
| assert datafile.lower_bounds[1].decode() == "aaaaaaaaaaaaaaaaaaaa"[:BOUND_TRUNCATED_LENGHT] | ||
| assert datafile.lower_bounds[2] == struct.pack('<d', 1.69) | ||
|
|
||
| assert len(datafile.upper_bounds) == 2 | ||
| assert datafile.upper_bounds[1].decode() == "zzzzzzzzzzzzzzzzzzzz"[:BOUND_TRUNCATED_LENGHT] | ||
| assert datafile.upper_bounds[2] == struct.pack('<d', 100) | ||
|
|
||
|
|
||
| def test_offsets() -> None: | ||
|
|
||
| file_obj = pa.BufferReader(construct_test_table()) | ||
|
|
||
| datafile = DataFile() | ||
| fill_parquet_file_metadata(datafile, file_obj) | ||
|
|
||
| assert len(datafile.split_offsets) == 1 | ||
| assert datafile.split_offsets[0] == 4 |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.