Skip to content

fix: add assert to HashJoinExec::swap_inputs#23078

Open
haohuaijin wants to merge 4 commits into
apache:mainfrom
haohuaijin:drop-dynamic-filter-when-swap-inputs
Open

fix: add assert to HashJoinExec::swap_inputs#23078
haohuaijin wants to merge 4 commits into
apache:mainfrom
haohuaijin:drop-dynamic-filter-when-swap-inputs

Conversation

@haohuaijin

@haohuaijin haohuaijin commented Jun 22, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

Dynamic filters are runtime state tied to the probe side. swap_inputs changes the probe side, so preserving the old filter is unsafe and can lead to wrong column references.

What changes are included in this PR?

remove the dynamic_filter while swap inputs in HashJoinExec:

.with_dynamic_filter(None)

Are these changes tested?

yes, add one test case

Are there any user-facing changes?

no

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 22, 2026
@github-actions github-actions Bot added the proto Related to proto crate label Jun 22, 2026
}

#[test]
fn test_swap_inputs_clears_dynamic_filter() -> Result<()> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an end-to-end test that explains why this behavior is desired — that swap_inputs() should drop the dynamic filter? It’s a bit hard to infer the test goal from this unit test alone.

It should be some SQL queries that used to have bug in results, but enforcing this fix make them correct.

@haohuaijin haohuaijin Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i try it, but not easy to reproduce with the sql. this bug is trigger when ser/de the physical plan fc93043 (you can check this test case for more detail). i will give more try to reproduce this by sql.

@2010YOUY01 2010YOUY01 Jun 23, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering: are you using swap_inputs() directly downstream? 🤔

I think the root cause is that swap_inputs() has not been clearly specified. It implicitly assumes a bunch of preconditions, so it is only mostly safe to use internally when we strictly follow the optimizer rule order.

However, it is now a public API, so downstream usages may trigger subtle bugs if users are unaware of those preconditions.

See the optimizer rule order below. Internally, swap_inputs() is only called inside that optimizer rule, so we can ensure it is called only when dynamic_filter is None:

Arc::new(JoinSelection::new()),

I tried adding assert!(self.dynamic_filter.is_none()) at the beginning of HashJoinExec::swap_inputs(), and the internal tests still pass.

So I suggest we discuss separately what extra contract you need for downstream usage, and then extend swap_inputs() accordingly. (and also add the above assertions to prevent similar issues)

@haohuaijin haohuaijin Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we directly call the swap_inputs() after the dynamic filter pushdown rule(by add a new physical optmizer rule via SessionStateBuilder::with_physical_optimizer_rule).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a separate issue to discuss the broader public API contract of HashJoinExec::swap_inputs() for downstream optimizer usage: #23105

For this PR, should we keep it scoped to making swap_inputs() drop the stale dynamic filter and documenting that behavior, or would you prefer this PR only add an assertion/precondition for now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now we should only assert that the dynamic filter is None when calling swap_inputs().

If your use case can follow the existing restriction, it should be both simpler and safer: it is very natural to first decide the join order and then construct dynamic filters, rather than build some dynamic filters -> swap the join order -> build the remaining dynamic filters.

This likely means ensuring extension physical optimizer rules follow the same restriction: dynamic filter construction rules should be placed after rules that swap join input orders.

If there are cases that cannot be solved this way, we can discuss them separately.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply suggestion in 1d85fc7

follow the existing restriction will solve the issue, but i must copy the below code and insert my optimizer rule into it😂.

