diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml index bad770bf572f..f16440109f95 100644 --- a/embedded-tests/pom.xml +++ b/embedded-tests/pom.xml @@ -311,6 +311,19 @@ + + org.apache.druid.extensions.contrib + druid-deltalake-extensions + ${project.parent.version} + test + + + + org.apache.hadoop + hadoop-common + + + org.apache.druid.extensions.contrib kafka-emitter diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/deltalake/DeltaLakeInputSourceIngestionTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/deltalake/DeltaLakeInputSourceIngestionTest.java new file mode 100644 index 000000000000..10aa9c7783ae --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/deltalake/DeltaLakeInputSourceIngestionTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.deltalake; + +import org.apache.druid.delta.common.DeltaLakeDruidModule; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.http.SqlTaskStatus; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.indexing.Resources; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.junit.jupiter.api.Test; + +import java.io.File; + +/** + * End-to-end ingestion test for the Delta Lake input source running inside an embedded Druid cluster. + * Modeled on the Iceberg embedded test ({@code IcebergRestCatalogIngestionTest}): it ingests an external + * table with an MSQ {@code INSERT ... EXTERN(...)} SQL statement and verifies the result over the cluster. + * Unlike Iceberg, Delta needs no catalog or testcontainer since it reads directly from a filesystem path. + * + *

The table has two Parquet files of 2000 rows each (4000 rows total). Because each file exceeds the + * Delta kernel's default batch size of 1024 rows, this exercises the per-file batch-drain path in + * {@code DeltaInputSourceReader} that regressed in + * GH-18606, where the reader returned only the + * first 1024 rows of each file ({@code 1024 * 2 = 2048} instead of 4000). This is the integration-level + * counterpart of the unit regression test {@code DeltaInputSourceTest.BatchDrainRegressionTests}. + */ +public class DeltaLakeInputSourceIngestionTest extends EmbeddedClusterTestBase +{ + /** + * Delta table with 2 Parquet files x 2000 rows = 4000 rows total, columns {@code id} (long) and + * {@code name} (string). Copied from the {@code druid-deltalake-extensions} test resources + * ({@code large-row-group-table}). + */ + private static final String DELTA_TABLE_RESOURCE = "delta/large-row-group-table"; + private static final int EXPECTED_ROW_COUNT = 4000; + + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer() + .setServerMemory(300_000_000L) + .addProperty("druid.worker.capacity", "2"); + private final EmbeddedBroker broker = new EmbeddedBroker(); + + @Override + protected EmbeddedDruidCluster createCluster() + { + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .addExtension(DeltaLakeDruidModule.class) + .addServer(overlord) + .addServer(coordinator) + .addServer(indexer) + .addServer(broker) + .addServer(new EmbeddedHistorical()); + } + + @Test + public void testIngestAllRowsFromDeltaTable() + { + final EmbeddedMSQApis msqApis = new EmbeddedMSQApis(cluster, overlord); + + final File deltaTableDir = Resources.getFileForResource(DELTA_TABLE_RESOURCE); + // Escape backslashes so a Windows path stays valid inside the JSON input source spec. + final String tablePath = StringUtils.replace(deltaTableDir.getAbsolutePath(), "\\", "\\\\"); + + final String sql = StringUtils.format( + "INSERT INTO %s\n" + + "SELECT\n" + // The 'id' column holds POSIX seconds; convert to a millisecond timestamp. + + " MILLIS_TO_TIMESTAMP(\"id\" * 1000) AS __time,\n" + + " \"name\"\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"delta\",\"tablePath\":\"%s\"}',\n" + // EXTERN requires a non-null inputFormat, but the Delta input source reads Parquet via the + // Delta kernel and ignores it (DeltaInputSource.needsFormat() == false). This value is unused. + + " '{\"type\":\"json\"}',\n" + + " '[{\"type\":\"long\",\"name\":\"id\"},{\"type\":\"string\",\"name\":\"name\"}]'\n" + + " )\n" + + ")\n" + + "PARTITIONED BY DAY", + dataSource, + tablePath + ); + + final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql); + cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord); + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + + // The core regression assertion: all 4000 rows must be ingested. Before the GH-18606 fix this + // returned 1024 * 2 = 2048 because only the first batch of each Parquet file was read. + cluster.callApi().verifySqlQuery( + "SELECT COUNT(*) FROM %s", + dataSource, + String.valueOf(EXPECTED_ROW_COUNT) + ); + + // Secondary sanity check on timestamp parsing and value range: the 'id' column (used as the POSIX + // timestamp) has a documented min of 0 and max of 3999, which should bound __time after ingestion. + // Completeness across both files is guaranteed by the COUNT(*) assertion above, not by these bounds. + cluster.callApi().verifySqlQuery( + "SELECT MIN(__time), MAX(__time) FROM %s", + dataSource, + "1970-01-01T00:00:00.000Z,1970-01-01T01:06:39.000Z" + ); + } +} diff --git a/embedded-tests/src/test/resources/delta/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc b/embedded-tests/src/test/resources/delta/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc new file mode 100644 index 000000000000..aa967daecef7 Binary files /dev/null and b/embedded-tests/src/test/resources/delta/large-row-group-table/.part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet.crc differ diff --git a/embedded-tests/src/test/resources/delta/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc b/embedded-tests/src/test/resources/delta/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc new file mode 100644 index 000000000000..a46c186eed7b Binary files /dev/null and b/embedded-tests/src/test/resources/delta/large-row-group-table/.part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet.crc differ diff --git a/embedded-tests/src/test/resources/delta/large-row-group-table/_delta_log/.00000000000000000000.json.crc b/embedded-tests/src/test/resources/delta/large-row-group-table/_delta_log/.00000000000000000000.json.crc new file mode 100644 index 000000000000..7e765da1cd48 Binary files /dev/null and b/embedded-tests/src/test/resources/delta/large-row-group-table/_delta_log/.00000000000000000000.json.crc differ diff --git a/embedded-tests/src/test/resources/delta/large-row-group-table/_delta_log/00000000000000000000.json b/embedded-tests/src/test/resources/delta/large-row-group-table/_delta_log/00000000000000000000.json new file mode 100644 index 000000000000..9e92f4a42fee --- /dev/null +++ b/embedded-tests/src/test/resources/delta/large-row-group-table/_delta_log/00000000000000000000.json @@ -0,0 +1,5 @@ +{"commitInfo":{"timestamp":1781690365208,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputRows":"4000","numOutputBytes":"36263"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"f2a1da56-9880-474d-80cb-520430c4d221"}} +{"metaData":{"id":"c1c6ec87-61f6-4ca9-8b67-2edd4a2e6acb","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1781690363342}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"add":{"path":"part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet","partitionValues":{},"size":18078,"modificationTime":1781690364771,"dataChange":true,"stats":"{\"numRecords\":2002,\"minValues\":{\"id\":1,\"name\":\"name_1\"},\"maxValues\":{\"id\":3995,\"name\":\"name_995\"},\"nullCount\":{\"id\":0,\"name\":0}}"}} +{"add":{"path":"part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet","partitionValues":{},"size":18185,"modificationTime":1781690364771,"dataChange":true,"stats":"{\"numRecords\":1998,\"minValues\":{\"id\":0,\"name\":\"name_0\"},\"maxValues\":{\"id\":3999,\"name\":\"name_999\"},\"nullCount\":{\"id\":0,\"name\":0}}"}} diff --git a/embedded-tests/src/test/resources/delta/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet b/embedded-tests/src/test/resources/delta/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet new file mode 100644 index 000000000000..ba8302a2741f Binary files /dev/null and b/embedded-tests/src/test/resources/delta/large-row-group-table/part-00000-42349806-104f-42a0-a6fe-5397f37d29d8-c000.snappy.parquet differ diff --git a/embedded-tests/src/test/resources/delta/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet b/embedded-tests/src/test/resources/delta/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet new file mode 100644 index 000000000000..c0d53dde4076 Binary files /dev/null and b/embedded-tests/src/test/resources/delta/large-row-group-table/part-00001-42b5d278-2c32-4094-af60-5eaca2f7ba03-c000.snappy.parquet differ