Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.query;

import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.Druids;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.EmbeddedRouter;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.IOException;

/**
* Exercises range-read coalescing end to end: on-demand partial loads over MinIO S3 deep storage, asserting the
* {@link StorageMonitor} read metrics reflect fewer requests than files loaded.
*/
class QueryVirtualStorageCoalesceTest extends EmbeddedClusterTestBase
{
// Cache is sized to hold the whole dataset so a query's columns load once and eviction doesn't muddy the counts.
private static final long CACHE_SIZE = HumanReadableBytes.parse("64MiB");
private static final long MAX_SIZE = HumanReadableBytes.parse("1GiB");
private static final long ESTIMATE_SIZE = HumanReadableBytes.parse("1MiB");

private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedHistorical historical = new EmbeddedHistorical();
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private final EmbeddedRouter router = new EmbeddedRouter();
private final MinIOStorageResource storageResource = new MinIOStorageResource();

@Override
public EmbeddedDruidCluster createCluster()
{
historical.setServerMemory(500_000_000)
.addProperty("druid.segmentCache.virtualStorage", "true")
.addProperty("druid.segmentCache.virtualStoragePartialDownloadsEnabled", "true")
.addProperty("druid.segmentCache.virtualStorageMetadataReservationEstimate", String.valueOf(ESTIMATE_SIZE))
.addProperty("druid.segmentCache.virtualStorageLoadThreads", String.valueOf(Runtime.getRuntime().availableProcessors()))
.addBeforeStartHook(
(cluster, self) -> self.addProperty(
"druid.segmentCache.locations",
StringUtils.format(
"[{\"path\":\"%s\",\"maxSize\":\"%s\"}]",
cluster.getTestFolder().newFolder().getAbsolutePath(),
CACHE_SIZE
)
)
)
.addProperty("druid.server.maxSize", String.valueOf(MAX_SIZE));

broker.setServerMemory(200_000_000);
coordinator.addProperty("druid.manager.segments.useIncrementalCache", "always");
overlord.addProperty("druid.manager.segments.useIncrementalCache", "always")
.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
indexer.setServerMemory(400_000_000)
.addProperty("druid.worker.capacity", "4")
.addProperty("druid.processing.numThreads", "3")
.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");

return EmbeddedDruidCluster
.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.useDefaultTimeoutForLatchableEmitter(20)
.addResource(storageResource)
// segments must be pushed unzipped (rangeable) and in V10 format for partial reads to engage
.addCommonProperty("druid.storage.zip", "false")
.addCommonProperty("druid.indexer.task.buildV10", "true")
.addCommonProperty("druid.monitoring.emissionPeriod", "PT1s")
.addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
.addServer(historical)
.addServer(broker)
.addServer(router);
}

@BeforeAll
void loadData() throws IOException
{
final EmbeddedMSQApis msqApis = new EmbeddedMSQApis(cluster, overlord);
dataSource = "wiki-" + IdUtils.getRandomId();
WikipediaVirtualStorageTable.ingestHourly(cluster, msqApis, broker, dataSource);
// waitUntilSegmentsLoad only covers the historical mount; wait for the broker schema too so column queries resolve
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
}

@Override
protected void refreshDatasourceName()
{
// set up once for all tests
}

@Test
void testMultiColumnQueryCoalescesReads()
{
// no baseline wait here: nothing has loaded on the historical yet (partial loads only happen on query), so the
// VSF metrics don't exist until the query below drives them
final LatchableEmitter emitter = historical.latchableEmitter();
emitter.flush();

// Native scan, not SQL: the broker only knows __time for a metadata-mounted partial segment, so SQL can't resolve
// data columns. A native query names its columns and the historical resolves + loads them on demand. Touching
// several columns per segment lets contiguous files coalesce into far fewer range reads.
final ScanQuery query = Druids.newScanQueryBuilder()
.dataSource(new TableDataSource(dataSource))
.eternityInterval()
.columns("channel", "countryName", "page", "user", "added", "deleted", "delta")
.limit(100)
.build();
final String result = cluster.callApi().onAnyBroker(b -> b.submitNativeQuery(query));
Assertions.assertFalse(result.isEmpty());

emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_READ_COUNT));
final long reads = emitter.getMetricEventLongSum(StorageMonitor.VSF_READ_COUNT);
final long loads = emitter.getMetricEventLongSum(StorageMonitor.VSF_LOAD_COUNT);

