Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/development/extensions-contrib/openlineage-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ All configuration parameters are under `druid.request.logging`.
| `druid.request.logging.transportType` | Where to send events. `CONSOLE` logs JSON to the Druid log; `HTTP` POSTs to an OpenLineage API endpoint. | no | `CONSOLE` |
| `druid.request.logging.transportUrl` | OpenLineage API endpoint URL. Required when `transportType=HTTP`. | no | — |
| `druid.request.logging.excludedNativeQueryTypes` | Native query types to exclude from lineage emission. Internal broker queries like segment metadata lookups produce noisy, low-value events. | no | `["segmentMetadata", "dataSourceMetadata", "timeBoundary"]` |
| `druid.request.logging.columnLineageEnabled` | Emit per-column lineage (the `schema` and `druid_columnUsage` dataset facets) for native queries. Set to `false` to emit table-level (datasource) lineage only, reducing event size. | no | `true` |
| `druid.request.logging.emitQueueCapacity` | Maximum number of events buffered in the async HTTP emit queue. Events are dropped (with a warning) when the queue is full. Only applies when `transportType=HTTP`. | no | `1000` |
| `druid.request.logging.emitThreadCount` | Number of background threads used to POST events to the HTTP endpoint. Only applies when `transportType=HTTP`. | no | `1` |
| `druid.request.logging.trustStorePath` | Path to the TrustStore file for HTTPS transport. Only applies when `transportType=HTTP`. | no | — |
Expand Down Expand Up @@ -96,3 +97,24 @@ Each emitted event follows the [OpenLineage spec](https://openlineage.io/spec/2-
|---|---|
| `jobType` | `processingType=BATCH`, `integration=DRUID`, `jobType=QUERY`. Standard OpenLineage facet. |
| `sql` | Raw SQL text. Present on SQL queries only. Standard OpenLineage facet. |

### Input dataset facets

For supported native query types (`scan`, `groupBy`, `topN`, `timeseries`), each input dataset carries column-level lineage describing which columns the query referenced and how. Columns are attributed to their base tables across joins (per side), sub-queries, unions, and datasource wrappers. Query types that are not supported, a bare `SELECT *` (a scan with no explicit columns), and any column that cannot be attributed to a base table (for example lookup or inline-data columns) fall back to table-level lineage only. This can be disabled entirely with `columnLineageEnabled=false`.

| Facet | Description |
|---|---|
| `schema` | Standard OpenLineage `SchemaDatasetFacet` listing the referenced input column names (names only, sorted). |
| `druid_columnUsage` | Druid-specific facet mapping each referenced column to the roles in which it was used. |

Column usage roles (`druid_columnUsage`):

| Role | Meaning |
|---|---|
| `PROJECTION` | Column is selected/projected (OpenLineage `IDENTITY`/`TRANSFORMATION`). |
| `GROUP_BY` | Column is a grouping dimension. |
| `AGGREGATION` | Column feeds an aggregator (e.g. `SUM`, `COUNT`). |
| `FILTER` | Column appears in a filter / `WHERE`. |
| `JOIN` | Column is used as a join key. |

Virtual/expression columns are expanded to their underlying base columns, carrying the consuming role. `__time` appears only when explicitly referenced by a query part (not for the implicit interval). Output datasets (MSQ `INSERT`/`REPLACE`) do not carry column-level facets.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryColumnUsageAnalyzer;
import org.apache.druid.server.RequestLogLine;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.sql.calcite.parser.DruidSqlParser;
Expand All @@ -48,6 +50,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -85,6 +88,12 @@ public class OpenLineageRequestLogger implements RequestLogger
+ "/src/main/resources/openlineage-schema/";
private static final String CONTEXT_FACET_SCHEMA_URL = CUSTOM_SCHEMA_BASE + "DruidQueryContextRunFacet.json";
private static final String STATS_FACET_SCHEMA_URL = CUSTOM_SCHEMA_BASE + "DruidQueryStatisticsRunFacet.json";
// Standard OpenLineage SchemaDatasetFacet listing the input columns referenced by the query (names only).
private static final String SCHEMA_FACET_SCHEMA_URL =
"https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json";
// Custom dataset facet describing how each referenced column was used (filter, group-by, aggregation, ...).
private static final String COLUMN_USAGE_FACET_SCHEMA_URL =
CUSTOM_SCHEMA_BASE + "DruidColumnUsageDatasetFacet.json";
static final int SQL_FACET_MAX_LENGTH = 64 * 1024;
static final int DEFAULT_EMIT_QUEUE_CAPACITY = 1000;
static final int DEFAULT_EMIT_THREAD_COUNT = 1;
Expand All @@ -105,6 +114,7 @@ public class OpenLineageRequestLogger implements RequestLogger
@Nullable
private final String transportUrl;
private final Set<String> excludedNativeQueryTypes;
private final boolean columnLineageEnabled;
@Nullable
private final HttpClient httpClient;
@Nullable
Expand All @@ -120,7 +130,7 @@ public OpenLineageRequestLogger(
)
{
this(jsonMapper, namespace, transportType, transportUrl, excludedNativeQueryTypes,
DEFAULT_EMIT_QUEUE_CAPACITY, DEFAULT_EMIT_THREAD_COUNT, null);
true, DEFAULT_EMIT_QUEUE_CAPACITY, DEFAULT_EMIT_THREAD_COUNT, null);
}

