Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.apache.spark.sql.auron.paimon

import java.util.Locale

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.commons.lang3.reflect.MethodUtils
import org.apache.paimon.spark.DataConverter
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.source.{DataSplit, Split}
import org.apache.paimon.types.RowType
Expand All @@ -34,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.hive.auron.paimon.PaimonUtil
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

final case class PaimonFile(filePath: String, fileSize: Long, partitionValues: InternalRow)

Expand All @@ -49,6 +53,10 @@ object PaimonScanSupport extends Logging {

private val PaimonBaseScanClassName = "org.apache.paimon.spark.PaimonBaseScan"
private val PaimonInputPartitionClassName = "org.apache.paimon.spark.PaimonInputPartition"
private val PaimonMetadataColumnPrefix = "__paimon_"
private val PaimonFilePathColumn = PaimonMetadataColumn.FILE_PATH_COLUMN
private val PaimonBucketColumn = PaimonMetadataColumn.BUCKET_COLUMN
private val PaimonMetadataColumns = PaimonMetadataColumn.SUPPORTED_METADATA_COLUMNS.toSet

// Planning a Paimon scan performs split-planning I/O (reading metadata files). The conversion
// pipeline calls plan() twice on the same exec (once in isSupported, once in convert), so we
Expand Down Expand Up @@ -107,10 +115,25 @@ object PaimonScanSupport extends Logging {
logDebug("Skip native Paimon scan: unsupported column data type in read schema.")
return None
}
val physicalColumnSet = table.schema().fieldNames().asScala.toSet
def isPhysicalColumn(name: String): Boolean = containsName(physicalColumnSet, name)
val unsupportedMetadataColumns = readSchema.fields.filter { f =>
!isPhysicalColumn(f.name) && isPaimonMetadataColumn(f.name) && !isSupportedMetadataColumn(
f.name)
}
if (unsupportedMetadataColumns.nonEmpty) {
logDebug(
s"Skip native Paimon scan: unsupported metadata columns " +
s"${unsupportedMetadataColumns.map(_.name).mkString(", ")}.")
return None
}

val partitionKeys = table.schema().partitionKeys().asScala.toSet
val partitionFields = readSchema.fields.filter(f => containsName(partitionKeys, f.name))
val fileFields = readSchema.fields.filterNot(f => containsName(partitionKeys, f.name))
val partitionKeySet = table.schema().partitionKeys().asScala.toSet
def isPartitionValueField(name: String): Boolean =
containsName(partitionKeySet, name) ||
(!isPhysicalColumn(name) && isSupportedMetadataColumn(name))
val partitionFields = readSchema.fields.filter(f => isPartitionValueField(f.name))
val fileFields = readSchema.fields.filterNot(f => isPartitionValueField(f.name))
Comment thread
SteNicholas marked this conversation as resolved.
val partitionSchema = StructType(partitionFields)
val fileSchema = StructType(fileFields)
Comment thread
SteNicholas marked this conversation as resolved.

Expand Down Expand Up @@ -149,19 +172,29 @@ object PaimonScanSupport extends Logging {

val partitionRowType = table.schema().logicalPartitionType()
val partitionConverter = new RowDataToObjectArrayConverter(partitionRowType)
val partitionKeys = table.schema().partitionKeys().asScala.toSeq
val filePathMetadataIndex = partitionSchema.fields.indexWhere { field =>
isFilePathMetadataColumn(field.name)
}

val files = splits.flatMap { split =>
val partitionValues = if (partitionSchema.isEmpty) {
InternalRow.empty
val partitionValueTemplate = if (partitionSchema.isEmpty) {
Array.empty[Any]
} else {
toPartitionRow(
toPartitionValueTemplate(
partitionConverter.convert(split.partition()),
partitionRowType,
partitionSchema,
table.schema().partitionKeys().asScala.toSeq)
partitionKeys,
split.bucket())
}
split.dataFiles().asScala.map { dataFile =>
val filePath = s"${split.bucketPath()}/${dataFile.fileName()}"
Comment thread
SteNicholas marked this conversation as resolved.
Outdated
val partitionValues = if (partitionSchema.isEmpty) {
Comment thread
SteNicholas marked this conversation as resolved.
InternalRow.empty
} else {
toPartitionRow(partitionValueTemplate, filePathMetadataIndex, filePath)
}
PaimonFile(filePath, dataFile.fileSize(), partitionValues)
}
}
Expand All @@ -174,30 +207,64 @@ object PaimonScanSupport extends Logging {
names.exists(n => resolver(n, target))
}

// Build a Spark InternalRow for partition values matching partitionSchema's data types.
// Partition values from Paimon are returned in the table's partition-key order; we reorder
// them to match partitionSchema and convert each value into the Spark catalyst representation
// expected for the field's type via Paimon's own DataConverter. This handles dates (epoch
// days), timestamps (micros), decimals and binary correctly, rather than the lossy
// String round-trip a Cast(Literal(value.toString), dataType) would perform.
private def toPartitionRow(
private def isFilePathMetadataColumn(name: String): Boolean = {
SQLConf.get.resolver(name, PaimonFilePathColumn)
}

private def isBucketMetadataColumn(name: String): Boolean = {
SQLConf.get.resolver(name, PaimonBucketColumn)
}

private def isSupportedMetadataColumn(name: String): Boolean = {
isFilePathMetadataColumn(name) || isBucketMetadataColumn(name)
}

private def isPaimonMetadataColumn(name: String): Boolean = {
containsName(PaimonMetadataColumns, name) ||
name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix)
}

// Build split-invariant constants for partition columns and supported metadata columns in
// partitionSchema order. Paimon's DataConverter preserves Catalyst representations for typed
// partition values such as dates, timestamps, decimals and binary.
private def toPartitionValueTemplate(
paimonValues: Array[AnyRef],
partitionRowType: RowType,
partitionSchema: StructType,
partitionKeys: Seq[String]): InternalRow = {
partitionKeys: Seq[String],
bucket: Int): Array[Any] = {
val resolver = SQLConf.get.resolver
val indexByName = partitionKeys.zipWithIndex.toMap
InternalRow.fromSeq(partitionSchema.fields.map { field =>
val idx = indexByName
.find { case (k, _) => resolver(k, field.name) }
.map(_._2)
.getOrElse(-1)
if (idx >= 0 && idx < paimonValues.length && paimonValues(idx) != null) {
DataConverter.fromPaimon(paimonValues(idx), partitionRowType.getTypeAt(idx))
} else {
partitionSchema.fields.map { field =>
if (isFilePathMetadataColumn(field.name)) {
null
} else if (isBucketMetadataColumn(field.name)) {
Comment thread
SteNicholas marked this conversation as resolved.
bucket
} else {
val idx = indexByName
.find { case (k, _) => resolver(k, field.name) }
.map(_._2)
.getOrElse(-1)
if (idx >= 0 && idx < paimonValues.length && paimonValues(idx) != null) {
DataConverter.fromPaimon(paimonValues(idx), partitionRowType.getTypeAt(idx))
} else {
null
}
}
})
}
}

private def toPartitionRow(
partitionValueTemplate: Array[Any],
filePathMetadataIndex: Int,
filePath: String): InternalRow = {
if (filePathMetadataIndex < 0) {
InternalRow.fromSeq(partitionValueTemplate)
} else {
val partitionValues = partitionValueTemplate.clone()
partitionValues(filePathMetadataIndex) = UTF8String.fromString(filePath)
InternalRow.fromSeq(partitionValues)
}
}

private def collectSplits(partitions: Seq[InputPartition]): Option[Seq[DataSplit]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,85 @@ class AuronPaimonV2IntegrationSuite
}
}

test("paimon v2 native scan supports file-level metadata columns") {
withTable("paimon.db.t_metadata") {
sql("create table paimon.db.t_metadata (id int, v string) using paimon")
sql("insert into paimon.db.t_metadata values (1, 'a')")

checkSparkAnswerAndNativePaimonScan(
"select id, __paimon_file_path, __paimon_bucket from paimon.db.t_metadata")
Comment thread
SteNicholas marked this conversation as resolved.
}
}

test("paimon v2 native scan supports metadata-only projection") {
withTable("paimon.db.t_metadata_only") {
sql("create table paimon.db.t_metadata_only (id int, v string) using paimon")
sql("insert into paimon.db.t_metadata_only values (1, 'a'), (2, 'b')")

checkSparkAnswerAndNativePaimonScan(
"select __paimon_file_path from paimon.db.t_metadata_only")
}
}

test("paimon v2 native scan reads physical columns that share metadata names") {
withTable("paimon.db.t_metadata_name_collision") {
sql("""
|create table paimon.db.t_metadata_name_collision
|(`__paimon_bucket` int, id int)
|using paimon
|""".stripMargin)
sql("insert into paimon.db.t_metadata_name_collision values (10, 1), (20, 2)")

checkSparkAnswerAndNativePaimonScan(
"select `__paimon_bucket`, id from paimon.db.t_metadata_name_collision")
}
}

test("paimon v2 native scan supports metadata columns with table partitions") {
withTable("paimon.db.t_metadata_part") {
sql("""
|create table paimon.db.t_metadata_part (id int, v string, p string)
|using paimon
|partitioned by (p)
|""".stripMargin)
sql("insert into paimon.db.t_metadata_part values (1, 'a', 'p1'), (2, 'b', 'p2')")

checkSparkAnswerAndNativePaimonScan(
"select p, __paimon_file_path, id, __paimon_bucket from paimon.db.t_metadata_part")
}
}

test("paimon v2 native scan falls back for unsupported metadata columns") {
withTable("paimon.db.t_metadata_unsupported") {
sql("create table paimon.db.t_metadata_unsupported (id int, v string) using paimon")
sql("insert into paimon.db.t_metadata_unsupported values (1, 'a')")

val df = sql("select id, __paimon_row_index from paimon.db.t_metadata_unsupported")
val plan = df.queryExecution.executedPlan.toString()

assert(!plan.contains("NativePaimonV2TableScan"))
assert(df.collect().length === 1)
}
}

private def assertNativePaimonScanApplied(df: DataFrame): Unit = {
val plan = df.queryExecution.executedPlan.toString()
assert(
plan.contains("NativePaimonV2TableScan"),
s"plan should use native paimon scan:\n$plan")
}

private def checkSparkAnswerAndNativePaimonScan(sqlText: String): Unit = {
var expected: Seq[Row] = Nil
withSQLConf("spark.auron.enable.paimon.scan" -> "false") {
expected = sql(sqlText).collect().toSeq
}

val df = sql(sqlText)
checkAnswer(df, expected)
assertNativePaimonScanApplied(df)
}

private def executedNativeScan(df: DataFrame): NativePaimonV2TableScanExec = {
val nativeScan = df.queryExecution.executedPlan.collectFirst {
case scan: NativePaimonV2TableScanExec => scan
Expand Down
Loading