pub fn new() -> Self {
// NOTEs:
// - The order of rules in this list is important, as it determines the
// order in which they are applied.
// - Adding a new rule here is expensive as it will be applied to all
// queries, and will likely increase the optimization time. Please extend
// existing rules when possible, rather than adding a new rule.
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
// If there is a output requirement of the query, make sure that
// this information is not lost across different rules during optimization.
Arc::new(OutputRequirements::new_add_mode()),
Arc::new(AggregateStatistics::new()),
// Statistics-based join selection will change the Auto mode to a real join implementation,
// like collect left, or hash join, or future sort merge join, which will influence the
// EnsureRequirements rule as it decides whether to add additional repartitioning and
// local sorting steps to meet distribution and ordering requirements. Therefore, it
// should run before EnsureRequirements.
Arc::new(JoinSelection::new()),
// The LimitedDistinctAggregation rule should be applied before EnsureRequirements,
// as that rule may inject other operations in between the different AggregateExecs.
// Applying the rule early means only directly-connected AggregateExecs must be examined.
Arc::new(LimitedDistinctAggregation::new()),
// The FilterPushdown rule tries to push down filters as far as it can.
// For example, it will push down filtering from a `FilterExec` to `DataSourceExec`.
// Note that this does not push down dynamic filters (such as those created by a `SortExec` operator in TopK mode),
// those are handled by the later `FilterPushdown` rule.
// See `FilterPushdownPhase` for more details.
Arc::new(FilterPushdown::new()),
// Ensures each input plan satisfies the distribution and ordering
// requirements declared by `ExecutionPlan::required_input_distribution`
// and `ExecutionPlan::required_input_ordering`.
//
// If the requirements are already satisfied, this rule leaves the plan
// unchanged. For example, it does not add sorting when the input is a
// file scan whose existing order already satisfies the required ordering.
// Otherwise, this rule inserts the necessary repartitioning and sorting
// operators.
//
// This used to be implemented as two separate rules: `EnforceDistribution`
// and `EnforceSorting`. It is now a single idempotent rule that decides
// distribution and sorting together in one bottom-up pass, so the
// `pushdown_sorts` step no longer breaks distribution invariants set
// earlier in the pipeline. See the module-level doc on
// [`EnsureRequirements`](crate::ensure_requirements) for the per-phase
// breakdown, and <https://github.com/apache/datafusion/issues/21973>
// for the original failure mode.
Arc::new(EnsureRequirements::new()),
// The CombinePartialFinalAggregate rule should be applied after distribution enforcement
Arc::new(CombinePartialFinalAggregate::new()),
// Run once after the local sorting requirement is changed
Arc::new(OptimizeAggregateOrder::new()),
// WindowTopN: replaces Filter(rn<=K) → Window(ROW_NUMBER) → Sort
// with Window(ROW_NUMBER) → PartitionedTopKExec(fetch=K).
// Must run after EnsureRequirements (which inserts SortExec) and before
// ProjectionPushdown (which embeds projections into FilterExec).
Arc::new(WindowTopN::new()),
// TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future.
Arc::new(ProjectionPushdown::new()),
// Remove the ancillary output requirement operator since we are done with the planning
// phase.
Arc::new(OutputRequirements::new_remove_mode()),
// The aggregation limiter will try to find situations where the accumulator count
// is not tied to the cardinality, i.e. when the output of the aggregation is passed
// into an `order by max(x) limit y`. In this case it will copy the limit value down
// to the aggregation, allowing it to use only y number of accumulators.
Arc::new(TopKAggregation::new()),
// Tries to push limits down through window functions, growing as appropriate
// This can possibly be combined with [LimitPushdown]
// It needs to come after [EnsureRequirements] (which handles sort enforcement)
Arc::new(LimitPushPastWindows::new()),
// The HashJoinBuffering rule adds a BufferExec node with the configured capacity
// in the prob side of hash joins. That way, the probe side gets eagerly polled before
// the build side is completely finished.
Arc::new(HashJoinBuffering::new()),
// The LimitPushdown rule tries to push limits down as far as possible,
// replacing operators with fetching variants, or adding limits
// past operators that support limit pushdown.
Arc::new(LimitPushdown::new()),
// TopKRepartition pushes TopK (Sort with fetch) below Hash
// repartition when the partition key is a prefix of the sort key.
// This reduces data volume before a hash shuffle. It must run
// after LimitPushdown so that the TopK already exists on the SortExec.
Arc::new(TopKRepartition::new()),
// The ProjectionPushdown rule tries to push projections towards
// the sources in the execution plan. As a result of this process,
// a projection can disappear if it reaches the source providers, and
// sequential projections can merge into one. Even if these two cases
// are not present, the load of executors such as join or union will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
// PushdownSort: Detect sorts that can be pushed down to data sources.
Arc::new(PushdownSort::new()),
Arc::new(EnsureCooperative::new()),
// This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan.
// Therefore, it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references.
// See `FilterPushdownPhase` for more details.
Arc::new(FilterPushdown::new_post_optimization()),
// The SanityCheckPlan rule checks whether the order and
// distribution requirements of each node in the plan
// is satisfied. It will also reject non-runnable query
// plans that use pipeline-breaking operators on infinite
// input(s). The rule generates a diagnostic error
// message for invalid plans. It makes no changes to the
// given query plan; i.e. it only acts as a final
// gatekeeping rule.
Arc::new(SanityCheckPlan::new()),
];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree, adding custom rule is quite tricky.

There are lots of implicit assumptions like 'rule A must be run after rule B', so if your extension rule might require certain properties, it's a good idea to add more tests and assertion guards to DataFusion core.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are lots of implicit assumptions like 'rule A must be run after rule B', so if your extension rule might require certain properties, it's a good idea to add more tests and assertion guards to DataFusion core.

Good idea

@github-actions github-actions Bot removed the proto Related to proto crate label Jun 23, 2026
@haohuaijin haohuaijin changed the title fix: drop dynamic filter in hash-join when swap inputs fix: add assert to HashJoinExec::swap_inputs Jun 23, 2026

@2010YOUY01 2010YOUY01 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Should be good to go after CI passes.

@haohuaijin

Copy link
Copy Markdown
Contributor Author

Thanks for your review @2010YOUY01 @Dandandan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

HashJoinExec::swap_inputs preserves old dynamic filter after swap

3 participants