From 3335d71d2c936ca4be406c08aadc646b354449ca Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Mon, 22 Jun 2026 18:21:19 +0800 Subject: [PATCH 1/2] fix: lock spills before new spiller --- .../datafusion-ext-plans/src/shuffle/sort_repartitioner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index b76ed6314..621eda92d 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -98,6 +98,7 @@ impl MemConsumer for SortShuffleRepartitioner { async fn spill(&self) -> Result<()> { let data = self.data.lock().await.drain(); let spill_metrics = self.exec_ctx.spill_metrics().clone(); + let mut spills = self.spills.lock().await; let spill = tokio::task::spawn_blocking(move || { let mut spill = try_new_spill(&spill_metrics)?; let offsets = data.write(spill.get_buf_writer())?; @@ -105,8 +106,7 @@ impl MemConsumer for SortShuffleRepartitioner { }) .await .expect("tokio spawn_blocking error")?; - - self.spills.lock().await.push(spill); + spills.push(spill); self.update_mem_used(0).await?; Ok(()) } From 90f5392a4bafb5ddff54978c74d9446526e0d406 Mon Sep 17 00:00:00 2001 From: Zhen Wang <643348094@qq.com> Date: Tue, 23 Jun 2026 14:41:01 +0800 Subject: [PATCH 2/2] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../datafusion-ext-plans/src/shuffle/sort_repartitioner.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs index 621eda92d..c32c91e0a 100644 --- a/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs +++ b/native-engine/datafusion-ext-plans/src/shuffle/sort_repartitioner.rs @@ -107,6 +107,7 @@ impl MemConsumer for SortShuffleRepartitioner { .await .expect("tokio spawn_blocking error")?; spills.push(spill); + drop(spills); self.update_mem_used(0).await?; Ok(()) }