Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -570,12 +570,25 @@ public PqVariant getVariant(int projectedIndex) {
private static final byte[] VARIANT_NULL_VALUE = new byte[] { 0x00 };

private PqVariant createVariant(TopLevelFieldMap.FieldDesc.Variant desc) {
// Variant bytes are decoded lazily here, so a malformed `metadata`/`value`
// surfaces as a decode error from deep inside the variant codec. Enrich it
// (and the local projection checks) with the originating file name, matching
// the other accessors in this view.
try {
return createVariantInternal(desc);
}
catch (RuntimeException e) {
throw ExceptionContext.addFileContext(currentFileName, e);
}
}

private PqVariant createVariantInternal(TopLevelFieldMap.FieldDesc.Variant desc) {
if (isVariantNull(desc)) {
return null;
}
if (desc.metadataCol() < 0) {
throw new IllegalStateException(prefix()
+ "Variant column '" + desc.schema().name() + "' requires its 'metadata' child in the projection");
throw new IllegalStateException(
"Variant column '" + desc.schema().name() + "' requires its 'metadata' child in the projection");
}
int metaIdx = cachedValueIndex[desc.metadataCol()];
byte[] metadataBytes = batchIndex.getBinary(desc.metadataCol(), metaIdx);
Expand All @@ -597,8 +610,8 @@ private PqVariant createVariant(TopLevelFieldMap.FieldDesc.Variant desc) {
// Unshredded: the raw value bytes ARE the canonical value bytes.
int valueCol = desc.valueCol();
if (valueCol < 0) {
throw new IllegalStateException(prefix()
+ "Variant column '" + desc.schema().name() + "' requires its 'value' child in the projection");
throw new IllegalStateException(
"Variant column '" + desc.schema().name() + "' requires its 'value' child in the projection");
}
int valIdx = cachedValueIndex[valueCol];
byte[] value = batchIndex.getBinary(valueCol, valIdx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import dev.hardwood.row.VariantType;

/// [PqVariantObject] implementation. Caches the parsed [ObjectLayout] so
/// repeated field accesses don't re-walk the header; field-id-to-index lookup
/// uses binary search over the object's id array (ids are guaranteed sorted
/// ascending per the Variant spec).
/// repeated field accesses don't re-walk the header. Field lookup resolves the
/// name to its dictionary id once, then scans the object's id array linearly:
/// the ids are ordered by field name (not numerically), so a numeric binary
/// search would be incorrect unless the metadata dictionary is itself sorted.
/// See [#indexOf(String)].
final class PqVariantObjectImpl implements PqVariantObject {

private final VariantMetadata metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,23 @@
/// encoding. All multi-byte integers in the encoding are unsigned little-endian
/// unless otherwise noted.
///
/// @see <a href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md">Parquet Variant Encoding</a>
/// @see <a href="https://parquet.apache.org/docs/file-format/types/variantencoding/">Parquet Variant Encoding</a>
public final class VariantBinary {

private VariantBinary() {}

// ==================== Metadata header byte ====================
// bit 0-3: version (must be 1)
// bit 4: sorted_strings flag
// bit 5-6: offset_size_minus_one (0..3 → 1..4 bytes)
// bit 7: unused
// bit 5: unused

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noice noice, i was confused as it's not specified in the docs, but found the correction here apache/parquet-format#574

// bit 6-7: offset_size_minus_one (0..3 → 1..4 bytes)
// Bit layout per the Variant spec's "Metadata encoding" section:
// https://parquet.apache.org/docs/file-format/types/variantencoding/#metadata-encoding

public static final int METADATA_VERSION = 1;
public static final int METADATA_VERSION_MASK = 0x0F;
public static final int METADATA_SORTED_MASK = 0x10;
public static final int METADATA_OFFSET_SIZE_SHIFT = 5;
public static final int METADATA_OFFSET_SIZE_SHIFT = 6;
public static final int METADATA_OFFSET_SIZE_MASK = 0x03;

// ==================== Value header byte — basic_type ====================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/// field id or by name.
///
/// Layout:
/// - 1-byte header: version (bit 0-3) | sorted_strings (bit 4) | offset_size_minus_one (bit 5-6)
/// - 1-byte header: version (bit 0-3) | sorted_strings (bit 4) | offset_size_minus_one (bit 6-7)
/// - `offset_size` bytes: dictionary_size (unsigned LE)
/// - `(dictionary_size + 1) * offset_size` bytes: offsets into the strings section (unsigned LE)
/// - `offset_size` bytes of the last offset define the total byte length
Expand Down Expand Up @@ -48,6 +48,11 @@ public VariantMetadata(byte[] buf) {
throw new IllegalArgumentException("Variant metadata buffer truncated before dictionary_size");
}
this.dictionarySize = VariantBinary.readUnsignedLE(buf, headerEnd, offsetSize);
// Reject an out-of-range dictionary_size before it feeds later arithmetic.
if (dictionarySize < 0) {
throw new IllegalArgumentException(
"Variant metadata dictionary_size is not a valid unsigned int: " + dictionarySize);
}
this.offsetsStart = headerEnd + offsetSize;
int offsetsLen = (dictionarySize + 1) * offsetSize;
if (buf.length < offsetsStart + offsetsLen) {
Expand Down
43 changes: 43 additions & 0 deletions core/src/test/java/dev/hardwood/VariantLogicalTypeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import dev.hardwood.reader.ParquetFileReader;
import dev.hardwood.reader.RowReader;
import dev.hardwood.row.PqVariant;
import dev.hardwood.row.PqVariantObject;
import dev.hardwood.row.VariantType;
import dev.hardwood.schema.FileSchema;
import dev.hardwood.schema.SchemaNode;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/// End-to-end recognition of the Parquet `VARIANT` logical-type annotation.
/// Reads the generated `variant_test.parquet` fixture and asserts that:
Expand All @@ -36,6 +38,10 @@
class VariantLogicalTypeTest {

private static final Path FILE = Paths.get("src/test/resources/variant_test.parquet");
private static final Path FILE_OFFSET_SIZE2 =
Paths.get("src/test/resources/variant_metadata_offset_size2.parquet");
private static final Path FILE_BAD_METADATA =
Paths.get("src/test/resources/variant_negative_dict_size.parquet");

@Test
void schemaReportsVariantLogicalType() throws IOException {
Expand Down Expand Up @@ -82,4 +88,41 @@ void rowReaderSurfacesVariantBytes() throws IOException {
assertThat(rowReader.hasNext()).isFalse();
}
}

/// Reads a real fixture whose Variant metadata dictionary is large enough to
/// require `offset_size = 2`, then resolves the object's two fields through
/// the VARIANT row API.
@Test
void readsVariantWhoseMetadataUsesOffsetSizeTwo() throws IOException {
String k0 = "a".repeat(160);
String k1 = "b".repeat(160);
try (ParquetFileReader fileReader = ParquetFileReader.open(InputFile.of(FILE_OFFSET_SIZE2));
RowReader rowReader = fileReader.rowReader()) {
rowReader.next();
PqVariant v = rowReader.getVariant("var");
assertThat(v).isNotNull();
assertThat(v.type()).isEqualTo(VariantType.OBJECT);

PqVariantObject obj = v.asObject();
assertThat(obj.getFieldCount()).isEqualTo(2);
assertThat(obj.getInt(k0)).isEqualTo(5);
assertThat(obj.getBoolean(k1)).isTrue();

assertThat(rowReader.hasNext()).isFalse();
}
}

/// A malformed-variant decode error raised while reading is enriched with the
/// originating file name, so it is attributable like other read-path errors.
@Test
void malformedVariantMetadataErrorCarriesFileName() throws IOException {
try (ParquetFileReader fileReader = ParquetFileReader.open(InputFile.of(FILE_BAD_METADATA));
RowReader rowReader = fileReader.rowReader()) {
rowReader.next();
assertThatThrownBy(() -> rowReader.getVariant("var"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("[variant_negative_dict_size.parquet] "
+ "Variant metadata dictionary_size is not a valid unsigned int: -1");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ void objectPrimitiveDictionary() throws IOException {
assertThat(metadata.findField("missing_field")).isEqualTo(-1);
}

@Test
void negativeDictionarySizeRejected() {
// Header 0xC1: version=1 (bits 0-3), offset_size=4 (bits 6-7 = 11).
// The 4-byte dictionary_size 0xFFFFFFFF reads back as a negative int.
byte[] bytes = { (byte) 0xC1, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF };
assertThatThrownBy(() -> new VariantMetadata(bytes))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Variant metadata dictionary_size is not a valid unsigned int: -1");
}

@Test
void truncatedBufferRejected() {
byte[] bytes = { 0x01 }; // header only, no dictionary size bytes
Expand Down
Binary file not shown.
Binary file not shown.
98 changes: 98 additions & 0 deletions tools/simple-datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,104 @@ def _schema_elem(name, type_val=None, rep=None, num_children=None, ct=None, logi
print(" - 4 rows: BOOLEAN_TRUE, BOOLEAN_FALSE, INT32(42), short string 'hi'")
print(" - `var` is a VARIANT-annotated group of {metadata, value} binaries")

# ============================================================================
# Variant logical type — metadata with offset_size = 2
# ============================================================================
#
# Regression fixture for the metadata offset-size parse: a Variant whose
# dictionary string section exceeds 255 bytes, so the writer must use
# offset_size = 2, encoded in bits 6-7 of the metadata header. Reading that
# field from the wrong bits corrupts the whole dictionary. The `var` value is an
# object with two long-named fields so the read path actually resolves the keys.

def _v_le(value, width):
return bytes((value >> (8 * i)) & 0xFF for i in range(width))

# Two 160-byte keys → 320-byte string section (> 255), forcing offset_size = 2.
# Keys are lexicographically ordered to match the sorted-strings flag.
_v_k0 = b'a' * 160
_v_k1 = b'b' * 160

# metadata header 0x51 = version 1 | sorted (bit 4) | offset_size_minus_one=1 (bits 6-7)
_md_offsize2 = (
bytes([0x51])
+ _v_le(2, 2) # dictionary_size = 2 (2-byte LE)
+ _v_le(0, 2) + _v_le(len(_v_k0), 2) + _v_le(len(_v_k0) + len(_v_k1), 2) # offset table
+ _v_k0 + _v_k1 # string data
)

# value: object { k0: int8(5), k1: boolean true }
# header 0x02 (OBJECT, idSize=1, offsetSize=1, small); numElements=2;
# field_ids=[0,1]; field_offsets=[0,2,3]; values int8(5)=[0x0C,0x05], true=[0x04]
_val_object = bytes([
0x02, # object header
0x02, # numElements = 2
0x00, 0x01, # field_ids: id0, id1
0x00, 0x02, 0x03, # field_offsets: 0, 2, 3
0x0C, 0x05, # field 0 value: INT8 = 5
0x04, # field 1 value: BOOLEAN_TRUE
])

variant_offsize2_schema = pa.schema([
('id', pa.int32(), False),
('var', pa.struct([
pa.field('metadata', pa.binary(), False),
pa.field('value', pa.binary(), False),
]), True),
])

variant_offsize2_table = pa.table({
'id': [1],
'var': [{'metadata': _md_offsize2, 'value': _val_object}],
}, schema=variant_offsize2_schema)

pq.write_table(
variant_offsize2_table,
'core/src/test/resources/variant_metadata_offset_size2.parquet',
compression='NONE',
use_dictionary=False,
data_page_version='1.0',
)
annotate_group_as_variant('core/src/test/resources/variant_metadata_offset_size2.parquet', 'var')

print("\nGenerated variant_metadata_offset_size2.parquet:")
print(" - `var` is a VARIANT object whose metadata uses offset_size=2 (320-byte dict)")
print(" - fields: ('a'*160)=INT8 5, ('b'*160)=BOOLEAN_TRUE")

# ============================================================================
# Variant logical type — malformed metadata (negative dictionary_size)
# ============================================================================
#
# A VARIANT column whose `metadata` is deliberately corrupt: header 0xC1 selects
# offset_size=4, and the 4-byte dictionary_size 0xFFFFFFFF reads back as a
# negative int. Decoding is rejected when the row is read, exercising the
# file-name enrichment on the variant read path.

variant_bad_meta_schema = pa.schema([
('id', pa.int32(), False),
('var', pa.struct([
pa.field('metadata', pa.binary(), False),
pa.field('value', pa.binary(), False),
]), True),
])

variant_bad_meta_table = pa.table({
'id': [1],
'var': [{'metadata': bytes([0xC1, 0xFF, 0xFF, 0xFF, 0xFF]), 'value': bytes([0x00])}],
}, schema=variant_bad_meta_schema)

pq.write_table(
variant_bad_meta_table,
'core/src/test/resources/variant_negative_dict_size.parquet',
compression='NONE',
use_dictionary=False,
data_page_version='1.0',
)
annotate_group_as_variant('core/src/test/resources/variant_negative_dict_size.parquet', 'var')

print("\nGenerated variant_negative_dict_size.parquet:")
print(" - `var` metadata is corrupt (4-byte dictionary_size reads as negative)")

# ============================================================================
# Variant logical type — shredded (typed_value: int64)
# ============================================================================
Expand Down