diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java index 672a126f7c46..6acbb095f3bb 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java @@ -95,6 +95,13 @@ private static class DeltaInputSourceIterator implements CloseableIterator> filteredColumnarBatchIterators; + // Keep a reference to the current file's batch iterator so we drain ALL + // its batches before advancing to the next file. + // Bug fix for https://github.com/apache/druid/issues/18606: + // the original code used a local variable for filteredBatchIterator which + // was discarded on return, causing only the first batch (1024 rows) of each + // file to be read. + private io.delta.kernel.utils.CloseableIterator currentFileIterator = null; private io.delta.kernel.utils.CloseableIterator currentBatch = null; private final InputRowSchema inputRowSchema; @@ -111,20 +118,34 @@ public DeltaInputSourceIterator( public boolean hasNext() { while (currentBatch == null || !currentBatch.hasNext()) { - if (!filteredColumnarBatchIterators.hasNext()) { - return false; // No more batches or records to read! - } - - final io.delta.kernel.utils.CloseableIterator filteredBatchIterator = - filteredColumnarBatchIterators.next(); - - while (filteredBatchIterator.hasNext()) { - final FilteredColumnarBatch nextBatch = filteredBatchIterator.next(); + // Drain remaining batches from the current file before moving to the next. + while (currentFileIterator != null && currentFileIterator.hasNext()) { + final FilteredColumnarBatch nextBatch = currentFileIterator.next(); currentBatch = nextBatch.getRows(); if (currentBatch.hasNext()) { return true; } } + + // Advance to the next file. + if (!filteredColumnarBatchIterators.hasNext()) { + return false; + } + // Close the drained file iterator before overwriting it. Each iterator from + // Scan.transformPhysicalData() owns an underlying Parquet reader/file handle; + // not closing it here would leak a handle per completed file on multi-file + // tables (only the last and the never-started iterators are closed in close()). + // hasNext() cannot throw checked exceptions, so wrap like the rest of this + // extension (see DeltaInputSource). + if (currentFileIterator != null) { + try { + currentFileIterator.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + currentFileIterator = filteredColumnarBatchIterators.next(); } return true; } @@ -146,8 +167,10 @@ public void close() throws IOException if (currentBatch != null) { currentBatch.close(); } - - if (filteredColumnarBatchIterators.hasNext()) { + if (currentFileIterator != null) { + currentFileIterator.close(); + } + while (filteredColumnarBatchIterators.hasNext()) { filteredColumnarBatchIterators.next().close(); } } diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java index cbbcaefb3ceb..6a689f31df59 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java @@ -439,6 +439,46 @@ private static List readAllRows(InputSourceReader reader) throws IOExc return rows; } + /** + * Regression test for https://github.com/apache/druid/issues/18606. + * + * {@link DeltaInputSourceReader.DeltaInputSourceIterator} used a local variable for the + * per-file {@code CloseableIterator}. When {@code hasNext()} returned + * after the first non-empty batch of a file, that iterator went out of scope. The next + * {@code hasNext()} call advanced to the next file, skipping all remaining batches of the + * current file. With the Delta kernel default batch size of 1024 rows this produced exactly + * {@code 1024 * numFiles} rows regardless of actual file size. + * + * Test table: 2 Parquet files x 2000 rows = 4000 rows total. + * Without the fix: 1024 x 2 = 2048 rows. + * With the fix: 4000 rows. + */ + public static class BatchDrainRegressionTests + { + @Test + public void testAllRowsReturnedWhenFileExceedsOneBatch() throws IOException + { + final DeltaInputSource deltaInputSource = new DeltaInputSource( + LargeRowGroupDeltaTable.DELTA_TABLE_PATH, + null, + null, + null + ); + final InputSourceReader inputSourceReader = deltaInputSource.reader( + LargeRowGroupDeltaTable.SCHEMA, + null, + null + ); + final List rows = readAllRows(inputSourceReader); + Assert.assertEquals( + "Expected all rows to be read. " + + "If this fails with " + (1024 * 2) + " rows, the per-file batch drain bug (GH-18606) has regressed.", + LargeRowGroupDeltaTable.EXPECTED_ROW_COUNT, + rows.size() + ); + } + } + private static void validateRows( final List> expectedRows, final List actualReadRows, diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/LargeRowGroupDeltaTable.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/LargeRowGroupDeltaTable.java new file mode 100644 index 000000000000..c9a1966af153 --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/LargeRowGroupDeltaTable.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; + +/** + * Descriptor for a Delta table with 2 Parquet files × 2000 rows = 4000 rows total. + * + * Each file has more than 1024 rows, ensuring the Delta kernel reads more than one + * batch per file. Used as a regression test for GH-18606 where + * {@link DeltaInputSourceReader} only returned the first 1024 rows per file. + * + * Generated by src/test/resources/create_delta_table.py (large-row-group-table). + */ +public class LargeRowGroupDeltaTable +{ + public static final String DELTA_TABLE_PATH = + "src/test/resources/large-row-group-table"; + + public static final int EXPECTED_ROW_COUNT = 4000; + + public static final InputRowSchema SCHEMA = new InputRowSchema( + new TimestampSpec("id", "posix", null), + new DimensionsSpec(ImmutableList.of( + new LongDimensionSchema("id"), + new StringDimensionSchema("name") + )), + ColumnsFilter.all() + ); +} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc new file mode 100644 index 000000000000..aa967daecef7 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc new file mode 100644 index 000000000000..a46c186eed7b Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/.00000000000000000000.json.crc b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 000000000000..7e765da1cd48 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/.00000000000000000000.json.crc differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/00000000000000000000.json b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..9e92f4a42fee --- /dev/null +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/_delta_log/00000000000000000000.json @@ -0,0 +1,5 @@ +{"commitInfo":{"timestamp":1781690365208,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputRows":"4000","numOutputBytes":"36263"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"f2a1da56-9880-474d-80cb-520430c4d221"}} +{"metaData":{"id":"c1c6ec87-61f6-4ca9-8b67-2edd4a2e6acb","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1781690363342}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"add":{"path":"part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet","partitionValues":{},"size":18078,"modificationTime":1781690364771,"dataChange":true,"stats":"{\"numRecords\":2002,\"minValues\":{\"id\":1,\"name\":\"name_1\"},\"maxValues\":{\"id\":3995,\"name\":\"name_995\"},\"nullCount\":{\"id\":0,\"name\":0}}"}} +{"add":{"path":"part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet","partitionValues":{},"size":18185,"modificationTime":1781690364771,"dataChange":true,"stats":"{\"numRecords\":1998,\"minValues\":{\"id\":0,\"name\":\"name_0\"},\"maxValues\":{\"id\":3999,\"name\":\"name_999\"},\"nullCount\":{\"id\":0,\"name\":0}}"}} diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet new file mode 100644 index 000000000000..ba8302a2741f Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet differ diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet new file mode 100644 index 000000000000..c0d53dde4076 Binary files /dev/null and b/extensions-contrib/druid-deltalake-extensions/src/test/resources/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet differ