Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.Random
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.auron.plan.NativeAggBase
import org.apache.spark.sql.functions.{collect_list, monotonically_increasing_id, rand, randn, spark_partition_id, sum}
import org.apache.spark.sql.functions.{collect_list, last, monotonically_increasing_id, rand, randn, spark_partition_id, sum}
import org.apache.spark.sql.internal.SQLConf

class AuronDataFrameAggregateSuite extends DataFrameAggregateSuite with SparkQueryTestsBase {
Expand Down Expand Up @@ -75,4 +75,31 @@ class AuronDataFrameAggregateSuite extends DataFrameAggregateSuite with SparkQue
rand(Random.nextLong()),
randn(Random.nextLong())).foreach(assertNoExceptions)
}

testAuron("native last / last(ignoreNulls) aggregate") {
// The grouped aggregate is reliably offloaded to NativeAggBase, and the data
// is deterministic by construction (no intra-group ordering dependence):
// k=1 -> all values 10 => last=10, last(ignoreNulls)=10
// k=2 -> all values null => last=null, last(ignoreNulls)=null
// k=3 -> single row 30 => last=30, last(ignoreNulls)=30
val df = Seq[(Int, Option[Int])](
(1, Some(10)),
(1, Some(10)),
(2, None),
(2, None),
(3, Some(30)))
.toDF("k", "v")

val aggDF = df
.groupBy("k")
.agg(last($"v").as("last_v"), last($"v", ignoreNulls = true).as("last_v_ign"))

checkAnswer(aggDF, Seq(Row(1, 10, 10), Row(2, null, null), Row(3, 30, 30)))

// the aggregate must be offloaded to the native engine
assert(getExecutedPlan(aggDF).exists {
case _: NativeAggBase => true
case _ => false
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.Random
import org.apache.spark.sql.execution.WholeStageCodegenExec
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.auron.plan.NativeAggBase
import org.apache.spark.sql.functions.{collect_list, monotonically_increasing_id, rand, randn, spark_partition_id, sum}
import org.apache.spark.sql.functions.{collect_list, last, monotonically_increasing_id, rand, randn, spark_partition_id, sum}
import org.apache.spark.sql.internal.SQLConf

class AuronDataFrameAggregateSuite extends DataFrameAggregateSuite with SparkQueryTestsBase {
Expand Down Expand Up @@ -75,4 +75,31 @@ class AuronDataFrameAggregateSuite extends DataFrameAggregateSuite with SparkQue
rand(Random.nextLong()),
randn(Random.nextLong())).foreach(assertNoExceptions)
}

testAuron("native last / last(ignoreNulls) aggregate") {
// The grouped aggregate is reliably offloaded to NativeAggBase, and the data
// is deterministic by construction (no intra-group ordering dependence):
// k=1 -> all values 10 => last=10, last(ignoreNulls)=10
// k=2 -> all values null => last=null, last(ignoreNulls)=null
// k=3 -> single row 30 => last=30, last(ignoreNulls)=30
val df = Seq[(Int, Option[Int])](
(1, Some(10)),
(1, Some(10)),
(2, None),
(2, None),
(3, Some(30)))
.toDF("k", "v")

val aggDF = df
.groupBy("k")
.agg(last($"v").as("last_v"), last($"v", ignoreNulls = true).as("last_v_ign"))

checkAnswer(aggDF, Seq(Row(1, 10, 10), Row(2, null, null), Row(3, 30, 30)))

// the aggregate must be offloaded to the native engine
assert(getExecutedPlan(aggDF).exists {
case _: NativeAggBase => true
case _ => false
})
}
}
2 changes: 2 additions & 0 deletions native-engine/auron-planner/proto/auron.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ enum AggFunction {
FIRST = 7;
FIRST_IGNORES_NULL = 8;
BLOOM_FILTER = 9;
LAST = 10;
LAST_IGNORES_NULL = 11;
BRICKHOUSE_COLLECT = 1000;
BRICKHOUSE_COMBINE_UNIQUE = 1001;
UDAF = 1002;
Expand Down
2 changes: 2 additions & 0 deletions native-engine/auron-planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ impl From<protobuf::AggFunction> for AggFunction {
protobuf::AggFunction::CollectSet => AggFunction::CollectSet,
protobuf::AggFunction::First => AggFunction::First,
protobuf::AggFunction::FirstIgnoresNull => AggFunction::FirstIgnoresNull,
protobuf::AggFunction::Last => AggFunction::Last,
protobuf::AggFunction::LastIgnoresNull => AggFunction::LastIgnoresNull,
protobuf::AggFunction::BloomFilter => AggFunction::BloomFilter,
protobuf::AggFunction::BrickhouseCollect => AggFunction::BrickhouseCollect,
protobuf::AggFunction::BrickhouseCombineUnique => AggFunction::BrickhouseCombineUnique,
Expand Down
6 changes: 6 additions & 0 deletions native-engine/auron-planner/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,12 @@ impl PhysicalPlanner {
protobuf::AggFunction::FirstIgnoresNull => {
WindowFunction::Agg(AggFunction::FirstIgnoresNull)
}
protobuf::AggFunction::Last => {
WindowFunction::Agg(AggFunction::Last)
}
protobuf::AggFunction::LastIgnoresNull => {
WindowFunction::Agg(AggFunction::LastIgnoresNull)
}
protobuf::AggFunction::BloomFilter => {
WindowFunction::Agg(AggFunction::BloomFilter)
}
Expand Down
10 changes: 10 additions & 0 deletions native-engine/datafusion-ext-plans/src/agg/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::agg::{
count::AggCount,
first::AggFirst,
first_ignores_null::AggFirstIgnoresNull,
last::AggLast,
last_ignores_null::AggLastIgnoresNull,
maxmin::{AggMax, AggMin},
spark_udaf_wrapper::SparkUDAFWrapper,
sum::AggSum,
Expand Down Expand Up @@ -212,6 +214,14 @@ pub fn create_agg(
let dt = children[0].data_type(input_schema)?;
Arc::new(AggFirstIgnoresNull::try_new(children[0].clone(), dt)?)
}
AggFunction::Last => {
let dt = children[0].data_type(input_schema)?;
Arc::new(AggLast::try_new(children[0].clone(), dt)?)
}
AggFunction::LastIgnoresNull => {
let dt = children[0].data_type(input_schema)?;
Arc::new(AggLastIgnoresNull::try_new(children[0].clone(), dt)?)
}
AggFunction::BloomFilter => {
let dt = children[0].data_type(input_schema)?;
let empty_batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
Expand Down
Loading
Loading