From 8a7f9ea4969aa3441df4d4c026e68cbc363446e5 Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Tue, 30 Jun 2026 11:54:05 -0700 Subject: [PATCH 1/2] Add column-level lineage --- .../extensions-contrib/openlineage-emitter.md | 22 ++ .../openlineage/OpenLineageRequestLogger.java | 356 +++++++++++++++++- .../OpenLineageRequestLoggerProvider.java | 6 + .../DruidColumnUsageDatasetFacet.json | 31 ++ .../OpenLineageRequestLoggerTest.java | 348 +++++++++++++++++ 5 files changed, 756 insertions(+), 7 deletions(-) create mode 100644 extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidColumnUsageDatasetFacet.json diff --git a/docs/development/extensions-contrib/openlineage-emitter.md b/docs/development/extensions-contrib/openlineage-emitter.md index 6e613cf6320b..571ea2993de2 100644 --- a/docs/development/extensions-contrib/openlineage-emitter.md +++ b/docs/development/extensions-contrib/openlineage-emitter.md @@ -45,6 +45,7 @@ All configuration parameters are under `druid.request.logging`. | `druid.request.logging.transportType` | Where to send events. `CONSOLE` logs JSON to the Druid log; `HTTP` POSTs to an OpenLineage API endpoint. | no | `CONSOLE` | | `druid.request.logging.transportUrl` | OpenLineage API endpoint URL. Required when `transportType=HTTP`. | no | — | | `druid.request.logging.excludedNativeQueryTypes` | Native query types to exclude from lineage emission. Internal broker queries like segment metadata lookups produce noisy, low-value events. | no | `["segmentMetadata", "dataSourceMetadata", "timeBoundary"]` | +| `druid.request.logging.columnLineageEnabled` | Emit per-column lineage (the `schema` and `druid_columnUsage` dataset facets) for native queries. Set to `false` to emit table-level (datasource) lineage only, reducing event size. | no | `true` | | `druid.request.logging.emitQueueCapacity` | Maximum number of events buffered in the async HTTP emit queue. Events are dropped (with a warning) when the queue is full. Only applies when `transportType=HTTP`. | no | `1000` | | `druid.request.logging.emitThreadCount` | Number of background threads used to POST events to the HTTP endpoint. Only applies when `transportType=HTTP`. | no | `1` | | `druid.request.logging.trustStorePath` | Path to the TrustStore file for HTTPS transport. Only applies when `transportType=HTTP`. | no | — | @@ -96,3 +97,24 @@ Each emitted event follows the [OpenLineage spec](https://openlineage.io/spec/2- |---|---| | `jobType` | `processingType=BATCH`, `integration=DRUID`, `jobType=QUERY`. Standard OpenLineage facet. | | `sql` | Raw SQL text. Present on SQL queries only. Standard OpenLineage facet. | + +### Input dataset facets + +For supported native query types (`scan`, `groupBy`, `topN`, `timeseries`), each input dataset carries column-level lineage describing which columns the query referenced and how. Columns are attributed to their base tables across joins (per side), sub-queries, unions, and datasource wrappers. Query types that are not supported, a bare `SELECT *` (a scan with no explicit columns), and any column that cannot be attributed to a base table (for example lookup or inline-data columns) fall back to table-level lineage only. This can be disabled entirely with `columnLineageEnabled=false`. + +| Facet | Description | +|---|---| +| `schema` | Standard OpenLineage `SchemaDatasetFacet` listing the referenced input column names (names only, sorted). | +| `druid_columnUsage` | Druid-specific facet mapping each referenced column to the roles in which it was used. | + +Column usage roles (`druid_columnUsage`): + +| Role | Meaning | +|---|---| +| `PROJECTION` | Column is selected/projected (OpenLineage `IDENTITY`/`TRANSFORMATION`). | +| `GROUP_BY` | Column is a grouping dimension. | +| `AGGREGATION` | Column feeds an aggregator (e.g. `SUM`, `COUNT`). | +| `FILTER` | Column appears in a filter / `WHERE`. | +| `JOIN` | Column is used as a join key. | + +Virtual/expression columns are expanded to their underlying base columns, carrying the consuming role. `__time` appears only when explicitly referenced by a query part (not for the implicit interval). Output datasets (MSQ `INSERT`/`REPLACE`) do not carry column-level facets. diff --git a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java index 1a3739b1c04f..a4ba15c87703 100644 --- a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java +++ b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java @@ -31,6 +31,28 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.BaseQuery; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.FilteredDataSource; +import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; +import org.apache.druid.query.RestrictedDataSource; +import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.UnnestDataSource; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.planning.JoinDataSourceAnalysis; +import org.apache.druid.query.planning.PreJoinableClause; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.union.UnionQuery; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.join.JoinPrefixUtils; import org.apache.druid.server.RequestLogLine; import org.apache.druid.server.log.RequestLogger; import org.apache.druid.sql.calcite.parser.DruidSqlParser; @@ -48,10 +70,15 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; @@ -85,6 +112,12 @@ public class OpenLineageRequestLogger implements RequestLogger + "/src/main/resources/openlineage-schema/"; private static final String CONTEXT_FACET_SCHEMA_URL = CUSTOM_SCHEMA_BASE + "DruidQueryContextRunFacet.json"; private static final String STATS_FACET_SCHEMA_URL = CUSTOM_SCHEMA_BASE + "DruidQueryStatisticsRunFacet.json"; + // Standard OpenLineage SchemaDatasetFacet listing the input columns referenced by the query (names only). + private static final String SCHEMA_FACET_SCHEMA_URL = + "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json"; + // Custom dataset facet describing how each referenced column was used (filter, group-by, aggregation, ...). + private static final String COLUMN_USAGE_FACET_SCHEMA_URL = + CUSTOM_SCHEMA_BASE + "DruidColumnUsageDatasetFacet.json"; static final int SQL_FACET_MAX_LENGTH = 64 * 1024; static final int DEFAULT_EMIT_QUEUE_CAPACITY = 1000; static final int DEFAULT_EMIT_THREAD_COUNT = 1; @@ -105,6 +138,7 @@ public class OpenLineageRequestLogger implements RequestLogger @Nullable private final String transportUrl; private final Set excludedNativeQueryTypes; + private final boolean columnLineageEnabled; @Nullable private final HttpClient httpClient; @Nullable @@ -120,7 +154,7 @@ public OpenLineageRequestLogger( ) { this(jsonMapper, namespace, transportType, transportUrl, excludedNativeQueryTypes, - DEFAULT_EMIT_QUEUE_CAPACITY, DEFAULT_EMIT_THREAD_COUNT, null); + true, DEFAULT_EMIT_QUEUE_CAPACITY, DEFAULT_EMIT_THREAD_COUNT, null); } public OpenLineageRequestLogger( @@ -129,6 +163,7 @@ public OpenLineageRequestLogger( OpenLineageRequestLoggerProvider.TransportType transportType, @Nullable String transportUrl, Set excludedNativeQueryTypes, + boolean columnLineageEnabled, int emitQueueCapacity, int emitThreadCount, @Nullable HttpClient httpClient @@ -139,6 +174,7 @@ public OpenLineageRequestLogger( this.transportType = transportType; this.transportUrl = transportUrl; this.excludedNativeQueryTypes = excludedNativeQueryTypes; + this.columnLineageEnabled = columnLineageEnabled; if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP && transportUrl == null) { throw new IllegalStateException( "druid.request.logging.transportUrl must be set when transportType=HTTP" @@ -222,7 +258,9 @@ public void logNativeQuery(RequestLogLine requestLogLine) throws IOException queryId = UNKNOWN_QUERY_ID; } - emit(buildRunEvent(queryId, queryType, requestLogLine, inputs, null)); + Map>> columnsByTable = + columnLineageEnabled ? extractColumnsByTable(requestLogLine.getQuery()) : null; + emit(buildRunEvent(queryId, queryType, requestLogLine, inputs, columnsByTable, null)); } /** @@ -254,7 +292,7 @@ public void logSqlQuery(RequestLogLine requestLogLine) throws IOException queryId = UNKNOWN_QUERY_ID; } - emit(buildRunEvent(queryId, "msq", requestLogLine, List.of(), outputTable)); + emit(buildRunEvent(queryId, "msq", requestLogLine, List.of(), null, outputTable)); } /** @@ -309,11 +347,275 @@ static String extractMsqOutputDatasource(String sql) } } + /** + * Column usage roles for the custom {@code druid_columnUsage} dataset facet. Names align with + * OpenLineage transformation subtypes where they exist (GROUP_BY, AGGREGATION, FILTER, JOIN); + * PROJECTION corresponds to OpenLineage IDENTITY/TRANSFORMATION (a selected/projected column). + */ + enum ColumnRole + { + PROJECTION, GROUP_BY, AGGREGATION, FILTER, JOIN + } + + /** + * Resolves the per-base-table column-usage map for a native query, used to attach the {@code schema} + * and {@code druid_columnUsage} dataset facets to input datasets. Handles all datasource shapes: + * tables, joins (attributing columns per side via the planner's join prefixes), unions, sub-queries + * (recursing into the sub-query's own columns), and datasource wrappers (restricted/filtered/unnest). + * Columns that have no base table (lookups, inline data) are dropped rather than fabricated. + * + *

Returns {@code null} (yielding table-level lineage only) when no base-table columns can be + * determined, or on any error -- it never fabricates or mis-attributes columns. + */ + @Nullable + private Map>> extractColumnsByTable(Query query) + { + try { + Map>> result = new TreeMap<>(); + collectInto(result, query); + return result.isEmpty() ? null : result; + } + // StackOverflowError (an Error, not an Exception) is caught too so that a pathologically deep + // query plan degrades to table-level lineage rather than breaking the request-logging path. + catch (Exception | StackOverflowError e) { + log.debug(e, "Failed to extract column lineage; falling back to table-level lineage"); + return null; + } + } + + /** + * Walks {@code query}'s column-bearing parts and attributes the referenced columns to the base + * tables of its datasource. {@link UnionQuery} (whose {@code getDataSource()} is undefined) is + * handled by recursing into each of its branches. + */ + private void collectInto(Map>> result, Query query) + { + if (query instanceof UnionQuery) { + for (DataSource branch : ((UnionQuery) query).getDataSources()) { + attribute(result, branch, Collections.emptyMap()); + } + return; + } + attribute(result, query.getDataSource(), collectColumnRoles(query)); + } + + /** + * Attributes {@code roles} (column to roles, expressed in {@code dataSource}'s output namespace) to + * the underlying base tables, recursing through the datasource tree. Sub-queries are recursed into + * via {@link #collectInto} (their columns come from their own parts, not the outer references). + */ + private void attribute( + Map>> result, + DataSource dataSource, + Map> roles + ) + { + if (dataSource instanceof TableDataSource) { + // Also covers GlobalTableDataSource (a TableDataSource subclass); it is a real named table, so + // handling it here is intentional. Keep this branch ahead of the wrapper branches below. + String table = ((TableDataSource) dataSource).getName(); + for (Map.Entry> entry : roles.entrySet()) { + addRoles(result, table, entry.getKey(), entry.getValue()); + } + } else if (dataSource instanceof RestrictedDataSource) { + attribute(result, ((RestrictedDataSource) dataSource).getBase(), roles); + } else if (dataSource instanceof FilteredDataSource) { + attribute(result, ((FilteredDataSource) dataSource).getBase(), roles); + } else if (dataSource instanceof UnnestDataSource) { + UnnestDataSource unnest = (UnnestDataSource) dataSource; + VirtualColumn unnestColumn = unnest.getVirtualColumn(); + Map> baseRoles = new TreeMap<>(roles); + // The unnest output column is synthetic (not a base column); drop it and instead record the + // underlying column(s) being unnested as projected from the base. + baseRoles.remove(unnestColumn.getOutputName()); + for (String required : unnestColumn.requiredColumns()) { + baseRoles.computeIfAbsent(required, k -> EnumSet.noneOf(ColumnRole.class)).add(ColumnRole.PROJECTION); + } + attribute(result, unnest.getBase(), baseRoles); + } else if (dataSource instanceof UnionDataSource) { + // Union members share the same output schema, so the referenced columns apply to each. + for (DataSource member : dataSource.getChildren()) { + attribute(result, member, roles); + } + } else if (dataSource instanceof QueryDataSource) { + // The outer references are this sub-query's OUTPUT columns, not base columns; the sub-query's + // own base-table columns are captured by recursing into its parts. + collectInto(result, ((QueryDataSource) dataSource).getQuery()); + } else if (dataSource instanceof JoinDataSource) { + attributeJoin(result, (JoinDataSource) dataSource, roles); + } + // LookupDataSource, InlineDataSource and any other shape have no base table: drop (never fabricate). + } + + /** + * Splits {@code roles} (plus the join-condition columns, tagged {@link ColumnRole#JOIN}) across the + * base datasource and each joinable clause using the planner's join prefixes, then recurses. Clauses + * are matched longest-prefix-first; right-side columns arrive already prefixed and are un-prefixed + * before attribution to the clause's datasource (which may itself be a table, join, or sub-query). + */ + private void attributeJoin( + Map>> result, + JoinDataSource join, + Map> roles + ) + { + JoinDataSourceAnalysis analysis = JoinDataSourceAnalysis.constructAnalysis(join); + DataSource base = analysis.getBaseDataSource(); + List clauses = new ArrayList<>(analysis.getPreJoinableClauses()); + // Longest prefix first so that, e.g., "j0.x" matches clause "j0." rather than a shorter prefix. + clauses.sort((a, b) -> Integer.compare(b.getPrefix().length(), a.getPrefix().length())); + + Map> all = new TreeMap<>(roles); + for (PreJoinableClause clause : clauses) { + for (String column : clause.getCondition().getRequiredColumns()) { + all.computeIfAbsent(column, k -> EnumSet.noneOf(ColumnRole.class)).add(ColumnRole.JOIN); + } + } + + Map>> partitioned = new LinkedHashMap<>(); + for (Map.Entry> entry : all.entrySet()) { + String column = entry.getKey(); + DataSource target = base; + String resolved = column; + for (PreJoinableClause clause : clauses) { + if (JoinPrefixUtils.isPrefixedBy(column, clause.getPrefix())) { + target = clause.getDataSource(); + resolved = JoinPrefixUtils.unprefix(column, clause.getPrefix()); + break; + } + } + partitioned.computeIfAbsent(target, k -> new TreeMap<>()) + .computeIfAbsent(resolved, k -> EnumSet.noneOf(ColumnRole.class)) + .addAll(entry.getValue()); + } + for (Map.Entry>> entry : partitioned.entrySet()) { + attribute(result, entry.getKey(), entry.getValue()); + } + } + + private static void addRoles( + Map>> result, + String table, + String column, + EnumSet roles + ) + { + result.computeIfAbsent(table, k -> new TreeMap<>()) + .computeIfAbsent(column, k -> EnumSet.noneOf(ColumnRole.class)) + .addAll(roles); + } + + /** + * Walks the query's column-bearing parts (projected columns, dimensions, aggregator inputs, filter + * columns) and records each referenced base column with the role(s) it was used in. Virtual columns + * are expanded transitively to their underlying base columns, carrying the consuming role. Only + * explicitly-referenced columns are captured (notably {@code __time} is included only when it + * actually appears in a part, never its implicit interval usage). Returns an empty map for + * unsupported query types and for a bare {@code SELECT *} (a Scan with neither explicit columns nor + * a filter); a {@code SELECT *} that carries a filter still contributes its filter columns. + */ + private Map> collectColumnRoles(Query query) + { + Map> roles = new TreeMap<>(); + if (query instanceof ScanQuery) { + ScanQuery scan = (ScanQuery) query; + VirtualColumns vcs = scan.getVirtualColumns(); + if (scan.getColumns() != null) { + for (String column : scan.getColumns()) { + addColumn(roles, vcs, column, ColumnRole.PROJECTION); + } + } + addFilterColumns(roles, vcs, scan.getFilter()); + } else if (query instanceof GroupByQuery) { + GroupByQuery groupBy = (GroupByQuery) query; + VirtualColumns vcs = groupBy.getVirtualColumns(); + for (DimensionSpec dimension : groupBy.getDimensions()) { + addColumn(roles, vcs, dimension.getDimension(), ColumnRole.GROUP_BY); + } + addAggregatorColumns(roles, vcs, groupBy.getAggregatorSpecs()); + addFilterColumns(roles, vcs, groupBy.getDimFilter()); + } else if (query instanceof TopNQuery) { + TopNQuery topN = (TopNQuery) query; + VirtualColumns vcs = topN.getVirtualColumns(); + addColumn(roles, vcs, topN.getDimensionSpec().getDimension(), ColumnRole.GROUP_BY); + addAggregatorColumns(roles, vcs, topN.getAggregatorSpecs()); + addFilterColumns(roles, vcs, topN.getFilter()); + } else if (query instanceof TimeseriesQuery) { + TimeseriesQuery timeseries = (TimeseriesQuery) query; + VirtualColumns vcs = timeseries.getVirtualColumns(); + addAggregatorColumns(roles, vcs, timeseries.getAggregatorSpecs()); + addFilterColumns(roles, vcs, timeseries.getFilter()); + } else { + // Unsupported query type: emit table-level lineage only. + return Collections.emptyMap(); + } + return roles; + } + + private void addAggregatorColumns( + Map> roles, + VirtualColumns vcs, + List aggregators + ) + { + for (AggregatorFactory aggregator : aggregators) { + for (String column : aggregator.requiredFields()) { + addColumn(roles, vcs, column, ColumnRole.AGGREGATION); + } + } + } + + private void addFilterColumns( + Map> roles, + VirtualColumns vcs, + @Nullable DimFilter filter + ) + { + if (filter != null) { + for (String column : filter.getRequiredColumns()) { + addColumn(roles, vcs, column, ColumnRole.FILTER); + } + } + } + + private void addColumn(Map> roles, VirtualColumns vcs, String column, ColumnRole role) + { + expandColumn(roles, vcs, column, role, new HashSet<>()); + } + + /** + * Adds {@code column} with {@code role}, expanding virtual columns transitively to their underlying + * base columns. The {@code visited} set guards against virtual columns that reference one another. + */ + private void expandColumn( + Map> roles, + VirtualColumns vcs, + @Nullable String column, + ColumnRole role, + Set visited + ) + { + if (column == null || !visited.add(column)) { + return; + } + if (vcs != null && vcs.exists(column)) { + VirtualColumn virtualColumn = vcs.getVirtualColumn(column); + if (virtualColumn != null) { + for (String required : virtualColumn.requiredColumns()) { + expandColumn(roles, vcs, required, role, visited); + } + return; + } + } + roles.computeIfAbsent(column, k -> EnumSet.noneOf(ColumnRole.class)).add(role); + } + private ObjectNode buildRunEvent( String queryId, String queryType, RequestLogLine requestLogLine, List inputs, + @Nullable Map>> columnsByTable, @Nullable String output ) { @@ -327,8 +629,8 @@ private ObjectNode buildRunEvent( event.put("schemaURL", SCHEMA_URL); event.set("run", buildRun(queryId, queryType, requestLogLine, stats, success)); event.set("job", buildJob(queryId, requestLogLine.getSql())); - event.set("inputs", buildDatasets(inputs)); - event.set("outputs", buildDatasets(output != null ? List.of(output) : List.of())); + event.set("inputs", buildDatasets(inputs, columnsByTable)); + event.set("outputs", buildDatasets(output != null ? List.of(output) : List.of(), null)); return event; } @@ -434,19 +736,59 @@ private ObjectNode createFacet(@Nullable String schemaUrl) return facet; } - private ArrayNode buildDatasets(List tableNames) + private ArrayNode buildDatasets( + List tableNames, + @Nullable Map>> columnsByTable + ) { ArrayNode array = jsonMapper.createArrayNode(); for (String name : tableNames) { ObjectNode node = jsonMapper.createObjectNode(); node.put("namespace", namespace); node.put("name", name); - node.set("facets", jsonMapper.createObjectNode()); + ObjectNode facets = jsonMapper.createObjectNode(); + Map> columns = columnsByTable == null ? null : columnsByTable.get(name); + if (columns != null && !columns.isEmpty()) { + addColumnFacets(facets, columns); + } + node.set("facets", facets); array.add(node); } return array; } + /** + * Attaches the standard OpenLineage {@code schema} facet (referenced column names, sorted) and the + * custom {@code druid_columnUsage} facet (column to usage roles) to an input dataset's facets. + */ + private void addColumnFacets(ObjectNode facets, Map> columns) + { + ObjectNode schemaFacet = createFacet(SCHEMA_FACET_SCHEMA_URL); + ArrayNode fields = jsonMapper.createArrayNode(); + ObjectNode usageFields = jsonMapper.createObjectNode(); + // columns is a TreeMap, so iteration (and the emitted JSON) is deterministically sorted by name. + for (Map.Entry> entry : columns.entrySet()) { + ObjectNode field = jsonMapper.createObjectNode(); + field.put("name", entry.getKey()); + fields.add(field); + + ArrayNode usages = jsonMapper.createArrayNode(); + // EnumSet iterates in enum declaration order, so usage lists are deterministic too. + for (ColumnRole role : entry.getValue()) { + usages.add(role.name()); + } + ObjectNode usageEntry = jsonMapper.createObjectNode(); + usageEntry.set("usages", usages); + usageFields.set(entry.getKey(), usageEntry); + } + schemaFacet.set("fields", fields); + facets.set("schema", schemaFacet); + + ObjectNode usageFacet = createFacet(COLUMN_USAGE_FACET_SCHEMA_URL); + usageFacet.set("fields", usageFields); + facets.set("druid_columnUsage", usageFacet); + } + protected void emit(ObjectNode event) { diff --git a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerProvider.java b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerProvider.java index 30555d3ff34e..b3c72921412b 100644 --- a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerProvider.java +++ b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerProvider.java @@ -82,6 +82,11 @@ public enum TransportType "timeBoundary" ); + // When false, only table-level (datasource) lineage is emitted; the per-column schema and + // druid_columnUsage facets are omitted. Lets operators trade column detail for smaller events. + @JsonProperty + private boolean columnLineageEnabled = true; + @JsonProperty private int emitQueueCapacity = OpenLineageRequestLogger.DEFAULT_EMIT_QUEUE_CAPACITY; @@ -115,6 +120,7 @@ public RequestLogger get() transportType, transportUrl, excludedNativeQueryTypes, + columnLineageEnabled, emitQueueCapacity, emitThreadCount, httpClient diff --git a/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidColumnUsageDatasetFacet.json b/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidColumnUsageDatasetFacet.json new file mode 100644 index 000000000000..d7580988dde3 --- /dev/null +++ b/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidColumnUsageDatasetFacet.json @@ -0,0 +1,31 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://raw.githubusercontent.com/apache/druid/master/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidColumnUsageDatasetFacet.json", + "title": "DruidColumnUsageDatasetFacet", + "description": "Druid-specific dataset facet capturing how each input column was referenced by the query. Roles align with OpenLineage transformation subtypes where applicable (GROUP_BY, AGGREGATION, FILTER, JOIN); PROJECTION corresponds to OpenLineage IDENTITY/TRANSFORMATION (a selected column).", + "type": "object", + "allOf": [ + {"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/definitions/DatasetFacet"} + ], + "properties": { + "fields": { + "type": "object", + "description": "Map of referenced column name to how it was used in this query.", + "additionalProperties": { + "type": "object", + "properties": { + "usages": { + "type": "array", + "description": "The roles in which the column was referenced by the query.", + "items": { + "type": "string", + "enum": ["PROJECTION", "GROUP_BY", "AGGREGATION", "FILTER", "JOIN"] + } + } + }, + "required": ["usages"] + } + } + }, + "required": ["fields"] +} diff --git a/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java b/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java index 0a5c2d1bd57a..cb31100ff163 100644 --- a/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java +++ b/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java @@ -23,18 +23,35 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; +import org.apache.druid.query.Druids; +import org.apache.druid.query.JoinDataSource; +import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; import org.joda.time.DateTime; @@ -527,6 +544,337 @@ public void testStopWithHttpTransport() Assertions.assertDoesNotThrow(httpLogger::stop); } + // --- Column-level lineage tests (BDCE-559) --- + + @Test + public void testScanColumnLineage() throws IOException + { + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource("events") + .intervals(everyInterval()) + .columns("userId", "page") + .filters(new SelectorDimFilter("country", "US", null)) + .context(ImmutableMap.of("queryId", "scan-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode input = inputByName(capturedEvents.get(0), "events"); + Assertions.assertEquals(ImmutableList.of("country", "page", "userId"), schemaFieldNames(input)); + Assertions.assertEquals(ImmutableList.of("PROJECTION"), usagesOf(input, "userId")); + Assertions.assertEquals(ImmutableList.of("PROJECTION"), usagesOf(input, "page")); + Assertions.assertEquals(ImmutableList.of("FILTER"), usagesOf(input, "country")); + } + + @Test + public void testGroupByColumnLineage() throws IOException + { + GroupByQuery query = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setAggregatorSpecs(new LongSumAggregatorFactory("total", "amount")) + .setDimFilter(new SelectorDimFilter("status", "active", null)) + .setContext(ImmutableMap.of("queryId", "gb-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode input = inputByName(capturedEvents.get(0), "sales"); + Assertions.assertEquals(ImmutableList.of("amount", "country", "status"), schemaFieldNames(input)); + Assertions.assertEquals(ImmutableList.of("AGGREGATION"), usagesOf(input, "amount")); + Assertions.assertEquals(ImmutableList.of("GROUP_BY"), usagesOf(input, "country")); + Assertions.assertEquals(ImmutableList.of("FILTER"), usagesOf(input, "status")); + } + + @Test + public void testVirtualColumnExpandsToBaseColumns() throws IOException + { + GroupByQuery query = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setVirtualColumns(VirtualColumns.create(ImmutableList.of( + new ExpressionVirtualColumn("v0", "\"base\" * 2", ColumnType.LONG, ExprMacroTable.nil()) + ))) + .setDimensions(new DefaultDimensionSpec("v0", "v0")) + .setContext(ImmutableMap.of("queryId", "vc-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode input = inputByName(capturedEvents.get(0), "sales"); + // The virtual column "v0" is expanded to its underlying base column "base"; "v0" is not emitted. + Assertions.assertEquals(ImmutableList.of("base"), schemaFieldNames(input)); + Assertions.assertEquals(ImmutableList.of("GROUP_BY"), usagesOf(input, "base")); + } + + @Test + public void testSelectStarEmitsNoColumnFacets() throws IOException + { + // Scan with no explicit columns (SELECT *) -> table-level lineage only, no column facets. + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource("events") + .intervals(everyInterval()) + .context(ImmutableMap.of("queryId", "scan-star")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode input = inputByName(capturedEvents.get(0), "events"); + Assertions.assertNotNull(input); + Assertions.assertNull(input.get("facets").get("schema")); + Assertions.assertNull(input.get("facets").get("druid_columnUsage")); + } + + @Test + public void testUnionReplicatesColumnsToMembers() throws IOException + { + // Union members share the same output schema, so referenced columns are attributed to each member. + GroupByQuery query = GroupByQuery.builder() + .setDataSource(new UnionDataSource(ImmutableList.of( + new TableDataSource("sales_a"), + new TableDataSource("sales_b") + ))) + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setContext(ImmutableMap.of("queryId", "union-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode event = capturedEvents.get(0); + Set names = inputNames(event); + Assertions.assertEquals(2, names.size()); + Assertions.assertTrue(names.contains("sales_a") && names.contains("sales_b")); + for (String table : ImmutableList.of("sales_a", "sales_b")) { + JsonNode input = inputByName(event, table); + Assertions.assertEquals(ImmutableList.of("country"), schemaFieldNames(input), table); + Assertions.assertEquals(ImmutableList.of("GROUP_BY"), usagesOf(input, "country")); + } + } + + @Test + public void testJoinColumnLineageAttributedPerSide() throws IOException + { + JoinDataSource join = JoinDataSource.create( + new TableDataSource("sales"), + new TableDataSource("users"), + "j0.", + "\"country\" == \"j0.country\"", + JoinType.INNER, + null, + ExprMacroTable.nil(), + null, + null + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(join) + .intervals(everyInterval()) + .columns("country", "j0.age") + .context(ImmutableMap.of("queryId", "join-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode event = capturedEvents.get(0); + JsonNode sales = inputByName(event, "sales"); + JsonNode users = inputByName(event, "users"); + + // Left-side column "country" is both projected and used as a join key. + Assertions.assertEquals(ImmutableList.of("country"), schemaFieldNames(sales)); + Assertions.assertEquals(ImmutableList.of("PROJECTION", "JOIN"), usagesOf(sales, "country")); + // Right-side columns are un-prefixed and attributed to "users". + Assertions.assertEquals(ImmutableList.of("age", "country"), schemaFieldNames(users)); + Assertions.assertEquals(ImmutableList.of("PROJECTION"), usagesOf(users, "age")); + Assertions.assertEquals(ImmutableList.of("JOIN"), usagesOf(users, "country")); + } + + @Test + public void testSubQueryColumnLineageRecursesToBaseTable() throws IOException + { + GroupByQuery inner = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setAggregatorSpecs(new LongSumAggregatorFactory("total", "amount")) + .build(); + GroupByQuery outer = GroupByQuery.builder() + .setDataSource(new QueryDataSource(inner)) + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setAggregatorSpecs(new LongSumAggregatorFactory("grand", "total")) + .setContext(ImmutableMap.of("queryId", "subq-1")) + .build(); + logger.logNativeQuery(nativeLine(outer, ImmutableMap.of("success", true))); + + JsonNode sales = inputByName(capturedEvents.get(0), "sales"); + // Base columns come from the sub-query's own parts; the outer references to sub-query outputs + // ("total", "grand") must not be fabricated as base columns. + Assertions.assertEquals(ImmutableList.of("amount", "country"), schemaFieldNames(sales)); + Assertions.assertEquals(ImmutableList.of("AGGREGATION"), usagesOf(sales, "amount")); + Assertions.assertEquals(ImmutableList.of("GROUP_BY"), usagesOf(sales, "country")); + } + + @Test + public void testTopNColumnLineage() throws IOException + { + Query query = new TopNQueryBuilder() + .dataSource("sales") + .intervals(everyInterval()) + .granularity(Granularities.ALL) + .dimension(new DefaultDimensionSpec("country", "country")) + .metric("total") + .threshold(10) + .aggregators(new LongSumAggregatorFactory("total", "amount")) + .filters(new SelectorDimFilter("status", "active", null)) + .context(ImmutableMap.of("queryId", "topn-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode sales = inputByName(capturedEvents.get(0), "sales"); + Assertions.assertEquals(ImmutableList.of("amount", "country", "status"), schemaFieldNames(sales)); + Assertions.assertEquals(ImmutableList.of("GROUP_BY"), usagesOf(sales, "country")); + Assertions.assertEquals(ImmutableList.of("AGGREGATION"), usagesOf(sales, "amount")); + Assertions.assertEquals(ImmutableList.of("FILTER"), usagesOf(sales, "status")); + } + + @Test + public void testTimeseriesColumnLineage() throws IOException + { + Query query = Druids.newTimeseriesQueryBuilder() + .dataSource("sales") + .intervals(everyInterval()) + .granularity(Granularities.ALL) + .aggregators(new LongSumAggregatorFactory("total", "amount")) + .filters(new SelectorDimFilter("status", "active", null)) + .context(ImmutableMap.of("queryId", "ts-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode sales = inputByName(capturedEvents.get(0), "sales"); + // Timeseries has no dimensions, so no GROUP_BY role. + Assertions.assertEquals(ImmutableList.of("amount", "status"), schemaFieldNames(sales)); + Assertions.assertEquals(ImmutableList.of("AGGREGATION"), usagesOf(sales, "amount")); + Assertions.assertEquals(ImmutableList.of("FILTER"), usagesOf(sales, "status")); + } + + @Test + public void testColumnUsedInMultipleRoles() throws IOException + { + GroupByQuery query = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setDimFilter(new SelectorDimFilter("country", "US", null)) + .setContext(ImmutableMap.of("queryId", "multirole-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode sales = inputByName(capturedEvents.get(0), "sales"); + // "country" is both a grouping dimension and a filter; roles merge in enum declaration order. + Assertions.assertEquals(ImmutableList.of("GROUP_BY", "FILTER"), usagesOf(sales, "country")); + } + + @Test + public void testLookupJoinDropsRightColumnsNotFabricated() throws IOException + { + JoinDataSource join = JoinDataSource.create( + new TableDataSource("sales"), + new LookupDataSource("country_lookup"), + "j0.", + "\"country\" == \"j0.k\"", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + null, + null + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(join) + .intervals(everyInterval()) + .columns("country", "j0.v") + .context(ImmutableMap.of("queryId", "lookupjoin-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode event = capturedEvents.get(0); + // The lookup has no base table -> it contributes no input dataset and no fabricated columns. + Assertions.assertEquals(Collections.singleton("sales"), inputNames(event)); + JsonNode sales = inputByName(event, "sales"); + Assertions.assertEquals(ImmutableList.of("country"), schemaFieldNames(sales)); + // "j0.v" (lookup value) and "j0.k" (lookup key) are dropped, not attributed to "sales". + Assertions.assertEquals(ImmutableList.of("PROJECTION", "JOIN"), usagesOf(sales, "country")); + } + + @Test + public void testColumnLineageDisabledEmitsTableLevelOnly() throws IOException + { + OpenLineageRequestLogger disabled = new OpenLineageRequestLogger( + MAPPER, + NAMESPACE, + OpenLineageRequestLoggerProvider.TransportType.CONSOLE, + null, + DEFAULT_EXCLUDED_NATIVE_QUERY_TYPES, + false, + OpenLineageRequestLogger.DEFAULT_EMIT_QUEUE_CAPACITY, + OpenLineageRequestLogger.DEFAULT_EMIT_THREAD_COUNT, + null + ) + { + @Override + protected void emit(ObjectNode event) + { + capturedEvents.add(event); + } + }; + GroupByQuery query = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setContext(ImmutableMap.of("queryId", "disabled-1")) + .build(); + disabled.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode sales = inputByName(capturedEvents.get(0), "sales"); + Assertions.assertNull(sales.get("facets").get("schema"), "column lineage disabled -> no schema facet"); + Assertions.assertNull(sales.get("facets").get("druid_columnUsage")); + } + + private static QuerySegmentSpec everyInterval() + { + return new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2024-01-01/2024-01-02"))); + } + + private static JsonNode inputByName(JsonNode event, String name) + { + for (JsonNode input : event.get("inputs")) { + if (input.get("name").asText().equals(name)) { + return input; + } + } + return null; + } + + private static List schemaFieldNames(JsonNode dataset) + { + List names = new ArrayList<>(); + for (JsonNode field : dataset.get("facets").get("schema").get("fields")) { + names.add(field.get("name").asText()); + } + return names; + } + + private static List usagesOf(JsonNode dataset, String column) + { + List usages = new ArrayList<>(); + JsonNode entry = dataset.get("facets").get("druid_columnUsage").get("fields").get(column); + for (JsonNode usage : entry.get("usages")) { + usages.add(usage.asText()); + } + return usages; + } + private static RequestLogLine sqlLine(String sql, Map context, Map stats) { return RequestLogLine.forSql(sql, context, TIMESTAMP, REMOTE_ADDR, new QueryStats(stats)); From d8592d190a871d54d7cc5d126efb5bc8db078adb Mon Sep 17 00:00:00 2001 From: Maryam Shahid Date: Thu, 2 Jul 2026 11:19:25 -0700 Subject: [PATCH 2/2] updates from comments --- .../openlineage/OpenLineageRequestLogger.java | 298 +---------- .../OpenLineageRequestLoggerTest.java | 74 ++- .../druid/query/QueryColumnUsageAnalyzer.java | 365 +++++++++++++ .../apache/druid/query/union/UnionQuery.java | 12 + .../query/QueryColumnUsageAnalyzerTest.java | 490 ++++++++++++++++++ 5 files changed, 956 insertions(+), 283 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/QueryColumnUsageAnalyzer.java create mode 100644 processing/src/test/java/org/apache/druid/query/QueryColumnUsageAnalyzerTest.java diff --git a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java index a4ba15c87703..e50c555fbab2 100644 --- a/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java +++ b/extensions-contrib/openlineage-emitter/src/main/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLogger.java @@ -31,28 +31,8 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.BaseQuery; -import org.apache.druid.query.DataSource; -import org.apache.druid.query.FilteredDataSource; -import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; -import org.apache.druid.query.QueryDataSource; -import org.apache.druid.query.RestrictedDataSource; -import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.UnionDataSource; -import org.apache.druid.query.UnnestDataSource; -import org.apache.druid.query.aggregation.AggregatorFactory; -import org.apache.druid.query.dimension.DimensionSpec; -import org.apache.druid.query.filter.DimFilter; -import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.query.planning.JoinDataSourceAnalysis; -import org.apache.druid.query.planning.PreJoinableClause; -import org.apache.druid.query.scan.ScanQuery; -import org.apache.druid.query.timeseries.TimeseriesQuery; -import org.apache.druid.query.topn.TopNQuery; -import org.apache.druid.query.union.UnionQuery; -import org.apache.druid.segment.VirtualColumn; -import org.apache.druid.segment.VirtualColumns; -import org.apache.druid.segment.join.JoinPrefixUtils; +import org.apache.druid.query.QueryColumnUsageAnalyzer; import org.apache.druid.server.RequestLogLine; import org.apache.druid.server.log.RequestLogger; import org.apache.druid.sql.calcite.parser.DruidSqlParser; @@ -70,15 +50,11 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; @@ -258,7 +234,7 @@ public void logNativeQuery(RequestLogLine requestLogLine) throws IOException queryId = UNKNOWN_QUERY_ID; } - Map>> columnsByTable = + Map>> columnsByTable = columnLineageEnabled ? extractColumnsByTable(requestLogLine.getQuery()) : null; emit(buildRunEvent(queryId, queryType, requestLogLine, inputs, columnsByTable, null)); } @@ -348,31 +324,18 @@ static String extractMsqOutputDatasource(String sql) } /** - * Column usage roles for the custom {@code druid_columnUsage} dataset facet. Names align with - * OpenLineage transformation subtypes where they exist (GROUP_BY, AGGREGATION, FILTER, JOIN); - * PROJECTION corresponds to OpenLineage IDENTITY/TRANSFORMATION (a selected/projected column). - */ - enum ColumnRole - { - PROJECTION, GROUP_BY, AGGREGATION, FILTER, JOIN - } - - /** - * Resolves the per-base-table column-usage map for a native query, used to attach the {@code schema} - * and {@code druid_columnUsage} dataset facets to input datasets. Handles all datasource shapes: - * tables, joins (attributing columns per side via the planner's join prefixes), unions, sub-queries - * (recursing into the sub-query's own columns), and datasource wrappers (restricted/filtered/unnest). - * Columns that have no base table (lookups, inline data) are dropped rather than fabricated. - * - *

Returns {@code null} (yielding table-level lineage only) when no base-table columns can be - * determined, or on any error -- it never fabricates or mis-attributes columns. + * Resolves the per-base-table column-usage map for a native query by delegating to the core + * {@link QueryColumnUsageAnalyzer}, then used to attach the {@code schema} and {@code druid_columnUsage} + * dataset facets to input datasets. Returns {@code null} (yielding table-level lineage only) when no + * base-table columns can be determined, or on any error -- lineage extraction must never break the + * request-logging path, so it never fabricates or mis-attributes columns. */ @Nullable - private Map>> extractColumnsByTable(Query query) + private Map>> extractColumnsByTable(Query query) { try { - Map>> result = new TreeMap<>(); - collectInto(result, query); + Map>> result = + QueryColumnUsageAnalyzer.analyze(query); return result.isEmpty() ? null : result; } // StackOverflowError (an Error, not an Exception) is caught too so that a pathologically deep @@ -383,239 +346,12 @@ private Map>> extractColumnsByTable(Quer } } - /** - * Walks {@code query}'s column-bearing parts and attributes the referenced columns to the base - * tables of its datasource. {@link UnionQuery} (whose {@code getDataSource()} is undefined) is - * handled by recursing into each of its branches. - */ - private void collectInto(Map>> result, Query query) - { - if (query instanceof UnionQuery) { - for (DataSource branch : ((UnionQuery) query).getDataSources()) { - attribute(result, branch, Collections.emptyMap()); - } - return; - } - attribute(result, query.getDataSource(), collectColumnRoles(query)); - } - - /** - * Attributes {@code roles} (column to roles, expressed in {@code dataSource}'s output namespace) to - * the underlying base tables, recursing through the datasource tree. Sub-queries are recursed into - * via {@link #collectInto} (their columns come from their own parts, not the outer references). - */ - private void attribute( - Map>> result, - DataSource dataSource, - Map> roles - ) - { - if (dataSource instanceof TableDataSource) { - // Also covers GlobalTableDataSource (a TableDataSource subclass); it is a real named table, so - // handling it here is intentional. Keep this branch ahead of the wrapper branches below. - String table = ((TableDataSource) dataSource).getName(); - for (Map.Entry> entry : roles.entrySet()) { - addRoles(result, table, entry.getKey(), entry.getValue()); - } - } else if (dataSource instanceof RestrictedDataSource) { - attribute(result, ((RestrictedDataSource) dataSource).getBase(), roles); - } else if (dataSource instanceof FilteredDataSource) { - attribute(result, ((FilteredDataSource) dataSource).getBase(), roles); - } else if (dataSource instanceof UnnestDataSource) { - UnnestDataSource unnest = (UnnestDataSource) dataSource; - VirtualColumn unnestColumn = unnest.getVirtualColumn(); - Map> baseRoles = new TreeMap<>(roles); - // The unnest output column is synthetic (not a base column); drop it and instead record the - // underlying column(s) being unnested as projected from the base. - baseRoles.remove(unnestColumn.getOutputName()); - for (String required : unnestColumn.requiredColumns()) { - baseRoles.computeIfAbsent(required, k -> EnumSet.noneOf(ColumnRole.class)).add(ColumnRole.PROJECTION); - } - attribute(result, unnest.getBase(), baseRoles); - } else if (dataSource instanceof UnionDataSource) { - // Union members share the same output schema, so the referenced columns apply to each. - for (DataSource member : dataSource.getChildren()) { - attribute(result, member, roles); - } - } else if (dataSource instanceof QueryDataSource) { - // The outer references are this sub-query's OUTPUT columns, not base columns; the sub-query's - // own base-table columns are captured by recursing into its parts. - collectInto(result, ((QueryDataSource) dataSource).getQuery()); - } else if (dataSource instanceof JoinDataSource) { - attributeJoin(result, (JoinDataSource) dataSource, roles); - } - // LookupDataSource, InlineDataSource and any other shape have no base table: drop (never fabricate). - } - - /** - * Splits {@code roles} (plus the join-condition columns, tagged {@link ColumnRole#JOIN}) across the - * base datasource and each joinable clause using the planner's join prefixes, then recurses. Clauses - * are matched longest-prefix-first; right-side columns arrive already prefixed and are un-prefixed - * before attribution to the clause's datasource (which may itself be a table, join, or sub-query). - */ - private void attributeJoin( - Map>> result, - JoinDataSource join, - Map> roles - ) - { - JoinDataSourceAnalysis analysis = JoinDataSourceAnalysis.constructAnalysis(join); - DataSource base = analysis.getBaseDataSource(); - List clauses = new ArrayList<>(analysis.getPreJoinableClauses()); - // Longest prefix first so that, e.g., "j0.x" matches clause "j0." rather than a shorter prefix. - clauses.sort((a, b) -> Integer.compare(b.getPrefix().length(), a.getPrefix().length())); - - Map> all = new TreeMap<>(roles); - for (PreJoinableClause clause : clauses) { - for (String column : clause.getCondition().getRequiredColumns()) { - all.computeIfAbsent(column, k -> EnumSet.noneOf(ColumnRole.class)).add(ColumnRole.JOIN); - } - } - - Map>> partitioned = new LinkedHashMap<>(); - for (Map.Entry> entry : all.entrySet()) { - String column = entry.getKey(); - DataSource target = base; - String resolved = column; - for (PreJoinableClause clause : clauses) { - if (JoinPrefixUtils.isPrefixedBy(column, clause.getPrefix())) { - target = clause.getDataSource(); - resolved = JoinPrefixUtils.unprefix(column, clause.getPrefix()); - break; - } - } - partitioned.computeIfAbsent(target, k -> new TreeMap<>()) - .computeIfAbsent(resolved, k -> EnumSet.noneOf(ColumnRole.class)) - .addAll(entry.getValue()); - } - for (Map.Entry>> entry : partitioned.entrySet()) { - attribute(result, entry.getKey(), entry.getValue()); - } - } - - private static void addRoles( - Map>> result, - String table, - String column, - EnumSet roles - ) - { - result.computeIfAbsent(table, k -> new TreeMap<>()) - .computeIfAbsent(column, k -> EnumSet.noneOf(ColumnRole.class)) - .addAll(roles); - } - - /** - * Walks the query's column-bearing parts (projected columns, dimensions, aggregator inputs, filter - * columns) and records each referenced base column with the role(s) it was used in. Virtual columns - * are expanded transitively to their underlying base columns, carrying the consuming role. Only - * explicitly-referenced columns are captured (notably {@code __time} is included only when it - * actually appears in a part, never its implicit interval usage). Returns an empty map for - * unsupported query types and for a bare {@code SELECT *} (a Scan with neither explicit columns nor - * a filter); a {@code SELECT *} that carries a filter still contributes its filter columns. - */ - private Map> collectColumnRoles(Query query) - { - Map> roles = new TreeMap<>(); - if (query instanceof ScanQuery) { - ScanQuery scan = (ScanQuery) query; - VirtualColumns vcs = scan.getVirtualColumns(); - if (scan.getColumns() != null) { - for (String column : scan.getColumns()) { - addColumn(roles, vcs, column, ColumnRole.PROJECTION); - } - } - addFilterColumns(roles, vcs, scan.getFilter()); - } else if (query instanceof GroupByQuery) { - GroupByQuery groupBy = (GroupByQuery) query; - VirtualColumns vcs = groupBy.getVirtualColumns(); - for (DimensionSpec dimension : groupBy.getDimensions()) { - addColumn(roles, vcs, dimension.getDimension(), ColumnRole.GROUP_BY); - } - addAggregatorColumns(roles, vcs, groupBy.getAggregatorSpecs()); - addFilterColumns(roles, vcs, groupBy.getDimFilter()); - } else if (query instanceof TopNQuery) { - TopNQuery topN = (TopNQuery) query; - VirtualColumns vcs = topN.getVirtualColumns(); - addColumn(roles, vcs, topN.getDimensionSpec().getDimension(), ColumnRole.GROUP_BY); - addAggregatorColumns(roles, vcs, topN.getAggregatorSpecs()); - addFilterColumns(roles, vcs, topN.getFilter()); - } else if (query instanceof TimeseriesQuery) { - TimeseriesQuery timeseries = (TimeseriesQuery) query; - VirtualColumns vcs = timeseries.getVirtualColumns(); - addAggregatorColumns(roles, vcs, timeseries.getAggregatorSpecs()); - addFilterColumns(roles, vcs, timeseries.getFilter()); - } else { - // Unsupported query type: emit table-level lineage only. - return Collections.emptyMap(); - } - return roles; - } - - private void addAggregatorColumns( - Map> roles, - VirtualColumns vcs, - List aggregators - ) - { - for (AggregatorFactory aggregator : aggregators) { - for (String column : aggregator.requiredFields()) { - addColumn(roles, vcs, column, ColumnRole.AGGREGATION); - } - } - } - - private void addFilterColumns( - Map> roles, - VirtualColumns vcs, - @Nullable DimFilter filter - ) - { - if (filter != null) { - for (String column : filter.getRequiredColumns()) { - addColumn(roles, vcs, column, ColumnRole.FILTER); - } - } - } - - private void addColumn(Map> roles, VirtualColumns vcs, String column, ColumnRole role) - { - expandColumn(roles, vcs, column, role, new HashSet<>()); - } - - /** - * Adds {@code column} with {@code role}, expanding virtual columns transitively to their underlying - * base columns. The {@code visited} set guards against virtual columns that reference one another. - */ - private void expandColumn( - Map> roles, - VirtualColumns vcs, - @Nullable String column, - ColumnRole role, - Set visited - ) - { - if (column == null || !visited.add(column)) { - return; - } - if (vcs != null && vcs.exists(column)) { - VirtualColumn virtualColumn = vcs.getVirtualColumn(column); - if (virtualColumn != null) { - for (String required : virtualColumn.requiredColumns()) { - expandColumn(roles, vcs, required, role, visited); - } - return; - } - } - roles.computeIfAbsent(column, k -> EnumSet.noneOf(ColumnRole.class)).add(role); - } - private ObjectNode buildRunEvent( String queryId, String queryType, RequestLogLine requestLogLine, List inputs, - @Nullable Map>> columnsByTable, + @Nullable Map>> columnsByTable, @Nullable String output ) { @@ -738,7 +474,7 @@ private ObjectNode createFacet(@Nullable String schemaUrl) private ArrayNode buildDatasets( List tableNames, - @Nullable Map>> columnsByTable + @Nullable Map>> columnsByTable ) { ArrayNode array = jsonMapper.createArrayNode(); @@ -747,7 +483,7 @@ private ArrayNode buildDatasets( node.put("namespace", namespace); node.put("name", name); ObjectNode facets = jsonMapper.createObjectNode(); - Map> columns = columnsByTable == null ? null : columnsByTable.get(name); + Map> columns = columnsByTable == null ? null : columnsByTable.get(name); if (columns != null && !columns.isEmpty()) { addColumnFacets(facets, columns); } @@ -761,20 +497,20 @@ private ArrayNode buildDatasets( * Attaches the standard OpenLineage {@code schema} facet (referenced column names, sorted) and the * custom {@code druid_columnUsage} facet (column to usage roles) to an input dataset's facets. */ - private void addColumnFacets(ObjectNode facets, Map> columns) + private void addColumnFacets(ObjectNode facets, Map> columns) { ObjectNode schemaFacet = createFacet(SCHEMA_FACET_SCHEMA_URL); ArrayNode fields = jsonMapper.createArrayNode(); ObjectNode usageFields = jsonMapper.createObjectNode(); // columns is a TreeMap, so iteration (and the emitted JSON) is deterministically sorted by name. - for (Map.Entry> entry : columns.entrySet()) { + for (Map.Entry> entry : columns.entrySet()) { ObjectNode field = jsonMapper.createObjectNode(); field.put("name", entry.getKey()); fields.add(field); ArrayNode usages = jsonMapper.createArrayNode(); // EnumSet iterates in enum declaration order, so usage lists are deterministic too. - for (ColumnRole role : entry.getValue()) { + for (QueryColumnUsageAnalyzer.ColumnUsage role : entry.getValue()) { usages.add(role.name()); } ObjectNode usageEntry = jsonMapper.createObjectNode(); @@ -789,7 +525,6 @@ private void addColumnFacets(ObjectNode facets, Map> facets.set("druid_columnUsage", usageFacet); } - protected void emit(ObjectNode event) { try { @@ -913,5 +648,4 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) } } } - } diff --git a/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java b/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java index cb31100ff163..57c4d55ce847 100644 --- a/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java +++ b/extensions-contrib/openlineage-emitter/src/test/java/org/apache/druid/extensions/openlineage/OpenLineageRequestLoggerTest.java @@ -33,12 +33,14 @@ import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; +import org.apache.druid.query.FilteredDataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.UnionDataSource; +import org.apache.druid.query.aggregation.FilteredAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.filter.DimFilter; @@ -67,7 +69,6 @@ import java.util.Map; import java.util.Set; - public class OpenLineageRequestLoggerTest { private static final ObjectMapper MAPPER = new DefaultObjectMapper(); @@ -841,6 +842,77 @@ protected void emit(ObjectNode event) Assertions.assertNull(sales.get("facets").get("druid_columnUsage")); } + @Test + public void testFilteredAggregatorSplitsAggregationAndFilterRoles() throws IOException + { + // SUM(added) FILTER (WHERE status = 'active'): "added" is the aggregation input, "status" the filter. + Query query = Druids.newTimeseriesQueryBuilder() + .dataSource("sales") + .intervals(everyInterval()) + .granularity(Granularities.ALL) + .aggregators(new FilteredAggregatorFactory( + new LongSumAggregatorFactory("s", "added"), + new SelectorDimFilter("status", "active", null) + )) + .context(ImmutableMap.of("queryId", "filtagg-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode sales = inputByName(capturedEvents.get(0), "sales"); + Assertions.assertEquals(ImmutableList.of("added", "status"), schemaFieldNames(sales)); + Assertions.assertEquals(ImmutableList.of("AGGREGATION"), usagesOf(sales, "added")); + Assertions.assertEquals(ImmutableList.of("FILTER"), usagesOf(sales, "status")); + } + + @Test + public void testJoinBaseTableFilterColumns() throws IOException + { + JoinDataSource join = JoinDataSource.create( + new TableDataSource("sales"), + new TableDataSource("users"), + "j0.", + "\"country\" == \"j0.country\"", + JoinType.INNER, + new SelectorDimFilter("status", "active", null), + ExprMacroTable.nil(), + null, + null + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(join) + .intervals(everyInterval()) + .columns("country") + .context(ImmutableMap.of("queryId", "joinfilter-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode sales = inputByName(capturedEvents.get(0), "sales"); + // The join's base-table (left) filter on "status" is captured as FILTER on the base table. + Assertions.assertEquals(ImmutableList.of("FILTER"), usagesOf(sales, "status")); + Assertions.assertEquals(ImmutableList.of("PROJECTION", "JOIN"), usagesOf(sales, "country")); + } + + @Test + public void testFilteredDataSourceCapturesFilterColumns() throws IOException + { + FilteredDataSource filtered = FilteredDataSource.create( + new TableDataSource("sales"), + new SelectorDimFilter("status", "active", null) + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(filtered) + .intervals(everyInterval()) + .columns("country") + .context(ImmutableMap.of("queryId", "filtds-1")) + .build(); + logger.logNativeQuery(nativeLine(query, ImmutableMap.of("success", true))); + + JsonNode sales = inputByName(capturedEvents.get(0), "sales"); + Assertions.assertEquals(ImmutableList.of("country", "status"), schemaFieldNames(sales)); + Assertions.assertEquals(ImmutableList.of("PROJECTION"), usagesOf(sales, "country")); + Assertions.assertEquals(ImmutableList.of("FILTER"), usagesOf(sales, "status")); + } + private static QuerySegmentSpec everyInterval() { return new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2024-01-01/2024-01-02"))); diff --git a/processing/src/main/java/org/apache/druid/query/QueryColumnUsageAnalyzer.java b/processing/src/main/java/org/apache/druid/query/QueryColumnUsageAnalyzer.java new file mode 100644 index 000000000000..e78a991ab6c7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/QueryColumnUsageAnalyzer.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.FilteredAggregatorFactory; +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.DimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.planning.JoinDataSourceAnalysis; +import org.apache.druid.query.planning.PreJoinableClause; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.query.union.UnionQuery; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.join.JoinPrefixUtils; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * Analyzes a native {@link Query} to determine, per base table, which columns it references and in + * what role (projection, filter, group-by, aggregation, join key). Columns are read from the query + * plan -- not by parsing SQL -- and attributed to base tables across joins (using the planner's join + * prefixes), unions, sub-queries, and datasource wrappers (restricted/filtered/unnest). Columns with + * no base table (for example lookups or inline data) are dropped rather than fabricated. + * + *

This is intended as a single, planner-aligned source of truth for "which columns does this query + * read, and how" so that lineage emitters, column-statistics/optimizer work, and other consumers do + * not each re-derive it and drift from the parser/planner. + * + *

It is complementary to {@link Query#getRequiredColumns()}: that method returns a flat, + * un-attributed set, may be {@code null}, unconditionally injects {@code __time}, does not expand + * virtual columns to their base columns, and (by contract) excludes join-condition and pushed-down + * filter columns. This analyzer attributes per base table, assigns usage roles, expands virtual + * columns, and includes join-condition / {@code leftFilter} / filtered-aggregator predicate columns. + */ +public final class QueryColumnUsageAnalyzer +{ + private QueryColumnUsageAnalyzer() + { + } + + /** + * The role in which a column is referenced by a query. Names align with OpenLineage transformation + * subtypes where they exist (GROUP_BY, AGGREGATION, FILTER, JOIN); PROJECTION denotes a + * selected/projected column. + */ + public enum ColumnUsage + { + PROJECTION, + GROUP_BY, + AGGREGATION, + FILTER, + JOIN + } + + /** + * Returns a map of base-table name to (column name to the set of roles it was used in) for the given + * query. Returns an empty map when no base-table columns can be determined (unsupported query type, + * {@code SELECT *} with no filter, or a datasource with no base tables). Never returns {@code null}. + * + *

Result maps are sorted by table and column, and role sets iterate in enum-declaration order, so + * the output is deterministic. This method reads the query structure only and performs no I/O. The + * returned maps and role sets are freshly allocated, mutable, and owned by the caller. + */ + public static Map>> analyze(Query query) + { + Map>> result = new TreeMap<>(); + collectInto(result, query); + return result; + } + + /** + * Walks {@code query}'s column-bearing parts and attributes the referenced columns to the base + * tables of its datasource. {@link UnionQuery} (whose {@code getDataSource()} is undefined) is + * handled by recursing into each of its branch queries -- {@code getDataSources()} would discard the + * branches' dimensions/aggregators/filters, so the full branch queries are used instead. + */ + private static void collectInto(Map>> result, Query query) + { + if (query instanceof UnionQuery) { + for (Query branch : ((UnionQuery) query).getQueries()) { + collectInto(result, branch); + } + return; + } + attribute(result, query.getDataSource(), collectColumnRoles(query)); + } + + /** + * Attributes {@code roles} (column to roles, expressed in {@code dataSource}'s output namespace) to + * the underlying base tables, recursing through the datasource tree. Sub-queries are recursed into + * via {@link #collectInto} (their columns come from their own parts, not the outer references). + */ + private static void attribute( + Map>> result, + DataSource dataSource, + Map> roles + ) + { + if (dataSource instanceof TableDataSource) { + // Also covers GlobalTableDataSource (a TableDataSource subclass); it is a real named table, so + // handling it here is intentional. Keep this branch ahead of the wrapper branches below. + String table = ((TableDataSource) dataSource).getName(); + for (Map.Entry> entry : roles.entrySet()) { + addRoles(result, table, entry.getKey(), entry.getValue()); + } + } else if (dataSource instanceof RestrictedDataSource) { + attribute(result, ((RestrictedDataSource) dataSource).getBase(), roles); + } else if (dataSource instanceof FilteredDataSource) { + // FilteredDataSource carries a pushed-down filter over its base; its columns are FILTER usages. + FilteredDataSource filtered = (FilteredDataSource) dataSource; + Map> baseRoles = copyRoles(roles); + addFilterColumns(baseRoles, null, filtered.getFilter()); + attribute(result, filtered.getBase(), baseRoles); + } else if (dataSource instanceof UnnestDataSource) { + UnnestDataSource unnest = (UnnestDataSource) dataSource; + VirtualColumn unnestColumn = unnest.getVirtualColumn(); + Map> baseRoles = copyRoles(roles); + // The unnest output column is synthetic (not a base column); drop it and instead record the + // underlying column(s) being unnested as projected from the base. + baseRoles.remove(unnestColumn.getOutputName()); + for (String required : unnestColumn.requiredColumns()) { + baseRoles.computeIfAbsent(required, k -> EnumSet.noneOf(ColumnUsage.class)).add(ColumnUsage.PROJECTION); + } + attribute(result, unnest.getBase(), baseRoles); + } else if (dataSource instanceof UnionDataSource) { + // Union members share the same output schema, so the referenced columns apply to each. + for (DataSource member : dataSource.getChildren()) { + attribute(result, member, roles); + } + } else if (dataSource instanceof QueryDataSource) { + // The outer references are this sub-query's OUTPUT columns, not base columns; the sub-query's + // own base-table columns are captured by recursing into its parts. + collectInto(result, ((QueryDataSource) dataSource).getQuery()); + } else if (dataSource instanceof JoinDataSource) { + attributeJoin(result, (JoinDataSource) dataSource, roles); + } + // LookupDataSource, InlineDataSource and any other shape have no base table: drop (never fabricate). + } + + /** + * Splits {@code roles} (plus join-condition and base-table-filter columns) across the base + * datasource and each joinable clause using the planner's join prefixes, then recurses. Clauses are + * matched longest-prefix-first; right-side columns arrive already prefixed and are un-prefixed before + * attribution to the clause's datasource (which may itself be a table, join, or sub-query). + */ + private static void attributeJoin( + Map>> result, + JoinDataSource join, + Map> roles + ) + { + JoinDataSourceAnalysis analysis = JoinDataSourceAnalysis.constructAnalysis(join); + DataSource base = analysis.getBaseDataSource(); + List clauses = new ArrayList<>(analysis.getPreJoinableClauses()); + // Longest prefix first so that, e.g., "j0.x" matches clause "j0." rather than a shorter prefix. + clauses.sort((a, b) -> Integer.compare(b.getPrefix().length(), a.getPrefix().length())); + + Map> all = copyRoles(roles); + for (PreJoinableClause clause : clauses) { + for (String column : clause.getCondition().getRequiredColumns()) { + all.computeIfAbsent(column, k -> EnumSet.noneOf(ColumnUsage.class)).add(ColumnUsage.JOIN); + } + } + // A predicate pushed onto the join's base table (leftFilter) also references base-table columns. + analysis.getJoinBaseTableFilter().ifPresent(baseFilter -> { + for (String column : baseFilter.getRequiredColumns()) { + all.computeIfAbsent(column, k -> EnumSet.noneOf(ColumnUsage.class)).add(ColumnUsage.FILTER); + } + }); + + Map>> partitioned = new LinkedHashMap<>(); + for (Map.Entry> entry : all.entrySet()) { + String column = entry.getKey(); + DataSource target = base; + String resolved = column; + for (PreJoinableClause clause : clauses) { + if (JoinPrefixUtils.isPrefixedBy(column, clause.getPrefix())) { + target = clause.getDataSource(); + resolved = JoinPrefixUtils.unprefix(column, clause.getPrefix()); + break; + } + } + partitioned.computeIfAbsent(target, k -> new TreeMap<>()) + .computeIfAbsent(resolved, k -> EnumSet.noneOf(ColumnUsage.class)) + .addAll(entry.getValue()); + } + for (Map.Entry>> entry : partitioned.entrySet()) { + attribute(result, entry.getKey(), entry.getValue()); + } + } + + private static void addRoles( + Map>> result, + String table, + String column, + EnumSet roles + ) + { + result.computeIfAbsent(table, k -> new TreeMap<>()) + .computeIfAbsent(column, k -> EnumSet.noneOf(ColumnUsage.class)) + .addAll(roles); + } + + /** + * Deep-copies a column-to-roles map so a wrapper/join branch can add its own roles without mutating + * the shared {@link EnumSet} values of the map it was handed (the same {@code roles} instance is + * passed to every {@link UnionDataSource} member, so in-place mutation would leak across members). + */ + private static Map> copyRoles(Map> roles) + { + Map> copy = new TreeMap<>(); + for (Map.Entry> entry : roles.entrySet()) { + copy.put(entry.getKey(), EnumSet.copyOf(entry.getValue())); + } + return copy; + } + + /** + * Walks the query's column-bearing parts (projected columns, dimensions, aggregator inputs, filter + * columns) and records each referenced base column with the role(s) it was used in. Virtual columns + * are expanded transitively to their underlying base columns, carrying the consuming role. Only + * explicitly-referenced columns are captured (notably {@code __time} is included only when it + * actually appears in a part, never its implicit interval usage). Returns an empty map for + * unsupported query types and for a bare {@code SELECT *} (a Scan with neither explicit columns nor + * a filter); a {@code SELECT *} that carries a filter still contributes its filter columns. + */ + private static Map> collectColumnRoles(Query query) + { + Map> roles = new TreeMap<>(); + if (query instanceof ScanQuery) { + ScanQuery scan = (ScanQuery) query; + VirtualColumns vcs = scan.getVirtualColumns(); + if (scan.getColumns() != null) { + for (String column : scan.getColumns()) { + addColumn(roles, vcs, column, ColumnUsage.PROJECTION); + } + } + addFilterColumns(roles, vcs, scan.getFilter()); + } else if (query instanceof GroupByQuery) { + GroupByQuery groupBy = (GroupByQuery) query; + VirtualColumns vcs = groupBy.getVirtualColumns(); + for (DimensionSpec dimension : groupBy.getDimensions()) { + addColumn(roles, vcs, dimension.getDimension(), ColumnUsage.GROUP_BY); + } + addAggregatorColumns(roles, vcs, groupBy.getAggregatorSpecs()); + addFilterColumns(roles, vcs, groupBy.getDimFilter()); + } else if (query instanceof TopNQuery) { + TopNQuery topN = (TopNQuery) query; + VirtualColumns vcs = topN.getVirtualColumns(); + addColumn(roles, vcs, topN.getDimensionSpec().getDimension(), ColumnUsage.GROUP_BY); + addAggregatorColumns(roles, vcs, topN.getAggregatorSpecs()); + addFilterColumns(roles, vcs, topN.getFilter()); + } else if (query instanceof TimeseriesQuery) { + TimeseriesQuery timeseries = (TimeseriesQuery) query; + VirtualColumns vcs = timeseries.getVirtualColumns(); + addAggregatorColumns(roles, vcs, timeseries.getAggregatorSpecs()); + addFilterColumns(roles, vcs, timeseries.getFilter()); + } else { + // Unsupported query type: no column-level information. + return Collections.emptyMap(); + } + return roles; + } + + private static void addAggregatorColumns( + Map> roles, + VirtualColumns vcs, + List aggregators + ) + { + for (AggregatorFactory aggregator : aggregators) { + if (aggregator instanceof FilteredAggregatorFactory) { + // e.g. SUM(x) FILTER (WHERE y = ...): requiredFields() mixes the aggregation input (x) with + // the filter predicate (y). Tag the delegate's inputs AGGREGATION and the predicate FILTER. + FilteredAggregatorFactory filtered = (FilteredAggregatorFactory) aggregator; + addAggregatorColumns(roles, vcs, Collections.singletonList(filtered.getAggregator())); + addFilterColumns(roles, vcs, filtered.getFilter()); + } else { + for (String column : aggregator.requiredFields()) { + addColumn(roles, vcs, column, ColumnUsage.AGGREGATION); + } + } + } + } + + private static void addFilterColumns( + Map> roles, + @Nullable VirtualColumns vcs, + @Nullable DimFilter filter + ) + { + if (filter != null) { + for (String column : filter.getRequiredColumns()) { + addColumn(roles, vcs, column, ColumnUsage.FILTER); + } + } + } + + private static void addColumn( + Map> roles, + @Nullable VirtualColumns vcs, + String column, + ColumnUsage role + ) + { + expandColumn(roles, vcs, column, role, new HashSet<>()); + } + + /** + * Adds {@code column} with {@code role}, expanding virtual columns transitively to their underlying + * base columns. The {@code visited} set guards against virtual columns that reference one another. + */ + private static void expandColumn( + Map> roles, + @Nullable VirtualColumns vcs, + @Nullable String column, + ColumnUsage role, + Set visited + ) + { + if (column == null || !visited.add(column)) { + return; + } + if (vcs != null && vcs.exists(column)) { + VirtualColumn virtualColumn = vcs.getVirtualColumn(column); + if (virtualColumn != null) { + for (String required : virtualColumn.requiredColumns()) { + expandColumn(roles, vcs, required, role, visited); + } + return; + } + } + roles.computeIfAbsent(column, k -> EnumSet.noneOf(ColumnUsage.class)).add(role); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java b/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java index 99745744f42c..2ab3e663b1c6 100644 --- a/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java +++ b/processing/src/main/java/org/apache/druid/query/union/UnionQuery.java @@ -20,6 +20,7 @@ package org.apache.druid.query.union; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -87,6 +88,17 @@ public List getDataSources() return dataSources; } + /** + * The branch queries unioned together. Unlike {@link #getDataSources()} (which returns each branch's + * datasource), this exposes the full branch queries, so callers that need the branches' column-bearing + * parts (dimensions, aggregators, filters) can inspect them. + */ + @JsonIgnore + public List> getQueries() + { + return Collections.unmodifiableList(queries); + } + @Override public boolean hasFilters() { diff --git a/processing/src/test/java/org/apache/druid/query/QueryColumnUsageAnalyzerTest.java b/processing/src/test/java/org/apache/druid/query/QueryColumnUsageAnalyzerTest.java new file mode 100644 index 000000000000..bc09726801a6 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/QueryColumnUsageAnalyzerTest.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.QueryColumnUsageAnalyzer.ColumnUsage; +import org.apache.druid.query.aggregation.FilteredAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.policy.NoRestrictionPolicy; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; +import org.apache.druid.query.spec.QuerySegmentSpec; +import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.query.union.UnionQuery; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; + +public class QueryColumnUsageAnalyzerTest +{ + @Test + public void testScan() + { + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource("events") + .intervals(everyInterval()) + .columns("userId", "page") + .filters(new SelectorDimFilter("country", "US", null)) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("events"); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION), cols.get("userId")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION), cols.get("page")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.FILTER), cols.get("country")); + } + + @Test + public void testGroupBy() + { + GroupByQuery query = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setAggregatorSpecs(new LongSumAggregatorFactory("total", "amount")) + .setDimFilter(new SelectorDimFilter("status", "active", null)) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY), cols.get("country")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.AGGREGATION), cols.get("amount")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.FILTER), cols.get("status")); + } + + @Test + public void testTopN() + { + Query query = new TopNQueryBuilder() + .dataSource("sales") + .intervals(everyInterval()) + .granularity(Granularities.ALL) + .dimension(new DefaultDimensionSpec("country", "country")) + .metric("total") + .threshold(10) + .aggregators(new LongSumAggregatorFactory("total", "amount")) + .filters(new SelectorDimFilter("status", "active", null)) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY), cols.get("country")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.AGGREGATION), cols.get("amount")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.FILTER), cols.get("status")); + } + + @Test + public void testTimeseries() + { + Query query = Druids.newTimeseriesQueryBuilder() + .dataSource("sales") + .intervals(everyInterval()) + .granularity(Granularities.ALL) + .aggregators(new LongSumAggregatorFactory("total", "amount")) + .filters(new SelectorDimFilter("status", "active", null)) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + Assertions.assertEquals(EnumSet.of(ColumnUsage.AGGREGATION), cols.get("amount")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.FILTER), cols.get("status")); + } + + @Test + public void testVirtualColumnExpandsToBaseColumns() + { + GroupByQuery query = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setVirtualColumns(VirtualColumns.create(ImmutableList.of( + new ExpressionVirtualColumn("v0", "\"base\" * 2", ColumnType.LONG, ExprMacroTable.nil()) + ))) + .setDimensions(new DefaultDimensionSpec("v0", "v0")) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + Assertions.assertEquals(Collections.singleton("base"), cols.keySet()); + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY), cols.get("base")); + } + + @Test + public void testColumnUsedInMultipleRoles() + { + GroupByQuery query = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setDimFilter(new SelectorDimFilter("country", "US", null)) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY, ColumnUsage.FILTER), cols.get("country")); + } + + @Test + public void testJoinAttributedPerSide() + { + JoinDataSource join = JoinDataSource.create( + new TableDataSource("sales"), + new TableDataSource("users"), + "j0.", + "\"country\" == \"j0.country\"", + JoinType.INNER, + null, + ExprMacroTable.nil(), + null, + null + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(join) + .intervals(everyInterval()) + .columns("country", "j0.age") + .build(); + Map>> result = QueryColumnUsageAnalyzer.analyze(query); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION, ColumnUsage.JOIN), result.get("sales").get("country")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION), result.get("users").get("age")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.JOIN), result.get("users").get("country")); + } + + @Test + public void testJoinBaseTableFilter() + { + JoinDataSource join = JoinDataSource.create( + new TableDataSource("sales"), + new TableDataSource("users"), + "j0.", + "\"country\" == \"j0.country\"", + JoinType.INNER, + new SelectorDimFilter("status", "active", null), + ExprMacroTable.nil(), + null, + null + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(join) + .intervals(everyInterval()) + .columns("country") + .build(); + Map> sales = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + Assertions.assertEquals(EnumSet.of(ColumnUsage.FILTER), sales.get("status")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION, ColumnUsage.JOIN), sales.get("country")); + } + + @Test + public void testLookupJoinDropsRightColumns() + { + JoinDataSource join = JoinDataSource.create( + new TableDataSource("sales"), + new LookupDataSource("country_lookup"), + "j0.", + "\"country\" == \"j0.k\"", + JoinType.LEFT, + null, + ExprMacroTable.nil(), + null, + null + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(join) + .intervals(everyInterval()) + .columns("country", "j0.v") + .build(); + Map>> result = QueryColumnUsageAnalyzer.analyze(query); + // Lookup has no base table -> no lookup entry, and "j0.v"/"j0.k" are not fabricated onto "sales". + Assertions.assertEquals(Collections.singleton("sales"), result.keySet()); + Assertions.assertEquals(Collections.singleton("country"), result.get("sales").keySet()); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION, ColumnUsage.JOIN), result.get("sales").get("country")); + } + + @Test + public void testSubQueryRecursesToBaseTable() + { + GroupByQuery inner = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setAggregatorSpecs(new LongSumAggregatorFactory("total", "amount")) + .build(); + GroupByQuery outer = GroupByQuery.builder() + .setDataSource(new QueryDataSource(inner)) + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setAggregatorSpecs(new LongSumAggregatorFactory("grand", "total")) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(outer).get("sales"); + // Base columns come from the sub-query; the outer references to sub-query outputs are not fabricated. + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY), cols.get("country")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.AGGREGATION), cols.get("amount")); + Assertions.assertFalse(cols.containsKey("total")); + Assertions.assertFalse(cols.containsKey("grand")); + } + + @Test + public void testUnionReplicatesToMembers() + { + GroupByQuery query = GroupByQuery.builder() + .setDataSource(new UnionDataSource(ImmutableList.of( + new TableDataSource("sales_a"), + new TableDataSource("sales_b") + ))) + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .build(); + Map>> result = QueryColumnUsageAnalyzer.analyze(query); + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY), result.get("sales_a").get("country")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY), result.get("sales_b").get("country")); + } + + @Test + public void testFilteredAggregatorSplitsRoles() + { + Query query = Druids.newTimeseriesQueryBuilder() + .dataSource("sales") + .intervals(everyInterval()) + .granularity(Granularities.ALL) + .aggregators(new FilteredAggregatorFactory( + new LongSumAggregatorFactory("s", "added"), + new SelectorDimFilter("status", "active", null) + )) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + Assertions.assertEquals(EnumSet.of(ColumnUsage.AGGREGATION), cols.get("added")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.FILTER), cols.get("status")); + } + + @Test + public void testFilteredDataSourceCapturesFilterColumns() + { + FilteredDataSource filtered = FilteredDataSource.create( + new TableDataSource("sales"), + new SelectorDimFilter("status", "active", null) + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(filtered) + .intervals(everyInterval()) + .columns("country") + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION), cols.get("country")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.FILTER), cols.get("status")); + } + + @Test + public void testSelectStarYieldsEmpty() + { + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource("events") + .intervals(everyInterval()) + .build(); + Assertions.assertTrue(QueryColumnUsageAnalyzer.analyze(query).isEmpty()); + } + + @Test + public void testUnnestReplacesSyntheticColumnWithSource() + { + UnnestDataSource unnest = UnnestDataSource.create( + new TableDataSource("sales"), + new ExpressionVirtualColumn("unnestOut", "\"tags\"", ColumnType.STRING, ExprMacroTable.nil()), + null + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(unnest) + .intervals(everyInterval()) + .columns("unnestOut") + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + // The synthetic unnest output is dropped; the underlying source column is projected instead. + Assertions.assertEquals(Collections.singleton("tags"), cols.keySet()); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION), cols.get("tags")); + } + + @Test + public void testNestedJoinAttributesSecondPrefix() + { + JoinDataSource inner = JoinDataSource.create( + new TableDataSource("sales_a"), + new TableDataSource("sales_b"), + "j0.", + "\"a_id\" == \"j0.b_id\"", + JoinType.INNER, + null, + ExprMacroTable.nil(), + null, + null + ); + JoinDataSource outer = JoinDataSource.create( + inner, + new TableDataSource("sales_c"), + "_j0.", + "\"a_id\" == \"_j0.c_id\"", + JoinType.INNER, + null, + ExprMacroTable.nil(), + null, + null + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(outer) + .intervals(everyInterval()) + .columns("a_col", "j0.b_col", "_j0.c_col") + .build(); + Map>> result = QueryColumnUsageAnalyzer.analyze(query); + // "_j0." (longer) must win over "j0." so the second-join column lands on sales_c, not sales_b. + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION), result.get("sales_c").get("c_col")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION), result.get("sales_b").get("b_col")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.JOIN), result.get("sales_c").get("c_id")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.JOIN), result.get("sales_b").get("b_id")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.PROJECTION), result.get("sales_a").get("a_col")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.JOIN), result.get("sales_a").get("a_id")); + } + + @Test + public void testJoinKeyAlsoAggregatedMergesRoles() + { + JoinDataSource join = JoinDataSource.create( + new TableDataSource("sales"), + new TableDataSource("users"), + "j0.", + "\"amount\" == \"j0.amount\"", + JoinType.INNER, + null, + ExprMacroTable.nil(), + null, + null + ); + GroupByQuery query = GroupByQuery.builder() + .setDataSource(join) + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .setAggregatorSpecs(new LongSumAggregatorFactory("total", "j0.amount")) + .build(); + // The right-side "amount" is both a join key and an aggregation input: roles must merge, not clobber. + Assertions.assertEquals( + EnumSet.of(ColumnUsage.AGGREGATION, ColumnUsage.JOIN), + QueryColumnUsageAnalyzer.analyze(query).get("users").get("amount") + ); + } + + @Test + public void testRestrictedDataSourceAttributesToBase() + { + RestrictedDataSource restricted = RestrictedDataSource.create( + new TableDataSource("sales"), + NoRestrictionPolicy.instance() + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(restricted) + .intervals(everyInterval()) + .columns("country") + .build(); + Assertions.assertEquals( + EnumSet.of(ColumnUsage.PROJECTION), + QueryColumnUsageAnalyzer.analyze(query).get("sales").get("country") + ); + } + + @Test + public void testChainedVirtualColumnsExpandTransitively() + { + GroupByQuery query = GroupByQuery.builder() + .setDataSource("sales") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setVirtualColumns(VirtualColumns.create(ImmutableList.of( + new ExpressionVirtualColumn("v0", "\"base\" * 2", ColumnType.LONG, ExprMacroTable.nil()), + new ExpressionVirtualColumn("v1", "\"v0\" + 1", ColumnType.LONG, ExprMacroTable.nil()) + ))) + .setDimensions(new DefaultDimensionSpec("v1", "v1")) + .build(); + Map> cols = QueryColumnUsageAnalyzer.analyze(query).get("sales"); + // v1 -> v0 -> base: only the real base column survives, carrying the consuming role. + Assertions.assertEquals(Collections.singleton("base"), cols.keySet()); + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY), cols.get("base")); + } + + @Test + public void testUnionQueryRecursesEachBranch() + { + GroupByQuery branchA = GroupByQuery.builder() + .setDataSource("sales_a") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setDimensions(new DefaultDimensionSpec("country", "country")) + .build(); + GroupByQuery branchB = GroupByQuery.builder() + .setDataSource("sales_b") + .setQuerySegmentSpec(everyInterval()) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(new LongSumAggregatorFactory("total", "amount")) + .build(); + UnionQuery union = new UnionQuery(List.of(branchA, branchB)); + Map>> result = QueryColumnUsageAnalyzer.analyze(union); + // Each branch's own column-bearing parts must be captured (not discarded via getDataSources()). + Assertions.assertEquals(EnumSet.of(ColumnUsage.GROUP_BY), result.get("sales_a").get("country")); + Assertions.assertEquals(EnumSet.of(ColumnUsage.AGGREGATION), result.get("sales_b").get("amount")); + } + + @Test + public void testInlineDataSourceDropped() + { + RowSignature signature = RowSignature.builder() + .add("id", ColumnType.LONG) + .add("name", ColumnType.STRING) + .build(); + InlineDataSource inline = InlineDataSource.fromIterable( + ImmutableList.of(new Object[]{1L, "alice"}), + signature + ); + ScanQuery query = Druids.newScanQueryBuilder() + .dataSource(inline) + .intervals(everyInterval()) + .columns("name") + .build(); + // Inline data has no base table; it is dropped, never fabricated as a pseudo-table. + Assertions.assertTrue(QueryColumnUsageAnalyzer.analyze(query).isEmpty()); + } + + @Test + public void testUnsupportedQueryTypeYieldsEmpty() + { + Query query = Druids.newTimeBoundaryQueryBuilder().dataSource("events").build(); + Map>> result = QueryColumnUsageAnalyzer.analyze(query); + Assertions.assertNotNull(result); + Assertions.assertTrue(result.isEmpty()); + } + + private static QuerySegmentSpec everyInterval() + { + return new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2024-01-01/2024-01-02"))); + } +}