Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions arrow-flight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
# Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389
arrow-cast = { workspace = true }
arrow-data = { workspace = true, optional = true }
arrow-data = { workspace = true }
Comment thread
Rich-T-kid marked this conversation as resolved.
arrow-ipc = { workspace = true }
arrow-ord = { workspace = true, optional = true }
arrow-row = { workspace = true, optional = true }
Expand Down Expand Up @@ -62,7 +62,7 @@ all-features = true

[features]
default = []
flight-sql = ["dep:arrow-arith", "dep:arrow-data", "dep:arrow-ord", "dep:arrow-row", "dep:arrow-select", "dep:arrow-string", "dep:once_cell", "dep:paste"]
flight-sql = ["dep:arrow-arith", "dep:arrow-ord", "dep:arrow-row", "dep:arrow-select", "dep:arrow-string", "dep:once_cell", "dep:paste"]
# TODO: Remove in the next release
flight-sql-experimental = ["flight-sql"]
tls-aws-lc= ["tonic/tls-aws-lc"]
Expand Down
14 changes: 14 additions & 0 deletions arrow-flight/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use crate::{FlightData, trailers::LazyTrailers, utils::flight_data_to_arrow_batch};
use arrow_array::{ArrayRef, RecordBatch};
use arrow_buffer::Buffer;
use arrow_data::UnsafeFlag;
//use arrow_ipc::reader;
Comment thread
Rich-T-kid marked this conversation as resolved.
Outdated
use arrow_schema::{Schema, SchemaRef};
use bytes::Bytes;
use futures::{Stream, StreamExt, ready, stream::BoxStream};
Expand Down Expand Up @@ -228,6 +230,8 @@ pub struct FlightDataDecoder {
state: Option<FlightStreamState>,
/// Seen the end of the inner stream?
done: bool,
/// Skip validation of decoded arrays (UTF-8, offset bounds, null counts).
skip_validation: UnsafeFlag,
}

impl Debug for FlightDataDecoder {
Expand All @@ -236,6 +240,7 @@ impl Debug for FlightDataDecoder {
.field("response", &"<stream>")
.field("state", &self.state)
.field("done", &self.done)
.field("skip_validation", &self.skip_validation)
.finish()
}
}
Expand All @@ -250,9 +255,17 @@ impl FlightDataDecoder {
state: None,
response: response.boxed(),
done: false,
skip_validation: UnsafeFlag::new(),
}
}

/// # Safety
Comment thread
Rich-T-kid marked this conversation as resolved.
/// Invalid data may cause undefined behavior. Only use for trusted senders.
pub unsafe fn with_skip_validation(mut self) -> Self {
unsafe { self.skip_validation.set(true) };
self
}