Assertions.assertTrue(reads > 0, "expected some range reads, got " + reads);
// the coalescing signal: multiple files land per read, so files loaded exceeds reads issued
Assertions.assertTrue(loads > reads, "expected loads(" + loads + ") > reads(" + reads + ")");

// gap-fill is emitted and bounded by the wire bytes; it quantifies over-fetch from coalescing
emitter.waitForNextEvent(event -> event.hasMetricName(StorageMonitor.VSF_READ_GAP_BYTES));
final long gapBytes = emitter.getMetricEventLongSum(StorageMonitor.VSF_READ_GAP_BYTES);
final long readBytes = emitter.getMetricEventLongSum(StorageMonitor.VSF_READ_BYTES);
Assertions.assertTrue(gapBytes >= 0, "gap-fill bytes should be non-negative, got " + gapBytes);
Assertions.assertTrue(gapBytes <= readBytes, "gap-fill(" + gapBytes + ") cannot exceed read bytes(" + readBytes + ")");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,16 @@
package org.apache.druid.testing.embedded.query;

import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.server.metrics.LatchableEmitter;
import org.apache.druid.server.metrics.StorageMonitor;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.http.GetQueryReportResponse;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
Expand All @@ -44,20 +40,15 @@
import org.apache.druid.testing.embedded.EmbeddedRouter;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.testing.embedded.minio.MinIOStorageResource;
import org.apache.druid.testing.embedded.msq.EmbeddedDurableShuffleStorageTest;
import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.io.ByteStreams;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -387,51 +378,10 @@ private String createTestDatasourceName()
return "wiki-" + IdUtils.getRandomId();
}

