Skip to content

feat: add column-level lineage to the OpenLineage#19643

Open
mshahid6 wants to merge 2 commits into
apache:masterfrom
mshahid6:add-column-lineage
Open

feat: add column-level lineage to the OpenLineage#19643
mshahid6 wants to merge 2 commits into
apache:masterfrom
mshahid6:add-column-lineage

Conversation

@mshahid6

@mshahid6 mshahid6 commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Fixes #19314

Description

Builds on #19107 (the OpenLineage request-logger extension) to add column-level lineage for native queries. Each input dataset now carries which columns the query referenced and how they were used.

Two facets are attached per input dataset:

  • schema — the standard OpenLineage SchemaDatasetFacet listing the referenced input column names (names only, sorted).
  • druid_columnUsage — a Druid-specific dataset facet mapping each referenced column to the role(s) it was used in: PROJECTION, GROUP_BY, AGGREGATION, FILTER, JOIN.

Validated in docker:

SQL Native type Emitted column usage
SELECT page, "user" FROM wikipedia WHERE countryName = '…' LIMIT 10 scan page=PROJECTION, user=PROJECTION, countryName=FILTER
SELECT countryName, SUM(added) FROM wikipedia WHERE channel = '…' GROUP BY countryName groupBy countryName=GROUP_BY, added=AGGREGATION, channel=FILTER
SELECT page, SUM(added) s FROM wikipedia GROUP BY page ORDER BY s DESC LIMIT 5 topN page=GROUP_BY, added=AGGREGATION
SELECT SUM(added) FROM wikipedia WHERE isRobot = 'false' timeseries added=AGGREGATION, isRobot=FILTER
SELECT w1.page, w2.channel FROM wikipedia w1 JOIN wikipedia w2 ON w1.page = w2.page WHERE w1.countryName = '…' scan (join) page=[PROJECTION, JOIN], channel=PROJECTION, countryName=FILTER — right-side w2.channel correctly un-prefixed
SELECT countryName FROM (SELECT countryName, SUM(added) s FROM wikipedia GROUP BY countryName) GROUP BY countryName groupBy countryName=GROUP_BY — no fabricated sub-query-output columns

Example emitted facets for the join query's input dataset:

"facets": {
  "schema": {
    "_schemaURL": "https://openlineage.io/spec/facets/1-1-1/SchemaDatasetFacet.json",
    "fields": [{"name": "channel"}, {"name": "countryName"}, {"name": "page"}]
  },
  "druid_columnUsage": {
    "_schemaURL": ".../DruidColumnUsageDatasetFacet.json",
    "fields": {
      "channel":     {"usages": ["PROJECTION"]},
      "countryName": {"usages": ["FILTER"]},
      "page":        {"usages": ["PROJECTION", "JOIN"]}
    }
  }
}

Release note

The OpenLineage emitter now emits column-level lineage for native queries as schema and druid_columnUsage facets on input datasets. This can be disabled with druid.request.logging.columnLineageEnabled=false.


Key changed/added classes in this PR
  • OpenLineageRequestLogger
  • OpenLineageRequestLoggerProvider
  • DruidColumnUsageDatasetFacet.json

This PR has:

  • been self-reviewed.
  • using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 3
P3 0
Total 3
Severity Findings
P0 0
P1 0
P2 3
P3 0
Total 3

Found 3 issues.

Reviewed 5 of 5 changed files.


This is an automated review by Codex GPT-5.5

@jtuglu1

jtuglu1 commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

Drive-by comment: if we can, I'd like to expose this column extraction logic within Druid itself. A few reasons for this:

  1. Other extensions/use-cases can benefit from extracting the queried columns from the plan (this can be useful in column statistics plan optimization, other emitters, etc.).
  2. Having one implementation for extracting this query information ensures that our OL emitter is never out of sync with OSS parser/planner. With the advent of column statistics being available to the planner/optimizer, knowing which columns are queried will be important and similar logic will be needed in the core files. We don't need to overly complicate the initial implementation of this, but it would be great to have a unified way of extracting this information.

@jtuglu1 jtuglu1 requested review from clintropolis and gianm July 2, 2026 18:23
Map<String, EnumSet<ColumnUsage>> roles
)
{
if (dataSource instanceof TableDataSource) {
@jtuglu1

jtuglu1 commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

@gianm @clintropolis would like to get your thoughts here on the column extraction logic – do we want to make use of any Calcite functionality here?

@FrankChen021 FrankChen021 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 2
P3 0
Total 2

Reviewed 8 of 8 changed files.

Found 2 issues in the updated lineage implementation. The prior datasource-filter and filtered-aggregator concerns look addressed.


This is an automated review by Codex GPT-5.5

Map<String, EnumSet<ColumnUsage>> baseRoles = copyRoles(roles);
// The unnest output column is synthetic (not a base column); drop it and instead record the
// underlying column(s) being unnested as projected from the base.
baseRoles.remove(unnestColumn.getOutputName());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Preserve unnest output roles

For UnnestDataSource, roles collected on the synthetic unnest output are discarded here and the source columns are re-added only as PROJECTION; getUnnestFilter() is also ignored. Queries such as SELECT d3, COUNT(*) ... WHERE d3 = 'a' GROUP BY d3 will report the underlying array column as projected instead of GROUP_BY and FILTER. Preserve the removed role set and merge unnest-filter required columns as FILTER when mapping to unnestColumn.requiredColumns().


emit(buildRunEvent(queryId, queryType, requestLogLine, inputs, null));
Map<String, Map<String, EnumSet<QueryColumnUsageAnalyzer.ColumnUsage>>> columnsByTable =
columnLineageEnabled ? extractColumnsByTable(requestLogLine.getQuery()) : null;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Handle top-level UnionQuery inputs

A native top-level UnionQuery has getDataSource() deliberately throwing, but logNativeQuery always calls getDataSource().getTableNames() before the new QueryColumnUsageAnalyzer can handle UnionQuery. SQL plans with queryType union will fail request logging and emit no OpenLineage event. Special-case UnionQuery input table extraction, or derive inputs from the analyzer or branch datasources, and add a logger-level UnionQuery test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add facets to OpenLineage

4 participants