public OpenLineageRequestLogger(
Expand All @@ -129,6 +139,7 @@ public OpenLineageRequestLogger(
OpenLineageRequestLoggerProvider.TransportType transportType,
@Nullable String transportUrl,
Set<String> excludedNativeQueryTypes,
boolean columnLineageEnabled,
int emitQueueCapacity,
int emitThreadCount,
@Nullable HttpClient httpClient
Expand All @@ -139,6 +150,7 @@ public OpenLineageRequestLogger(
this.transportType = transportType;
this.transportUrl = transportUrl;
this.excludedNativeQueryTypes = excludedNativeQueryTypes;
this.columnLineageEnabled = columnLineageEnabled;
if (transportType == OpenLineageRequestLoggerProvider.TransportType.HTTP && transportUrl == null) {
throw new IllegalStateException(
"druid.request.logging.transportUrl must be set when transportType=HTTP"
Expand Down Expand Up @@ -222,7 +234,9 @@ public void logNativeQuery(RequestLogLine requestLogLine) throws IOException
queryId = UNKNOWN_QUERY_ID;
}

emit(buildRunEvent(queryId, queryType, requestLogLine, inputs, null));
Map<String, Map<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>>> columnsByTable =
columnLineageEnabled ? extractColumnsByTable(requestLogLine.getQuery()) : null;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Handle top-level UnionQuery inputs

A native top-level UnionQuery has getDataSource() deliberately throwing, but logNativeQuery always calls getDataSource().getTableNames() before the new QueryColumnUsageAnalyzer can handle UnionQuery. SQL plans with queryType union will fail request logging and emit no OpenLineage event. Special-case UnionQuery input table extraction, or derive inputs from the analyzer or branch datasources, and add a logger-level UnionQuery test.

emit(buildRunEvent(queryId, queryType, requestLogLine, inputs, columnsByTable, null));
}

/**
Expand Down Expand Up @@ -254,7 +268,7 @@ public void logSqlQuery(RequestLogLine requestLogLine) throws IOException
queryId = UNKNOWN_QUERY_ID;
}

emit(buildRunEvent(queryId, "msq", requestLogLine, List.of(), outputTable));
emit(buildRunEvent(queryId, "msq", requestLogLine, List.of(), null, outputTable));
}

/**
Expand Down Expand Up @@ -309,11 +323,35 @@ static String extractMsqOutputDatasource(String sql)
}
}

/**
* Resolves the per-base-table column-usage map for a native query by delegating to the core
* {@link QueryColumnUsageAnalyzer}, then used to attach the {@code schema} and {@code druid_columnUsage}
* dataset facets to input datasets. Returns {@code null} (yielding table-level lineage only) when no
* base-table columns can be determined, or on any error -- lineage extraction must never break the
* request-logging path, so it never fabricates or mis-attributes columns.
*/
@Nullable
private Map<String, Map<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>>> extractColumnsByTable(Query<?> query)
{
try {
Map<String, Map<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>>> result =
QueryColumnUsageAnalyzer.analyze(query);
return result.isEmpty() ? null : result;
}
// StackOverflowError (an Error, not an Exception) is caught too so that a pathologically deep
// query plan degrades to table-level lineage rather than breaking the request-logging path.
catch (Exception | StackOverflowError e) {
log.debug(e, "Failed to extract column lineage; falling back to table-level lineage");
return null;
}
}

private ObjectNode buildRunEvent(
String queryId,
String queryType,
RequestLogLine requestLogLine,
List<String> inputs,
@Nullable Map<String, Map<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>>> columnsByTable,
@Nullable String output
)
{
Expand All @@ -327,8 +365,8 @@ private ObjectNode buildRunEvent(
event.put("schemaURL", SCHEMA_URL);
event.set("run", buildRun(queryId, queryType, requestLogLine, stats, success));
event.set("job", buildJob(queryId, requestLogLine.getSql()));
event.set("inputs", buildDatasets(inputs));
event.set("outputs", buildDatasets(output != null ? List.of(output) : List.of()));
event.set("inputs", buildDatasets(inputs, columnsByTable));
event.set("outputs", buildDatasets(output != null ? List.of(output) : List.of(), null));
return event;
}

Expand Down Expand Up @@ -434,19 +472,58 @@ private ObjectNode createFacet(@Nullable String schemaUrl)
return facet;
}

private ArrayNode buildDatasets(List<String> tableNames)
private ArrayNode buildDatasets(
List<String> tableNames,
@Nullable Map<String, Map<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>>> columnsByTable
)
{
ArrayNode array = jsonMapper.createArrayNode();
for (String name : tableNames) {
ObjectNode node = jsonMapper.createObjectNode();
node.put("namespace", namespace);
node.put("name", name);
node.set("facets", jsonMapper.createObjectNode());
ObjectNode facets = jsonMapper.createObjectNode();
Map<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>> columns = columnsByTable == null ? null : columnsByTable.get(name);
if (columns != null && !columns.isEmpty()) {
addColumnFacets(facets, columns);
}
node.set("facets", facets);
array.add(node);
}
return array;
}

/**
* Attaches the standard OpenLineage {@code schema} facet (referenced column names, sorted) and the
* custom {@code druid_columnUsage} facet (column to usage roles) to an input dataset's facets.
*/
private void addColumnFacets(ObjectNode facets, Map<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>> columns)
{
ObjectNode schemaFacet = createFacet(SCHEMA_FACET_SCHEMA_URL);
ArrayNode fields = jsonMapper.createArrayNode();
ObjectNode usageFields = jsonMapper.createObjectNode();
// columns is a TreeMap, so iteration (and the emitted JSON) is deterministically sorted by name.
for (Map.Entry<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>> entry : columns.entrySet()) {
ObjectNode field = jsonMapper.createObjectNode();
field.put("name", entry.getKey());
fields.add(field);

ArrayNode usages = jsonMapper.createArrayNode();
// EnumSet iterates in enum declaration order, so usage lists are deterministic too.
for (QueryColumnUsageAnalyzer.ColumnUsage role : entry.getValue()) {
usages.add(role.name());
}
ObjectNode usageEntry = jsonMapper.createObjectNode();
usageEntry.set("usages", usages);
usageFields.set(entry.getKey(), usageEntry);
}
schemaFacet.set("fields", fields);
facets.set("schema", schemaFacet);

ObjectNode usageFacet = createFacet(COLUMN_USAGE_FACET_SCHEMA_URL);
usageFacet.set("fields", usageFields);
facets.set("druid_columnUsage", usageFacet);
}

protected void emit(ObjectNode event)
{
Expand Down Expand Up @@ -571,5 +648,4 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public enum TransportType
"timeBoundary"
);

// When false, only table-level (datasource) lineage is emitted; the per-column schema and
// druid_columnUsage facets are omitted. Lets operators trade column detail for smaller events.
@JsonProperty
private boolean columnLineageEnabled = true;

@JsonProperty
private int emitQueueCapacity = OpenLineageRequestLogger.DEFAULT_EMIT_QUEUE_CAPACITY;

Expand Down Expand Up @@ -115,6 +120,7 @@ public RequestLogger get()
transportType,
transportUrl,
excludedNativeQueryTypes,
columnLineageEnabled,
emitQueueCapacity,
emitThreadCount,
httpClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://raw.githubusercontent.com/apache/druid/master/extensions-contrib/openlineage-emitter/src/main/resources/openlineage-schema/DruidColumnUsageDatasetFacet.json",
"title": "DruidColumnUsageDatasetFacet",
"description": "Druid-specific dataset facet capturing how each input column was referenced by the query. Roles align with OpenLineage transformation subtypes where applicable (GROUP_BY, AGGREGATION, FILTER, JOIN); PROJECTION corresponds to OpenLineage IDENTITY/TRANSFORMATION (a selected column).",
"type": "object",
"allOf": [
{"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/definitions/DatasetFacet"}
],
"properties": {
"fields": {
"type": "object",
"description": "Map of referenced column name to how it was used in this query.",
"additionalProperties": {
"type": "object",
"properties": {
"usages": {
"type": "array",
"description": "The roles in which the column was referenced by the query.",
"items": {
"type": "string",
"enum": ["PROJECTION", "GROUP_BY", "AGGREGATION", "FILTER", "JOIN"]
}
}
},
"required": ["usages"]
}
}
},
"required": ["fields"]
}
Loading
Loading