/**
* Stolen from {@link EmbeddedDurableShuffleStorageTest#loadWikipediaTable()} but with hourly granularity and no
* durable shuffle location
*/
private void loadWikiData() throws IOException
{
final File tmpDir = cluster.getTestFolder().newFolder();
final File wikiFile = new File(tmpDir, "wiki.gz");

ByteStreams.copy(
DruidProcessingConfigTest.class.getResourceAsStream("/wikipedia/wikiticker-2015-09-12-sampled.json.gz"),
Files.newOutputStream(wikiFile.toPath())
);
final String sql = StringUtils.format(
"SET waitUntilSegmentsLoad = TRUE;\n"
+ "REPLACE INTO \"%s\" OVERWRITE ALL\n"
+ "SELECT\n"
+ " TIME_PARSE(\"time\") AS __time,\n"
+ " channel,\n"
+ " countryName,\n"
+ " page,\n"
+ " \"user\",\n"
+ " added,\n"
+ " deleted,\n"
+ " delta\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " %s,\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"name\":\"isRobot\",\"type\":\"string\"},{\"name\":\"channel\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"},{\"name\":\"flags\",\"type\":\"string\"},{\"name\":\"isUnpatrolled\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"diffUrl\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"long\"},{\"name\":\"comment\",\"type\":\"string\"},{\"name\":\"commentLength\",\"type\":\"long\"},{\"name\":\"isNew\",\"type\":\"string\"},{\"name\":\"isMinor\",\"type\":\"string\"},{\"name\":\"delta\",\"type\":\"long\"},{\"name\":\"isAnonymous\",\"type\":\"string\"},{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"deltaBucket\",\"type\":\"long\"},{\"name\":\"deleted\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"},{\"name\":\"cityName\",\"type\":\"string\"},{\"name\":\"countryName\",\"type\":\"string\"},{\"name\":\"regionIsoCode\",\"type\":\"string\"},{\"name\":\"metroCode\",\"type\":\"long\"},{\"name\":\"countryIsoCode\",\"type\":\"string\"},{\"name\":\"regionName\",\"type\":\"string\"}]'\n"
+ " )\n"
+ " )\n"
+ "PARTITIONED BY HOUR\n"
+ "CLUSTERED BY channel",
dataSource,
Calcites.escapeStringLiteral(
broker.bindings()
.jsonMapper()
.writeValueAsString(new LocalInputSource(null, null, Collections.singletonList(wikiFile), null))
)
);

final MSQTaskReportPayload payload = msqApis.runTaskSqlAndGetReport(sql);
Assertions.assertEquals(TaskState.SUCCESS, payload.getStatus().getStatus());
final MSQTaskReportPayload payload =
WikipediaVirtualStorageTable.ingestHourly(cluster, msqApis, broker, dataSource);
Assertions.assertEquals(24, payload.getStatus().getSegmentLoadWaiterStatus().getTotalSegments());
Assertions.assertNull(payload.getStatus().getErrorReport());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.embedded.query;

import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.msq.EmbeddedMSQApis;
import org.junit.jupiter.api.Assertions;
import org.testcontainers.shaded.com.google.common.io.ByteStreams;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;

/**
* Shared wikipedia ingest for the virtual-storage query tests. Loads the bundled {@code wikiticker} sample via MSQ,
* partitioned by hour and clustered by channel, so the resulting datasource has many small segments with several
* columns each - enough for partial loads to have something to coalesce.
*/
public final class WikipediaVirtualStorageTable
{
private WikipediaVirtualStorageTable()
{
}

/**
* Ingest the sample into {@code dataSource} and return the task report so callers can make their own assertions
* (e.g. segment count). Fails if the ingest task does not succeed.
*/
public static MSQTaskReportPayload ingestHourly(
EmbeddedDruidCluster cluster,
EmbeddedMSQApis msqApis,
EmbeddedBroker broker,
String dataSource
) throws IOException
{
final File wikiFile = new File(cluster.getTestFolder().newFolder(), "wiki.gz");
ByteStreams.copy(
DruidProcessingConfigTest.class.getResourceAsStream("/wikipedia/wikiticker-2015-09-12-sampled.json.gz"),
Files.newOutputStream(wikiFile.toPath())
);
final String sql = StringUtils.format(
"SET waitUntilSegmentsLoad = TRUE;\n"
+ "REPLACE INTO \"%s\" OVERWRITE ALL\n"
+ "SELECT\n"
+ " TIME_PARSE(\"time\") AS __time,\n"
+ " channel,\n"
+ " countryName,\n"
+ " page,\n"
+ " \"user\",\n"
+ " added,\n"
+ " deleted,\n"
+ " delta\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " %s,\n"
+ " '{\"type\":\"json\"}',\n"
+ " '[{\"name\":\"isRobot\",\"type\":\"string\"},{\"name\":\"channel\",\"type\":\"string\"},{\"name\":\"time\",\"type\":\"string\"},{\"name\":\"flags\",\"type\":\"string\"},{\"name\":\"isUnpatrolled\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"diffUrl\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"long\"},{\"name\":\"comment\",\"type\":\"string\"},{\"name\":\"commentLength\",\"type\":\"long\"},{\"name\":\"isNew\",\"type\":\"string\"},{\"name\":\"isMinor\",\"type\":\"string\"},{\"name\":\"delta\",\"type\":\"long\"},{\"name\":\"isAnonymous\",\"type\":\"string\"},{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"deltaBucket\",\"type\":\"long\"},{\"name\":\"deleted\",\"type\":\"long\"},{\"name\":\"namespace\",\"type\":\"string\"},{\"name\":\"cityName\",\"type\":\"string\"},{\"name\":\"countryName\",\"type\":\"string\"},{\"name\":\"regionIsoCode\",\"type\":\"string\"},{\"name\":\"metroCode\",\"type\":\"long\"},{\"name\":\"countryIsoCode\",\"type\":\"string\"},{\"name\":\"regionName\",\"type\":\"string\"}]'\n"
+ " )\n"
+ " )\n"
+ "PARTITIONED BY HOUR\n"
+ "CLUSTERED BY channel",
dataSource,
Calcites.escapeStringLiteral(
broker.bindings()
.jsonMapper()
.writeValueAsString(new LocalInputSource(null, null, Collections.singletonList(wikiFile), null))
)
);

final MSQTaskReportPayload payload = msqApis.runTaskSqlAndGetReport(sql);
Assertions.assertEquals(TaskState.SUCCESS, payload.getStatus().getStatus());
Assertions.assertNull(payload.getStatus().getErrorReport());
return payload;
}
}
Loading
Loading