/// Returns the current schema for this stream
pub fn schema(&self) -> Option<&SchemaRef> {
self.state.as_ref().map(|state| &state.schema)
Expand Down Expand Up @@ -323,6 +336,7 @@ impl FlightDataDecoder {
&data,
Arc::clone(&state.schema),
&state.dictionaries_by_field,
self.skip_validation.clone(),
)
.map_err(|e| {
FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}"))
Expand Down
2 changes: 2 additions & 0 deletions arrow-flight/src/sql/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! A FlightSQL Client [`FlightSqlServiceClient`]
use arrow_buffer::Buffer;
use arrow_data::UnsafeFlag;
use arrow_ipc::MessageHeader;
use arrow_ipc::convert::fb_to_schema;
use arrow_ipc::reader::read_record_batch;
Expand Down Expand Up @@ -651,6 +652,7 @@ pub fn arrow_data_from_flight_data(
&dictionaries_by_field,
None,
&ipc_message.version(),
UnsafeFlag::new(),
)?;
Ok(ArrowFlightData::RecordBatch(record_batch))
}
Expand Down
17 changes: 15 additions & 2 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;

use arrow_array::{ArrayRef, RecordBatch};
use arrow_buffer::Buffer;
use arrow_data::UnsafeFlag;
use arrow_ipc::convert::fb_to_schema;
use arrow_ipc::writer::CompressionContext;
use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions};
Expand All @@ -45,7 +46,12 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result<Vec<RecordBa
let mut batches = vec![];
let dictionaries_by_id = HashMap::new();
for datum in flight_data[1..].iter() {
let batch = flight_data_to_arrow_batch(datum, schema.clone(), &dictionaries_by_id)?;
let batch = flight_data_to_arrow_batch(
datum,
schema.clone(),
&dictionaries_by_id,
UnsafeFlag::new(),
)?;
batches.push(batch);
}
Ok(batches)
Expand All @@ -56,6 +62,7 @@ pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
skip_validation: UnsafeFlag,
) -> Result<RecordBatch, ArrowError> {
// check that the data_header is a record batch message
let message = arrow_ipc::root_as_message(&data.data_header[..])
Expand All @@ -69,13 +76,19 @@ pub fn flight_data_to_arrow_batch(
)
})
.map(|batch| {
let buf = if data.data_body.as_ptr() as usize % 64 == 0 {
Buffer::from(data.data_body.clone())
} else {
Buffer::from(data.data_body.as_ref())
};
reader::read_record_batch(
&Buffer::from(data.data_body.as_ref()),
&buf,
batch,
schema,
dictionaries_by_id,
None,
&message.version(),
skip_validation,
)
})?
}
Expand Down
1 change: 1 addition & 0 deletions arrow-integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow = { path = "../arrow", default-features = false, features = ["test_utils", "ipc", "ipc_compression", "json", "ffi"] }
arrow-data = { workspace = true }
arrow-flight = { path = "../arrow-flight", default-features = false }
arrow-integration-test = { path = "../arrow-integration-test", default-features = false }
clap = { version = "4", default-features = false, features = ["std", "derive", "help", "error-context", "usage"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,13 @@ async fn consume_flight_location(
let metadata = counter.to_string().into_bytes();
assert_eq!(metadata, data.app_metadata);

let actual_batch =
flight_data_to_arrow_batch(&data, actual_schema.clone(), &dictionaries_by_id)
.expect("Unable to convert flight data to Arrow batch");
let actual_batch = flight_data_to_arrow_batch(
&data,
actual_schema.clone(),
&dictionaries_by_id,
arrow_data::UnsafeFlag::new(),
)
.expect("Unable to convert flight data to Arrow batch");

assert_eq!(actual_schema, actual_batch.schema());
assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use arrow::{
ipc::{self, reader, writer},
record_batch::RecordBatch,
};
use arrow_data::UnsafeFlag;
use arrow_flight::{
Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest, HandshakeResponse, IpcMessage, PollInfo, PutResult, SchemaAsIpc,
Expand Down Expand Up @@ -335,6 +336,7 @@ async fn record_batch_from_message(
dictionaries_by_id,
None,
&message.version(),
UnsafeFlag::new(),
);

arrow_batch_result
Expand Down
12 changes: 7 additions & 5 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl RecordBatchDecoder<'_> {
let null_buffer = self.next_buffer()?;

// read the arrays for each field
let mut struct_arrays = vec![];
let mut struct_arrays = Vec::with_capacity(struct_fields.len());
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
Expand Down Expand Up @@ -557,7 +557,7 @@ impl<'a> RecordBatchDecoder<'a> {

let schema = Arc::clone(&self.schema);
if let Some(projection) = self.projection {
let mut arrays = vec![];
let mut arrays = Vec::with_capacity(projection.len());
// project fields
for (idx, field) in schema.fields().iter().enumerate() {
// A projected field can appear more than once, so collect all matching positions.
Expand Down Expand Up @@ -597,7 +597,7 @@ impl<'a> RecordBatchDecoder<'a> {
RecordBatch::try_new_with_options(schema, columns, &options)
}
} else {
let mut children = vec![];
let mut children = Vec::with_capacity(schema.fields().len());
// keep track of index as lists require more than one node
for field in schema.fields() {
let child = self.create_array(field, &mut variadic_counts)?;
Expand Down Expand Up @@ -771,11 +771,13 @@ pub fn read_record_batch(
dictionaries_by_id: &HashMap<i64, ArrayRef>,
projection: Option<&[usize]>,
metadata: &MetadataVersion,
skip_validation: UnsafeFlag,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a public API it means we can't release this until the next major (breaking) API release

https://docs.rs/arrow-ipc/latest/arrow_ipc/reader/fn.read_record_batch.html

do we really need to make this change? Perhaps we should just direct people to use RecordBatchDecoder directly if they want more control over the decoding process rather than changing this signature.

However, not much of that strucutre seems to be public at the moment: https://docs.rs/arrow-ipc/latest/arrow_ipc/reader/struct.RecordBatchDecoder.html

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe we could (as a separate PR) deprecate read_record_batch and make all the corresponding methods on RecordBatchDecoder pub 🤔

Then this PR would simply add the with_disable_validation 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to make RecordBatchDecoder public. flightdata_to_record_batch/read_record_batch is a wrapper around the RecordBatchDecoder.read_record_batch() function.

to avoid making this a breaking PR change I can just call RecordBatchDecoder.read_record_batch() directly in flightDataDecoder::extract_message().

) -> Result<RecordBatch, ArrowError> {
RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
let decoder = RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can probably make this more concise by not adding a let decoder = ...

.with_projection(projection)
.with_require_alignment(false)
.read_record_batch()
.with_skip_validation(skip_validation);
decoder.read_record_batch()
}

/// Read the dictionary from the buffer and provided metadata,
Expand Down
Loading