perf: hash join probe loop has iterator overhead and per-row Growable extends#7098
perf: hash join probe loop has iterator overhead and per-row Growable extends#7098QLiangong wants to merge 2 commits into
Conversation
Greptile SummaryThis PR optimizes
Confidence Score: 1/5Not safe to merge: the batch code path produces silently incorrect join output whenever matches span more than one build-side RecordBatch. The new batch path decouples the build-side row order (grouped by RecordBatch index) from the probe-side row order (original match order), causing row misalignment in the most common multi-batch build scenarios. The join would return wrong data without any error. src/daft-local-execution/src/join/inner_join.rs — specifically the batch path from line 88 onward. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[probe_inner called] --> B[Collect all matches into flat Vec]
B --> C{matches_len >= 1024 AND fanout >= 3.0?}
C -- YES: batch path --> D[Split into probe_indices and per_table_indices by rb_idx]
D --> E[take each build table by its bucket indices]
E --> F[concat build tables in rb_idx order]
F --> G[take probe table by probe_indices in match order]
G --> H[MISALIGNED rows]
H --> I[build_final_table]
C -- NO: growable path --> J[GrowableRecordBatch.extend per match]
J --> K[build_side_growable.build]
K --> L[take probe table by probe_side_idxs]
L --> M[Rows correctly aligned]
M --> I
I --> N[MicroPartition output]
Reviews (1): Last reviewed commit: "probe_inner" | Re-trigger Greptile |
| let (left_table, right_table) = if use_batch { | ||
| let build_tables = probe_state.get_record_batches(); | ||
| let mut probe_indices = Vec::with_capacity(matches_len); | ||
| let mut per_table_indices = vec![Vec::new(); build_tables.len()]; | ||
|
|
||
| for (probe_idx, rb_idx, row_idx) in matches { | ||
| probe_indices.push(probe_idx as u64); | ||
| per_table_indices[rb_idx as usize].push(row_idx); | ||
| } | ||
|
|
||
| let (left_table, right_table) = if params.build_on_left { | ||
| (build_side_table, probe_side_table) | ||
| let mut build_side_tables = Vec::new(); | ||
| for (rb_idx, indices) in per_table_indices.into_iter().enumerate() { | ||
| if !indices.is_empty() { | ||
| let idx_arr = UInt64Array::from_vec("", indices); | ||
| let taken = build_tables[rb_idx].take(&idx_arr)?; | ||
| build_side_tables.push(taken); | ||
| } | ||
| } | ||
|
|
||
| let build_side_table = if build_side_tables.is_empty() { | ||
| daft_recordbatch::RecordBatch::empty(Some(params.output_schema.clone())) | ||
| } else if build_side_tables.len() == 1 { | ||
| build_side_tables.pop().unwrap() | ||
| } else { | ||
| daft_recordbatch::RecordBatch::concat(&build_side_tables)? | ||
| }; |
There was a problem hiding this comment.
Build/probe row alignment broken for multi-table build sides
In the batch path, probe_indices is filled in the original match order (interleaved across rb_idx values), but the build-side rows are concatenated in table-index order (rb_idx=0 first, then rb_idx=1, etc.). When the build side spans more than one RecordBatch, the two sides no longer correspond row-for-row and the join emits silently wrong results.
Concrete failure: suppose matches = [(probe=5, rb=1, row=10), (probe=3, rb=0, row=7), (probe=8, rb=1, row=2)]. probe_indices = [5, 3, 8]; the concatenated build side is [table[0][7], table[1][10], table[1][2]]. The final join pairs probe[5] with table[0][7] instead of table[1][10], and probe[3] with table[1][10] instead of table[0][7].
The fix is to sort matches by rb_idx before splitting into probe_indices and per_table_indices, so that probe_indices is accumulated in the same grouped order that the build-side concat produces.
| let build_side_table = if build_side_tables.is_empty() { | ||
| daft_recordbatch::RecordBatch::empty(Some(params.output_schema.clone())) |
There was a problem hiding this comment.
Wrong schema used for the empty build-side fallback
RecordBatch::empty(Some(params.output_schema.clone())) creates a table whose schema is the full output schema, but this table is used as the build-side fragment before build_final_table tries to extract common_join_keys and left_non_join_columns from it. The empty table should be created with the build-side schema: left_schema when params.build_on_left is true, right_schema otherwise.
| let mut matches: Vec<(u32, u64, u64)> = Vec::with_capacity(input_table.len()); | ||
| for (probe_idx, inner_iter) in idx_iter.enumerate() { | ||
| if let Some(inner_iter) = inner_iter { | ||
| for (build_rb_idx, build_row_idx) in inner_iter { | ||
| build_side_growable.extend( | ||
| build_rb_idx as usize, | ||
| build_row_idx as usize, | ||
| 1, | ||
| ); | ||
| probe_side_idxs.push(probe_row_idx as u64); | ||
| for (rb_idx, row_idx) in inner_iter { | ||
| matches.push((probe_idx as u32, rb_idx as u64, row_idx)); |
There was a problem hiding this comment.
probe_idx silently truncates on large input tables
idx_iter.enumerate() yields a usize counter cast to u32 before being stored in matches. On a 64-bit platform, any input table with more than u32::MAX (~4 billion) rows will silently produce wrong probe-side indices. Storing probe_idx as u64 would be consistent with row_idx and avoids the overflow.
srilman
left a comment
There was a problem hiding this comment.
Overall, this does make sense to me but I have a couple of thoughts:
- Do you happen to have any performance numbers? Would be helpful to see how much of a benefit
- In addition, do you happen to have any memory profiles, even a rough measurement of RSS usage over the timeline of a query? I believe we went with the growable approach because it is less likely to make large memory allocations which could cause OOMs
- Is there much benefit to maintaining the growable path for cases where there are only 1024 matches? that seems so small that the performance difference would be negligible
cc @colin-ho on this, I believe we tried something similar before and noticed that there were a lot of OOMs. i think this approach is better but do you see anything potentially problematic
| // build_rb_idx for the take-based path. The probe-side hot loop | ||
| // is a tight `Vec::push`, which the compiler lowers to the same | ||
| // code as a `flat_map(...).collect()` form. | ||
| let mut matches: Vec<(u32, u64, u64)> = Vec::with_capacity(input_table.len()); |
There was a problem hiding this comment.
Rather than putting everything into matches and then splitting by build_rb_idx, why not just do the fanout here? That way we can cut the memory usage by half
Changes Made
This PR introduces a fan-out-aware vectorized
RecordBatch::takepathin
probe_inner(src/daft-local-execution/src/join/inner_join.rs)to replace the per-row
GrowableRecordBatch::extendvirtual call whenfan-out is high enough to make it pay off. Only
inner_join.rsismodified; no public API changes.
Implementation
Materialize matches up front.
probe_innernow walksprobe_indices()once and collects all matches into a localVec<(probe_idx, build_rb_idx, build_row_idx)>, preallocated withVec::with_capacity(input_table.len()). This gives us the totalmatch count for capacity hints and lets the take path bucket the
matches by build table.
Fan-out heuristic. Two module-level constants gate the new path:
TAKE_BATCH_MIN_MATCHES = 1024— minimum total matches beforethe take path is considered; below this, setup cost dominates.
TAKE_BATCH_MIN_FANOUT = 3.0— minimummatches.len() / probe_rowsratio before the take path ispreferred; below this,
take + concatoverhead exceeds theper-row
extendcost.Both are placeholder values pending a Rust-level micro-benchmark
over
(probe_rows, fan_out, num_build_partitions), modeled onsrc/daft-recordbatch/src/ops/bench_agg.rs.Take-based build path (new, used when both thresholds met):
build_rb_idxinto oneVec<u64>per buildRecordBatch.UInt64Arrayand callRecordBatch::take.RecordBatch::concatthe per-table results into the finalbuild side.
RecordBatch::takeoverthe collected
probe_idxcolumn.This replaces N virtual
GrowableRecordBatch::extend(_, _, 1)calls with one vectorized arrow kernel call per build table.
Growable build path (preserved, used when below thresholds):
same algorithm as the original
probe_inner, but now consumes thematerialized match vector and preallocates the
GrowableRecordBatchand
probe_side_idxswithmatches_len.max(DEFAULT_GROWABLE_SIZE)capacity instead of the previous fixed
DEFAULT_GROWABLE_SIZE = 20.This removes a few reallocations on join-heavy queries even when
the take path is not selected.
build_final_tablehelper. Both paths produce(left_table, right_table)and share the same column-rearrangementstep (compute
common_join_keys/left_non_join_columns/right_non_join_columns, runget_columns_by_name, thenunion).Extracting this into a helper removes ~25 lines of duplication
between the two paths.
Related Issues
Closes #7076