From 3995864578960fe4b3dd459f4d44fed303e31095 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 26 Jun 2026 17:23:56 +0800 Subject: [PATCH 1/5] test: add paimon metadata columns suite --- .../AuronPaimonV2IntegrationSuite.scala | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala index b0508382d..30457362b 100644 --- a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala +++ b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala @@ -209,6 +209,43 @@ class AuronPaimonV2IntegrationSuite } } + test("paimon v2 native scan supports file-level metadata columns") { + withTable("paimon.db.t_metadata") { + sql("create table paimon.db.t_metadata (id int, v string) using paimon") + sql("insert into paimon.db.t_metadata values (1, 'a')") + + checkSparkAnswerAndNativePaimonScan( + "select id, __paimon_file_path, __paimon_bucket from paimon.db.t_metadata") + } + } + + test("paimon v2 native scan supports metadata columns with table partitions") { + withTable("paimon.db.t_metadata_part") { + sql(""" + |create table paimon.db.t_metadata_part (id int, v string, p string) + |using paimon + |partitioned by (p) + |""".stripMargin) + sql("insert into paimon.db.t_metadata_part values (1, 'a', 'p1'), (2, 'b', 'p2')") + + checkSparkAnswerAndNativePaimonScan( + "select p, __paimon_file_path, id, __paimon_bucket from paimon.db.t_metadata_part") + } + } + + test("paimon v2 native scan falls back for unsupported metadata columns") { + withTable("paimon.db.t_metadata_unsupported") { + sql("create table paimon.db.t_metadata_unsupported (id int, v string) using paimon") + sql("insert into paimon.db.t_metadata_unsupported values (1, 'a')") + + val df = sql("select id, __paimon_row_index from paimon.db.t_metadata_unsupported") + val plan = df.queryExecution.executedPlan.toString() + + assert(!plan.contains("NativePaimonV2TableScan")) + assert(df.collect().length === 1) + } + } + private def assertNativePaimonScanApplied(df: DataFrame): Unit = { val plan = df.queryExecution.executedPlan.toString() assert( @@ -216,6 +253,18 @@ class AuronPaimonV2IntegrationSuite s"plan should use native paimon scan:\n$plan") } + private def checkSparkAnswerAndNativePaimonScan(sqlText: String): DataFrame = { + var expected: Seq[Row] = Nil + withSQLConf("spark.auron.enable.paimon.scan" -> "false") { + expected = sql(sqlText).collect().toSeq + } + + val df = sql(sqlText) + checkAnswer(df, expected) + assertNativePaimonScanApplied(df) + df + } + private def executedNativeScan(df: DataFrame): NativePaimonV2TableScanExec = { val nativeScan = df.queryExecution.executedPlan.collectFirst { case scan: NativePaimonV2TableScanExec => scan From b05f5a61d2e485cb72b5d48df0e683f13f24e45c Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 26 Jun 2026 17:28:56 +0800 Subject: [PATCH 2/5] support paimon file-level metadata --- .../sql/auron/paimon/PaimonScanSupport.scala | 93 ++++++++++++++----- 1 file changed, 68 insertions(+), 25 deletions(-) diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala index f5c23d3a2..64252e9ab 100644 --- a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala +++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala @@ -16,11 +16,14 @@ */ package org.apache.spark.sql.auron.paimon +import java.util.Locale + import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.commons.lang3.reflect.MethodUtils import org.apache.paimon.spark.DataConverter +import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.source.{DataSplit, Split} import org.apache.paimon.types.RowType @@ -34,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.hive.auron.paimon.PaimonUtil import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String final case class PaimonFile(filePath: String, fileSize: Long, partitionValues: InternalRow) @@ -49,6 +53,10 @@ object PaimonScanSupport extends Logging { private val PaimonBaseScanClassName = "org.apache.paimon.spark.PaimonBaseScan" private val PaimonInputPartitionClassName = "org.apache.paimon.spark.PaimonInputPartition" + private val PaimonMetadataColumnPrefix = "__paimon_" + private val PaimonFilePathColumn = PaimonMetadataColumn.FILE_PATH_COLUMN + private val PaimonBucketColumn = PaimonMetadataColumn.BUCKET_COLUMN + private val PaimonMetadataColumns = PaimonMetadataColumn.SUPPORTED_METADATA_COLUMNS.toSet // Planning a Paimon scan performs split-planning I/O (reading metadata files). The conversion // pipeline calls plan() twice on the same exec (once in isSupported, once in convert), so we @@ -107,10 +115,21 @@ object PaimonScanSupport extends Logging { logDebug("Skip native Paimon scan: unsupported column data type in read schema.") return None } + val unsupportedMetadataColumns = readSchema.fields.filter { f => + isPaimonMetadataColumn(f.name) && !isSupportedMetadataColumn(f.name) + } + if (unsupportedMetadataColumns.nonEmpty) { + logDebug( + s"Skip native Paimon scan: unsupported metadata columns " + + s"${unsupportedMetadataColumns.map(_.name).mkString(", ")}.") + return None + } val partitionKeys = table.schema().partitionKeys().asScala.toSet - val partitionFields = readSchema.fields.filter(f => containsName(partitionKeys, f.name)) - val fileFields = readSchema.fields.filterNot(f => containsName(partitionKeys, f.name)) + def isPartitionValueField(name: String): Boolean = + containsName(partitionKeys, name) || isSupportedMetadataColumn(name) + val partitionFields = readSchema.fields.filter(f => isPartitionValueField(f.name)) + val fileFields = readSchema.fields.filterNot(f => isPartitionValueField(f.name)) val partitionSchema = StructType(partitionFields) val fileSchema = StructType(fileFields) @@ -151,17 +170,19 @@ object PaimonScanSupport extends Logging { val partitionConverter = new RowDataToObjectArrayConverter(partitionRowType) val files = splits.flatMap { split => - val partitionValues = if (partitionSchema.isEmpty) { - InternalRow.empty - } else { - toPartitionRow( - partitionConverter.convert(split.partition()), - partitionRowType, - partitionSchema, - table.schema().partitionKeys().asScala.toSeq) - } split.dataFiles().asScala.map { dataFile => val filePath = s"${split.bucketPath()}/${dataFile.fileName()}" + val partitionValues = if (partitionSchema.isEmpty) { + InternalRow.empty + } else { + toPartitionRow( + partitionConverter.convert(split.partition()), + partitionRowType, + partitionSchema, + table.schema().partitionKeys().asScala.toSeq, + filePath, + split.bucket()) + } PaimonFile(filePath, dataFile.fileSize(), partitionValues) } } @@ -174,28 +195,50 @@ object PaimonScanSupport extends Logging { names.exists(n => resolver(n, target)) } - // Build a Spark InternalRow for partition values matching partitionSchema's data types. - // Partition values from Paimon are returned in the table's partition-key order; we reorder - // them to match partitionSchema and convert each value into the Spark catalyst representation - // expected for the field's type via Paimon's own DataConverter. This handles dates (epoch - // days), timestamps (micros), decimals and binary correctly, rather than the lossy - // String round-trip a Cast(Literal(value.toString), dataType) would perform. + private def isFilePathMetadataColumn(name: String): Boolean = { + SQLConf.get.resolver(name, PaimonFilePathColumn) + } + + private def isBucketMetadataColumn(name: String): Boolean = { + SQLConf.get.resolver(name, PaimonBucketColumn) + } + + private def isSupportedMetadataColumn(name: String): Boolean = { + isFilePathMetadataColumn(name) || isBucketMetadataColumn(name) + } + + private def isPaimonMetadataColumn(name: String): Boolean = { + containsName(PaimonMetadataColumns, name) || + name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix) + } + + // Build constants for partition columns and supported file-level metadata columns in + // partitionSchema order. Paimon's DataConverter preserves Catalyst representations for typed + // partition values such as dates, timestamps, decimals and binary. private def toPartitionRow( paimonValues: Array[AnyRef], partitionRowType: RowType, partitionSchema: StructType, - partitionKeys: Seq[String]): InternalRow = { + partitionKeys: Seq[String], + filePath: String, + bucket: Int): InternalRow = { val resolver = SQLConf.get.resolver val indexByName = partitionKeys.zipWithIndex.toMap InternalRow.fromSeq(partitionSchema.fields.map { field => - val idx = indexByName - .find { case (k, _) => resolver(k, field.name) } - .map(_._2) - .getOrElse(-1) - if (idx >= 0 && idx < paimonValues.length && paimonValues(idx) != null) { - DataConverter.fromPaimon(paimonValues(idx), partitionRowType.getTypeAt(idx)) + if (isFilePathMetadataColumn(field.name)) { + UTF8String.fromString(filePath) + } else if (isBucketMetadataColumn(field.name)) { + bucket } else { - null + val idx = indexByName + .find { case (k, _) => resolver(k, field.name) } + .map(_._2) + .getOrElse(-1) + if (idx >= 0 && idx < paimonValues.length && paimonValues(idx) != null) { + DataConverter.fromPaimon(paimonValues(idx), partitionRowType.getTypeAt(idx)) + } else { + null + } } }) } From ea31cda3d45d06c3311745787091f3c4b6d857f9 Mon Sep 17 00:00:00 2001 From: linfeng Date: Sun, 28 Jun 2026 22:01:59 +0800 Subject: [PATCH 3/5] apply suggestions --- .../sql/auron/paimon/PaimonScanSupport.scala | 60 +++++++++++++------ .../AuronPaimonV2IntegrationSuite.scala | 29 ++++++++- 2 files changed, 68 insertions(+), 21 deletions(-) diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala index 64252e9ab..30c73a8c2 100644 --- a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala +++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala @@ -115,8 +115,11 @@ object PaimonScanSupport extends Logging { logDebug("Skip native Paimon scan: unsupported column data type in read schema.") return None } + val physicalColumnSet = table.schema().fieldNames().asScala.toSet + def isPhysicalColumn(name: String): Boolean = containsName(physicalColumnSet, name) val unsupportedMetadataColumns = readSchema.fields.filter { f => - isPaimonMetadataColumn(f.name) && !isSupportedMetadataColumn(f.name) + !isPhysicalColumn(f.name) && isPaimonMetadataColumn(f.name) && !isSupportedMetadataColumn( + f.name) } if (unsupportedMetadataColumns.nonEmpty) { logDebug( @@ -125,9 +128,10 @@ object PaimonScanSupport extends Logging { return None } - val partitionKeys = table.schema().partitionKeys().asScala.toSet + val partitionKeySet = table.schema().partitionKeys().asScala.toSet def isPartitionValueField(name: String): Boolean = - containsName(partitionKeys, name) || isSupportedMetadataColumn(name) + containsName(partitionKeySet, name) || + (!isPhysicalColumn(name) && isSupportedMetadataColumn(name)) val partitionFields = readSchema.fields.filter(f => isPartitionValueField(f.name)) val fileFields = readSchema.fields.filterNot(f => isPartitionValueField(f.name)) val partitionSchema = StructType(partitionFields) @@ -168,20 +172,28 @@ object PaimonScanSupport extends Logging { val partitionRowType = table.schema().logicalPartitionType() val partitionConverter = new RowDataToObjectArrayConverter(partitionRowType) + val partitionKeys = table.schema().partitionKeys().asScala.toSeq + val filePathMetadataIndex = partitionSchema.fields.indexWhere { field => + isFilePathMetadataColumn(field.name) + } val files = splits.flatMap { split => + val partitionValueTemplate = if (partitionSchema.isEmpty) { + Array.empty[Any] + } else { + toPartitionValueTemplate( + partitionConverter.convert(split.partition()), + partitionRowType, + partitionSchema, + partitionKeys, + split.bucket()) + } split.dataFiles().asScala.map { dataFile => val filePath = s"${split.bucketPath()}/${dataFile.fileName()}" val partitionValues = if (partitionSchema.isEmpty) { InternalRow.empty } else { - toPartitionRow( - partitionConverter.convert(split.partition()), - partitionRowType, - partitionSchema, - table.schema().partitionKeys().asScala.toSeq, - filePath, - split.bucket()) + toPartitionRow(partitionValueTemplate, filePathMetadataIndex, filePath) } PaimonFile(filePath, dataFile.fileSize(), partitionValues) } @@ -209,24 +221,23 @@ object PaimonScanSupport extends Logging { private def isPaimonMetadataColumn(name: String): Boolean = { containsName(PaimonMetadataColumns, name) || - name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix) + name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix) } - // Build constants for partition columns and supported file-level metadata columns in + // Build split-invariant constants for partition columns and supported metadata columns in // partitionSchema order. Paimon's DataConverter preserves Catalyst representations for typed // partition values such as dates, timestamps, decimals and binary. - private def toPartitionRow( + private def toPartitionValueTemplate( paimonValues: Array[AnyRef], partitionRowType: RowType, partitionSchema: StructType, partitionKeys: Seq[String], - filePath: String, - bucket: Int): InternalRow = { + bucket: Int): Array[Any] = { val resolver = SQLConf.get.resolver val indexByName = partitionKeys.zipWithIndex.toMap - InternalRow.fromSeq(partitionSchema.fields.map { field => + partitionSchema.fields.map { field => if (isFilePathMetadataColumn(field.name)) { - UTF8String.fromString(filePath) + null } else if (isBucketMetadataColumn(field.name)) { bucket } else { @@ -240,7 +251,20 @@ object PaimonScanSupport extends Logging { null } } - }) + } + } + + private def toPartitionRow( + partitionValueTemplate: Array[Any], + filePathMetadataIndex: Int, + filePath: String): InternalRow = { + if (filePathMetadataIndex < 0) { + InternalRow.fromSeq(partitionValueTemplate) + } else { + val partitionValues = partitionValueTemplate.clone() + partitionValues(filePathMetadataIndex) = UTF8String.fromString(filePath) + InternalRow.fromSeq(partitionValues) + } } private def collectSplits(partitions: Seq[InputPartition]): Option[Seq[DataSplit]] = { diff --git a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala index 30457362b..4bad3d596 100644 --- a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala +++ b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala @@ -219,6 +219,30 @@ class AuronPaimonV2IntegrationSuite } } + test("paimon v2 native scan supports metadata-only projection") { + withTable("paimon.db.t_metadata_only") { + sql("create table paimon.db.t_metadata_only (id int, v string) using paimon") + sql("insert into paimon.db.t_metadata_only values (1, 'a'), (2, 'b')") + + checkSparkAnswerAndNativePaimonScan( + "select __paimon_file_path from paimon.db.t_metadata_only") + } + } + + test("paimon v2 native scan reads physical columns that share metadata names") { + withTable("paimon.db.t_metadata_name_collision") { + sql(""" + |create table paimon.db.t_metadata_name_collision + |(`__paimon_bucket` int, id int) + |using paimon + |""".stripMargin) + sql("insert into paimon.db.t_metadata_name_collision values (10, 1), (20, 2)") + + checkSparkAnswerAndNativePaimonScan( + "select `__paimon_bucket`, id from paimon.db.t_metadata_name_collision") + } + } + test("paimon v2 native scan supports metadata columns with table partitions") { withTable("paimon.db.t_metadata_part") { sql(""" @@ -245,7 +269,7 @@ class AuronPaimonV2IntegrationSuite assert(df.collect().length === 1) } } - + private def assertNativePaimonScanApplied(df: DataFrame): Unit = { val plan = df.queryExecution.executedPlan.toString() assert( @@ -253,7 +277,7 @@ class AuronPaimonV2IntegrationSuite s"plan should use native paimon scan:\n$plan") } - private def checkSparkAnswerAndNativePaimonScan(sqlText: String): DataFrame = { + private def checkSparkAnswerAndNativePaimonScan(sqlText: String): Unit = { var expected: Seq[Row] = Nil withSQLConf("spark.auron.enable.paimon.scan" -> "false") { expected = sql(sqlText).collect().toSeq @@ -262,7 +286,6 @@ class AuronPaimonV2IntegrationSuite val df = sql(sqlText) checkAnswer(df, expected) assertNativePaimonScanApplied(df) - df } private def executedNativeScan(df: DataFrame): NativePaimonV2TableScanExec = { From fcb091446cc8b220644de15a6553f70915f906e1 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Mon, 29 Jun 2026 22:28:47 +0800 Subject: [PATCH 4/5] test: add tests for paimon v2 scan exec --- .../AuronPaimonV2IntegrationSuite.scala | 140 +++++++++++++++++- 1 file changed, 133 insertions(+), 7 deletions(-) diff --git a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala index 4bad3d596..9b32ba144 100644 --- a/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala +++ b/thirdparty/auron-paimon/src/test/scala/org/apache/auron/paimon/AuronPaimonV2IntegrationSuite.scala @@ -22,9 +22,13 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ +import org.apache.paimon.spark.PaimonInputPartition +import org.apache.paimon.table.source.DataSplit import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.auron.paimon.PaimonScanSupport import org.apache.spark.sql.execution.auron.plan.NativePaimonV2TableScanExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates class AuronPaimonV2IntegrationSuite @@ -223,9 +227,70 @@ class AuronPaimonV2IntegrationSuite withTable("paimon.db.t_metadata_only") { sql("create table paimon.db.t_metadata_only (id int, v string) using paimon") sql("insert into paimon.db.t_metadata_only values (1, 'a'), (2, 'b')") + sql("insert into paimon.db.t_metadata_only values (3, 'c'), (4, 'd')") - checkSparkAnswerAndNativePaimonScan( - "select __paimon_file_path from paimon.db.t_metadata_only") + withSQLConf( + "spark.sql.files.maxPartitionBytes" -> "256", + "spark.sql.files.openCostInBytes" -> "1") { + checkSparkAnswerAndNativePaimonScan( + "select __paimon_file_path from paimon.db.t_metadata_only") + } + } + } + + test( + "paimon v2 native scan planning supports metadata across multiple data files in one split") { + withTable("paimon.db.t_metadata_multi_file_split") { + sql(""" + |create table paimon.db.t_metadata_multi_file_split (id int, v string) + |using paimon + |tblproperties ( + | 'source.split.target-size' = '1 gb', + | 'source.split.open-file-cost' = '1 b' + |) + |""".stripMargin) + withSQLConf("spark.sql.shuffle.partitions" -> "2") { + spark + .range(0, 20) + .repartition(2) + .selectExpr("cast(id as int) as id", "cast(id as string) as v") + .writeTo("paimon.db.t_metadata_multi_file_split") + .append() + } + + val sqlText = + "select id, v, __paimon_file_path, __paimon_bucket " + + "from paimon.db.t_metadata_multi_file_split" + withSQLConf("spark.sql.files.minPartitionNum" -> "1") { + var expected: Seq[Row] = Nil + withSQLConf("spark.auron.enable.paimon.scan" -> "false") { + expected = sql(sqlText).collect().toSeq + } + assert( + expected.map(_.getString(2)).distinct.size > 1, + s"expected rows from multiple data files, got $expected") + + val df = sql(sqlText) + checkAnswer(df, expected) + val plan = df.queryExecution.sparkPlan + val batchScan = plan.collectFirst { case scan: BatchScanExec => scan } + assert(batchScan.nonEmpty, s"expected BatchScanExec in spark plan:\n$plan") + val splits = paimonDataSplits(batchScan.get) + assert( + splits.exists(_.dataFiles().size() > 1), + s"expected at least one Paimon split with multiple data files, got " + + s"${splits.map(_.dataFiles().size()).mkString("[", ", ", "]")}") + + val scanPlan = PaimonScanSupport.plan(batchScan.get) + assert(scanPlan.nonEmpty, s"expected native Paimon scan plan for:\n$batchScan") + val filePathIndex = scanPlan.get.partitionSchema.fieldIndex("__paimon_file_path") + val plannedFilePaths = scanPlan.get.files.map { file => + file.partitionValues.getUTF8String(filePathIndex).toString + } + assert( + plannedFilePaths.distinct.size > 1, + s"expected per-file metadata paths in native scan plan, got $plannedFilePaths") + } } } @@ -243,6 +308,26 @@ class AuronPaimonV2IntegrationSuite } } + test("paimon v2 native scan reads partition columns that share metadata names") { + withTable("paimon.db.t_metadata_partition_name_collision") { + sql(""" + |create table paimon.db.t_metadata_partition_name_collision + |(`__paimon_bucket` int, `__paimon_file_path` string, id int) + |using paimon + |partitioned by (`__paimon_bucket`, `__paimon_file_path`) + |""".stripMargin) + sql(""" + |insert into paimon.db.t_metadata_partition_name_collision values + |(10, 'path-a', 1), + |(20, 'path-b', 2) + |""".stripMargin) + + checkSparkAnswerAndNativePaimonScan( + "select `__paimon_bucket`, `__paimon_file_path`, id " + + "from paimon.db.t_metadata_partition_name_collision") + } + } + test("paimon v2 native scan supports metadata columns with table partitions") { withTable("paimon.db.t_metadata_part") { sql(""" @@ -250,13 +335,43 @@ class AuronPaimonV2IntegrationSuite |using paimon |partitioned by (p) |""".stripMargin) - sql("insert into paimon.db.t_metadata_part values (1, 'a', 'p1'), (2, 'b', 'p2')") + sql("insert into paimon.db.t_metadata_part values (1, 'a', 'p1'), (2, 'b', '50%')") checkSparkAnswerAndNativePaimonScan( "select p, __paimon_file_path, id, __paimon_bucket from paimon.db.t_metadata_part") } } + test("paimon v2 native scan supports non-zero bucket metadata columns") { + withTable("paimon.db.t_metadata_bucketed") { + sql(""" + |create table paimon.db.t_metadata_bucketed (id int, v string) + |using paimon + |tblproperties ( + | 'primary-key' = 'id', + | 'bucket' = '2', + | 'full-compaction.delta-commits' = '1' + |) + |""".stripMargin) + sql( + "insert into paimon.db.t_metadata_bucketed " + + "select cast(id as int), cast(id as string) from range(0, 100)") + + var expected: Seq[Row] = Nil + withSQLConf("spark.auron.enable.paimon.scan" -> "false") { + expected = + sql("select id, __paimon_bucket from paimon.db.t_metadata_bucketed").collect().toSeq + } + assert( + expected.exists(_.getInt(1) != 0), + s"expected at least one non-zero Paimon bucket, got $expected") + + val df = sql("select id, __paimon_bucket from paimon.db.t_metadata_bucketed") + checkAnswer(df, expected) + assertNativePaimonScanApplied(df) + } + } + test("paimon v2 native scan falls back for unsupported metadata columns") { withTable("paimon.db.t_metadata_unsupported") { sql("create table paimon.db.t_metadata_unsupported (id int, v string) using paimon") @@ -289,10 +404,21 @@ class AuronPaimonV2IntegrationSuite } private def executedNativeScan(df: DataFrame): NativePaimonV2TableScanExec = { - val nativeScan = df.queryExecution.executedPlan.collectFirst { - case scan: NativePaimonV2TableScanExec => scan - } - assert(nativeScan.nonEmpty, "expected NativePaimonV2TableScanExec in executed plan") + val plan = df.queryExecution.executedPlan + val nativeScan = plan.collectFirst { case scan: NativePaimonV2TableScanExec => scan } + assert(nativeScan.nonEmpty, s"expected NativePaimonV2TableScanExec in executed plan:\n$plan") nativeScan.get } + + private def paimonDataSplits(batchScan: BatchScanExec): Seq[DataSplit] = { + batchScan.scan.toBatch.planInputPartitions().toSeq.flatMap { partition => + assert( + partition.isInstanceOf[PaimonInputPartition], + s"expected Paimon input partition, got $partition") + partition + .asInstanceOf[PaimonInputPartition] + .splits + .collect { case split: DataSplit => split } + } + } } From 78641670f129961e22d4f10c2ff1180b2195cc51 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Mon, 29 Jun 2026 22:30:54 +0800 Subject: [PATCH 5/5] fix paimon metadata column handling --- .../sql/auron/paimon/PaimonScanSupport.scala | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala index 30c73a8c2..fddc3df5d 100644 --- a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala +++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/auron/paimon/PaimonScanSupport.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.commons.lang3.reflect.MethodUtils +import org.apache.paimon.fs.Path import org.apache.paimon.spark.DataConverter import org.apache.paimon.spark.schema.PaimonMetadataColumn import org.apache.paimon.table.FileStoreTable @@ -128,7 +129,8 @@ object PaimonScanSupport extends Logging { return None } - val partitionKeySet = table.schema().partitionKeys().asScala.toSet + val partitionKeys = table.schema().partitionKeys().asScala.toSeq + val partitionKeySet = partitionKeys.toSet def isPartitionValueField(name: String): Boolean = containsName(partitionKeySet, name) || (!isPhysicalColumn(name) && isSupportedMetadataColumn(name)) @@ -172,9 +174,16 @@ object PaimonScanSupport extends Logging { val partitionRowType = table.schema().logicalPartitionType() val partitionConverter = new RowDataToObjectArrayConverter(partitionRowType) - val partitionKeys = table.schema().partitionKeys().asScala.toSeq + val partitionKeyIndexByName = partitionKeys.zipWithIndex.toMap + val resolver = SQLConf.get.resolver + val partitionFieldIndexes = partitionSchema.fields.map { field => + partitionKeyIndexByName + .find { case (k, _) => resolver(k, field.name) } + .map(_._2) + .getOrElse(-1) + } val filePathMetadataIndex = partitionSchema.fields.indexWhere { field => - isFilePathMetadataColumn(field.name) + !isPhysicalColumn(field.name) && isFilePathMetadataColumn(field.name) } val files = splits.flatMap { split => @@ -185,17 +194,20 @@ object PaimonScanSupport extends Logging { partitionConverter.convert(split.partition()), partitionRowType, partitionSchema, - partitionKeys, + partitionFieldIndexes, split.bucket()) } split.dataFiles().asScala.map { dataFile => - val filePath = s"${split.bucketPath()}/${dataFile.fileName()}" + val rawFilePath = dataFile + .externalPath() + .orElse(s"${split.bucketPath()}/${dataFile.fileName()}") + val metadataFilePath = new Path(rawFilePath).toUri.toString val partitionValues = if (partitionSchema.isEmpty) { InternalRow.empty } else { - toPartitionRow(partitionValueTemplate, filePathMetadataIndex, filePath) + toPartitionRow(partitionValueTemplate, filePathMetadataIndex, metadataFilePath) } - PaimonFile(filePath, dataFile.fileSize(), partitionValues) + PaimonFile(rawFilePath, dataFile.fileSize(), partitionValues) } } @@ -231,25 +243,23 @@ object PaimonScanSupport extends Logging { paimonValues: Array[AnyRef], partitionRowType: RowType, partitionSchema: StructType, - partitionKeys: Seq[String], + partitionFieldIndexes: Array[Int], bucket: Int): Array[Any] = { - val resolver = SQLConf.get.resolver - val indexByName = partitionKeys.zipWithIndex.toMap - partitionSchema.fields.map { field => - if (isFilePathMetadataColumn(field.name)) { + partitionSchema.fields.zip(partitionFieldIndexes).map { case (field, partitionColumnIndex) => + if (partitionColumnIndex >= 0 && partitionColumnIndex < paimonValues.length) { + if (paimonValues(partitionColumnIndex) == null) { + null + } else { + DataConverter.fromPaimon( + paimonValues(partitionColumnIndex), + partitionRowType.getTypeAt(partitionColumnIndex)) + } + } else if (isFilePathMetadataColumn(field.name)) { null } else if (isBucketMetadataColumn(field.name)) { bucket } else { - val idx = indexByName - .find { case (k, _) => resolver(k, field.name) } - .map(_._2) - .getOrElse(-1) - if (idx >= 0 && idx < paimonValues.length && paimonValues(idx) != null) { - DataConverter.fromPaimon(paimonValues(idx), partitionRowType.getTypeAt(idx)) - } else { - null - } + null } } }