From 080628dc76f8087fbec6657a1feea7e0c588d597 Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 23 Jun 2026 15:33:48 -0400 Subject: [PATCH 01/10] allow arrow-flight users to skip validation in arrow-ipc decoding --- arrow-flight/src/decode.rs | 12 +++++++++++ arrow-flight/src/sql/client.rs | 1 + arrow-flight/src/utils.rs | 6 ++++-- .../integration_test.rs | 2 +- .../integration_test.rs | 1 + arrow-ipc/profile.json.gz | Bin 0 -> 17355 bytes arrow-ipc/src/reader.rs | 19 ++++++++++++------ 7 files changed, 32 insertions(+), 9 deletions(-) create mode 100644 arrow-ipc/profile.json.gz diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index 8c518ac9d454..93f66af9baba 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -228,6 +228,8 @@ pub struct FlightDataDecoder { state: Option, /// Seen the end of the inner stream? done: bool, + /// Skip validation of decoded arrays (UTF-8, offset bounds, null counts). + skip_validation: bool, } impl Debug for FlightDataDecoder { @@ -236,6 +238,7 @@ impl Debug for FlightDataDecoder { .field("response", &"") .field("state", &self.state) .field("done", &self.done) + .field("skip_validation", &self.skip_validation) .finish() } } @@ -250,9 +253,17 @@ impl FlightDataDecoder { state: None, response: response.boxed(), done: false, + skip_validation: false, } } + /// Only set for trusted senders, invalid data may cause undefined behavior. + /// Can improve performance by skipping validation + pub fn with_skip_validation(mut self, skip_validation: bool) -> Self { + self.skip_validation = skip_validation; + self + } + /// Returns the current schema for this stream pub fn schema(&self) -> Option<&SchemaRef> { self.state.as_ref().map(|state| &state.schema) @@ -323,6 +334,7 @@ impl FlightDataDecoder { &data, Arc::clone(&state.schema), &state.dictionaries_by_field, + self.skip_validation, ) .map_err(|e| { FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs index 5476d4ede9a4..951b3457fd22 100644 --- a/arrow-flight/src/sql/client.rs +++ b/arrow-flight/src/sql/client.rs @@ -651,6 +651,7 @@ pub fn arrow_data_from_flight_data( &dictionaries_by_field, None, &ipc_message.version(), + false, )?; Ok(ArrowFlightData::RecordBatch(record_batch)) } diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 6effb5f86aaf..e0852ae51638 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -45,7 +45,7 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result, + skip_validation: bool, ) -> Result { // check that the data_header is a record batch message let message = arrow_ipc::root_as_message(&data.data_header[..]) @@ -70,12 +71,13 @@ pub fn flight_data_to_arrow_batch( }) .map(|batch| { reader::read_record_batch( - &Buffer::from(data.data_body.as_ref()), + &Buffer::from(data.data_body.clone()), batch, schema, dictionaries_by_id, None, &message.version(), + skip_validation, ) })? } diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index 05ca5627ecd8..d35b28cfa5e5 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -250,7 +250,7 @@ async fn consume_flight_location( assert_eq!(metadata, data.app_metadata); let actual_batch = - flight_data_to_arrow_batch(&data, actual_schema.clone(), &dictionaries_by_id) + flight_data_to_arrow_batch(&data, actual_schema.clone(), &dictionaries_by_id, false) .expect("Unable to convert flight data to Arrow batch"); assert_eq!(actual_schema, actual_batch.schema()); diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index ae316886381a..8769b50e77ce 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -335,6 +335,7 @@ async fn record_batch_from_message( dictionaries_by_id, None, &message.version(), + false, ); arrow_batch_result diff --git a/arrow-ipc/profile.json.gz b/arrow-ipc/profile.json.gz new file mode 100644 index 0000000000000000000000000000000000000000..1a68528f9054bd41a341d41c426f4eebce3bfc09 GIT binary patch literal 17355 zcmaI-bC4&&(gq5TZQJ&5Y}?-Pj&0jJHg?P%+qP}nwmo;wIqw(o-G6R(baqu#WyMp~ z+0~tuPZ35!K>_{e__lYnGqtcbVX$>N!#Av2gHdk)yfV|f@2A@jA_tw;$v~}f>l4_!h_2OiXRP%_AiVFKV zUyd&(?UyI@KAe`^-1?t6%ExzIpZ3=MrmEUG`Y#4A7~QHv7o$u+Cl@CzpA4TqEnhsu z>(|}xEAJ{d+`jW&CcC$7)jm90+S*=EYsG0RrA?NGk8Ryzj zc3YbWe&5#u(|X?T_>&Bn``6{WZ}08P^_T%`{@tHfzggQmx^UG6Tz}PLdZU;>xw3WW zZa(y1Ue>&vpP71iuzd*h!@y7Lwd{L+zBK1-BQzE^muC#Nf7w16F1eLo+nKVqw_a{{ zWpt)?p1yT^-xH!JTm%D>uO`>acdwUI++2eDWBVUBSM3)+qwj00{e_wxBRx{FCZ*NR}iPf*S+qyhAzlErnj%wy(&L`rL&tE(pbKGZ?%LsbahD@wq zZTyLEPSO`9$2GxKh7{NZ;J*(0=Uy!Fuy} z>9@J@dH9x*r~O!+!f|a@Ignxd*kAKj3UufMbj)#o%{%V7c;)n9GhFWzj<0v(qazZp##O%F*-xx8HRF!WBxBj;19r+yCQ zI9T{GPXZbSh7dFK>{&RcpHXos=a>yyh47S@;h6+J@qAxAtMl8e#Gt&d^7>qqqQ77K zxO*!veK=ZZ+LCkf(qqw9@l9T(@8lKWnJyzKE-tQSzSnfdAM9H}4yKzZ>4F+o;Y%=vCkiX>ZTsRPG$b29FsLQ-v9Ve@f zfZVTK}Fn@+h`Ykqqzk5qd$eK|FK z?6-NZQ@b|i^7$%5j#;mke@4$_Ls^RR`$nRXr7Zch++Ts<##H)yKaZmJ&z-B;$PCd7Rh(?kiRS)V+O+g=Y)Is>U!=7cm2JN65JXQHen3(E zCJ3O$c3Jef!FYwDh_;y34-c#u2*=08)$)9n2(B1S{&%Xols^b@h0Uh<7zoCF@xxMI?q6gn>gxUgCE`^+ zVd8xHwFVMZFX2~u*cW;35nkMPy7wV0ZHkyJ-^{dI+BXe4ark(5^s2u~(@99xE|v{8 zs-8^mFS!$0pQ?AdDz=bxFQd*CeYkj9nwfp3xdt-w(;>*X=6`r!-l@D5HH36tGJMc! zp^Z)cPTBH_A#wfXib>OrnOVs*=#P{*sy{`ac8$H2#OZZ0nOcrbHupF*#nxvRfU zq(*p48o;r*_)vX4sxUFl7?Y>*Qb{mRNhr7?kHwmR{;PI$spi()q1pKg)Zn3EBw+u9 z>4n0}v)BaWtp4zkEY(tT;#pSwXMfeAWOx-`Q4IC(c*HiMoKU z-g4sM;vox<576y#bc1^O&bMZ0dt>k+YgTt%U$voksoAlf{4|RdU2)azp4x^f`j4g6 z1ZP0@!X!o4&lFE#K^~M&%Dvciz8ChiudZS|faGN5)MsYpWOdK|^3+Sf8b2Lu%`96T zha_1>{X-Z);7qKx@HGsHH2I#N0kpQffWdGzw#TDu*Gn{XpJGOX=R}?UuZtJ`{ugF7 zYk@nCo(*ci&wJ?KyR_>$=fhjX*Lx@67VB3Rb7;h~4w7@If_S_-)>?ko+9CUTp=#to zjwvO*derrU5~ynuKFsC&)X^gp^#q|+r!d#dB4`hh0^6}TnkM9S{Vn`+P<^j+N4@E!5 zwF~w-HeIgi?CHVRJ(9CkiwRd`Sf>Rfa!vuHiEOHm73c2P-D82@zTa=QK@NNxUENg- zeiv6Ea=Lou^>*+I-RWKO8UTySE1n-CWn5mz0Gej3>)<`MCeSO>%S`zN$QdioT)hz9 zfATmf?TZCQS~$uuc-%J;*zkpi9d&VgB<|Vxg9w}8&q*|LS2q0Tvw%&XW7kFpby9D zwfOw6M&OUijNkAz;_@L9Q?m88wHolvR6{td=!VIPejcOY1@3X8W2R?hgJiT)NeQV1 zl9KXgT;SxB;q?iW(?X+0wS5PgB9aA)M0wm3K?5_~#lo|^`3@hK2~_PA#TVJA7F2^- zq8h>^@GjEvf^*>^;jHPwLj0RM>q@bj+eab253ow{mqerD?(i3X_WR3(f zTPQImMc5OY0QQntSWq#MO$^Np8h+;+l#r9V;XtA;B&CWB_|4j*z|lzgs7MbSD=XVn zCUN^g#F5*>yqGj1R%HG&bln%`qV(stl2;WaQ-Y4kV=@DxH#uIDWR)>uGoToMq&-do zE=j^?VW!JPlLN$y?OG4u&{5oBaZySW(8n+Mw(R(kMIf!y0|n7KJgVN1qtp}Yw!e1r#;qan zncQf(1iUFQpgvV;D~7LDf&QG}jey(XQjqw=xNBo;wv2Af70D6ewf?9p#fxjs@jU4!u(NH#p1a zlP7$)8Acs&xRdvlC8k7BKYTR56KFUZKr;%0foy0t27K+~T}^CNu?gU@Fz7WjuT@0z zYvTXO;X@J4A628_~4k$$jO zRWnqF*6-5eL3Y7Af4tHmy~I#FXXo@5oV?s?65VVT{X--A*2H+f&s|8_9Cu^Y?rOiH z9Z3sLipnG*9|NNh(H9l27!c2WjORo)+Y{aqlO+LL9g@r>L&0*Rpe_g?@`a-2LIVx=MfuNebB3&>FtuY?ul4M1=28)9URlrK>N-$P_8g?yLtyrLU&)@J$|9!y= zQdeSEC~gfc`+UiZ@D-K-HwU-@ zAR{2yT&N;r(}Kt zVE5T7>~dT}bP4sBi+_FJIGw-EyzLCM`RMo@RCw~1_;v-ILEtxW+Rg5$ z|Ey$zXYYW}N5q?$sz$wm(CrAbj%3&X&jNG>bm7t{xE!b6>TVxVn;-#7Y>8MOGdtvb z5GxzjkJabHjn(VDt$mOhOy2jIxWlrg0)xkx~@ znp<8^Ju}OsxuG+6B0CxUrMkSenrcaRI+_TnwjO{{imf6~Er6WGOLka!0EbUD)WN!;gIk>wV8=6@*zAm*xlD89G z2q)2js^f8|vuz)uy(JZPb6kxGh`BVk{ZqdeA;vknT9&gdMHSPxTM!&Yoe4Ip9GPU? zgZ+T*azRtV8G#|Sc*P`q@oGDEZpHm<@YHn}w^Kks8qNZ}-v;*8+pDyrOP!=&d?y)> zYBHImT0+)hWJ*Io-LFy)Wzu5qSSq7qbjsHOOx9Zi8FEPOYMdFcZv4B;~25Mcbgpu zf%l!NvGrgm-f;0oiaNuz2)6cUkQZ&roBEV;*{3{+hWm70#{G(Y5?$mivK^1=R&)vW zRz3aP3e)cN)JTWQ^{aUs!`oC{-j@wXd^M*x~e~|D1P? zgzR<>HX-pg!&Kh*VzZ3rb=Wz;6itsB9L=D%CU&Hr(Q6V zT%n!2xzDz#zD4$M8#NtCpq1*_%-eXDD||RP+rmQ&R>pkIj6QV=02VE0BWdaFwM}BSIPisSCWU#Q9le)7AZcU zig_>Y*6NgPUz0S06auL!Vve-ypuF}BU*%w6qeFerbmNeQJF$1yrL$bajZg|Ke5_d{ z2S{NS8sxEx)p5^M#+&Q7-13mO$vN+kw3>GeV`peMWQsMS*O7jT=hZMxo1tO!0Lb^( zx%V57Q-JGkB>sU^dg}8!BvBd9Pj(C>_p~(byJKcp`kcgNr5ms791O$VI`L-MaFu0L zrq*|_?kpCcY@E)k<<5HVm=rl4#M~EcM7(DwzwEE88vfjIGu#P?knf?hBC)R1yUV+z z0cZ=*Q%G|ji;pJ1GnF5kQOTz2W}QE5>k23uh2-iZl6O%ecMb7u?rygB zy#H~p_qEuvN9?p{^5!ptf!$)!+E)0Y56D&dgy$9VrImU1r1W-`A$aOHt5c0?3jcl~ zOum&m`sNDyelA)xbY6ZVO{brek^-i=?l*<4zC%}C%4>@!AImU>(MhQmf*6}W3gc@E zDO)Z{Gi|DXeJgMPu7%6}+-iiaICi!J52FlNgn#R{f#>aO;+-(Mr>~lfRKjv$kOb8K z!$?tBi1-yln!J6zD6z&8#1$p7p$uy?l88tW6#C3;JW~Ch3~#+5DzRkCjd46kwp`7I zbTJo0^L!Sb<#b8FHh~Jci7z)mdc5_{j3uC;@r#{ufeIGe{{PL*`Xj z);ri3z0xi6s9m>tlf;SEeV|+^)Jx-U>(wN!rwF20STLT8SD<~q5$xsuO%q+L2fKdl zgAwg3xjidx@6_fB`aL`X`((Ze<47;nzUFZbW0D)l=sDTgS~z)1U?o8B9y^!V+At+2 z{}`#O2AJYU_L?8nDM9!WX_6fyo5;fI@9gwLIZKK#AlV~zB&XSY?V;^iB$zn5qu`SW zj7V{l1m2p24ykw!5@CSgt-~`uA?5CL>tT(^(XtFyiI$PT%J~#gYE^swxe zt#3=m>-^a!8=mf0aIg*A=)r9TNKj{pqcX}~JAa>o^L|AiA}0(!Z>}`}A}OS&{omZSGIlyWwF8^shFc?PLq_OVK#10v)Zs{5(~_OG#4WHaXs;9@ zJX+(i;0>5=Ce#^Z)ZR!N5rH=oWIEAjFM4dnIqWTh9!o&KM4lhv68Nr{>}Q9B)6=lN zE)x}@=5oA?k7`ode(8C=o-tQ;n`x}B^{Ys2l|(eZAKcQP1zU+F7h_$CrN#Vk0tMaf zWu7!YvL$kFUY9}Ab5DD&`w}K^SF)-fb@#J3$3Ezfc#Y+KTQt8Y6-D%A50@{G5-@*exgl`GyQS*N(LIf=u2w+Q(OcVC-mSVYaSm% zd>*5<5a8Y<^_;u|ZSA}78K2lSIgZJG1Daj0GrrDG%fy=#Y`^hQm-QT`p9S#F z@g0TfAU}@dabN!Nok)D{7oTHrtgVh9*rJAOEGi!T8JaA@XHepvsAk=T~!+ksiTQO)+m9KmWV%{FK7$e_wSFVsR4x-=01+_7<-Pf;2%S zI>j`OFv;UyW1A>r(m&&VwJSVBWtk+>3{sm8(mOj>UaSB3`d9!?Kg3Bdxwq+OMP8o&V> z!(&xTN=8Emm79etPvfmGQHdZ24jknJRXMYBl_tnXFRq{{jbZnMf|?q~a_%DmV@+YU z$_tBIn0T=%)ANI*PfB#?o3n(o@V-GQ=J*p}BbXIYj$<;Q9KDxra{(WjDGcFTL^ldt zK}0Tc5M>$Rm0^e-6mtT;GMZon5m-+)5Lba{Scs%Zw-6CVYOPmLg$pSJN#G>!5~(HB zC$^8aIts5ikig{xm$yV)n(GZTR3DvaZ3@Q;@ zf};zgE0LO|ql$xOMdeXlL~Z&fE!_)#j)Jd&M}LGvwT4cH3QGs2L7%2JYk;6>`C-jV z6kLX-MLtWC>TwqqUiUlqL%R z0qF6O{-?9UR~=%?oqM!p28rr8rZRZ?tC+nlNid#DKv7VBSU?ai;Q0H8|Ab4`Gu+ z2xYxTx!l&sJPH*{QQ+1T$tb`O)I9Q+UwF%4G+M~w;3TLXqqy^ks@ir~^+qNaIG38hCsrL} zv?SqE;XN_^!FPt7-O_yY5)fCOmzZL( z7H!~UqH#Eb2@sN3udx-lFuAYF#TP%Cd+AP7e+J%SABJzS2!()n|yj7rHqpnv<2F9RC(vlUqCmur%Hj|w(#ktXK>Ym`DIjCVEQw^Mlm~$TBd;rXQZ-I&lIXrPo?8ALz@^Y9S=ex@ zYDAbv$|h6$ zL3x#ku2~3Vtu&OO?$ED|dgK)k6nivF6i12w$w?G}8g2ZhN2)ptSN#_d5?>SsicdWR zQDN+YCv<}pe*?zW^Ik&_PANZ>CsO7dVpAdp@-UGeDTY)CVw;ad@6VFExdT@wG|XZv zJHx=A7qSKS$I+W#cn2y#00%6aNtBOC1MmBPc{H$WcCqhS$s>oZ|174}Km%+1+SYyU zR*Go?@T^lFnRv-#G>N<}`+jCxjWj@4x1&$7z6Tm}1jN zjO+8peLlvifR^HLg#YVKwM>N8#e|axrok3BfF`{oD=9&Pr~gD|E)ZOPnV9Q_WbAV& z+Lu-q2zp9@bgUDspN?@5gpO4~DilDLAXN}&o@CtV}4$IJ%~uj*L7_@;`$df(B$@d{A&3IJuG$s*fYY{gD1i2G}9& zvXYv4%pn0`Nu;dC1(~I5#$<6Ys^uV$G!3o6Xw@|FxxZ9LnKT5Qgqa{pV2IHx%6b(g z%HmZG;zpRHk(}i%Wi5qkDoe0Sxk|;#LSV;dfD%3ZAF~(!EF>>P=c0$CtKwH7VqYQF z=qHM=L#|=TfBS&d5^~M(o)(p`sxJyYTB2U;W8d^%v&>^F#GQC@cAa zLy4nLB9P+g03@A4P{JF)%F3QV?lG(-AXqRT9tsM%QR9Rk;ri|TxAgRh0;*8ktm=@U z5r82sfB83(BBAv`6rzian<`qVuyxokMhVG?Qvf4?GqLoQgNVyBJ1~>URjFUfQv8tx z#a>v6ILMGLf&j%yBWZOcnU`e}6oR#YaA`)WI8v7*oT(Ex{~FQvY!E(T(ib_Wndu;T z1N08)6O7_%t-yhDXa?uu&Bm4+V2Eq3+*>~Bye31PdYNJ2y#Ox5)E$jU5AYUDlyX)ZzGh|nVc1#RIJ#D?M{ z_V9nj;1(edr6%oR$As204xQrdIUqD=#fF;rKCnJe>}5fLqD)1JlT6jgO$$lwP6Pb_ z)n(1ut#PMwly(2cu4Bi4=geP4o{U-FX@UQ*;6G<~pPI~*tDL=kg$f`A`!Yv**=YdJ zNeRRE;&Ggu9%GB`#7Djgvmdsx-+=h9h_wxhg(tdoFCVU!Xz)V2jQJf41N1 z2n9IML7o71Ai1L)`ttUL$Tu1#1&cxA{Lh^9|4O_(`}Z&=1Mh5u;tXrz5!(m%{BM>2 z|37h0tf+vB>(N8d{=^o7ne_J0c>3e#1CMKhO3plXSsbVK3*w=@deCyF``gZ1Kt+j_ z{c|9?w;l_~ZFzvAR7?cXREe)_T9DpBAY4 zFopPY+WYl;&J%0X@uKafMS~arW!E}&$Ccq{jdp2gFNGHh8Kf2Amt*c^s7N$n4&cV^ zywwHoW%m5$`OL)~?*-W{w#DoJtmJneyZI~!Kh5pE19*2ZI&W2<1A5HG!T*`v|7`D` zU1NBMGa^pn;C*AnE%*g_Pa?wa|MZ7#ZoD=j!Uk@Ov_F|DY|R+IxY}{5y1TvjypYZR zkA#(D#@O{4V;Xr?g`4vem)K^o@{dU`GWGQQiA#trAQY5zoU6O3r1=r2z{5gew?2V z-uQlO8HVPrH*dDD8{BShK0b?-e>^z?yZ|D~_I$h8lUr4`Z@W)V-A%*GTfX;io11$_ z#pezrtp|w-Y(ouIPw+*rSC=oki}I`(WC7_oFYD8GfXl&}i!I&FN%JP0uI`RbT=jM~ z^-26wyUh>F6&*{S=!a(YIC;5Qo#cALlfJ1o;_bBp{LiPCX3N!XOV0rKIvy2S z)>Hughep(8#gq-BwsG#4+WE3nA+O3b2;x>QJ0aOYIWXdU9i@6dP>V!KR?bq!brcU) zNs(m*ePBt#N#mOx%l!;!A9Ri4sFx z2cO_hJOz*9Ow5Ie;zX>Jx=5yT{$yDP0PE@P2HawFMTa-X*~5<3$Em{BI>g<;NLAb* z3IQRZ0bzbRyb2)CZMIY1fr-){td1xZb9xBDWeEb929c;q0P3`;@nBZE7)q{5Bp`6c z|H6PkUu>jjqe=2_h{TMQKS23)yRfH1%G(j%b=b1d5TI#&N%F{s?HFwXLq~u${7^I^ zFkvXUj-Q6eB2+l3D@sruCmJ20s286Luk84t0i?9LoTDZ*`5`)16ck;{`!e7T&!4w$_mCAGS!sJIm#2 zI#R(ALLxH)+ATQfb0HHtLwX@I7p2`Tkj&j2-=sE9F$0LkC}|#5IB)kz z&AE~+m|fO-i*C<6o#z`|@CssW{^b5S&rd=YO70I+Mm|xL(4KoKLL0`F1+*qRL~B>kZ_aZaHk z=BY6r%cepvL<2EFmkyf_5wsplvJC1=Vkrm~<)RqlQXGppi2DGjloIb{V#Tj05AZUf;^2in*%Ea-&zr!KgvBpT7~h8qcS~)m_ze1YhVP3f>41ny9uM5x}isUYq=aDn6g_ zkxYOUdL#vT0d5`w@*UaJG&ieyQ) z;?D{QfD^ia+?$1zosylG(r|o0Jq-VF2bXaGApmFEK0Y+an*;)Us5y)U3RQxa)`4~J zWI+FM#|ilom>9&taF1+Y<#`lp-Q?qMmjxZgcTlp+7fp0EXNPL_aq0Lv_$P8izJDkE90ld#3$1s ziw7^NAH)Jbt-k-K)y6iH%&P62L(VUo-?E7SJvNuI{yB^z5n1^ zAYfbqfvfJlje1%w{Dq*18j!P(aACLKv7VWLLykV#?0PmU{x$v zS9DDbH&@0S(t3>;6XI^+q=>##!UA|ca z(v2!LjVN|;NxWh5?oFj)gWxPtLz5$fScFQ02KcIZT0VLN0Ts83bKZ&cjsP-Ai9|%) z@@z`Vt0>*uOg0XCIlhXxYUTcLxa>Gs{8@94l(n0XieK38jqW!7 zT1+-9VWw3+|A5T%3YEHC?+Fl(!mS;ajYLRXWE=J`Hat~r+DmXC`0Lp}j4K3UI~_O) zV@4hig&WomHcMlYGI&a9A8wK*Sgz!rIBNVJTtNi|Ola7mjG#CetaipBKadF8Fh&Gj z{z6%@tY(z#L?l_9{yF4EvvISKIhICL-9SZZLed_qr0=r@MUC_;iAW3aZ6=cvWtxnL z3|W|xL#{2sKTh!97TBzV(~mP+8&@EhkV*zEH75ZgA6ga*$@2$_VpWpa`Jvlp$08&q z3*gj^IjTY;l}09{gE#Es=WID+bB`sqB}|jIiWS@--fpM3IsZ3Fa3)=dlA|{-q8ceQ zuep>RW`)kMQH}S)3njXcih6)_V6M9N;g(}(O}}GyS|namnv6QUpyx>`ojzC!-jLOr zWQLH-0zHFRe*ZTG{EMEr_Gl4SbQeBJU7}|Ni5YrI`3c-QbW9u+w{Q0rPT?xIbwevJ zT4jK?+Jf6$IuTHQQyc}k9D58omj%;YFc^phT8;|wB$9xEUA&U>6jhK}*jzl?F#3uM zkz;>TJ+&i8`}o&3|C%p_QpcJXh^Bs><#WIZ{CaF6_=+=8nTRN(F&r|+UQ<*)T|fJS zgHSVYyQK&>QB158tg*j#Mme=>YaHhd9pTV?2rjC%oKwAu0e<0X`3>$KpQsPvn&v^gv572+k(1)E3Wb;#+A+{YJ62-j9bQN zcEC+i5qB-x2uk9vhLVkmxcVYyO;+ziy7)V`Di+N-(2kNA^@G#DBC#j?y!05X^>WBgYVdM2)XozR zWHndw8njQZtU%EN!LS5Ip zw!%kfwjZi$CQi)$&`mo0n%lT*69=c6HkPqhNuufN;kSl(uRVEv?(Z)*Z;loxAMfto z-O3TF;2UFxdMw22e{q!W?j3DD?tLXDS%4{z;r;3J%GOS*HK88wdTdI{1CBiYi?MHB zkT^5MrV(RAP5@zRXQD`OKCW+Jtp|}~T61B1M&)(hdEbPB=Qy|IF?;x6jLy2tm-4#c zveyRmE9HJ2Ccy6j7syHPj#KnF@5%M|Rj7drpN*Y5R@T6tGZEz$R zHo}xc%A|>4DPu|GxGN2NyKr@_LEuNnvQGGcDFV3nTJMa6bL z%u%FN%r7a;*_vNcD$MnyET{w_6KKOqIW9jbYH4-%AYILV|A+lv--^Y2%nE|6zG)jQ zayQq!nWE#Utw^}ArgtM`l+lKG#vjP)wGdMCSvTJV`(`r6jd7BqYnywO;$%C2og$*q zaLO1;N16)Al-}!mKhWKQC`%I7-d;Fd5jS-pSPyBexe+HN4b7_*Teh;uJ$svJ+;#TZ zfmq@{c2J|6x@USaiEnKz6FJ4Pbicri7W7O{36YykVb3oeXO?@QnOHp9|7sYSZ{c!N1-VeqVK6EdTxr zHgKRz`jySa&WtqAl;WTTpr>RVl^)uTmT|iwvHHw@_1l#xDmUv zxWv%S7OxrE0Ky&Yh&o^K(yVA50h{QeEk%}(yf+cij})<-Z$uLY?fCOY*1U2RX9^&x zX2jyo0B_TPTPo&Wt!s@Y*p^3f`T6AIwYW)dr*6k?-tO7Ik(9tX+t)_SI?!M{vy}C? z$plT|(*WUo?lOxs;yE8egW+hYy?W<(;JVw{=Vut)My}aJUFb0kFX7cFrY&@6kx}Sj zHm_xIY@^>fHe=s1z-H0N39Zzd0#DCJM-j#q*?c)|kcQKGrh>hO_s#&S+6`=9`anG(>MiJ<#-(-rqnz z$E>hTE|-uNih^4>V-gjeV}IhQS*@3;6~~~B3dy8no559ah~_3)=fY{5OR@jY%hnxF zq}%kyhamj~57S2=9Hx|z!m;s0WdG2?e^}Lq4#7afXq{;^%z{f*9@60K|3{_w@!_bb zYA>(VGMAy>N~tAJ3w=yni2rEya@*|AK3n#T93=Z^b(pi;X`U8V@`T!Xc$&vbw&QWv zG{V(jgN^w)qv3>S%rEDpZ=h9BZ+GWL}4SqSvj!ti?z-S*@kF`V30A{S!x+tWp!TK_r)M;krHd~gak00Kcbl|wLD9#=r9J$k#P z7~1NHSIOnH&v0X;C=C}wk~)%esc(-Jrzwe7em2FxoUFIa7X_LwMQSDHez494L)Vp` zu({~T129^v(-M!P>C6mG>y*>nYdA^~QT6Fw5?GOO1mCXcx=f;Cid{QrJ@5L(ty%3jJ7?HLhMGQE;Mv!s|Aufe>k=MM@?8zI7>>d`0dh&sB_xbe649mjF zjt}oH&r->|R)LmW<{36P6a$gtzJ@~MAr)73;zIb!>vcu_`QfTXpd4o@feI48c$@06 zwKT^2*El@9cIF^V+A__vPYq{X5kgz5y)Q z4Wr43UFY#cZrLReWo#X#FMRT63I2+#8_L*}n9CvIkKU2hB5aDHxX@d*Ph^CJQ)HAx zU1^iEXgB@KY@JvqVAC#mVn-OW33Ncp25VsmJym7bFo&WfScZH;qEwQDxT1JF+vMfo z)iBnB%_DBw6?DX2&h`pn!hwYTiFcXoSXj|>vF+`YYO@QUTg0!0 za}~Xc0yO?6?)9-1i2E_!ieI=$F6E9`^EZfOxL;)C5{YEH@*(&~Ur5`rE7d1SEE>6Q zTj?dH92DLP{U%Ejy%byKkUxvWTk8TUYTWD_H^~4%x=+O-C2u)qP=*p8o$fSB&MkGO z>?M{B=-@{8vs*)%5(ht(%!X2UF_`ev?=V-znvSF02d7vl;biHy$^46!dEW5v3{H33 z*h1+H2PHlsXxgkKm*O!;qsoBWHuj{%fL@m>B}ChvJlYUGe)sv$G~J(tM(->f&d&WcyYZ zUX#b>#na=Y!Z(-R?0egmczat1fhptP&BeWGRz(PWin__zi`)w1R0!5KHmJ2nszh4jkS~d%Bm1X~-aVf5Gm|)DGecHf*Tn z!x2y(Az>BTL7= zugU8CC6jgn_3!sPPlx;YWwV?bd(yJlsl(rW9dG|-^LOJ(ep`u_cD8rx{yFqCb6zJT zl;^#n?SlyYX0_Gl-3^cBv-kKPIloCc`#5y{`-9vs_Oy?Hg4|uQcnA&brP`KvyBps4 zw;|^pBg@cX3Jxdq`zy=l5aIRq_wcv5u;f<`56Atm4S)NVK0tf9roYI4)_uKubkRF8 z@#K;Dc(?qOe{&)f7Od9}wRH-27C!h~ujQyf|6M_+c-x7*%Tw|~&*%Bna<4b?+WtA{ z_v*`c*7)o*i}tqnk~_$?t`14%uaK;LsiRDEFq3lpa`rG(;m;2FNBq|#g zS3}#%$3S{1NeXEk8Fb;px3M`B+>M_|D?!nW?01xnj%Vsv*L%>kx=v8lG&qF7#FVBn zCIm60`Z}&I8T+2s_8!hHjAPA0dEAM)1U_sl8o%llr0MF8JoAwq9>vzYfp$H*NbvUU zfL8fIgC^H3l@8HDTQ6PG-C@jW;w2b`!>&R}jgdP`t&V^E5A8(>6WZSADbxOyky6L+ z<^bFGUv1CkLOq0N?vF=wsSLIpY$&h36E}`?*4TXj=jI|#gfmRe=p6>l@l+1U0IQR! z!mP7}x$!w{V=+4QnB8>{2ht#ENRZ1lr)o#r@2)0wLTMF#h0ekA;B!p(1Z?+o8}eSo z|6vhX{Q<&VKto}gAbfN4L;d@FdQ0&kU6~dTz^i&@d%D^3!T)}`UZQhVA$hqBZL|X; zDp2&fIw=(_T=FbC7K>R?Q^$|M4mc<9GRc{)j(>X~AWT{2@BEzJ?i0awGD5uaIEr>t z?vE0DW>Yr&WOn)e?keo#rfA8pvCr{YL&4f!_hH9f6_gdU;Z4D|_Oo3SpYHp&prXt= zyaiXZt(~NdoF_>hM5s3A3XBEzywpUL_TBt*v9Oly0w>m-Vez&55>IOgx@ysn0K$C+CR~fI5XO*HyNlAr~@dMr6dFBU~S#?>Nm2ufDm# zs%$tXiT+gXm(Hp?a{-i@sNMC8w-BV1YEf_}Q~z-<&)N?uJg@FA|_4 z8_Nxe$J@u}ins37;3?|z?2oc2KqwHydv9KQ)T{vg1UY!tLaat=#Z}R*xP4Iuj2P}i zNNLNDb}Al$S26_D+JQ49IUbvFFCfXO6?Ep2KWc^s*K}!>(1o_Rk4Z#M#*aMIxFLZS z+m`2UyorGn{Qm^00#^NC*V=1Sdms)R5CxuALONmRZ!011hkbzKqsBvSZ;x8T;z1U_ zpDXX-rS~*^S0NeuY6kiMJ|;acCf_}d%FVT$qpW?IM=#9#Zg*IRdFXdo2mD-rdDh{N zu?|TkAjB0*Vt7j<22v%0`;~wgpIbb}D4u2$YqIwUhd9U_e1|z;J3WUr zd?-`c&kml+44%se*06Yz1$;*8-`}mS{>hS^}sI_z?-x(qiZM*`p@TT z`;J5({_Zi}ct4xnr>FVmhmU1njWx%4g!}HR_u=mj@^Ea4wcTVo;oZmE9vAU`1vmVA z_j`jJItKo-?>|(2_+@(^TiY*_-0!l{!}k7f{=Bw@=Sn>vj!XM$tTFjX*=b+x4}bR= zC9p}awyl^y3Sf_%x!0O`cuFFqJ7r=pwFgeE2r79<8oS4d2S;&isVdLxYUZClj)O6Q z4gCVrnIQPCHP@bfX$?SHpQU$)H_8KMxlf3D9*f;olHbc-zx2VrS6NPXP~mtpUSsQJ zzG(Mk#fSBkf5Cm!c1K=(nTP4Qx8)cm<9?uD|6m)DV6vsx{2?QXzo74-0B8;&edOj99FMuemi)^ X_c!kl)uLvh``7;uvX%pgR-XX?Xexkw literal 0 HcmV?d00001 diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 6d1e799d43c9..7b359cab0860 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -557,7 +557,7 @@ impl<'a> RecordBatchDecoder<'a> { let schema = Arc::clone(&self.schema); if let Some(projection) = self.projection { - let mut arrays = vec![]; + let mut arrays = Vec::with_capacity(projection.len()); // project fields for (idx, field) in schema.fields().iter().enumerate() { // A projected field can appear more than once, so collect all matching positions. @@ -597,7 +597,7 @@ impl<'a> RecordBatchDecoder<'a> { RecordBatch::try_new_with_options(schema, columns, &options) } } else { - let mut children = vec![]; + let mut children = Vec::with_capacity(schema.fields().len()); // keep track of index as lists require more than one node for field in schema.fields() { let child = self.create_array(field, &mut variadic_counts)?; @@ -771,11 +771,18 @@ pub fn read_record_batch( dictionaries_by_id: &HashMap, projection: Option<&[usize]>, metadata: &MetadataVersion, + skip_validation: bool, ) -> Result { - RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? - .with_projection(projection) - .with_require_alignment(false) - .read_record_batch() + let mut decoder = + RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? + .with_projection(projection) + .with_require_alignment(false); + if skip_validation { + let mut flag = UnsafeFlag::new(); + unsafe { flag.set(true) }; + decoder = decoder.with_skip_validation(flag); + } + decoder.read_record_batch() } /// Read the dictionary from the buffer and provided metadata, From c9f66a1f9638bb0c320b7bc3955f302ee833097e Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 23 Jun 2026 16:28:57 -0400 Subject: [PATCH 02/10] avoid extracting header bytes twice --- arrow-flight/src/decode.rs | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index 93f66af9baba..a5b77f7f0807 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -15,9 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::{FlightData, trailers::LazyTrailers, utils::flight_data_to_arrow_batch}; +use crate::{FlightData, trailers::LazyTrailers}; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; +use arrow_ipc::reader; use arrow_schema::{Schema, SchemaRef}; use bytes::Bytes; use futures::{Stream, StreamExt, ready, stream::BoxStream}; @@ -330,15 +331,27 @@ impl FlightDataDecoder { )); }; - let batch = flight_data_to_arrow_batch( - &data, - Arc::clone(&state.schema), - &state.dictionaries_by_field, - self.skip_validation, - ) - .map_err(|e| { - FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) - })?; + let batch = message + .header_as_record_batch() + .ok_or_else(|| { + FlightError::DecodeError( + "Unable to convert flight data header to a record batch".to_string(), + ) + }) + .and_then(|record_batch| { + reader::read_record_batch( + &Buffer::from(data.data_body.clone()), + record_batch, + Arc::clone(&state.schema), + &state.dictionaries_by_field, + None, + &message.version(), + self.skip_validation, + ) + .map_err(|e| { + FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) + }) + })?; Ok(Some(DecodedFlightData::new_record_batch(data, batch))) } From 316b8bd03e4e56e6fddf3753477a02931809f866 Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 23 Jun 2026 21:40:54 -0400 Subject: [PATCH 03/10] pre-allocate vectors --- arrow-ipc/profile.json.gz | Bin 17355 -> 0 bytes arrow-ipc/src/reader.rs | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 arrow-ipc/profile.json.gz diff --git a/arrow-ipc/profile.json.gz b/arrow-ipc/profile.json.gz deleted file mode 100644 index 1a68528f9054bd41a341d41c426f4eebce3bfc09..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 17355 zcmaI-bC4&&(gq5TZQJ&5Y}?-Pj&0jJHg?P%+qP}nwmo;wIqw(o-G6R(baqu#WyMp~ z+0~tuPZ35!K>_{e__lYnGqtcbVX$>N!#Av2gHdk)yfV|f@2A@jA_tw;$v~}f>l4_!h_2OiXRP%_AiVFKV zUyd&(?UyI@KAe`^-1?t6%ExzIpZ3=MrmEUG`Y#4A7~QHv7o$u+Cl@CzpA4TqEnhsu z>(|}xEAJ{d+`jW&CcC$7)jm90+S*=EYsG0RrA?NGk8Ryzj zc3YbWe&5#u(|X?T_>&Bn``6{WZ}08P^_T%`{@tHfzggQmx^UG6Tz}PLdZU;>xw3WW zZa(y1Ue>&vpP71iuzd*h!@y7Lwd{L+zBK1-BQzE^muC#Nf7w16F1eLo+nKVqw_a{{ zWpt)?p1yT^-xH!JTm%D>uO`>acdwUI++2eDWBVUBSM3)+qwj00{e_wxBRx{FCZ*NR}iPf*S+qyhAzlErnj%wy(&L`rL&tE(pbKGZ?%LsbahD@wq zZTyLEPSO`9$2GxKh7{NZ;J*(0=Uy!Fuy} z>9@J@dH9x*r~O!+!f|a@Ignxd*kAKj3UufMbj)#o%{%V7c;)n9GhFWzj<0v(qazZp##O%F*-xx8HRF!WBxBj;19r+yCQ zI9T{GPXZbSh7dFK>{&RcpHXos=a>yyh47S@;h6+J@qAxAtMl8e#Gt&d^7>qqqQ77K zxO*!veK=ZZ+LCkf(qqw9@l9T(@8lKWnJyzKE-tQSzSnfdAM9H}4yKzZ>4F+o;Y%=vCkiX>ZTsRPG$b29FsLQ-v9Ve@f zfZVTK}Fn@+h`Ykqqzk5qd$eK|FK z?6-NZQ@b|i^7$%5j#;mke@4$_Ls^RR`$nRXr7Zch++Ts<##H)yKaZmJ&z-B;$PCd7Rh(?kiRS)V+O+g=Y)Is>U!=7cm2JN65JXQHen3(E zCJ3O$c3Jef!FYwDh_;y34-c#u2*=08)$)9n2(B1S{&%Xols^b@h0Uh<7zoCF@xxMI?q6gn>gxUgCE`^+ zVd8xHwFVMZFX2~u*cW;35nkMPy7wV0ZHkyJ-^{dI+BXe4ark(5^s2u~(@99xE|v{8 zs-8^mFS!$0pQ?AdDz=bxFQd*CeYkj9nwfp3xdt-w(;>*X=6`r!-l@D5HH36tGJMc! zp^Z)cPTBH_A#wfXib>OrnOVs*=#P{*sy{`ac8$H2#OZZ0nOcrbHupF*#nxvRfU zq(*p48o;r*_)vX4sxUFl7?Y>*Qb{mRNhr7?kHwmR{;PI$spi()q1pKg)Zn3EBw+u9 z>4n0}v)BaWtp4zkEY(tT;#pSwXMfeAWOx-`Q4IC(c*HiMoKU z-g4sM;vox<576y#bc1^O&bMZ0dt>k+YgTt%U$voksoAlf{4|RdU2)azp4x^f`j4g6 z1ZP0@!X!o4&lFE#K^~M&%Dvciz8ChiudZS|faGN5)MsYpWOdK|^3+Sf8b2Lu%`96T zha_1>{X-Z);7qKx@HGsHH2I#N0kpQffWdGzw#TDu*Gn{XpJGOX=R}?UuZtJ`{ugF7 zYk@nCo(*ci&wJ?KyR_>$=fhjX*Lx@67VB3Rb7;h~4w7@If_S_-)>?ko+9CUTp=#to zjwvO*derrU5~ynuKFsC&)X^gp^#q|+r!d#dB4`hh0^6}TnkM9S{Vn`+P<^j+N4@E!5 zwF~w-HeIgi?CHVRJ(9CkiwRd`Sf>Rfa!vuHiEOHm73c2P-D82@zTa=QK@NNxUENg- zeiv6Ea=Lou^>*+I-RWKO8UTySE1n-CWn5mz0Gej3>)<`MCeSO>%S`zN$QdioT)hz9 zfATmf?TZCQS~$uuc-%J;*zkpi9d&VgB<|Vxg9w}8&q*|LS2q0Tvw%&XW7kFpby9D zwfOw6M&OUijNkAz;_@L9Q?m88wHolvR6{td=!VIPejcOY1@3X8W2R?hgJiT)NeQV1 zl9KXgT;SxB;q?iW(?X+0wS5PgB9aA)M0wm3K?5_~#lo|^`3@hK2~_PA#TVJA7F2^- zq8h>^@GjEvf^*>^;jHPwLj0RM>q@bj+eab253ow{mqerD?(i3X_WR3(f zTPQImMc5OY0QQntSWq#MO$^Np8h+;+l#r9V;XtA;B&CWB_|4j*z|lzgs7MbSD=XVn zCUN^g#F5*>yqGj1R%HG&bln%`qV(stl2;WaQ-Y4kV=@DxH#uIDWR)>uGoToMq&-do zE=j^?VW!JPlLN$y?OG4u&{5oBaZySW(8n+Mw(R(kMIf!y0|n7KJgVN1qtp}Yw!e1r#;qan zncQf(1iUFQpgvV;D~7LDf&QG}jey(XQjqw=xNBo;wv2Af70D6ewf?9p#fxjs@jU4!u(NH#p1a zlP7$)8Acs&xRdvlC8k7BKYTR56KFUZKr;%0foy0t27K+~T}^CNu?gU@Fz7WjuT@0z zYvTXO;X@J4A628_~4k$$jO zRWnqF*6-5eL3Y7Af4tHmy~I#FXXo@5oV?s?65VVT{X--A*2H+f&s|8_9Cu^Y?rOiH z9Z3sLipnG*9|NNh(H9l27!c2WjORo)+Y{aqlO+LL9g@r>L&0*Rpe_g?@`a-2LIVx=MfuNebB3&>FtuY?ul4M1=28)9URlrK>N-$P_8g?yLtyrLU&)@J$|9!y= zQdeSEC~gfc`+UiZ@D-K-HwU-@ zAR{2yT&N;r(}Kt zVE5T7>~dT}bP4sBi+_FJIGw-EyzLCM`RMo@RCw~1_;v-ILEtxW+Rg5$ z|Ey$zXYYW}N5q?$sz$wm(CrAbj%3&X&jNG>bm7t{xE!b6>TVxVn;-#7Y>8MOGdtvb z5GxzjkJabHjn(VDt$mOhOy2jIxWlrg0)xkx~@ znp<8^Ju}OsxuG+6B0CxUrMkSenrcaRI+_TnwjO{{imf6~Er6WGOLka!0EbUD)WN!;gIk>wV8=6@*zAm*xlD89G z2q)2js^f8|vuz)uy(JZPb6kxGh`BVk{ZqdeA;vknT9&gdMHSPxTM!&Yoe4Ip9GPU? zgZ+T*azRtV8G#|Sc*P`q@oGDEZpHm<@YHn}w^Kks8qNZ}-v;*8+pDyrOP!=&d?y)> zYBHImT0+)hWJ*Io-LFy)Wzu5qSSq7qbjsHOOx9Zi8FEPOYMdFcZv4B;~25Mcbgpu zf%l!NvGrgm-f;0oiaNuz2)6cUkQZ&roBEV;*{3{+hWm70#{G(Y5?$mivK^1=R&)vW zRz3aP3e)cN)JTWQ^{aUs!`oC{-j@wXd^M*x~e~|D1P? zgzR<>HX-pg!&Kh*VzZ3rb=Wz;6itsB9L=D%CU&Hr(Q6V zT%n!2xzDz#zD4$M8#NtCpq1*_%-eXDD||RP+rmQ&R>pkIj6QV=02VE0BWdaFwM}BSIPisSCWU#Q9le)7AZcU zig_>Y*6NgPUz0S06auL!Vve-ypuF}BU*%w6qeFerbmNeQJF$1yrL$bajZg|Ke5_d{ z2S{NS8sxEx)p5^M#+&Q7-13mO$vN+kw3>GeV`peMWQsMS*O7jT=hZMxo1tO!0Lb^( zx%V57Q-JGkB>sU^dg}8!BvBd9Pj(C>_p~(byJKcp`kcgNr5ms791O$VI`L-MaFu0L zrq*|_?kpCcY@E)k<<5HVm=rl4#M~EcM7(DwzwEE88vfjIGu#P?knf?hBC)R1yUV+z z0cZ=*Q%G|ji;pJ1GnF5kQOTz2W}QE5>k23uh2-iZl6O%ecMb7u?rygB zy#H~p_qEuvN9?p{^5!ptf!$)!+E)0Y56D&dgy$9VrImU1r1W-`A$aOHt5c0?3jcl~ zOum&m`sNDyelA)xbY6ZVO{brek^-i=?l*<4zC%}C%4>@!AImU>(MhQmf*6}W3gc@E zDO)Z{Gi|DXeJgMPu7%6}+-iiaICi!J52FlNgn#R{f#>aO;+-(Mr>~lfRKjv$kOb8K z!$?tBi1-yln!J6zD6z&8#1$p7p$uy?l88tW6#C3;JW~Ch3~#+5DzRkCjd46kwp`7I zbTJo0^L!Sb<#b8FHh~Jci7z)mdc5_{j3uC;@r#{ufeIGe{{PL*`Xj z);ri3z0xi6s9m>tlf;SEeV|+^)Jx-U>(wN!rwF20STLT8SD<~q5$xsuO%q+L2fKdl zgAwg3xjidx@6_fB`aL`X`((Ze<47;nzUFZbW0D)l=sDTgS~z)1U?o8B9y^!V+At+2 z{}`#O2AJYU_L?8nDM9!WX_6fyo5;fI@9gwLIZKK#AlV~zB&XSY?V;^iB$zn5qu`SW zj7V{l1m2p24ykw!5@CSgt-~`uA?5CL>tT(^(XtFyiI$PT%J~#gYE^swxe zt#3=m>-^a!8=mf0aIg*A=)r9TNKj{pqcX}~JAa>o^L|AiA}0(!Z>}`}A}OS&{omZSGIlyWwF8^shFc?PLq_OVK#10v)Zs{5(~_OG#4WHaXs;9@ zJX+(i;0>5=Ce#^Z)ZR!N5rH=oWIEAjFM4dnIqWTh9!o&KM4lhv68Nr{>}Q9B)6=lN zE)x}@=5oA?k7`ode(8C=o-tQ;n`x}B^{Ys2l|(eZAKcQP1zU+F7h_$CrN#Vk0tMaf zWu7!YvL$kFUY9}Ab5DD&`w}K^SF)-fb@#J3$3Ezfc#Y+KTQt8Y6-D%A50@{G5-@*exgl`GyQS*N(LIf=u2w+Q(OcVC-mSVYaSm% zd>*5<5a8Y<^_;u|ZSA}78K2lSIgZJG1Daj0GrrDG%fy=#Y`^hQm-QT`p9S#F z@g0TfAU}@dabN!Nok)D{7oTHrtgVh9*rJAOEGi!T8JaA@XHepvsAk=T~!+ksiTQO)+m9KmWV%{FK7$e_wSFVsR4x-=01+_7<-Pf;2%S zI>j`OFv;UyW1A>r(m&&VwJSVBWtk+>3{sm8(mOj>UaSB3`d9!?Kg3Bdxwq+OMP8o&V> z!(&xTN=8Emm79etPvfmGQHdZ24jknJRXMYBl_tnXFRq{{jbZnMf|?q~a_%DmV@+YU z$_tBIn0T=%)ANI*PfB#?o3n(o@V-GQ=J*p}BbXIYj$<;Q9KDxra{(WjDGcFTL^ldt zK}0Tc5M>$Rm0^e-6mtT;GMZon5m-+)5Lba{Scs%Zw-6CVYOPmLg$pSJN#G>!5~(HB zC$^8aIts5ikig{xm$yV)n(GZTR3DvaZ3@Q;@ zf};zgE0LO|ql$xOMdeXlL~Z&fE!_)#j)Jd&M}LGvwT4cH3QGs2L7%2JYk;6>`C-jV z6kLX-MLtWC>TwqqUiUlqL%R z0qF6O{-?9UR~=%?oqM!p28rr8rZRZ?tC+nlNid#DKv7VBSU?ai;Q0H8|Ab4`Gu+ z2xYxTx!l&sJPH*{QQ+1T$tb`O)I9Q+UwF%4G+M~w;3TLXqqy^ks@ir~^+qNaIG38hCsrL} zv?SqE;XN_^!FPt7-O_yY5)fCOmzZL( z7H!~UqH#Eb2@sN3udx-lFuAYF#TP%Cd+AP7e+J%SABJzS2!()n|yj7rHqpnv<2F9RC(vlUqCmur%Hj|w(#ktXK>Ym`DIjCVEQw^Mlm~$TBd;rXQZ-I&lIXrPo?8ALz@^Y9S=ex@ zYDAbv$|h6$ zL3x#ku2~3Vtu&OO?$ED|dgK)k6nivF6i12w$w?G}8g2ZhN2)ptSN#_d5?>SsicdWR zQDN+YCv<}pe*?zW^Ik&_PANZ>CsO7dVpAdp@-UGeDTY)CVw;ad@6VFExdT@wG|XZv zJHx=A7qSKS$I+W#cn2y#00%6aNtBOC1MmBPc{H$WcCqhS$s>oZ|174}Km%+1+SYyU zR*Go?@T^lFnRv-#G>N<}`+jCxjWj@4x1&$7z6Tm}1jN zjO+8peLlvifR^HLg#YVKwM>N8#e|axrok3BfF`{oD=9&Pr~gD|E)ZOPnV9Q_WbAV& z+Lu-q2zp9@bgUDspN?@5gpO4~DilDLAXN}&o@CtV}4$IJ%~uj*L7_@;`$df(B$@d{A&3IJuG$s*fYY{gD1i2G}9& zvXYv4%pn0`Nu;dC1(~I5#$<6Ys^uV$G!3o6Xw@|FxxZ9LnKT5Qgqa{pV2IHx%6b(g z%HmZG;zpRHk(}i%Wi5qkDoe0Sxk|;#LSV;dfD%3ZAF~(!EF>>P=c0$CtKwH7VqYQF z=qHM=L#|=TfBS&d5^~M(o)(p`sxJyYTB2U;W8d^%v&>^F#GQC@cAa zLy4nLB9P+g03@A4P{JF)%F3QV?lG(-AXqRT9tsM%QR9Rk;ri|TxAgRh0;*8ktm=@U z5r82sfB83(BBAv`6rzian<`qVuyxokMhVG?Qvf4?GqLoQgNVyBJ1~>URjFUfQv8tx z#a>v6ILMGLf&j%yBWZOcnU`e}6oR#YaA`)WI8v7*oT(Ex{~FQvY!E(T(ib_Wndu;T z1N08)6O7_%t-yhDXa?uu&Bm4+V2Eq3+*>~Bye31PdYNJ2y#Ox5)E$jU5AYUDlyX)ZzGh|nVc1#RIJ#D?M{ z_V9nj;1(edr6%oR$As204xQrdIUqD=#fF;rKCnJe>}5fLqD)1JlT6jgO$$lwP6Pb_ z)n(1ut#PMwly(2cu4Bi4=geP4o{U-FX@UQ*;6G<~pPI~*tDL=kg$f`A`!Yv**=YdJ zNeRRE;&Ggu9%GB`#7Djgvmdsx-+=h9h_wxhg(tdoFCVU!Xz)V2jQJf41N1 z2n9IML7o71Ai1L)`ttUL$Tu1#1&cxA{Lh^9|4O_(`}Z&=1Mh5u;tXrz5!(m%{BM>2 z|37h0tf+vB>(N8d{=^o7ne_J0c>3e#1CMKhO3plXSsbVK3*w=@deCyF``gZ1Kt+j_ z{c|9?w;l_~ZFzvAR7?cXREe)_T9DpBAY4 zFopPY+WYl;&J%0X@uKafMS~arW!E}&$Ccq{jdp2gFNGHh8Kf2Amt*c^s7N$n4&cV^ zywwHoW%m5$`OL)~?*-W{w#DoJtmJneyZI~!Kh5pE19*2ZI&W2<1A5HG!T*`v|7`D` zU1NBMGa^pn;C*AnE%*g_Pa?wa|MZ7#ZoD=j!Uk@Ov_F|DY|R+IxY}{5y1TvjypYZR zkA#(D#@O{4V;Xr?g`4vem)K^o@{dU`GWGQQiA#trAQY5zoU6O3r1=r2z{5gew?2V z-uQlO8HVPrH*dDD8{BShK0b?-e>^z?yZ|D~_I$h8lUr4`Z@W)V-A%*GTfX;io11$_ z#pezrtp|w-Y(ouIPw+*rSC=oki}I`(WC7_oFYD8GfXl&}i!I&FN%JP0uI`RbT=jM~ z^-26wyUh>F6&*{S=!a(YIC;5Qo#cALlfJ1o;_bBp{LiPCX3N!XOV0rKIvy2S z)>Hughep(8#gq-BwsG#4+WE3nA+O3b2;x>QJ0aOYIWXdU9i@6dP>V!KR?bq!brcU) zNs(m*ePBt#N#mOx%l!;!A9Ri4sFx z2cO_hJOz*9Ow5Ie;zX>Jx=5yT{$yDP0PE@P2HawFMTa-X*~5<3$Em{BI>g<;NLAb* z3IQRZ0bzbRyb2)CZMIY1fr-){td1xZb9xBDWeEb929c;q0P3`;@nBZE7)q{5Bp`6c z|H6PkUu>jjqe=2_h{TMQKS23)yRfH1%G(j%b=b1d5TI#&N%F{s?HFwXLq~u${7^I^ zFkvXUj-Q6eB2+l3D@sruCmJ20s286Luk84t0i?9LoTDZ*`5`)16ck;{`!e7T&!4w$_mCAGS!sJIm#2 zI#R(ALLxH)+ATQfb0HHtLwX@I7p2`Tkj&j2-=sE9F$0LkC}|#5IB)kz z&AE~+m|fO-i*C<6o#z`|@CssW{^b5S&rd=YO70I+Mm|xL(4KoKLL0`F1+*qRL~B>kZ_aZaHk z=BY6r%cepvL<2EFmkyf_5wsplvJC1=Vkrm~<)RqlQXGppi2DGjloIb{V#Tj05AZUf;^2in*%Ea-&zr!KgvBpT7~h8qcS~)m_ze1YhVP3f>41ny9uM5x}isUYq=aDn6g_ zkxYOUdL#vT0d5`w@*UaJG&ieyQ) z;?D{QfD^ia+?$1zosylG(r|o0Jq-VF2bXaGApmFEK0Y+an*;)Us5y)U3RQxa)`4~J zWI+FM#|ilom>9&taF1+Y<#`lp-Q?qMmjxZgcTlp+7fp0EXNPL_aq0Lv_$P8izJDkE90ld#3$1s ziw7^NAH)Jbt-k-K)y6iH%&P62L(VUo-?E7SJvNuI{yB^z5n1^ zAYfbqfvfJlje1%w{Dq*18j!P(aACLKv7VWLLykV#?0PmU{x$v zS9DDbH&@0S(t3>;6XI^+q=>##!UA|ca z(v2!LjVN|;NxWh5?oFj)gWxPtLz5$fScFQ02KcIZT0VLN0Ts83bKZ&cjsP-Ai9|%) z@@z`Vt0>*uOg0XCIlhXxYUTcLxa>Gs{8@94l(n0XieK38jqW!7 zT1+-9VWw3+|A5T%3YEHC?+Fl(!mS;ajYLRXWE=J`Hat~r+DmXC`0Lp}j4K3UI~_O) zV@4hig&WomHcMlYGI&a9A8wK*Sgz!rIBNVJTtNi|Ola7mjG#CetaipBKadF8Fh&Gj z{z6%@tY(z#L?l_9{yF4EvvISKIhICL-9SZZLed_qr0=r@MUC_;iAW3aZ6=cvWtxnL z3|W|xL#{2sKTh!97TBzV(~mP+8&@EhkV*zEH75ZgA6ga*$@2$_VpWpa`Jvlp$08&q z3*gj^IjTY;l}09{gE#Es=WID+bB`sqB}|jIiWS@--fpM3IsZ3Fa3)=dlA|{-q8ceQ zuep>RW`)kMQH}S)3njXcih6)_V6M9N;g(}(O}}GyS|namnv6QUpyx>`ojzC!-jLOr zWQLH-0zHFRe*ZTG{EMEr_Gl4SbQeBJU7}|Ni5YrI`3c-QbW9u+w{Q0rPT?xIbwevJ zT4jK?+Jf6$IuTHQQyc}k9D58omj%;YFc^phT8;|wB$9xEUA&U>6jhK}*jzl?F#3uM zkz;>TJ+&i8`}o&3|C%p_QpcJXh^Bs><#WIZ{CaF6_=+=8nTRN(F&r|+UQ<*)T|fJS zgHSVYyQK&>QB158tg*j#Mme=>YaHhd9pTV?2rjC%oKwAu0e<0X`3>$KpQsPvn&v^gv572+k(1)E3Wb;#+A+{YJ62-j9bQN zcEC+i5qB-x2uk9vhLVkmxcVYyO;+ziy7)V`Di+N-(2kNA^@G#DBC#j?y!05X^>WBgYVdM2)XozR zWHndw8njQZtU%EN!LS5Ip zw!%kfwjZi$CQi)$&`mo0n%lT*69=c6HkPqhNuufN;kSl(uRVEv?(Z)*Z;loxAMfto z-O3TF;2UFxdMw22e{q!W?j3DD?tLXDS%4{z;r;3J%GOS*HK88wdTdI{1CBiYi?MHB zkT^5MrV(RAP5@zRXQD`OKCW+Jtp|}~T61B1M&)(hdEbPB=Qy|IF?;x6jLy2tm-4#c zveyRmE9HJ2Ccy6j7syHPj#KnF@5%M|Rj7drpN*Y5R@T6tGZEz$R zHo}xc%A|>4DPu|GxGN2NyKr@_LEuNnvQGGcDFV3nTJMa6bL z%u%FN%r7a;*_vNcD$MnyET{w_6KKOqIW9jbYH4-%AYILV|A+lv--^Y2%nE|6zG)jQ zayQq!nWE#Utw^}ArgtM`l+lKG#vjP)wGdMCSvTJV`(`r6jd7BqYnywO;$%C2og$*q zaLO1;N16)Al-}!mKhWKQC`%I7-d;Fd5jS-pSPyBexe+HN4b7_*Teh;uJ$svJ+;#TZ zfmq@{c2J|6x@USaiEnKz6FJ4Pbicri7W7O{36YykVb3oeXO?@QnOHp9|7sYSZ{c!N1-VeqVK6EdTxr zHgKRz`jySa&WtqAl;WTTpr>RVl^)uTmT|iwvHHw@_1l#xDmUv zxWv%S7OxrE0Ky&Yh&o^K(yVA50h{QeEk%}(yf+cij})<-Z$uLY?fCOY*1U2RX9^&x zX2jyo0B_TPTPo&Wt!s@Y*p^3f`T6AIwYW)dr*6k?-tO7Ik(9tX+t)_SI?!M{vy}C? z$plT|(*WUo?lOxs;yE8egW+hYy?W<(;JVw{=Vut)My}aJUFb0kFX7cFrY&@6kx}Sj zHm_xIY@^>fHe=s1z-H0N39Zzd0#DCJM-j#q*?c)|kcQKGrh>hO_s#&S+6`=9`anG(>MiJ<#-(-rqnz z$E>hTE|-uNih^4>V-gjeV}IhQS*@3;6~~~B3dy8no559ah~_3)=fY{5OR@jY%hnxF zq}%kyhamj~57S2=9Hx|z!m;s0WdG2?e^}Lq4#7afXq{;^%z{f*9@60K|3{_w@!_bb zYA>(VGMAy>N~tAJ3w=yni2rEya@*|AK3n#T93=Z^b(pi;X`U8V@`T!Xc$&vbw&QWv zG{V(jgN^w)qv3>S%rEDpZ=h9BZ+GWL}4SqSvj!ti?z-S*@kF`V30A{S!x+tWp!TK_r)M;krHd~gak00Kcbl|wLD9#=r9J$k#P z7~1NHSIOnH&v0X;C=C}wk~)%esc(-Jrzwe7em2FxoUFIa7X_LwMQSDHez494L)Vp` zu({~T129^v(-M!P>C6mG>y*>nYdA^~QT6Fw5?GOO1mCXcx=f;Cid{QrJ@5L(ty%3jJ7?HLhMGQE;Mv!s|Aufe>k=MM@?8zI7>>d`0dh&sB_xbe649mjF zjt}oH&r->|R)LmW<{36P6a$gtzJ@~MAr)73;zIb!>vcu_`QfTXpd4o@feI48c$@06 zwKT^2*El@9cIF^V+A__vPYq{X5kgz5y)Q z4Wr43UFY#cZrLReWo#X#FMRT63I2+#8_L*}n9CvIkKU2hB5aDHxX@d*Ph^CJQ)HAx zU1^iEXgB@KY@JvqVAC#mVn-OW33Ncp25VsmJym7bFo&WfScZH;qEwQDxT1JF+vMfo z)iBnB%_DBw6?DX2&h`pn!hwYTiFcXoSXj|>vF+`YYO@QUTg0!0 za}~Xc0yO?6?)9-1i2E_!ieI=$F6E9`^EZfOxL;)C5{YEH@*(&~Ur5`rE7d1SEE>6Q zTj?dH92DLP{U%Ejy%byKkUxvWTk8TUYTWD_H^~4%x=+O-C2u)qP=*p8o$fSB&MkGO z>?M{B=-@{8vs*)%5(ht(%!X2UF_`ev?=V-znvSF02d7vl;biHy$^46!dEW5v3{H33 z*h1+H2PHlsXxgkKm*O!;qsoBWHuj{%fL@m>B}ChvJlYUGe)sv$G~J(tM(->f&d&WcyYZ zUX#b>#na=Y!Z(-R?0egmczat1fhptP&BeWGRz(PWin__zi`)w1R0!5KHmJ2nszh4jkS~d%Bm1X~-aVf5Gm|)DGecHf*Tn z!x2y(Az>BTL7= zugU8CC6jgn_3!sPPlx;YWwV?bd(yJlsl(rW9dG|-^LOJ(ep`u_cD8rx{yFqCb6zJT zl;^#n?SlyYX0_Gl-3^cBv-kKPIloCc`#5y{`-9vs_Oy?Hg4|uQcnA&brP`KvyBps4 zw;|^pBg@cX3Jxdq`zy=l5aIRq_wcv5u;f<`56Atm4S)NVK0tf9roYI4)_uKubkRF8 z@#K;Dc(?qOe{&)f7Od9}wRH-27C!h~ujQyf|6M_+c-x7*%Tw|~&*%Bna<4b?+WtA{ z_v*`c*7)o*i}tqnk~_$?t`14%uaK;LsiRDEFq3lpa`rG(;m;2FNBq|#g zS3}#%$3S{1NeXEk8Fb;px3M`B+>M_|D?!nW?01xnj%Vsv*L%>kx=v8lG&qF7#FVBn zCIm60`Z}&I8T+2s_8!hHjAPA0dEAM)1U_sl8o%llr0MF8JoAwq9>vzYfp$H*NbvUU zfL8fIgC^H3l@8HDTQ6PG-C@jW;w2b`!>&R}jgdP`t&V^E5A8(>6WZSADbxOyky6L+ z<^bFGUv1CkLOq0N?vF=wsSLIpY$&h36E}`?*4TXj=jI|#gfmRe=p6>l@l+1U0IQR! z!mP7}x$!w{V=+4QnB8>{2ht#ENRZ1lr)o#r@2)0wLTMF#h0ekA;B!p(1Z?+o8}eSo z|6vhX{Q<&VKto}gAbfN4L;d@FdQ0&kU6~dTz^i&@d%D^3!T)}`UZQhVA$hqBZL|X; zDp2&fIw=(_T=FbC7K>R?Q^$|M4mc<9GRc{)j(>X~AWT{2@BEzJ?i0awGD5uaIEr>t z?vE0DW>Yr&WOn)e?keo#rfA8pvCr{YL&4f!_hH9f6_gdU;Z4D|_Oo3SpYHp&prXt= zyaiXZt(~NdoF_>hM5s3A3XBEzywpUL_TBt*v9Oly0w>m-Vez&55>IOgx@ysn0K$C+CR~fI5XO*HyNlAr~@dMr6dFBU~S#?>Nm2ufDm# zs%$tXiT+gXm(Hp?a{-i@sNMC8w-BV1YEf_}Q~z-<&)N?uJg@FA|_4 z8_Nxe$J@u}ins37;3?|z?2oc2KqwHydv9KQ)T{vg1UY!tLaat=#Z}R*xP4Iuj2P}i zNNLNDb}Al$S26_D+JQ49IUbvFFCfXO6?Ep2KWc^s*K}!>(1o_Rk4Z#M#*aMIxFLZS z+m`2UyorGn{Qm^00#^NC*V=1Sdms)R5CxuALONmRZ!011hkbzKqsBvSZ;x8T;z1U_ zpDXX-rS~*^S0NeuY6kiMJ|;acCf_}d%FVT$qpW?IM=#9#Zg*IRdFXdo2mD-rdDh{N zu?|TkAjB0*Vt7j<22v%0`;~wgpIbb}D4u2$YqIwUhd9U_e1|z;J3WUr zd?-`c&kml+44%se*06Yz1$;*8-`}mS{>hS^}sI_z?-x(qiZM*`p@TT z`;J5({_Zi}ct4xnr>FVmhmU1njWx%4g!}HR_u=mj@^Ea4wcTVo;oZmE9vAU`1vmVA z_j`jJItKo-?>|(2_+@(^TiY*_-0!l{!}k7f{=Bw@=Sn>vj!XM$tTFjX*=b+x4}bR= zC9p}awyl^y3Sf_%x!0O`cuFFqJ7r=pwFgeE2r79<8oS4d2S;&isVdLxYUZClj)O6Q z4gCVrnIQPCHP@bfX$?SHpQU$)H_8KMxlf3D9*f;olHbc-zx2VrS6NPXP~mtpUSsQJ zzG(Mk#fSBkf5Cm!c1K=(nTP4Qx8)cm<9?uD|6m)DV6vsx{2?QXzo74-0B8;&edOj99FMuemi)^ X_c!kl)uLvh``7;uvX%pgR-XX?Xexkw diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 7b359cab0860..6ddbe1c47527 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -146,7 +146,7 @@ impl RecordBatchDecoder<'_> { let null_buffer = self.next_buffer()?; // read the arrays for each field - let mut struct_arrays = vec![]; + let mut struct_arrays = Vec::with_capacity(struct_fields.len()); // TODO investigate whether just knowing the number of buffers could // still work for struct_field in struct_fields { From 0335ff403f40a76a64bbaef7c0b559a5728fbe73 Mon Sep 17 00:00:00 2001 From: Richard Date: Tue, 23 Jun 2026 23:16:23 -0400 Subject: [PATCH 04/10] re-align buffers if tonic passes up a mis-aligned buffer --- arrow-flight/src/decode.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index a5b77f7f0807..c7dff7965d8d 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -17,7 +17,7 @@ use crate::{FlightData, trailers::LazyTrailers}; use arrow_array::{ArrayRef, RecordBatch}; -use arrow_buffer::Buffer; +use arrow_buffer::{Buffer, MutableBuffer}; use arrow_ipc::reader; use arrow_schema::{Schema, SchemaRef}; use bytes::Bytes; @@ -331,6 +331,14 @@ impl FlightDataDecoder { )); }; + let data_buffer = if data.data_body.as_ptr() as usize % 64 != 0 { + let mut buf = MutableBuffer::with_capacity(data.data_body.len()); + buf.extend_from_slice(&data.data_body); + Buffer::from(buf) + } else { + Buffer::from(data.data_body.clone()) + }; + let batch = message .header_as_record_batch() .ok_or_else(|| { @@ -340,7 +348,7 @@ impl FlightDataDecoder { }) .and_then(|record_batch| { reader::read_record_batch( - &Buffer::from(data.data_body.clone()), + &data_buffer, record_batch, Arc::clone(&state.schema), &state.dictionaries_by_field, From a2c436b70ab2d180cfd59ff24e22b1f2693c5f5c Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 24 Jun 2026 10:05:01 -0400 Subject: [PATCH 05/10] make use of unsafe code clear to callers --- Cargo.lock | 1 + arrow-flight/Cargo.toml | 4 ++-- arrow-flight/src/decode.rs | 15 ++++++++------- arrow-flight/src/sql/client.rs | 3 ++- arrow-flight/src/utils.rs | 10 ++++++++-- arrow-integration-testing/Cargo.toml | 1 + .../flight_client_scenarios/integration_test.rs | 10 +++++++--- .../flight_server_scenarios/integration_test.rs | 3 ++- arrow-ipc/src/reader.rs | 15 +++++---------- 9 files changed, 36 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6a6bb511037b..14f251e2d02a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -388,6 +388,7 @@ version = "59.0.0" dependencies = [ "arrow", "arrow-buffer", + "arrow-data", "arrow-flight", "arrow-integration-test", "clap", diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index 46fcd0810315..c4541fdb6012 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -32,7 +32,7 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } # Cast is needed to work around https://github.com/apache/arrow-rs/issues/3389 arrow-cast = { workspace = true } -arrow-data = { workspace = true, optional = true } +arrow-data = { workspace = true } arrow-ipc = { workspace = true } arrow-ord = { workspace = true, optional = true } arrow-row = { workspace = true, optional = true } @@ -62,7 +62,7 @@ all-features = true [features] default = [] -flight-sql = ["dep:arrow-arith", "dep:arrow-data", "dep:arrow-ord", "dep:arrow-row", "dep:arrow-select", "dep:arrow-string", "dep:once_cell", "dep:paste"] +flight-sql = ["dep:arrow-arith", "dep:arrow-ord", "dep:arrow-row", "dep:arrow-select", "dep:arrow-string", "dep:once_cell", "dep:paste"] # TODO: Remove in the next release flight-sql-experimental = ["flight-sql"] tls-aws-lc= ["tonic/tls-aws-lc"] diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index c7dff7965d8d..bcd529b261a3 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -18,6 +18,7 @@ use crate::{FlightData, trailers::LazyTrailers}; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::{Buffer, MutableBuffer}; +use arrow_data::UnsafeFlag; use arrow_ipc::reader; use arrow_schema::{Schema, SchemaRef}; use bytes::Bytes; @@ -230,7 +231,7 @@ pub struct FlightDataDecoder { /// Seen the end of the inner stream? done: bool, /// Skip validation of decoded arrays (UTF-8, offset bounds, null counts). - skip_validation: bool, + skip_validation: UnsafeFlag, } impl Debug for FlightDataDecoder { @@ -254,14 +255,14 @@ impl FlightDataDecoder { state: None, response: response.boxed(), done: false, - skip_validation: false, + skip_validation: UnsafeFlag::new(), } } - /// Only set for trusted senders, invalid data may cause undefined behavior. - /// Can improve performance by skipping validation - pub fn with_skip_validation(mut self, skip_validation: bool) -> Self { - self.skip_validation = skip_validation; + /// # Safety + /// Invalid data may cause undefined behavior. Only use for trusted senders. + pub unsafe fn with_skip_validation(mut self) -> Self { + unsafe { self.skip_validation.set(true) }; self } @@ -354,7 +355,7 @@ impl FlightDataDecoder { &state.dictionaries_by_field, None, &message.version(), - self.skip_validation, + self.skip_validation.clone(), ) .map_err(|e| { FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs index 951b3457fd22..aa9bc47a833d 100644 --- a/arrow-flight/src/sql/client.rs +++ b/arrow-flight/src/sql/client.rs @@ -18,6 +18,7 @@ //! A FlightSQL Client [`FlightSqlServiceClient`] use arrow_buffer::Buffer; +use arrow_data::UnsafeFlag; use arrow_ipc::MessageHeader; use arrow_ipc::convert::fb_to_schema; use arrow_ipc::reader::read_record_batch; @@ -651,7 +652,7 @@ pub fn arrow_data_from_flight_data( &dictionaries_by_field, None, &ipc_message.version(), - false, + UnsafeFlag::new(), )?; Ok(ArrowFlightData::RecordBatch(record_batch)) } diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index e0852ae51638..49314e5409bc 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; +use arrow_data::UnsafeFlag; use arrow_ipc::convert::fb_to_schema; use arrow_ipc::writer::CompressionContext; use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions}; @@ -45,7 +46,12 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result, - skip_validation: bool, + skip_validation: UnsafeFlag, ) -> Result { // check that the data_header is a record batch message let message = arrow_ipc::root_as_message(&data.data_header[..]) diff --git a/arrow-integration-testing/Cargo.toml b/arrow-integration-testing/Cargo.toml index ae13d32b57a9..cb488f5ff791 100644 --- a/arrow-integration-testing/Cargo.toml +++ b/arrow-integration-testing/Cargo.toml @@ -35,6 +35,7 @@ logging = ["tracing-subscriber"] [dependencies] arrow = { path = "../arrow", default-features = false, features = ["test_utils", "ipc", "ipc_compression", "json", "ffi"] } +arrow-data = { workspace = true } arrow-flight = { path = "../arrow-flight", default-features = false } arrow-integration-test = { path = "../arrow-integration-test", default-features = false } clap = { version = "4", default-features = false, features = ["std", "derive", "help", "error-context", "usage"] } diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index d35b28cfa5e5..b1afc8aa3dd6 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -249,9 +249,13 @@ async fn consume_flight_location( let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); - let actual_batch = - flight_data_to_arrow_batch(&data, actual_schema.clone(), &dictionaries_by_id, false) - .expect("Unable to convert flight data to Arrow batch"); + let actual_batch = flight_data_to_arrow_batch( + &data, + actual_schema.clone(), + &dictionaries_by_id, + arrow_data::UnsafeFlag::new(), + ) + .expect("Unable to convert flight data to Arrow batch"); assert_eq!(actual_schema, actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index 8769b50e77ce..b75d52a0f937 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -30,6 +30,7 @@ use arrow::{ ipc::{self, reader, writer}, record_batch::RecordBatch, }; +use arrow_data::UnsafeFlag; use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage, PollInfo, PutResult, SchemaAsIpc, @@ -335,7 +336,7 @@ async fn record_batch_from_message( dictionaries_by_id, None, &message.version(), - false, + UnsafeFlag::new(), ); arrow_batch_result diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 6ddbe1c47527..e4eece2679ac 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -771,17 +771,12 @@ pub fn read_record_batch( dictionaries_by_id: &HashMap, projection: Option<&[usize]>, metadata: &MetadataVersion, - skip_validation: bool, + skip_validation: UnsafeFlag, ) -> Result { - let mut decoder = - RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? - .with_projection(projection) - .with_require_alignment(false); - if skip_validation { - let mut flag = UnsafeFlag::new(); - unsafe { flag.set(true) }; - decoder = decoder.with_skip_validation(flag); - } + let decoder = RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? + .with_projection(projection) + .with_require_alignment(false) + .with_skip_validation(skip_validation); decoder.read_record_batch() } From 1878721674ecce121e0f3de361714a83b09302af Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 24 Jun 2026 11:17:07 -0400 Subject: [PATCH 06/10] resolve FFI issues --- arrow-flight/src/decode.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index bcd529b261a3..bf099f888ca5 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -17,7 +17,7 @@ use crate::{FlightData, trailers::LazyTrailers}; use arrow_array::{ArrayRef, RecordBatch}; -use arrow_buffer::{Buffer, MutableBuffer}; +use arrow_buffer::Buffer; use arrow_data::UnsafeFlag; use arrow_ipc::reader; use arrow_schema::{Schema, SchemaRef}; @@ -332,14 +332,6 @@ impl FlightDataDecoder { )); }; - let data_buffer = if data.data_body.as_ptr() as usize % 64 != 0 { - let mut buf = MutableBuffer::with_capacity(data.data_body.len()); - buf.extend_from_slice(&data.data_body); - Buffer::from(buf) - } else { - Buffer::from(data.data_body.clone()) - }; - let batch = message .header_as_record_batch() .ok_or_else(|| { @@ -349,7 +341,7 @@ impl FlightDataDecoder { }) .and_then(|record_batch| { reader::read_record_batch( - &data_buffer, + &Buffer::from(data.data_body.clone()), record_batch, Arc::clone(&state.schema), &state.dictionaries_by_field, From 4053b0a437ac4fe4918cd67f94f4e946e777982b Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 24 Jun 2026 11:42:59 -0400 Subject: [PATCH 07/10] revert clone() to as_ref() --- arrow-flight/src/decode.rs | 34 +++++++++++----------------------- arrow-flight/src/utils.rs | 2 +- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index bf099f888ca5..32d6eab46180 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::{FlightData, trailers::LazyTrailers}; +use crate::{FlightData, trailers::LazyTrailers, utils::flight_data_to_arrow_batch}; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; use arrow_data::UnsafeFlag; -use arrow_ipc::reader; +//use arrow_ipc::reader; use arrow_schema::{Schema, SchemaRef}; use bytes::Bytes; use futures::{Stream, StreamExt, ready, stream::BoxStream}; @@ -332,27 +332,15 @@ impl FlightDataDecoder { )); }; - let batch = message - .header_as_record_batch() - .ok_or_else(|| { - FlightError::DecodeError( - "Unable to convert flight data header to a record batch".to_string(), - ) - }) - .and_then(|record_batch| { - reader::read_record_batch( - &Buffer::from(data.data_body.clone()), - record_batch, - Arc::clone(&state.schema), - &state.dictionaries_by_field, - None, - &message.version(), - self.skip_validation.clone(), - ) - .map_err(|e| { - FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) - }) - })?; + let batch = flight_data_to_arrow_batch( + &data, + Arc::clone(&state.schema), + &state.dictionaries_by_field, + self.skip_validation.clone(), + ) + .map_err(|e| { + FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) + })?; Ok(Some(DecodedFlightData::new_record_batch(data, batch))) } diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 49314e5409bc..51c4035b6223 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -77,7 +77,7 @@ pub fn flight_data_to_arrow_batch( }) .map(|batch| { reader::read_record_batch( - &Buffer::from(data.data_body.clone()), + &Buffer::from(data.data_body.as_ref()), batch, schema, dictionaries_by_id, From f2fcc0cb8df1a9361d608dbbd6578c5ea6706aaf Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 24 Jun 2026 13:33:18 -0400 Subject: [PATCH 08/10] re-introduce .clone() under if condition --- arrow-flight/src/utils.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 51c4035b6223..25c94b3f2fe3 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -76,8 +76,13 @@ pub fn flight_data_to_arrow_batch( ) }) .map(|batch| { + let buf = if data.data_body.as_ptr() as usize % 64 == 0 { + Buffer::from(data.data_body.clone()) + } else { + Buffer::from(data.data_body.as_ref()) + }; reader::read_record_batch( - &Buffer::from(data.data_body.as_ref()), + &buf, batch, schema, dictionaries_by_id, From 2d22728eeb86728f53342c030932135fb7589e89 Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 24 Jun 2026 16:05:36 -0400 Subject: [PATCH 09/10] avoid public API changes --- arrow-flight/src/decode.rs | 27 ++++++++++++------- arrow-flight/src/utils.rs | 10 +------ .../integration_test.rs | 10 +++---- .../integration_test.rs | 2 -- arrow-ipc/src/reader.rs | 20 ++++++++------ 5 files changed, 34 insertions(+), 35 deletions(-) diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index 32d6eab46180..cde416062b54 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::{FlightData, trailers::LazyTrailers, utils::flight_data_to_arrow_batch}; +use crate::{FlightData, trailers::LazyTrailers}; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; use arrow_data::UnsafeFlag; -//use arrow_ipc::reader; use arrow_schema::{Schema, SchemaRef}; use bytes::Bytes; use futures::{Stream, StreamExt, ready, stream::BoxStream}; @@ -332,15 +331,25 @@ impl FlightDataDecoder { )); }; - let batch = flight_data_to_arrow_batch( - &data, + let record_batch = message.header_as_record_batch().ok_or_else(|| { + FlightError::DecodeError( + "Unable to convert flight data header to a record batch".to_string(), + ) + })?; + let buf = if data.data_body.as_ptr() as usize % 64 == 0 { + Buffer::from(data.data_body.clone()) + } else { + Buffer::from(data.data_body.as_ref()) + }; + let batch = arrow_ipc::reader::RecordBatchDecoder::try_new( + &buf, + record_batch, Arc::clone(&state.schema), &state.dictionaries_by_field, - self.skip_validation.clone(), - ) - .map_err(|e| { - FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) - })?; + &message.version(), + )? + .with_skip_validation(self.skip_validation.clone()) + .read_record_batch()?; Ok(Some(DecodedFlightData::new_record_batch(data, batch))) } diff --git a/arrow-flight/src/utils.rs b/arrow-flight/src/utils.rs index 25c94b3f2fe3..0e38e7ed77aa 100644 --- a/arrow-flight/src/utils.rs +++ b/arrow-flight/src/utils.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use arrow_array::{ArrayRef, RecordBatch}; use arrow_buffer::Buffer; -use arrow_data::UnsafeFlag; use arrow_ipc::convert::fb_to_schema; use arrow_ipc::writer::CompressionContext; use arrow_ipc::{reader, root_as_message, writer, writer::IpcWriteOptions}; @@ -46,12 +45,7 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result, - skip_validation: UnsafeFlag, ) -> Result { // check that the data_header is a record batch message let message = arrow_ipc::root_as_message(&data.data_header[..]) @@ -88,7 +81,6 @@ pub fn flight_data_to_arrow_batch( dictionaries_by_id, None, &message.version(), - skip_validation, ) })? } diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index b1afc8aa3dd6..05ca5627ecd8 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -249,13 +249,9 @@ async fn consume_flight_location( let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); - let actual_batch = flight_data_to_arrow_batch( - &data, - actual_schema.clone(), - &dictionaries_by_id, - arrow_data::UnsafeFlag::new(), - ) - .expect("Unable to convert flight data to Arrow batch"); + let actual_batch = + flight_data_to_arrow_batch(&data, actual_schema.clone(), &dictionaries_by_id) + .expect("Unable to convert flight data to Arrow batch"); assert_eq!(actual_schema, actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index b75d52a0f937..ae316886381a 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -30,7 +30,6 @@ use arrow::{ ipc::{self, reader, writer}, record_batch::RecordBatch, }; -use arrow_data::UnsafeFlag; use arrow_flight::{ Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage, PollInfo, PutResult, SchemaAsIpc, @@ -336,7 +335,6 @@ async fn record_batch_from_message( dictionaries_by_id, None, &message.version(), - UnsafeFlag::new(), ); arrow_batch_result diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index e4eece2679ac..dd7557784e53 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -474,7 +474,7 @@ pub struct RecordBatchDecoder<'a> { impl<'a> RecordBatchDecoder<'a> { /// Create a reader for decoding arrays from an encoded [`RecordBatch`] - fn try_new( + pub fn try_new( buf: &'a Buffer, batch: crate::RecordBatch<'a>, schema: SchemaRef, @@ -530,6 +530,11 @@ impl<'a> RecordBatchDecoder<'a> { /// Specifies if validation should be skipped when reading data (defaults to `false`) /// + /// When enabled, the following checks are bypassed: + /// - Offset bounds (e.g. list/string offsets pointing past the end of their value buffer) + /// - UTF-8 validity of string columns (`Utf8` / `LargeUtf8`) + /// - Null count consistency and buffer length checks + /// /// Note this API is somewhat "funky" as it allows the caller to skip validation /// without having to use `unsafe` code. If this is ever made public /// it should be made clearer that this is a potentially unsafe by @@ -538,14 +543,15 @@ impl<'a> RecordBatchDecoder<'a> { /// # Safety /// /// Relies on the caller only passing a flag with `true` value if they are - /// certain that the data is valid - pub(crate) fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self { + /// certain that the data is valid. Invalid data that bypasses these checks + /// may cause undefined behavior when the arrays are later accessed. + pub fn with_skip_validation(mut self, skip_validation: UnsafeFlag) -> Self { self.skip_validation = skip_validation; self } /// Read the record batch, consuming the reader - fn read_record_batch(mut self) -> Result { + pub fn read_record_batch(mut self) -> Result { let mut variadic_counts: VecDeque = self .batch .variadicBufferCounts() @@ -771,13 +777,11 @@ pub fn read_record_batch( dictionaries_by_id: &HashMap, projection: Option<&[usize]>, metadata: &MetadataVersion, - skip_validation: UnsafeFlag, ) -> Result { - let decoder = RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? + RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, metadata)? .with_projection(projection) .with_require_alignment(false) - .with_skip_validation(skip_validation); - decoder.read_record_batch() + .read_record_batch() } /// Read the dictionary from the buffer and provided metadata, From e7b39949019281f4e906f896b1fd8b4c4642056f Mon Sep 17 00:00:00 2001 From: Richard Date: Wed, 24 Jun 2026 16:19:21 -0400 Subject: [PATCH 10/10] fix clippy errors --- arrow-flight/src/decode.rs | 11 ++++++++--- arrow-flight/src/sql/client.rs | 2 -- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs index cde416062b54..6d5ebb04c1d1 100644 --- a/arrow-flight/src/decode.rs +++ b/arrow-flight/src/decode.rs @@ -347,9 +347,14 @@ impl FlightDataDecoder { Arc::clone(&state.schema), &state.dictionaries_by_field, &message.version(), - )? - .with_skip_validation(self.skip_validation.clone()) - .read_record_batch()?; + ) + .and_then(|d| { + d.with_skip_validation(self.skip_validation.clone()) + .read_record_batch() + }) + .map_err(|e| { + FlightError::DecodeError(format!("Error decoding ipc RecordBatch: {e}")) + })?; Ok(Some(DecodedFlightData::new_record_batch(data, batch))) } diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs index aa9bc47a833d..5476d4ede9a4 100644 --- a/arrow-flight/src/sql/client.rs +++ b/arrow-flight/src/sql/client.rs @@ -18,7 +18,6 @@ //! A FlightSQL Client [`FlightSqlServiceClient`] use arrow_buffer::Buffer; -use arrow_data::UnsafeFlag; use arrow_ipc::MessageHeader; use arrow_ipc::convert::fb_to_schema; use arrow_ipc::reader::read_record_batch; @@ -652,7 +651,6 @@ pub fn arrow_data_from_flight_data( &dictionaries_by_field, None, &ipc_message.version(), - UnsafeFlag::new(), )?; Ok(ArrowFlightData::RecordBatch(record_batch)) }