Skip to content

[AURON #2366] fix: Handle Paimon metadata columns in V2 native scan#2367

Open
lyne7-sc wants to merge 2 commits into
apache:masterfrom
lyne7-sc:fix/paimon_meta
Open

[AURON #2366] fix: Handle Paimon metadata columns in V2 native scan#2367
lyne7-sc wants to merge 2 commits into
apache:masterfrom
lyne7-sc:fix/paimon_meta

Conversation

@lyne7-sc

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #2366

Rationale for this change

Paimon metadata columns are produced by the Paimon scan layer rather than stored as physical columns in data files. The Paimon V2 native scan was passing these columns to the native Parquet/ORC reader as file columns, which can return incorrect values.

For example:

create table paimon.db.t_metadata (id int, v string) using paimon;
insert into paimon.db.t_metadata values (1, 'a');
select id, __paimon_file_path from paimon.db.t_metadata;

The native path returned null for __paimon_file_path, while Spark/Paimon's scan path returns the actual file path.

What changes are included in this PR?

  • Recognize Paimon metadata columns using PaimonMetadataColumn.
  • Materialize supported file-level metadata columns (__paimon_file_path, __paimon_bucket) as per-file constants.
  • Keep unsupported Paimon metadata columns on Spark/Paimon's scan path instead of reading them from Parquet/ORC files.
  • Cover metadata columns both with and without table partition columns.

Are there any user-facing changes?

No API changes. This is a correctness fix for Paimon V2 native scan.

How was this patch tested?

Adds Paimon V2 integration tests

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lyne7-sc, thanks for the fix! The overall approach is sound: materialize __paimon_file_path/__paimon_bucket as per-file constants via partitionSchema, and fall back to Spark for unsupported metadata columns. The functional Test Paimon 1.2 CI job (which runs the new integration tests) is green.


private def isPaimonMetadataColumn(name: String): Boolean = {
containsName(PaimonMetadataColumns, name) ||
name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style / CI blocker. spotless scalafmt rejects this line — the || continuation should be indented 4 spaces, not 6. This is one of the two violations turning the Style job red (-······name.toLowerCase+····name.toLowerCase). mvn spotless:apply fixes it:

  private def isPaimonMetadataColumn(name: String): Boolean = {
    containsName(PaimonMetadataColumns, name) ||
    name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix)
  }

assert(df.collect().length === 1)
}
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style / CI blocker. This blank line has trailing whitespace (8 spaces), which spotless rejects (-········+). It is the second cause of the failing Style job. mvn spotless:apply removes it.

}
split.dataFiles().asScala.map { dataFile =>
val filePath = s"${split.bucketPath()}/${dataFile.fileName()}"
val partitionValues = if (partitionSchema.isEmpty) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Efficiency: partitionValues is now computed inside split.dataFiles().map, so partitionConverter.convert(split.partition()), indexByName, and the per-field DataConverter.fromPaimon conversions all run once per data file — even though everything except __paimon_file_path is constant across the files of a split (split.partition() and split.bucket() are split-level). For a split with N data files this rebuilds the whole partition row N times. Consider computing the split-invariant portion once per split and only filling the per-file file_path slot inside the loop.

def isPartitionValueField(name: String): Boolean =
containsName(partitionKeys, name) || isSupportedMetadataColumn(name)
val partitionFields = readSchema.fields.filter(f => isPartitionValueField(f.name))
val fileFields = readSchema.fields.filterNot(f => isPartitionValueField(f.name))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coverage gap worth a test: when only metadata columns are projected from a non-partitioned table (e.g. select __paimon_file_path from t), every field is classified as a partition/metadata constant, so fileFields is empty and fileSchema is empty. The native Parquet/ORC scan is then asked to read zero data columns but must still emit one row per record so the constant columns get the right cardinality. All three new tests also select id, so the empty-fileSchema path is never exercised (and there's no existing partition-only/count(*) test either). Please add a metadata-only projection test on a multi-row non-partitioned table to confirm the row count is correct on this path.

val partitionFields = readSchema.fields.filter(f => containsName(partitionKeys, f.name))
val fileFields = readSchema.fields.filterNot(f => containsName(partitionKeys, f.name))
def isPartitionValueField(name: String): Boolean =
containsName(partitionKeys, name) || isSupportedMetadataColumn(name)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor / edge case: classification here is purely by name (resolver against __paimon_file_path/__paimon_bucket). Paimon's schema validation reserves only the _KEY_ prefix and the core system field names — not the __paimon_ prefix — so a user could in principle define a real physical column named __paimon_bucket. It would then be treated as a per-file constant and return split.bucket() instead of the stored value (a silent wrong result rather than a fallback). Very unlikely in practice, but flagging it.

s"plan should use native paimon scan:\n$plan")
}

private def checkSparkAnswerAndNativePaimonScan(sqlText: String): DataFrame = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor test cleanup: the DataFrame return value is ignored by both callers (and the third metadata test doesn't use this helper), so Unit would be clearer. Also var expected: Seq[Row] = Nil reassigned inside withSQLConf can be a val, since withSQLConf returns its block value:

val expected = withSQLConf("spark.auron.enable.paimon.scan" -> "false") {
  sql(sqlText).collect().toSeq
}

@SteNicholas SteNicholas self-assigned this Jun 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Paimon V2 native scan does not handle metadata columns correctly

3 participants