diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageCoalesceTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageCoalesceTest.java new file mode 100644 index 000000000000..866959cd9843 --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageCoalesceTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.query; + +import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.Druids; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.server.metrics.LatchableEmitter; +import org.apache.druid.server.metrics.StorageMonitor; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedCoordinator; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.EmbeddedHistorical; +import org.apache.druid.testing.embedded.EmbeddedIndexer; +import org.apache.druid.testing.embedded.EmbeddedOverlord; +import org.apache.druid.testing.embedded.EmbeddedRouter; +import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; +import org.apache.druid.testing.embedded.minio.MinIOStorageResource; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +/** + * Exercises range-read coalescing end to end: on-demand partial loads over MinIO S3 deep storage, asserting the + * {@link StorageMonitor} read metrics reflect fewer requests than files loaded. + */ +class QueryVirtualStorageCoalesceTest extends EmbeddedClusterTestBase +{ + // Cache is sized to hold the whole dataset so a query's columns load once and eviction doesn't muddy the counts. + private static final long CACHE_SIZE = HumanReadableBytes.parse("64MiB"); + private static final long MAX_SIZE = HumanReadableBytes.parse("1GiB"); + private static final long ESTIMATE_SIZE = HumanReadableBytes.parse("1MiB"); + + private final EmbeddedBroker broker = new EmbeddedBroker(); + private final EmbeddedIndexer indexer = new EmbeddedIndexer(); + private final EmbeddedOverlord overlord = new EmbeddedOverlord(); + private final EmbeddedHistorical historical = new EmbeddedHistorical(); + private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator(); + private final EmbeddedRouter router = new EmbeddedRouter(); + private final MinIOStorageResource storageResource = new MinIOStorageResource(); + + @Override + public EmbeddedDruidCluster createCluster() + { + historical.setServerMemory(500_000_000) + .addProperty("druid.segmentCache.virtualStorage", "true") + .addProperty("druid.segmentCache.virtualStoragePartialDownloadsEnabled", "true") + .addProperty("druid.segmentCache.virtualStorageMetadataReservationEstimate", String.valueOf(ESTIMATE_SIZE)) + .addProperty("druid.segmentCache.virtualStorageLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors())) + .addBeforeStartHook( + (cluster, self) -> self.addProperty( + "druid.segmentCache.locations", + StringUtils.format( + "[{\"path\":\"%s\",\"maxSize\":\"%s\"}]", + cluster.getTestFolder().newFolder().getAbsolutePath(), + CACHE_SIZE + ) + ) + ) + .addProperty("druid.server.maxSize", String.valueOf(MAX_SIZE)); + + broker.setServerMemory(200_000_000); + coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always"); + overlord.addProperty("druid.manager.segments.useIncrementalCache", "always") + .addProperty("druid.manager.segments.pollDuration", "PT0.1s"); + indexer.setServerMemory(400_000_000) + .addProperty("druid.worker.capacity", "4") + .addProperty("druid.processing.numThreads", "3") + .addProperty("druid.segment.handoff.pollDuration", "PT0.1s"); + + return EmbeddedDruidCluster + .withEmbeddedDerbyAndZookeeper() + .useLatchableEmitter() + .useDefaultTimeoutForLatchableEmitter(20) + .addResource(storageResource) + // segments must be pushed unzipped (rangeable) and in V10 format for partial reads to engage + .addCommonProperty("druid.storage.zip", "false") + .addCommonProperty("druid.indexer.task.buildV10", "true") + .addCommonProperty("druid.monitoring.emissionPeriod", "PT1s") + .addServer(coordinator) + .addServer(overlord) + .addServer(indexer) + .addServer(historical) + .addServer(broker) + .addServer(router); + } + + @BeforeAll + void loadData() throws IOException + { + final EmbeddedMSQApis msqApis = new EmbeddedMSQApis(cluster, overlord); + dataSource = "wiki-" + IdUtils.getRandomId(); + WikipediaVirtualStorageTable.ingestHourly(cluster, msqApis, broker, dataSource); + // waitUntilSegmentsLoad only covers the historical mount; wait for the broker schema too so column queries resolve + cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); + } + + @Override + protected void refreshDatasourceName() + { + // set up once for all tests + } + + @Test + void testMultiColumnQueryCoalescesReads() + { + // no baseline wait here: nothing has loaded on the historical yet (partial loads only happen on query), so the + // VSF metrics don't exist until the query below drives them + final LatchableEmitter emitter = historical.latchableEmitter(); + emitter.flush(); + + // Native scan, not SQL: the broker only knows __time for a metadata-mounted partial segment, so SQL can't resolve + // data columns. A native query names its columns and the historical resolves + loads them on demand. Touching + // several columns per segment lets contiguous files coalesce into far fewer range reads. + final ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(new TableDataSource(dataSource)) + .eternityInterval() + .columns("channel", "countryName", "page", "user", "added", "deleted", "delta") + .limit(100) + .build(); + final String result = cluster.callApi().onAnyBroker(b -> b.submitNativeQuery(query)); + Assertions.assertFalse(result.isEmpty()); + + emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_READ_COUNT)); + final long reads = emitter.getMetricEventLongSum(StorageMonitor.VSF_READ_COUNT); + final long loads = emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT); + + Assertions.assertTrue(reads > 0, "expected some range reads, got " + reads); + // the coalescing signal: multiple files land per read, so files loaded exceeds reads issued + Assertions.assertTrue(loads > reads, "expected loads(" + loads + ") > reads(" + reads + ")"); + + // gap-fill is emitted and bounded by the wire bytes; it quantifies over-fetch from coalescing + emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_READ_GAP_BYTES)); + final long gapBytes = emitter.getMetricEventLongSum(StorageMonitor.VSF_READ_GAP_BYTES); + final long readBytes = emitter.getMetricEventLongSum(StorageMonitor.VSF_READ_BYTES); + Assertions.assertTrue(gapBytes >= 0, "gap-fill bytes should be non-negative, got " + gapBytes); + Assertions.assertTrue(gapBytes <= readBytes, "gap-fill(" + gapBytes + ") cannot exceed read bytes(" + readBytes + ")"); + } + +} diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java index b9d78d1fa6c7..0311c0fa8f13 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/QueryVirtualStorageTest.java @@ -20,20 +20,16 @@ package org.apache.druid.testing.embedded.query; import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.data.input.impl.LocalInputSource; -import org.apache.druid.indexer.TaskState; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.msq.indexing.report.MSQTaskReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; import org.apache.druid.query.DefaultQueryMetrics; -import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.http.ClientSqlQuery; import org.apache.druid.server.coordinator.stats.Stats; import org.apache.druid.server.metrics.LatchableEmitter; import org.apache.druid.server.metrics.StorageMonitor; -import org.apache.druid.sql.calcite.planner.Calcites; import org.apache.druid.sql.http.GetQueryReportResponse; import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedCoordinator; @@ -44,20 +40,15 @@ import org.apache.druid.testing.embedded.EmbeddedRouter; import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase; import org.apache.druid.testing.embedded.minio.MinIOStorageResource; -import org.apache.druid.testing.embedded.msq.EmbeddedDurableShuffleStorageTest; import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.shaded.com.google.common.io.ByteStreams; import javax.annotation.Nullable; -import java.io.File; import java.io.IOException; -import java.nio.file.Files; -import java.util.Collections; import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; @@ -387,51 +378,10 @@ private String createTestDatasourceName() return "wiki-" + IdUtils.getRandomId(); } - /** - * Stolen from {@link EmbeddedDurableShuffleStorageTest#loadWikipediaTable()} but with hourly granularity and no - * durable shuffle location - */ private void loadWikiData() throws IOException { - final File tmpDir = cluster.getTestFolder().newFolder(); - final File wikiFile = new File(tmpDir, "wiki.gz"); - - ByteStreams.copy( - DruidProcessingConfigTest.class.getResourceAsStream("/wikipedia/wikiticker-2015-09-12-sampled.json.gz"), - Files.newOutputStream(wikiFile.toPath()) - ); - final String sql = StringUtils.format( - "SET waitUntilSegmentsLoad = TRUE;\n" - + "REPLACE INTO \"%s\" OVERWRITE ALL\n" - + "SELECT\n" - + " TIME_PARSE(\"time\") AS __time,\n" - + " channel,\n" - + " countryName,\n" - + " page,\n" - + " \"user\",\n" - + " added,\n" - + " deleted,\n" - + " delta\n" - + "FROM TABLE(\n" - + " EXTERN(\n" - + " %s,\n" - + " '{\"type\":\"json\"}',\n" - + " '[{\"name\":\"isRobot\",\"type\":\"string\"},{\"name\":\"channel\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"},{\"name\":\"flags\",\"type\":\"string\"},{\"name\":\"isUnpatrolled\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"diffUrl\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"long\"},{\"name\":\"comment\",\"type\":\"string\"},{\"name\":\"commentLength\",\"type\":\"long\"},{\"name\":\"isNew\",\"type\":\"string\"},{\"name\":\"isMinor\",\"type\":\"string\"},{\"name\":\"delta\",\"type\":\"long\"},{\"name\":\"isAnonymous\",\"type\":\"string\"},{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"deltaBucket\",\"type\":\"long\"},{\"name\":\"deleted\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"},{\"name\":\"cityName\",\"type\":\"string\"},{\"name\":\"countryName\",\"type\":\"string\"},{\"name\":\"regionIsoCode\",\"type\":\"string\"},{\"name\":\"metroCode\",\"type\":\"long\"},{\"name\":\"countryIsoCode\",\"type\":\"string\"},{\"name\":\"regionName\",\"type\":\"string\"}]'\n" - + " )\n" - + " )\n" - + "PARTITIONED BY HOUR\n" - + "CLUSTERED BY channel", - dataSource, - Calcites.escapeStringLiteral( - broker.bindings() - .jsonMapper() - .writeValueAsString(new LocalInputSource(null, null, Collections.singletonList(wikiFile), null)) - ) - ); - - final MSQTaskReportPayload payload = msqApis.runTaskSqlAndGetReport(sql); - Assertions.assertEquals(TaskState.SUCCESS, payload.getStatus().getStatus()); + final MSQTaskReportPayload payload = + WikipediaVirtualStorageTable.ingestHourly(cluster, msqApis, broker, dataSource); Assertions.assertEquals(24, payload.getStatus().getSegmentLoadWaiterStatus().getTotalSegments()); - Assertions.assertNull(payload.getStatus().getErrorReport()); } } diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/WikipediaVirtualStorageTable.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/WikipediaVirtualStorageTable.java new file mode 100644 index 000000000000..64588e9f70c3 --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/query/WikipediaVirtualStorageTable.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.query; + +import org.apache.druid.data.input.impl.LocalInputSource; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.query.DruidProcessingConfigTest; +import org.apache.druid.sql.calcite.planner.Calcites; +import org.apache.druid.testing.embedded.EmbeddedBroker; +import org.apache.druid.testing.embedded.EmbeddedDruidCluster; +import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis; +import org.junit.jupiter.api.Assertions; +import org.testcontainers.shaded.com.google.common.io.ByteStreams; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Collections; + +/** + * Shared wikipedia ingest for the virtual-storage query tests. Loads the bundled {@code wikiticker} sample via MSQ, + * partitioned by hour and clustered by channel, so the resulting datasource has many small segments with several + * columns each - enough for partial loads to have something to coalesce. + */ +public final class WikipediaVirtualStorageTable +{ + private WikipediaVirtualStorageTable() + { + } + + /** + * Ingest the sample into {@code dataSource} and return the task report so callers can make their own assertions + * (e.g. segment count). Fails if the ingest task does not succeed. + */ + public static MSQTaskReportPayload ingestHourly( + EmbeddedDruidCluster cluster, + EmbeddedMSQApis msqApis, + EmbeddedBroker broker, + String dataSource + ) throws IOException + { + final File wikiFile = new File(cluster.getTestFolder().newFolder(), "wiki.gz"); + ByteStreams.copy( + DruidProcessingConfigTest.class.getResourceAsStream("/wikipedia/wikiticker-2015-09-12-sampled.json.gz"), + Files.newOutputStream(wikiFile.toPath()) + ); + final String sql = StringUtils.format( + "SET waitUntilSegmentsLoad = TRUE;\n" + + "REPLACE INTO \"%s\" OVERWRITE ALL\n" + + "SELECT\n" + + " TIME_PARSE(\"time\") AS __time,\n" + + " channel,\n" + + " countryName,\n" + + " page,\n" + + " \"user\",\n" + + " added,\n" + + " deleted,\n" + + " delta\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " %s,\n" + + " '{\"type\":\"json\"}',\n" + + " '[{\"name\":\"isRobot\",\"type\":\"string\"},{\"name\":\"channel\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"},{\"name\":\"flags\",\"type\":\"string\"},{\"name\":\"isUnpatrolled\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"diffUrl\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"long\"},{\"name\":\"comment\",\"type\":\"string\"},{\"name\":\"commentLength\",\"type\":\"long\"},{\"name\":\"isNew\",\"type\":\"string\"},{\"name\":\"isMinor\",\"type\":\"string\"},{\"name\":\"delta\",\"type\":\"long\"},{\"name\":\"isAnonymous\",\"type\":\"string\"},{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"deltaBucket\",\"type\":\"long\"},{\"name\":\"deleted\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"},{\"name\":\"cityName\",\"type\":\"string\"},{\"name\":\"countryName\",\"type\":\"string\"},{\"name\":\"regionIsoCode\",\"type\":\"string\"},{\"name\":\"metroCode\",\"type\":\"long\"},{\"name\":\"countryIsoCode\",\"type\":\"string\"},{\"name\":\"regionName\",\"type\":\"string\"}]'\n" + + " )\n" + + " )\n" + + "PARTITIONED BY HOUR\n" + + "CLUSTERED BY channel", + dataSource, + Calcites.escapeStringLiteral( + broker.bindings() + .jsonMapper() + .writeValueAsString(new LocalInputSource(null, null, Collections.singletonList(wikiFile), null)) + ) + ); + + final MSQTaskReportPayload payload = msqApis.runTaskSqlAndGetReport(sql); + Assertions.assertEquals(TaskState.SUCCESS, payload.getStatus().getStatus()); + Assertions.assertNull(payload.getStatus().getErrorReport()); + return payload; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java index cfe5f6f4d683..8a7aa3c6aa9f 100644 --- a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java @@ -59,6 +59,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -272,6 +273,124 @@ public boolean isFullyDownloaded() return fileMapper.isFullyDownloaded(); } + /** + * Logical column name -> primary segment-internal (smoosh) file name for the base table namespace, including the + * {@code __time} rename and the same column-descriptor filtering as {@link #buildColumnSuppliers}. Columns that + * resolve to a constant/synthesized column with no downloadable file (e.g. a constant {@code __time}) are omitted. + * Used by {@link PartialQueryableIndexCursorFactory} to coalesce the on-demand pre-fetch of a query's columns. + */ + public Map getBaseColumnFileNames() + { + return columnFileNames( + baseProjectionMetadata.getSchema().getTimeColumnName(), + baseProjectionMetadata.getSchema().getColumnNames(), + column -> Projections.getProjectionSegmentInternalFileName(baseProjectionMetadata.getSchema(), column) + ); + } + + /** + * Logical column name -> primary smoosh file name for an aggregate projection's namespace. Empty for an unknown + * projection. See {@link #getBaseColumnFileNames}. + */ + public Map getProjectionColumnFileNames(String projectionName) + { + final ProjectionMetadata spec = projectionSpecs.get(projectionName); + if (spec == null) { + return Map.of(); + } + return columnFileNames( + spec.getSchema().getTimeColumnName(), + spec.getSchema().getColumnNames(), + column -> Projections.getProjectionSegmentInternalFileName(spec.getSchema(), column) + ); + } + + /** + * Logical column name -> primary smoosh file name for a single cluster group's namespace. Empty for a non-clustered + * segment. See {@link #getBaseColumnFileNames}. + */ + public Map getClusterGroupColumnFileNames(TableClusterGroupSpec group) + { + if (clusteredBaseSummary == null) { + return Map.of(); + } + return columnFileNames( + clusteredBaseSummary.getTimeColumnName(), + clusteredBaseSummary.getGroupColumnNames(), + column -> Projections.getClusterGroupSegmentInternalFileName(group.getClusteringValueIds(), column) + ); + } + + /** + * Registered column name -> primary smoosh file name, built from {@link #resolveColumnFiles}. The key is the name the + * column is exposed under (the {@code __time} rename applied), which is what query callers look it up by. + */ + private Map columnFileNames( + @Nullable String timeColumnName, + List columnNames, + Function fileNameFn + ) + { + final Map returnNames = new LinkedHashMap<>(); + for (ColumnFile cf : resolveColumnFiles(timeColumnName, columnNames, fileNameFn)) { + returnNames.put(cf.registeredName(), cf.smooshName()); + } + return returnNames; + } + + /** + * Resolve a namespace's logical columns to their primary segment-internal (smoosh) files: skip columns with no + * {@link ColumnDescriptor}, and register the time column under {@code __time} when the schema names it differently. + * This is the single source of truth shared by {@link #buildColumnSuppliers} (which builds one lazy supplier per + * entry) and {@link #columnFileNames} (which the cursor factory uses to coalesce the on-demand pre-fetch), so the + * mapping a pre-fetch plans against cannot drift from the file a later {@code getColumnHolder} maps. + */ + private List resolveColumnFiles( + @Nullable String timeColumnName, + List columnNames, + Function fileNameFn + ) + { + final boolean renameTime = !ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName); + final List out = new ArrayList<>(); + for (String column : columnNames) { + final String smooshName = fileNameFn.apply(column); + if (!metadata.getColumnDescriptors().containsKey(smooshName)) { + continue; + } + final String registeredName = (column.equals(timeColumnName) && renameTime) + ? ColumnHolder.TIME_COLUMN_NAME + : column; + out.add(new ColumnFile(column, registeredName, smooshName)); + } + return out; + } + + /** + * A resolved column: its logical {@code column} name, the {@code registeredName} it is exposed under (after the + * {@code __time} rename), and its primary {@code smooshName} file. + */ + private record ColumnFile(String column, String registeredName, String smooshName) + { + } + + /** + * Plan coalesced range reads for a set of segment-internal (smoosh) file names; see + * {@link PartialSegmentFileMapperV10#planDownloadRuns}. + */ + public List planDownloadRuns(Set smooshNames) + { + return fileMapper.planDownloadRuns(smooshNames); + } + + /** + * Fetch one coalesced range read; see {@link PartialSegmentFileMapperV10#fetchRun}. + */ + public void fetchDownloadRun(PartialSegmentFileMapperV10.DownloadRun run) throws IOException + { + fileMapper.fetchRun(run); + } + @Override public Map getDimensionHandlers() { @@ -555,35 +674,24 @@ private Map> buildColumnSuppliers( Map> parentColumns ) { - final boolean renameTime = !ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName); final Map> columns = new LinkedHashMap<>(); - for (String column : columnNames) { - final String smooshName = fileNameFn.apply(column); - final ColumnDescriptor columnDescriptor = metadata.getColumnDescriptors().get(smooshName); - if (columnDescriptor == null) { - continue; - } - - final String internedColumnName = SmooshedFileMapper.STRING_INTERNER.intern(column); + for (ColumnFile cf : resolveColumnFiles(timeColumnName, columnNames, fileNameFn)) { + final ColumnDescriptor columnDescriptor = metadata.getColumnDescriptors().get(cf.smooshName()); final Supplier columnSupplier = Suppliers.memoize(() -> { try { - final ByteBuffer colBuffer = fileMapper.mapFile(smooshName); + final ByteBuffer colBuffer = fileMapper.mapFile(cf.smooshName()); final BaseColumnHolder parentColumn = - parentColumns.containsKey(column) ? parentColumns.get(column).get() : null; + parentColumns.containsKey(cf.column()) ? parentColumns.get(cf.column()).get() : null; return columnDescriptor.read(colBuffer, columnConfig, fileMapper, parentColumn); } catch (IOException e) { - throw DruidException.defensive(e, "Failed to load column[%s]", smooshName); + throw DruidException.defensive(e, "Failed to load column[%s]", cf.smooshName()); } }); - columns.put(internedColumnName, columnSupplier); - - if (column.equals(timeColumnName) && renameTime) { - columns.put(ColumnHolder.TIME_COLUMN_NAME, columns.get(column)); - columns.remove(column); - } + // register under the exposed name (the __time rename already applied by resolveColumnFiles) + columns.put(SmooshedFileMapper.STRING_INTERNER.intern(cf.registeredName()), columnSupplier); } if (timeColumnName == null) { diff --git a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java index 3704c055ff23..f6c7cca82d90 100644 --- a/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/PartialQueryableIndexCursorFactory.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.file.PartialSegmentFileMapperV10; import org.apache.druid.segment.projections.ClusterGroupQueryPlan; import org.apache.druid.segment.projections.Projections; import org.apache.druid.segment.projections.QueryableProjection; @@ -38,9 +39,13 @@ import javax.annotation.Nullable; import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -113,7 +118,15 @@ public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) // query's required columns. final QueryableIndex rowSelector = matched != null ? matched.getRowSelector() : index; final String bundleName = matched != null ? matched.getName() : Projections.BASE_TABLE_PROJECTION_NAME; - final DownloadBundle bundle = new DownloadBundle(bundleName, rowSelector, requiredColumns(rowSelector, matched, spec)); + final Map columnToFile = matched != null + ? index.getProjectionColumnFileNames(matched.getName()) + : index.getBaseColumnFileNames(); + final DownloadBundle bundle = new DownloadBundle( + bundleName, + rowSelector, + requiredColumns(rowSelector, matched, spec), + columnToFile + ); return buildAsyncCursorHolder(List.of(bundle), () -> delegate.makeCursorHolderForProjection(spec, matched)); } @@ -143,7 +156,8 @@ private AsyncCursorHolder makeClusteredCursorHolderAsync(CursorBuildSpec spec) new DownloadBundle( Projections.getClusterGroupBundleName(group.getClusteringValueIds()), groupIndex, - requiredColumns(groupIndex, null, groupSpec) + requiredColumns(groupIndex, null, groupSpec), + index.getClusterGroupColumnFileNames(group) ) ); } @@ -162,16 +176,42 @@ private AsyncCursorHolder buildAsyncCursorHolder(List bundles, S { final List holdReleases = new ArrayList<>(bundles.size()); try { - final List> columnDownloads = new ArrayList<>(); + final List> downloads = new ArrayList<>(); for (DownloadBundle bundle : bundles) { final BundleHoldRelease holdRelease = new BundleHoldRelease(bundleAcquirer.acquire(bundle.bundleName())); holdReleases.add(holdRelease); - // submit one materialization task per column so a multi-threaded download executor can fan them out + + // Resolve the query's required columns to their primary smoosh files, invert to file -> columns, and coalesce + // contiguous files into runs. We submit one task per run (rather than one per column) so adjacent columns are + // fetched in a single range read, while distinct runs still download concurrently on the executor. Columns + // whose file is already resident (dropped by planDownloadRuns) or that have no downloadable file + // (constant/synthesized columns) are materialized by the trailing task below. Together the run tasks and that + // task call getColumnHolder exactly once per required column here during pre-fetch, so the column is never + // deserialized lazily when the cursor is later read on a query-processing thread (which must not block on I/O). + final Map> columnsByFile = new HashMap<>(); for (String column : bundle.requiredColumns()) { - columnDownloads.add(submitColumnDownload(bundle.rowSelector(), column, holdRelease)); + final String file = bundle.columnToFile().get(column); + if (file != null) { + columnsByFile.computeIfAbsent(file, k -> new ArrayList<>()).add(column); + } + } + final Set coveredColumns = new HashSet<>(); + for (PartialSegmentFileMapperV10.DownloadRun run : index.planDownloadRuns(columnsByFile.keySet())) { + final List runColumns = new ArrayList<>(); + for (String file : run.wantedFiles()) { + runColumns.addAll(columnsByFile.getOrDefault(file, List.of())); + } + coveredColumns.addAll(runColumns); + downloads.add(submitRunDownload(bundle.rowSelector(), run, runColumns, holdRelease)); + } + + final Set materializeOnly = new LinkedHashSet<>(bundle.requiredColumns()); + materializeOnly.removeAll(coveredColumns); + if (!materializeOnly.isEmpty()) { + downloads.add(submitColumnMaterialization(bundle.rowSelector(), materializeOnly, holdRelease)); } } - final AsyncResource> downloaded = AsyncResources.collect(columnDownloads); + final AsyncResource> downloaded = AsyncResources.collect(downloads); // Canceler runs if the awaiter closes this holder before it's ready (e.g. query cancel/timeout). Close the // collected resource to cancel every column download that hasn't begun its deep-storage read yet (queued tasks @@ -228,22 +268,54 @@ public ColumnCapabilities getColumnCapabilities(String column) } /** - * Submit one column's materialization as a download task, running its body under the bundle-hold handshake (see - * {@link BundleHoldRelease#runDownloadBody}) so the hold stays alive until this body's - * {@link QueryableIndex#getColumnHolder} (and the - * {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#mapFile} it triggers) has finished. The returned - * token (the column name) is unused, the task's effect is the side effect of materializing the column into the file - * mapper. + * Submit one coalesced {@link PartialSegmentFileMapperV10.DownloadRun} as a download task: fetch the run's span in a + * single range read, then materialize the columns it covers via {@link QueryableIndex#getColumnHolder} (which finds + * the primary file bytes already resident; any nested sub-files not in the run fault in individually). The whole body + * runs under the bundle-hold handshake (see {@link BundleHoldRelease#runDownloadBody}) so the hold stays alive until + * the last {@code mapFile()} has finished. The returned token is a completion signal and is unused. + */ + private AsyncResource submitRunDownload( + QueryableIndex rowSelector, + PartialSegmentFileMapperV10.DownloadRun run, + List columns, + BundleHoldRelease holdRelease + ) + { + return bundleAcquirer.submitDownload(() -> { + holdRelease.runDownloadBody(() -> { + try { + index.fetchDownloadRun(run); + } + catch (IOException e) { + throw DruidException.defensive(e, "Failed to fetch download run for container[%d]", run.containerIndex()); + } + for (String column : columns) { + rowSelector.getColumnHolder(column); + } + }); + return "run:" + run.containerIndex(); + }); + } + + /** + * Submit a download task that only materializes columns (no deep-storage read): the columns either have no + * downloadable file (constant/synthesized columns such as a constant {@code __time}) or their file is already + * resident. Runs under the same bundle-hold handshake as {@link #submitRunDownload}. The returned token is a + * completion signal and is unused. */ - private AsyncResource submitColumnDownload( + private AsyncResource submitColumnMaterialization( QueryableIndex rowSelector, - String column, + Set columns, BundleHoldRelease holdRelease ) { return bundleAcquirer.submitDownload(() -> { - holdRelease.runDownloadBody(() -> rowSelector.getColumnHolder(column)); - return column; + holdRelease.runDownloadBody(() -> { + for (String column : columns) { + rowSelector.getColumnHolder(column); + } + }); + return "materialize"; }); } @@ -352,9 +424,16 @@ private static Closeable releasers(List holdReleases) /** * One bundle's worth of async download work: the cache-layer {@code bundleName} to mount, the {@link QueryableIndex} - * row selector whose {@code getColumnHolder} triggers the per-column downloads, and the columns to pre-fetch. + * row selector whose {@code getColumnHolder} materializes columns, the columns to pre-fetch, and the resolution of + * those columns to their primary segment-internal (smoosh) file names ({@code columnToFile}, missing the entries that + * resolve to a constant/synthesized column with no downloadable file). */ - private record DownloadBundle(String bundleName, QueryableIndex rowSelector, Set requiredColumns) + private record DownloadBundle( + String bundleName, + QueryableIndex rowSelector, + Set requiredColumns, + Map columnToFile + ) { } diff --git a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentDownloadListener.java b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentDownloadListener.java index 111c78b402c5..520de1f96351 100644 --- a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentDownloadListener.java +++ b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentDownloadListener.java @@ -34,7 +34,7 @@ public void onBytesDownloaded(long bytes) } @Override - public void onRangeRead(long bytes, long nanos) + public void onRangeRead(long bytes, long gapFillBytes, long nanos) { // no-op } @@ -52,11 +52,15 @@ public void onRangeRead(long bytes, long nanos) /** * A single deep-storage range read completed: the actual request granularity (one read may cover many files when a - * whole container is fetched at once). Measures wire bytes and latency, so it reflects the deep-storage request count - * and cost rather than how many files became resident. + * whole container is fetched at once, or when adjacent files are coalesced into one request). Measures wire bytes and + * latency, so it reflects the deep-storage request count and cost rather than how many files became resident. * - * @param bytes the number of bytes read from deep storage in this request - * @param nanos the wall-clock time the read took, in nanoseconds + * @param bytes the number of bytes read from deep storage in this request + * @param gapFillBytes the portion of {@code bytes} that was not part of a requested file: data read through only to + * coalesce adjacent requested files into one request (unrequested files spanned, plus inter-file + * padding). Zero for a single-file read or a whole-container fetch. Quantifies coalescing + * over-fetch. + * @param nanos the wall-clock time the read took, in nanoseconds */ - void onRangeRead(long bytes, long nanos); + void onRangeRead(long bytes, long gapFillBytes, long nanos); } diff --git a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java index ea4895a7f2e4..8209d0e2832e 100644 --- a/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java +++ b/processing/src/main/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10.java @@ -25,6 +25,7 @@ import com.google.common.primitives.Ints; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.ByteBufferUtils; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; @@ -46,6 +47,7 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -94,6 +96,44 @@ public class PartialSegmentFileMapperV10 implements SegmentFileMapper */ public static final String METADATA_HEADER_SUFFIX = ".header"; + /** + * Coalescing thresholds for {@link #planDownloadRuns}: the largest unwanted gap that is worth fetching to merge two + * wanted ranges into one range read, and the largest single coalesced read. Used when no explicit + * {@link CoalesceConfig} is supplied — by tests, and by the restore-from-disk constructor that has no operator config + * to consult. Running historicals pass operator-configured thresholds via {@link CoalesceConfig}. + */ + public static final long DEFAULT_COALESCE_MAX_GAP_BYTES = 1024 * 1024L; + public static final long DEFAULT_COALESCE_MAX_CHUNK_BYTES = 16 * 1024 * 1024L; + + /** + * Tunables controlling how adjacent internal-file ranges are merged into a single deep-storage range read. See + * {@link #planDownloadRuns}. {@code maxGapBytes} bounds the unwanted bytes fetched between two wanted files; + * {@code maxChunkBytes} bounds the size of one coalesced read, both so a single fetch can't balloon and so a wide + * request stays split into several runs that can be downloaded concurrently rather than collapsing into one serial + * read. + */ + public record CoalesceConfig(long maxGapBytes, long maxChunkBytes) + { + public static final CoalesceConfig DEFAULT = + new CoalesceConfig(DEFAULT_COALESCE_MAX_GAP_BYTES, DEFAULT_COALESCE_MAX_CHUNK_BYTES); + + public CoalesceConfig + { + if (maxGapBytes < 0) { + throw InvalidInput.exception( + "druid.segmentCache.virtualStorageCoalesceMaxGapBytes[%d] must be non-negative", + maxGapBytes + ); + } + if (maxChunkBytes <= 0) { + throw InvalidInput.exception( + "druid.segmentCache.virtualStorageCoalesceMaxChunkBytes[%d] must be greater than zero", + maxChunkBytes + ); + } + } + } + /** * Create (or restore) a lazy mapper for the main segment file with attached external file mappers. If persisted state * exists locally from a previous session, metadata is read from disk. Otherwise, metadata is fetched from deep @@ -107,13 +147,35 @@ public static PartialSegmentFileMapperV10 create( List externals, PartialSegmentDownloadListener downloadListener ) throws IOException + { + return create( + rangeReader, + jsonMapper, + localCacheDir, + targetFilename, + externals, + downloadListener, + CoalesceConfig.DEFAULT + ); + } + + public static PartialSegmentFileMapperV10 create( + SegmentRangeReader rangeReader, + ObjectMapper jsonMapper, + File localCacheDir, + String targetFilename, + List externals, + PartialSegmentDownloadListener downloadListener, + CoalesceConfig coalesceConfig + ) throws IOException { final PartialSegmentFileMapperV10 entryPoint = createForFile( rangeReader, jsonMapper, localCacheDir, targetFilename, - downloadListener + downloadListener, + coalesceConfig ); final Map externalMappers = new HashMap<>(); @@ -121,7 +183,7 @@ public static PartialSegmentFileMapperV10 create( for (String filename : externals) { externalMappers.put( filename, - createForFile(rangeReader, jsonMapper, localCacheDir, filename, downloadListener) + createForFile(rangeReader, jsonMapper, localCacheDir, filename, downloadListener, coalesceConfig) ); } } @@ -144,6 +206,19 @@ static PartialSegmentFileMapperV10 createForFile( String targetFilename, PartialSegmentDownloadListener downloadListener ) throws IOException + { + return createForFile(rangeReader, jsonMapper, localCacheDir, targetFilename, downloadListener, CoalesceConfig.DEFAULT); + } + + @VisibleForTesting + static PartialSegmentFileMapperV10 createForFile( + SegmentRangeReader rangeReader, + ObjectMapper jsonMapper, + File localCacheDir, + String targetFilename, + PartialSegmentDownloadListener downloadListener, + CoalesceConfig coalesceConfig + ) throws IOException { FileUtils.mkdirp(localCacheDir); final File headerFile = new File(localCacheDir, targetFilename + METADATA_HEADER_SUFFIX); @@ -185,7 +260,8 @@ static PartialSegmentFileMapperV10 createForFile( targetFilename, localCacheDir, bitmapBuffer, - downloadListener + downloadListener, + coalesceConfig ); // bitmap-vs-container repair pre-pass: if the bitmap claims a file is downloaded but its container file is @@ -266,6 +342,7 @@ static PartialSegmentFileMapperV10 createForFile( private final AtomicLong downloadedBytes = new AtomicLong(0); private final AtomicBoolean closed = new AtomicBoolean(false); private final PartialSegmentDownloadListener downloadListener; + private final CoalesceConfig coalesceConfig; private PartialSegmentFileMapperV10( SegmentFileMetadata metadata, @@ -274,7 +351,8 @@ private PartialSegmentFileMapperV10( String targetFilename, File localCacheDir, MappedByteBuffer bitmapBuffer, - PartialSegmentDownloadListener downloadListener + PartialSegmentDownloadListener downloadListener, + CoalesceConfig coalesceConfig ) { this.metadata = metadata; @@ -284,6 +362,7 @@ private PartialSegmentFileMapperV10( this.localCacheDir = localCacheDir; this.bitmapBuffer = bitmapBuffer; this.downloadListener = downloadListener; + this.coalesceConfig = coalesceConfig; // build stable file name ordering for bitmap indexing this.sortedFileNames = new ArrayList<>(new TreeSet<>(metadata.getFiles().keySet())); @@ -489,6 +568,7 @@ private void downloadContainer(int containerIndex) throws IOException headerSize + containerMeta.getStartOffset(), 0, containerMeta.getSize(), + 0, // whole-container fetch: every file is intended, so there is no coalescing over-fetch StringUtils.format("container[%d]", containerIndex) ); @@ -505,15 +585,146 @@ private void downloadContainer(int containerIndex) throws IOException * Pre-download a set of internal files so that subsequent {@link #mapFile(String)} calls for these files will not * trigger individual downloads. Files that are already downloaded are skipped. Useful for batch-downloading all * files in a bundle at once (see {@link SegmentFileBuilder#startFileBundle}). + *

+ * Files that are contiguous (or near-contiguous, within {@link CoalesceConfig#maxGapBytes}) in the same container are + * fetched together in a single range read via {@link #planDownloadRuns}, reducing the number of deep-storage requests + * versus one read per file. */ public void ensureFilesAvailable(Set fileNames) throws IOException { + for (DownloadRun run : planDownloadRuns(fileNames)) { + fetchRun(run); + } + } + + /** + * Group the not-yet-downloaded files in {@code fileNames} into coalesced range reads. Files are bucketed by container, + * sorted by offset, and merged into a {@link DownloadRun} while the unwanted gap to the next wanted file is within + * {@link CoalesceConfig#maxGapBytes} and the run length stays within {@link CoalesceConfig#maxChunkBytes}. Names that + * are unknown to this mapper or already downloaded are dropped. The returned runs are independent and may be fetched + * in any order (and concurrently, one container lock per run), so callers can fan them out. + */ + public List planDownloadRuns(Set fileNames) + { + // bucket wanted, not-yet-downloaded file names by container, dropping names this mapper doesn't know + final Map> namesByContainer = new HashMap<>(); for (String name : fileNames) { + if (downloadedFiles.contains(name)) { + continue; + } final SegmentInternalFileMetadata fileMetadata = metadata.getFiles().get(name); - if (fileMetadata != null) { - ensureFileDownloaded(name, fileMetadata); + if (fileMetadata == null) { + continue; + } + namesByContainer.computeIfAbsent(fileMetadata.getContainer(), k -> new ArrayList<>()).add(name); + } + + final List runs = new ArrayList<>(); + for (Map.Entry> entry : namesByContainer.entrySet()) { + final int containerIndex = entry.getKey(); + final List names = entry.getValue(); + names.sort(Comparator.comparingLong(name -> metadata.getFiles().get(name).getStartOffset())); + + // offsets here are relative to the container; a run grows while the next file is within maxGapBytes of the + // current end and the whole run stays within maxChunkBytes + long runStartOffset = -1; + long runEndOffset = 0; + List runFiles = null; + for (String name : names) { + final SegmentInternalFileMetadata file = metadata.getFiles().get(name); + final long fileEnd = file.getStartOffset() + file.getSize(); + final boolean fits = runFiles != null + && file.getStartOffset() - runEndOffset <= coalesceConfig.maxGapBytes() + && fileEnd - runStartOffset <= coalesceConfig.maxChunkBytes(); + if (!fits) { + if (runFiles != null) { + runs.add(makeRun(containerIndex, runStartOffset, runEndOffset, runFiles)); + } + runStartOffset = file.getStartOffset(); + runFiles = new ArrayList<>(); + } + runFiles.add(name); + runEndOffset = Math.max(runEndOffset, fileEnd); + } + if (runFiles != null) { + runs.add(makeRun(containerIndex, runStartOffset, runEndOffset, runFiles)); + } + } + return runs; + } + + private DownloadRun makeRun(int containerIndex, long runStartOffset, long runEndOffset, List wantedFiles) + { + return new DownloadRun( + containerIndex, + absoluteOffset(containerIndex, runStartOffset), + runStartOffset, + runEndOffset - runStartOffset, + List.copyOf(wantedFiles) + ); + } + + /** + * Fetch a single {@link DownloadRun} in one range read and mark every internal file it covers as downloaded. The + * read spans from the first wanted file to the last, so any file whose byte range lies fully within that span is now + * resident on disk; marking all of them (not just the explicitly wanted files) avoids re-fetching gap-fill files that + * we already paid to read and keeps the local-cache byte accounting in step with what was actually written. This + * mirrors {@link #downloadContainer}, which likewise marks every file in the region it reads. Holds the container lock + * for the fetch; a concurrent {@link #ensureFileDownloaded} writing a file in the same container stays safe because + * the two writes carry byte-identical data and {@link #markDownloaded} is gated on an atomic add (so a file is never + * counted twice). + */ + public void fetchRun(DownloadRun run) throws IOException + { + containerLocks[run.containerIndex()].lock(); + try { + checkClosed(); + ensureContainerInitialized(run.containerIndex()); + // over-fetch = everything in the span that isn't a requested file (unrequested files coalesced through + padding) + long requestedBytes = 0; + for (String name : run.wantedFiles()) { + requestedBytes += metadata.getFiles().get(name).getSize(); + } + streamRangeIntoContainer( + run.containerIndex(), + run.srcAbsoluteOffset(), + run.localOffset(), + run.size(), + run.size() - requestedBytes, + StringUtils.format("run[container=%d,files=%d]", run.containerIndex(), run.wantedFiles().size()) + ); + // Files are packed contiguously and the span starts/ends on wanted-file boundaries, so every file whose range + // falls inside [localOffset, localOffset + size) is fully resident now; mark them all, including gap-fill files. + final long spanStart = run.localOffset(); + final long spanEnd = run.localOffset() + run.size(); + for (String name : containerFileNames.get(run.containerIndex())) { + final SegmentInternalFileMetadata fileMetadata = metadata.getFiles().get(name); + final long fileStart = fileMetadata.getStartOffset(); + if (fileStart >= spanStart && fileStart + fileMetadata.getSize() <= spanEnd) { + markDownloaded(name, fileMetadata.getSize()); + } } } + finally { + containerLocks[run.containerIndex()].unlock(); + } + } + + /** + * One coalesced deep-storage range read covering one or more wanted internal files in a single container. The read + * spans {@code [srcAbsoluteOffset, srcAbsoluteOffset + size)} in the segment object and is written into the + * container's local file at {@code localOffset}. {@code wantedFiles} are the files the caller explicitly requested + * (used to drive column deserialization on the query path); {@link #fetchRun} additionally marks every other file + * the span fully covers as downloaded, since those bytes are resident too. + */ + public record DownloadRun( + int containerIndex, + long srcAbsoluteOffset, + long localOffset, + long size, + List wantedFiles + ) + { } /** @@ -605,8 +816,17 @@ private void checkClosed() */ private long computeAbsoluteOffset(SegmentInternalFileMetadata fileMetadata) { - final SegmentFileContainerMetadata container = metadata.getContainers().get(fileMetadata.getContainer()); - return headerSize + container.getStartOffset() + fileMetadata.getStartOffset(); + return absoluteOffset(fileMetadata.getContainer(), fileMetadata.getStartOffset()); + } + + /** + * Absolute byte offset, within the segment object in deep storage, of a position {@code relativeOffset} bytes into + * container {@code containerIndex}: the V10 header, then the container's own start offset, then the position within + * the container. + */ + private long absoluteOffset(int containerIndex, long relativeOffset) + { + return headerSize + metadata.getContainers().get(containerIndex).getStartOffset() + relativeOffset; } private void ensureFileDownloaded(String name, SegmentInternalFileMetadata fileMetadata) throws IOException @@ -631,6 +851,7 @@ private void ensureFileDownloaded(String name, SegmentInternalFileMetadata fileM computeAbsoluteOffset(fileMetadata), fileMetadata.getStartOffset(), fileMetadata.getSize(), + 0, // single-file read: the whole read is the requested file, no over-fetch StringUtils.format("file[%s]", name) ); markDownloaded(name, fileMetadata.getSize()); @@ -792,6 +1013,7 @@ private void streamRangeIntoContainer( long absoluteOffset, long localOffset, long size, + long gapFillBytes, String what ) throws IOException { @@ -820,7 +1042,8 @@ private void streamRangeIntoContainer( } // Report the completed deep-storage range read (reached only on success). One read may cover many files; this is // the actual request granularity, so it measures wire bytes + latency rather than bytes that became resident. - downloadListener.onRangeRead(size, System.nanoTime() - startNanos); + // gapFillBytes is the over-fetch within this read (data spanned only to coalesce adjacent requested files). + downloadListener.onRangeRead(size, gapFillBytes, System.nanoTime() - startNanos); } /** diff --git a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java index aac367cce191..229ee019be02 100644 --- a/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/PartialQueryableIndexCursorFactoryTest.java @@ -262,8 +262,9 @@ void testMatchedProjectionDownloadsOnlyRequestedColumns() throws IOException ); // Query asks for group-by dim1 + sum(metric1), but NOT count. The projection has dim1 / _metric1_sum / _count; - // the projection match rewrites this to physical columns {dim1, _metric1_sum}. We should see those download, - // but NOT the _count file; proves column-level pruning, not just projection-level. + // the projection match rewrites this to physical columns {dim1, _metric1_sum}. We should see those download, and + // unrelated base columns must stay untouched (cross-bundle pruning); _count, however, sits physically between the + // two requested projection columns, so the coalesced range read spans it and it is retained as collateral. final CursorBuildSpec aggSpec = CursorBuildSpec.builder() .setGroupingColumns(List.of("dim1")) .setAggregators(List.of( @@ -286,13 +287,14 @@ void testMatchedProjectionDownloadsOnlyRequestedColumns() throws IOException downloaded.contains(projPrefix + "_metric1_sum"), "expected projection _metric1_sum; got: " + downloaded ); - // The projection's _count file was NOT in the query; must not be downloaded - Assertions.assertFalse( + // The projection's _count file was NOT in the query, but it lies between the two requested columns, so the + // coalesced read spans it and marks it resident (we already paid to read those bytes). + Assertions.assertTrue( downloaded.contains(projPrefix + "_count"), - "expected projection _count NOT to be downloaded; got: " + downloaded + "expected projection _count to be retained as coalescing collateral; got: " + downloaded ); // Base __time and base metric1 are NOT touched: projection dim1 may pull base dim1 as its parent column - // (legitimate dependency), but unrelated base columns must stay untouched. + // (legitimate dependency), but unrelated base columns (a different bundle, never read) must stay untouched. Assertions.assertFalse( downloaded.contains(basePrefix + ColumnHolder.TIME_COLUMN_NAME), "expected base __time NOT to be downloaded; got: " + downloaded @@ -305,6 +307,37 @@ void testMatchedProjectionDownloadsOnlyRequestedColumns() throws IOException } } + @Test + void testAdjacentColumnsCoalesceIntoFewerRangeReads() throws IOException + { + final CountingRangeReader rangeReader = new CountingRangeReader(segmentDir); + try (IndexAndMapper opened = openIndex(rangeReader, "coalesce_columns")) { + final PartialQueryableIndex index = opened.index(); + final PartialSegmentFileMapperV10 mapper = opened.mapper(); + final PartialQueryableIndexCursorFactory factory = new PartialQueryableIndexCursorFactory( + index, + QueryableIndexTimeBoundaryInspector.create(index), + noOpAcquirer(directExec()) + ); + rangeReader.resetCount(); + + // A base full scan pre-fetches every base column (dim1, metric1, __time). They sit close together in the segment + // object, so with the default coalescing thresholds they merge into a single range read rather than one per + // column. directExec makes the async download complete synchronously. + try (AsyncCursorHolder asyncHolder = factory.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + CursorHolder holder = asyncHolder.release()) { + Assertions.assertNotNull(holder); + final int downloadedFiles = mapper.getDownloadedFiles().size(); + Assertions.assertTrue(downloadedFiles >= 2, "expected several base columns; got " + downloadedFiles); + Assertions.assertTrue( + rangeReader.getReadCount() < downloadedFiles, + "coalescing should issue fewer range reads (" + rangeReader.getReadCount() + ") than downloaded files (" + + downloadedFiles + ")" + ); + } + } + } + @Test void testOpeningTimeOrderedProjectionCursorTriggersNoDownload() throws IOException { diff --git a/processing/src/test/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10Test.java b/processing/src/test/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10Test.java index aa9e420d0d4e..14074e6e5730 100644 --- a/processing/src/test/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10Test.java +++ b/processing/src/test/java/org/apache/druid/segment/file/PartialSegmentFileMapperV10Test.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.Files; import com.google.common.primitives.Ints; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; @@ -145,9 +146,10 @@ void testEnsureFilesAvailable() throws IOException Set filesToLoad = Set.of("2", "5", "7"); mapper.ensureFilesAvailable(filesToLoad); - // should have downloaded exactly 3 files - Assertions.assertEquals(3, rangeReader.getReadCount()); - Assertions.assertEquals(12, mapper.getDownloadedBytes()); + // files 2,5,7 are contiguous (within maxGapBytes) in one container, so they coalesce into a single range read; + // the span covers files 2..7, all of which are resident and charged afterward (24 bytes) + Assertions.assertEquals(1, rangeReader.getReadCount()); + Assertions.assertEquals(24, mapper.getDownloadedBytes()); // accessing these files should not trigger additional downloads for (String name : filesToLoad) { @@ -155,8 +157,142 @@ void testEnsureFilesAvailable() throws IOException Assertions.assertNotNull(buf); Assertions.assertEquals(Integer.parseInt(name), buf.getInt()); } - Assertions.assertEquals(3, rangeReader.getReadCount()); + Assertions.assertEquals(1, rangeReader.getReadCount()); + } + } + + @Test + void testCoalesceConfigRejectsInvalidThresholds() + { + // negative gap and non-positive chunk are nonsensical + Assertions.assertThrows( + DruidException.class, + () -> new PartialSegmentFileMapperV10.CoalesceConfig(-1, 1 << 20) + ); + Assertions.assertThrows( + DruidException.class, + () -> new PartialSegmentFileMapperV10.CoalesceConfig(0, 0) + ); + Assertions.assertThrows( + DruidException.class, + () -> new PartialSegmentFileMapperV10.CoalesceConfig(0, -5) + ); + // boundary values are valid: zero gap (merge adjacent files only) and a one-byte chunk (coalescing effectively off) + Assertions.assertDoesNotThrow(() -> new PartialSegmentFileMapperV10.CoalesceConfig(0, 1)); + } + + @Test + void testPlanDownloadRunsSplitsWhenGapExceedsThreshold() throws IOException + { + final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE); + final File cacheDir = newCacheDir("plan_split"); + final CountingRangeReader rangeReader = new CountingRangeReader(segmentFile.getParentFile()); + + // maxGap = 0: no unwanted bytes may be read through, so non-adjacent wanted files never merge + final PartialSegmentFileMapperV10.CoalesceConfig noGap = new PartialSegmentFileMapperV10.CoalesceConfig(0, 1 << 20); + try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader, cacheDir, noGap)) { + final Set alternating = Set.of("0", "2", "4", "6", "8"); + final List runs = mapper.planDownloadRuns(alternating); + + Assertions.assertEquals(5, runs.size(), "each non-adjacent wanted file should be its own run"); + for (PartialSegmentFileMapperV10.DownloadRun run : runs) { + Assertions.assertEquals(1, run.wantedFiles().size()); + } + + rangeReader.resetCount(); + mapper.ensureFilesAvailable(alternating); + Assertions.assertEquals(5, rangeReader.getReadCount(), "one range read per un-coalesced run"); + } + } + + @Test + void testPlanDownloadRunsCapsRunAtMaxChunkBytes() throws IOException + { + final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE); + final File cacheDir = newCacheDir("plan_chunk"); + final CountingRangeReader rangeReader = new CountingRangeReader(segmentFile.getParentFile()); + + // each test file is a 4-byte int; maxChunk = 4 means no two files can share a run even though they are contiguous + final PartialSegmentFileMapperV10.CoalesceConfig tinyChunk = + new PartialSegmentFileMapperV10.CoalesceConfig(1 << 20, Integer.BYTES); + try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader, cacheDir, tinyChunk)) { + final Set all = mapper.getSegmentFileMetadata().getFiles().keySet(); + final List runs = mapper.planDownloadRuns(all); + + Assertions.assertEquals(all.size(), runs.size(), "maxChunkBytes should keep each file in its own run"); + } + } + + @Test + void testFetchRunMarksEveryFileTheSpanCovers() throws IOException + { + final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE); + final File cacheDir = newCacheDir("plan_marks"); + final CountingRangeReader rangeReader = new CountingRangeReader(segmentFile.getParentFile()); + + try (PartialSegmentFileMapperV10 mapper = createMapper(rangeReader, cacheDir)) { + rangeReader.resetCount(); + + // with the generous default gap, files 2,5,7 coalesce into a single run that spans the gap-fill files 3,4,6 + final Set wanted = Set.of("2", "5", "7"); + final List runs = mapper.planDownloadRuns(wanted); + Assertions.assertEquals(1, runs.size()); + + mapper.ensureFilesAvailable(wanted); + Assertions.assertEquals(1, rangeReader.getReadCount(), "the three wanted files coalesce into one range read"); + // every file the span covers is resident, so the gap-fill files (3,4,6) are marked downloaded too, not just the + // wanted ones; all six files (2..7) are charged + Assertions.assertEquals(Set.of("2", "3", "4", "5", "6", "7"), mapper.getDownloadedFiles()); + Assertions.assertEquals(24, mapper.getDownloadedBytes()); + + // a gap-fill file is already resident, so accessing it triggers no additional range read + mapper.mapFile("3"); + Assertions.assertEquals(1, rangeReader.getReadCount()); + + // a file outside the span was never fetched, so it still downloads on demand + mapper.mapFile("9"); + Assertions.assertEquals(2, rangeReader.getReadCount()); + } + } + + @Test + void testOnRangeReadReportsGapFillBytes() throws IOException + { + final File segmentFile = buildTestSegment(10, CompressionStrategy.NONE); + final File cacheDir = newCacheDir("gapfill"); + final DirectoryBackedRangeReader rangeReader = new DirectoryBackedRangeReader(segmentFile.getParentFile()); + + // capture (bytes, gapFillBytes) for each range read; onRangeRead does not fire for the header fetch + final List reads = new ArrayList<>(); + final PartialSegmentDownloadListener listener = new PartialSegmentDownloadListener() + { + @Override + public void onBytesDownloaded(long bytes) + { + // not under test + } + + @Override + public void onRangeRead(long bytes, long gapFillBytes, long nanos) + { + reads.add(new long[]{bytes, gapFillBytes}); + } + }; + + try (PartialSegmentFileMapperV10 mapper = PartialSegmentFileMapperV10.createForFile( + rangeReader, + JSON_MAPPER, + cacheDir, + IndexIO.V10_FILE_NAME, + listener + )) { + mapper.ensureFilesAvailable(Set.of("2", "5", "7")); } + + // one coalesced read covers files 2..7 (24 bytes); only 2,5,7 (12 bytes) were requested, so 12 bytes are over-fetch + Assertions.assertEquals(1, reads.size()); + Assertions.assertEquals(24, reads.get(0)[0]); + Assertions.assertEquals(12, reads.get(0)[1]); } @Test @@ -631,6 +767,22 @@ private static PartialSegmentFileMapperV10 createMapper( ); } + private static PartialSegmentFileMapperV10 createMapper( + SegmentRangeReader rangeReader, + File localCacheDir, + PartialSegmentFileMapperV10.CoalesceConfig coalesceConfig + ) throws IOException + { + return PartialSegmentFileMapperV10.createForFile( + rangeReader, + JSON_MAPPER, + localCacheDir, + IndexIO.V10_FILE_NAME, + PartialSegmentDownloadListener.NOOP, + coalesceConfig + ); + } + private static PartialSegmentFileMapperV10 createMapperWithExternal( SegmentRangeReader rangeReader, File localCacheDir, diff --git a/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentCacheBootstrap.java b/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentCacheBootstrap.java index 8a53c1b5f139..720d00c611f6 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentCacheBootstrap.java +++ b/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentCacheBootstrap.java @@ -82,6 +82,7 @@ public final class PartialSegmentCacheBootstrap * @param jsonMapper used by the metadata entry's mount path to parse the header * @param storagePool thread pool the async cursor path submits on-demand column downloads to (which bounds * load concurrency itself); may be null in tests that never invoke the cursor factory + * @param coalesceConfig range-coalescing thresholds applied to on-demand downloads once the entry is mounted * @param location the storage location to reserve the metadata entry on * @return the reserved {@link PartialSegmentMetadataCacheEntry}; the caller is responsible for mounting it * @throws DruidException if the expected header file is missing or the location cannot accept the reservation @@ -96,6 +97,31 @@ public static PartialSegmentMetadataCacheEntry reserveFromDisk( @Nullable StorageLoadingThreadPool storagePool, StorageLocation location ) + { + return reserveFromDisk( + segmentId, + localCacheDir, + targetFilename, + externalFilenames, + rangeReader, + jsonMapper, + storagePool, + PartialSegmentFileMapperV10.CoalesceConfig.DEFAULT, + location + ); + } + + public static PartialSegmentMetadataCacheEntry reserveFromDisk( + SegmentId segmentId, + File localCacheDir, + String targetFilename, + List externalFilenames, + SegmentRangeReader rangeReader, + ObjectMapper jsonMapper, + @Nullable StorageLoadingThreadPool storagePool, + PartialSegmentFileMapperV10.CoalesceConfig coalesceConfig, + StorageLocation location + ) { final File headerFile = new File(localCacheDir, targetFilename + PartialSegmentFileMapperV10.METADATA_HEADER_SUFFIX); if (!headerFile.exists()) { @@ -116,7 +142,8 @@ public static PartialSegmentMetadataCacheEntry reserveFromDisk( rangeReader, jsonMapper, storagePool, - actualMetadataSize + actualMetadataSize, + coalesceConfig ); if (!location.reserveWeak(metadata)) { diff --git a/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java b/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java index d597b656da57..5085af04d3b9 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java +++ b/server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java @@ -97,6 +97,7 @@ public class PartialSegmentMetadataCacheEntry implements SegmentCacheEntry, Resi private final SegmentRangeReader rangeReader; private final ObjectMapper jsonMapper; private final long reservationEstimate; + private final PartialSegmentFileMapperV10.CoalesceConfig coalesceConfig; @Nullable private final StorageLoadingThreadPool storagePool; @@ -157,6 +158,31 @@ public PartialSegmentMetadataCacheEntry( @Nullable StorageLoadingThreadPool storagePool, long reservationEstimate ) + { + this( + segmentId, + localCacheDir, + targetFilename, + externalFilenames, + rangeReader, + jsonMapper, + storagePool, + reservationEstimate, + PartialSegmentFileMapperV10.CoalesceConfig.DEFAULT + ); + } + + public PartialSegmentMetadataCacheEntry( + SegmentId segmentId, + File localCacheDir, + String targetFilename, + List externalFilenames, + SegmentRangeReader rangeReader, + ObjectMapper jsonMapper, + @Nullable StorageLoadingThreadPool storagePool, + long reservationEstimate, + PartialSegmentFileMapperV10.CoalesceConfig coalesceConfig + ) { if (reservationEstimate <= 0) { throw DruidException.defensive( @@ -175,6 +201,7 @@ public PartialSegmentMetadataCacheEntry( this.storagePool = storagePool; this.reservationEstimate = reservationEstimate; this.currentSize = reservationEstimate; + this.coalesceConfig = coalesceConfig; this.bundleAcquirer = createBundleAcquirer(); } @@ -337,7 +364,8 @@ private void doMount(StorageLocation mountLocation) throws IOException localCacheDir, targetFilename, externalFilenames, - new WeakLoadTracker(mountLocation) + new WeakLoadTracker(mountLocation), + coalesceConfig ); final long sizeToAdjust; @@ -990,9 +1018,9 @@ public void onBytesDownloaded(long bytes) } @Override - public void onRangeRead(long bytes, long nanos) + public void onRangeRead(long bytes, long gapFillBytes, long nanos) { - mountLocation.trackWeakRangeRead(bytes, nanos); + mountLocation.trackWeakRangeRead(bytes, gapFillBytes, nanos); } } } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 63e87c4b1008..9fc0186f2caa 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; +import org.apache.druid.segment.file.PartialSegmentFileMapperV10; import org.apache.druid.utils.RuntimeInfo; +import javax.validation.constraints.Min; import java.io.File; import java.util.Collections; import java.util.List; @@ -114,6 +116,32 @@ public class SegmentLoaderConfig @JsonProperty("virtualStoragePartialDownloadsEnabled") private boolean virtualStoragePartialDownloadsEnabled = false; + /** + * Largest unwanted gap, in bytes, that on-demand partial downloads will read through to coalesce two wanted internal + * files into a single deep-storage range read. Larger values trade extra over-fetched bytes for fewer requests. + * See {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#planDownloadRuns}. + */ + @JsonProperty("virtualStorageCoalesceMaxGapBytes") + @Min( + value = 0, + message = "druid.segmentCache.virtualStorageCoalesceMaxGapBytes must be at least 0 (it is the largest unwanted " + + "gap, in bytes, read through to merge two adjacent on-demand column reads into a single request)" + ) + private long virtualStorageCoalesceMaxGapBytes = PartialSegmentFileMapperV10.DEFAULT_COALESCE_MAX_GAP_BYTES; + + /** + * Largest size, in bytes, of a single coalesced range read for on-demand partial downloads. Bounds how big one fetch + * can grow and keeps a wide request split into several reads that can be downloaded concurrently rather than + * collapsing into one serial read. See {@link org.apache.druid.segment.file.PartialSegmentFileMapperV10#planDownloadRuns}. + */ + @JsonProperty("virtualStorageCoalesceMaxChunkBytes") + @Min( + value = 1, + message = "druid.segmentCache.virtualStorageCoalesceMaxChunkBytes must be at least 1 (it caps the size, in " + + "bytes, of a single coalesced range read; a small value effectively disables coalescing)" + ) + private long virtualStorageCoalesceMaxChunkBytes = PartialSegmentFileMapperV10.DEFAULT_COALESCE_MAX_CHUNK_BYTES; + private long combinedMaxSize = 0; public List getLocations() @@ -211,6 +239,18 @@ public boolean isVirtualStoragePartialDownloadsEnabled() return virtualStorage && virtualStoragePartialDownloadsEnabled; } + /** + * Range-coalescing thresholds for on-demand partial downloads, derived from + * {@link #virtualStorageCoalesceMaxGapBytes} and {@link #virtualStorageCoalesceMaxChunkBytes}. + */ + public PartialSegmentFileMapperV10.CoalesceConfig getVirtualStorageCoalesceConfig() + { + return new PartialSegmentFileMapperV10.CoalesceConfig( + virtualStorageCoalesceMaxGapBytes, + virtualStorageCoalesceMaxChunkBytes + ); + } + public SegmentLoaderConfig setLocations(List locations) { this.locations = Lists.newArrayList(locations); diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index f5d2452d9c38..e4b2380a698f 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -359,6 +359,7 @@ private void addFilesToCachedSegments( rangeReader, jsonMapper, virtualStorageLoadingThreadPool, + config.getVirtualStorageCoalesceConfig(), location ); cachedSegments.add(segment); @@ -802,7 +803,8 @@ private ReservedPartial reservePartial(DataSegment dataSegment, SegmentRangeRead rangeReader, jsonMapper, virtualStorageLoadingThreadPool, - config.getVirtualStorageMetadataReservationEstimate() + config.getVirtualStorageMetadataReservationEstimate(), + config.getVirtualStorageCoalesceConfig() ) ); if (hold == null) { diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index 460a2fa9d2c6..d8b1da38cc4e 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -795,13 +795,14 @@ public void trackWeakLoad(long size) } /** - * Record a single on-demand deep-storage range read: {@code bytes} pulled over the wire in {@code nanos}. One read - * may materialize several internal files (whole-container fetch), so this is the request-level signal that - * complements the per-file {@link #trackWeakLoad}. + * Record a single on-demand deep-storage range read: {@code bytes} pulled over the wire in {@code nanos}, of which + * {@code gapFillBytes} were over-fetch (data read through only to coalesce adjacent requested files). One read may + * materialize several internal files (whole-container fetch), so this is the request-level signal that complements + * the per-file {@link #trackWeakLoad}. */ - public void trackWeakRangeRead(long bytes, long nanos) + public void trackWeakRangeRead(long bytes, long gapFillBytes, long nanos) { - weakStats.getAndUpdate(s -> s.rangeRead(bytes, nanos)); + weakStats.getAndUpdate(s -> s.rangeRead(bytes, gapFillBytes, nanos)); } private void trackWeakHold(WeakCacheEntry entry) @@ -1246,6 +1247,7 @@ public static final class WeakStats implements VirtualStorageLocationStats private final AtomicLong unmountCount = new AtomicLong(0); private final AtomicLong readCount = new AtomicLong(0); private final AtomicLong readBytes = new AtomicLong(0); + private final AtomicLong readGapFillBytes = new AtomicLong(0); private final AtomicLong readTimeNanos = new AtomicLong(0); public WeakStats(AtomicLong sizeUsed, AtomicLong holdCount, AtomicLong holdBytes) @@ -1306,10 +1308,11 @@ public WeakStats reject() return this; } - public WeakStats rangeRead(long bytes, long nanos) + public WeakStats rangeRead(long bytes, long gapFillBytes, long nanos) { readCount.getAndIncrement(); readBytes.getAndAdd(bytes); + readGapFillBytes.getAndAdd(gapFillBytes); readTimeNanos.getAndAdd(nanos); return this; } @@ -1398,6 +1401,12 @@ public long getReadBytes() return readBytes.get(); } + @Override + public long getReadGapFillBytes() + { + return readGapFillBytes.get(); + } + @Override public long getReadTimeNanos() { diff --git a/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java b/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java index 035d30caffff..3e766b80a10b 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java +++ b/server/src/main/java/org/apache/druid/segment/loading/VirtualStorageLocationStats.java @@ -99,6 +99,13 @@ public interface VirtualStorageLocationStats */ long getReadBytes(); + /** + * Of {@link #getReadBytes()}, the bytes that were not part of a requested file: data read through only to coalesce + * adjacent requested files into one range read (unrequested files spanned plus inter-file padding). Quantifies the + * over-fetch cost of range coalescing; compare against {@link #getReadBytes()} for the over-fetch fraction. + */ + long getReadGapFillBytes(); + /** * Total wall-clock time spent in deep-storage range reads during the measurement period, in nanoseconds. Divide by * {@link #getReadCount()} for the average per-read latency. diff --git a/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java index 23a8bd4bbf4c..12c384a9ce98 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java +++ b/server/src/main/java/org/apache/druid/server/metrics/StorageMonitor.java @@ -174,6 +174,13 @@ public class StorageMonitor extends AbstractMonitor */ public static final String VSF_READ_BYTES = "storage/virtual/read/bytes"; + /** + * Of {@link #VSF_READ_BYTES}, the bytes pulled that were not part of a requested file: data read through only to + * coalesce adjacent requested files into one range read (unrequested files spanned plus inter-file padding). Measures + * the over-fetch cost of range coalescing; the ratio to {@link #VSF_READ_BYTES} is the over-fetch fraction. + */ + public static final String VSF_READ_GAP_BYTES = "storage/virtual/read/gapBytes"; + /** * Total wall-clock time spent in deep-storage range reads during the measurement period, in milliseconds. Combined * with {@link #VSF_READ_COUNT} this gives average per-read latency. @@ -226,6 +233,7 @@ public boolean doMonitor(ServiceEmitter emitter) emitter.emit(builder.setMetric(VSF_REJECT_COUNT, weakStats.getRejectCount())); emitter.emit(builder.setMetric(VSF_READ_COUNT, weakStats.getReadCount())); emitter.emit(builder.setMetric(VSF_READ_BYTES, weakStats.getReadBytes())); + emitter.emit(builder.setMetric(VSF_READ_GAP_BYTES, weakStats.getReadGapFillBytes())); emitter.emit(builder.setMetric(VSF_READ_TIME, TimeUnit.NANOSECONDS.toMillis(weakStats.getReadTimeNanos()))); } } diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderConfigTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderConfigTest.java index cda83b1ebb7c..6b763c525ec0 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderConfigTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderConfigTest.java @@ -19,11 +19,23 @@ package org.apache.druid.segment.loading; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.ProvisionException; +import org.apache.druid.guice.JsonConfigurator; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.segment.file.PartialSegmentFileMapperV10; +import org.apache.druid.utils.RuntimeInfo; import org.junit.Assert; import org.junit.Test; +import javax.validation.Validation; +import java.util.Properties; + public class SegmentLoaderConfigTest { + private static final String CONFIG_BASE = "druid.segmentCache"; + @Test public void testSetVirtualStorage() { @@ -40,4 +52,56 @@ public void testSetVirtualStorage() Assert.assertTrue(config.isVirtualStorage()); Assert.assertTrue(config.isVirtualStorageEphemeral()); } + + @Test + public void testCoalesceConfigDefaults() + { + final SegmentLoaderConfig config = bind(new Properties()); + Assert.assertEquals(PartialSegmentFileMapperV10.CoalesceConfig.DEFAULT, config.getVirtualStorageCoalesceConfig()); + } + + @Test + public void testCoalesceConfigCustomValid() + { + final Properties props = new Properties(); + props.setProperty(CONFIG_BASE + ".virtualStorageCoalesceMaxGapBytes", "2048"); + props.setProperty(CONFIG_BASE + ".virtualStorageCoalesceMaxChunkBytes", "4096"); + final PartialSegmentFileMapperV10.CoalesceConfig coalesce = bind(props).getVirtualStorageCoalesceConfig(); + Assert.assertEquals(2048L, coalesce.maxGapBytes()); + Assert.assertEquals(4096L, coalesce.maxChunkBytes()); + } + + @Test + public void testNegativeMaxGapBytesFailsAtBinding() + { + final Properties props = new Properties(); + props.setProperty(CONFIG_BASE + ".virtualStorageCoalesceMaxGapBytes", "-1"); + final ProvisionException e = Assert.assertThrows(ProvisionException.class, () -> bind(props)); + Assert.assertTrue(e.getMessage(), e.getMessage().contains("virtualStorageCoalesceMaxGapBytes")); + } + + @Test + public void testZeroMaxChunkBytesFailsAtBinding() + { + final Properties props = new Properties(); + props.setProperty(CONFIG_BASE + ".virtualStorageCoalesceMaxChunkBytes", "0"); + final ProvisionException e = Assert.assertThrows(ProvisionException.class, () -> bind(props)); + Assert.assertTrue(e.getMessage(), e.getMessage().contains("virtualStorageCoalesceMaxChunkBytes")); + } + + /** + * Bind {@code properties} into a {@link SegmentLoaderConfig} through {@link JsonConfigurator}, the same parse + JSR-303 + * validate path Druid runs at startup, so an invalid value surfaces as a {@link ProvisionException} here just as it + * would when a process boots. + */ + private static SegmentLoaderConfig bind(Properties properties) + { + final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues(new InjectableValues.Std().addValue(RuntimeInfo.class, new RuntimeInfo())); + final JsonConfigurator configurator = new JsonConfigurator( + mapper, + Validation.buildDefaultValidatorFactory().getValidator() + ); + return configurator.configurate(properties, CONFIG_BASE, SegmentLoaderConfig.class); + } }