Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
package org.apache.spark.sql.auron.paimon

import java.util.Locale

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.commons.lang3.reflect.MethodUtils
import org.apache.paimon.fs.Path
import org.apache.paimon.spark.DataConverter
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.source.{DataSplit, Split}
import org.apache.paimon.types.RowType
Expand All @@ -34,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.hive.auron.paimon.PaimonUtil
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

final case class PaimonFile(filePath: String, fileSize: Long, partitionValues: InternalRow)

Expand All @@ -49,6 +54,10 @@ object PaimonScanSupport extends Logging {

private val PaimonBaseScanClassName = "org.apache.paimon.spark.PaimonBaseScan"
private val PaimonInputPartitionClassName = "org.apache.paimon.spark.PaimonInputPartition"
private val PaimonMetadataColumnPrefix = "__paimon_"
private val PaimonFilePathColumn = PaimonMetadataColumn.FILE_PATH_COLUMN
private val PaimonBucketColumn = PaimonMetadataColumn.BUCKET_COLUMN
private val PaimonMetadataColumns = PaimonMetadataColumn.SUPPORTED_METADATA_COLUMNS.toSet

// Planning a Paimon scan performs split-planning I/O (reading metadata files). The conversion
// pipeline calls plan() twice on the same exec (once in isSupported, once in convert), so we
Expand Down Expand Up @@ -107,10 +116,26 @@ object PaimonScanSupport extends Logging {
logDebug("Skip native Paimon scan: unsupported column data type in read schema.")
return None
}
val physicalColumnSet = table.schema().fieldNames().asScala.toSet
def isPhysicalColumn(name: String): Boolean = containsName(physicalColumnSet, name)
val unsupportedMetadataColumns = readSchema.fields.filter { f =>
!isPhysicalColumn(f.name) && isPaimonMetadataColumn(f.name) && !isSupportedMetadataColumn(
f.name)
}
if (unsupportedMetadataColumns.nonEmpty) {
logDebug(
s"Skip native Paimon scan: unsupported metadata columns " +
s"${unsupportedMetadataColumns.map(_.name).mkString(", ")}.")
return None
}

val partitionKeys = table.schema().partitionKeys().asScala.toSet
val partitionFields = readSchema.fields.filter(f => containsName(partitionKeys, f.name))
val fileFields = readSchema.fields.filterNot(f => containsName(partitionKeys, f.name))
val partitionKeys = table.schema().partitionKeys().asScala.toSeq
val partitionKeySet = partitionKeys.toSet
def isPartitionValueField(name: String): Boolean =
containsName(partitionKeySet, name) ||
(!isPhysicalColumn(name) && isSupportedMetadataColumn(name))
val partitionFields = readSchema.fields.filter(f => isPartitionValueField(f.name))
val fileFields = readSchema.fields.filterNot(f => isPartitionValueField(f.name))
Comment thread
SteNicholas marked this conversation as resolved.
val partitionSchema = StructType(partitionFields)
val fileSchema = StructType(fileFields)
Comment thread
SteNicholas marked this conversation as resolved.

Expand Down Expand Up @@ -149,20 +174,40 @@ object PaimonScanSupport extends Logging {

val partitionRowType = table.schema().logicalPartitionType()
val partitionConverter = new RowDataToObjectArrayConverter(partitionRowType)
val partitionKeyIndexByName = partitionKeys.zipWithIndex.toMap
val resolver = SQLConf.get.resolver
val partitionFieldIndexes = partitionSchema.fields.map { field =>
partitionKeyIndexByName
.find { case (k, _) => resolver(k, field.name) }
.map(_._2)
.getOrElse(-1)
}
val filePathMetadataIndex = partitionSchema.fields.indexWhere { field =>
!isPhysicalColumn(field.name) && isFilePathMetadataColumn(field.name)
}

val files = splits.flatMap { split =>
val partitionValues = if (partitionSchema.isEmpty) {
InternalRow.empty
val partitionValueTemplate = if (partitionSchema.isEmpty) {
Array.empty[Any]
} else {
toPartitionRow(
toPartitionValueTemplate(
partitionConverter.convert(split.partition()),
partitionRowType,
partitionSchema,
table.schema().partitionKeys().asScala.toSeq)
partitionFieldIndexes,
split.bucket())
}
split.dataFiles().asScala.map { dataFile =>
val filePath = s"${split.bucketPath()}/${dataFile.fileName()}"
PaimonFile(filePath, dataFile.fileSize(), partitionValues)
val rawFilePath = dataFile
.externalPath()
.orElse(s"${split.bucketPath()}/${dataFile.fileName()}")
val metadataFilePath = new Path(rawFilePath).toUri.toString
val partitionValues = if (partitionSchema.isEmpty) {
Comment thread
SteNicholas marked this conversation as resolved.
InternalRow.empty
} else {
toPartitionRow(partitionValueTemplate, filePathMetadataIndex, metadataFilePath)
}
PaimonFile(rawFilePath, dataFile.fileSize(), partitionValues)
}
}

Expand All @@ -174,30 +219,62 @@ object PaimonScanSupport extends Logging {
names.exists(n => resolver(n, target))
}

// Build a Spark InternalRow for partition values matching partitionSchema's data types.
// Partition values from Paimon are returned in the table's partition-key order; we reorder
// them to match partitionSchema and convert each value into the Spark catalyst representation
// expected for the field's type via Paimon's own DataConverter. This handles dates (epoch
// days), timestamps (micros), decimals and binary correctly, rather than the lossy
// String round-trip a Cast(Literal(value.toString), dataType) would perform.
private def toPartitionRow(
private def isFilePathMetadataColumn(name: String): Boolean = {
SQLConf.get.resolver(name, PaimonFilePathColumn)
}

private def isBucketMetadataColumn(name: String): Boolean = {
SQLConf.get.resolver(name, PaimonBucketColumn)
}

private def isSupportedMetadataColumn(name: String): Boolean = {
isFilePathMetadataColumn(name) || isBucketMetadataColumn(name)
}

private def isPaimonMetadataColumn(name: String): Boolean = {
containsName(PaimonMetadataColumns, name) ||
name.toLowerCase(Locale.ROOT).startsWith(PaimonMetadataColumnPrefix)
}

// Build split-invariant constants for partition columns and supported metadata columns in
// partitionSchema order. Paimon's DataConverter preserves Catalyst representations for typed
// partition values such as dates, timestamps, decimals and binary.
private def toPartitionValueTemplate(
paimonValues: Array[AnyRef],
partitionRowType: RowType,
partitionSchema: StructType,
partitionKeys: Seq[String]): InternalRow = {
val resolver = SQLConf.get.resolver
val indexByName = partitionKeys.zipWithIndex.toMap
InternalRow.fromSeq(partitionSchema.fields.map { field =>
val idx = indexByName
.find { case (k, _) => resolver(k, field.name) }
.map(_._2)
.getOrElse(-1)
if (idx >= 0 && idx < paimonValues.length && paimonValues(idx) != null) {
DataConverter.fromPaimon(paimonValues(idx), partitionRowType.getTypeAt(idx))
partitionFieldIndexes: Array[Int],
bucket: Int): Array[Any] = {
partitionSchema.fields.zip(partitionFieldIndexes).map { case (field, partitionColumnIndex) =>
if (partitionColumnIndex >= 0 && partitionColumnIndex < paimonValues.length) {
if (paimonValues(partitionColumnIndex) == null) {
null
} else {
DataConverter.fromPaimon(
paimonValues(partitionColumnIndex),
partitionRowType.getTypeAt(partitionColumnIndex))
}
} else if (isFilePathMetadataColumn(field.name)) {
null
} else if (isBucketMetadataColumn(field.name)) {
Comment thread
SteNicholas marked this conversation as resolved.
bucket
} else {
null
}
})
}
}

private def toPartitionRow(
partitionValueTemplate: Array[Any],
filePathMetadataIndex: Int,
filePath: String): InternalRow = {
if (filePathMetadataIndex < 0) {
InternalRow.fromSeq(partitionValueTemplate)
} else {
val partitionValues = partitionValueTemplate.clone()
partitionValues(filePathMetadataIndex) = UTF8String.fromString(filePath)
InternalRow.fromSeq(partitionValues)
}
}

private def collectSplits(partitions: Seq[InputPartition]): Option[Seq[DataSplit]] = {
Expand Down
Loading
Loading