[10125] arrow-flight decode path optimizations#10206
Conversation
e152490 to
080628d
Compare
|
benchmarks locally look very promising |
|
@Jefffrey (sorry for the ping) could you run the arrow-flight benchmarks command on this when you get a chance? |
9f81b71 to
65c9b89
Compare
65c9b89 to
316b8bd
Compare
|
run benchmark flight |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/arrow-flight-decode-opt-impl (316b8bd) to d2f1611 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
Nice, decode path shows a large performance boost. The more data is being passed around, the larger the performance gains. This is difficult because of the lack of control we have over the byte offset that is handed to us by tonic. looking at it seems that the math for checking if the buffer is aligned just comes down to From my understanding arrow-ipc adds padding within the IPC body to account for alignment. The issue occurs when the initial buffer returned by Tonic starts at a misaligned byte offset. For example, if the buffer starts at byte offset 253, it isn't aligned to a 32 or 64-byte boundary. Because Arrow IPC's internal padding is relative to the start of the buffer, a misaligned starting offset means all the sub-buffers inside it are also misaligned, the padding does nothing to fix this. The result is that every sub-buffer has to be copied into a fresh, correctly aligned allocation anyway. This would avoid N buffer copies in exchange for 1 large buffer copy at the start. for future readers who are wondering why alignment is important: |
| .map_err(|e| { | ||
| FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) | ||
| })?; | ||
| let data_buffer = if data.data_body.as_ptr() as usize % 64 != 0 { |
|
|
|
@gabotechs this PR is ready for review 🚀. |
gabotechs
left a comment
There was a problem hiding this comment.
Cool! just have on comment, but otherwise LGTM
| &Buffer::from(data.data_body.as_ref()), | ||
| &Buffer::from(data.data_body.clone()), |
There was a problem hiding this comment.
Nice! I this is a sneaky one, but indeed this is avoiding a full clone
| /// Only set for trusted senders, invalid data may cause undefined behavior. | ||
| /// Can improve performance by skipping validation | ||
| pub fn with_skip_validation(mut self, skip_validation: bool) -> Self { | ||
| self.skip_validation = skip_validation; | ||
| self | ||
| } |
There was a problem hiding this comment.
Rather than exposing this as a plan bool flag, I think we should be requiring an UnsafeFlag here.
By requiring an UnsafeFlag, we force consumers to explicitly have an unsafe block in their codebase, making sure they are aware that what they are doing is not safe, and that they are responsible for ensuring memory safety there.
There was a problem hiding this comment.
Makes sense to me! pushed up a revision
|
@alamb could you take a look when you get a chance 🚀 We may also want to run the benchmarks again to see how much the buffer re-alignment impacted performance. |
b13b3da to
6379781
Compare
6379781 to
a2c436b
Compare
|
run benchmarks flight |
|
run benchmarks ipc_reader |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/arrow-flight-decode-opt-impl (a2c436b) to d2f1611 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing rich-T-kid/arrow-flight-decode-opt-impl (a2c436b) to d2f1611 (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
🤔 The pipelines seem to still be showing this issue: |
|
that error has been present since the first commit. the first commit only pre-allocated vectors, propagated the flag down to arrow-ipc and changed |
bf08213 to
ef64f12
Compare
ef64f12 to
4053b0a
Compare
|
The alignment issue was coming from the |
|
With f2fcc0c, buffers are only copied when they're actually misaligned. Out of ~18 million total calls during benchmarking, only ~46k required reallocation, meaning 99.75% of the time we skip the copy entirely, while still correctly handling the cases where alignment is off. |
|
This should be good to go ( for real this time 😅). we may want to run the benchmarks again |
alamb
left a comment
There was a problem hiding this comment.
Thanks @Rich-T-kid -- the code looks good to me (I had only some minor nits)
The biggest comment is baout the breaking API change -- I had a suggestion. let me know what you think
| skip_validation: UnsafeFlag, | ||
| ) -> Result<RecordBatch, ArrowError> { | ||
| RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? | ||
| let decoder = RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? |
There was a problem hiding this comment.
nit: we can probably make this more concise by not adding a let decoder = ...
| dictionaries_by_id: &HashMap<i64, ArrayRef>, | ||
| projection: Option<&[usize]>, | ||
| metadata: &MetadataVersion, | ||
| skip_validation: UnsafeFlag, |
There was a problem hiding this comment.
Since this is a public API it means we can't release this until the next major (breaking) API release
https://docs.rs/arrow-ipc/latest/arrow_ipc/reader/fn.read_record_batch.html
do we really need to make this change? Perhaps we should just direct people to use RecordBatchDecoder directly if they want more control over the decoding process rather than changing this signature.
However, not much of that strucutre seems to be public at the moment: https://docs.rs/arrow-ipc/latest/arrow_ipc/reader/struct.RecordBatchDecoder.html
There was a problem hiding this comment.
So maybe we could (as a separate PR) deprecate read_record_batch and make all the corresponding methods on RecordBatchDecoder pub 🤔
Then this PR would simply add the with_disable_validation 🤔
There was a problem hiding this comment.
I think it makes sense to make RecordBatchDecoder public. flightdata_to_record_batch/read_record_batch is a wrapper around the RecordBatchDecoder.read_record_batch() function.
to avoid making this a breaking PR change I can just call RecordBatchDecoder.read_record_batch() directly in flightDataDecoder::extract_message().
76cc53b to
2d22728
Compare
|
@alamb pushed up a revision. |

Which issue does this PR close?
Rationale for this change
see #10137 & #10125
What changes are included in this PR?
from_slice_ref()which would allocate another buffer and then copy bytes into it. instead replaced with.clone()which just is O(1)Are these changes tested?
N/A
Are there any user-facing changes?
Users now have the option to skip data validation