Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3412,6 +3412,75 @@ TEST(ArrowReadWrite, FixedSizeList) {
CheckSimpleRoundtrip(table, 2, props_store_schema);
}

TEST(ArrowReadWrite, FixedSizeListNull) {
using ::arrow::field;
using ::arrow::fixed_size_list;

auto type = fixed_size_list(::arrow::int16(), /*size=*/3);

const char* json = R"([
null,
[1, 2, 3],
null,
[4, 5, 6],
null])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
CheckSimpleRoundtrip(table, 2, props_store_schema);
}

TEST(ArrowReadWrite, FixedSizeListAllNull) {
using ::arrow::field;
using ::arrow::fixed_size_list;

auto type = fixed_size_list(::arrow::int16(), /*size=*/3);

const char* json = R"([
null,
null,
null])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
CheckSimpleRoundtrip(table, 2, props_store_schema);
}

TEST(ArrowReadWrite, NestedFixedSizeList) {
using ::arrow::field;
using ::arrow::fixed_size_list;

auto type = fixed_size_list(fixed_size_list(::arrow::int16(), 2), /*size=*/2);

const char* json = R"([
[[1, 2], [3, 4]],
null,
[[5, 6], null],
[null, [7, 8]]])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
CheckSimpleRoundtrip(table, 2, props_store_schema);
}

TEST(ArrowReadWrite, ListOfFixedSizeList) {
using ::arrow::field;
using ::arrow::fixed_size_list;
using ::arrow::list;

auto type = list(fixed_size_list(::arrow::int16(), 2));

const char* json = R"([
[[1, 2], null, [3, 4]],
null,
[null, [5, 6]],
[]])";
auto array = ::arrow::ArrayFromJSON(type, json);
auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), {array});
auto props_store_schema = ArrowWriterProperties::Builder().store_schema()->build();
CheckSimpleRoundtrip(table, 2, props_store_schema);
}

TEST(ArrowReadWrite, ListOfStructOfList2) {
using ::arrow::field;
using ::arrow::list;
Expand Down
75 changes: 64 additions & 11 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

#include <algorithm>
#include <cstring>
#include <iterator>
#include <memory>
#include <random>
#include <span>
#include <unordered_set>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/array.h" // IWYU pragma: keep
#include "arrow/array/concatenate.h"
#include "arrow/buffer.h"
#include "arrow/extension_type.h"
#include "arrow/io/memory.h"
Expand All @@ -35,6 +37,7 @@
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
Expand All @@ -45,13 +48,10 @@
#include "arrow/util/type_traits.h"

#include "parquet/arrow/reader_internal.h"
#include "parquet/bloom_filter.h"
#include "parquet/bloom_filter_reader.h"
#include "parquet/column_reader.h"
#include "parquet/exception.h"
#include "parquet/file_reader.h"
#include "parquet/metadata.h"
#include "parquet/page_index.h"
#include "parquet/properties.h"
#include "parquet/schema.h"

Expand Down Expand Up @@ -725,13 +725,66 @@ class PARQUET_NO_EXPORT FixedSizeListReader : public ListReader<int32_t> {
DCHECK_EQ(data->buffers.size(), 2);
DCHECK_EQ(field()->type()->id(), ::arrow::Type::FIXED_SIZE_LIST);
const auto& type = checked_cast<::arrow::FixedSizeListType&>(*field()->type());
const int32_t* offsets = reinterpret_cast<const int32_t*>(data->buffers[1]->data());
for (int x = 1; x <= data->length; x++) {
int32_t size = offsets[x] - offsets[x - 1];
if (size != type.list_size()) {
return Status::Invalid("Expected all lists to be of size=", type.list_size(),
" but index ", x, " had size=", size);
const auto* offsets = reinterpret_cast<const int32_t*>(data->buffers[1]->data());
const int32_t list_size = type.list_size();
const uint8_t* valid_bits =
data->buffers[0] != nullptr ? data->buffers[0]->data() : nullptr;
auto validate_offsets = [&](int64_t start, int64_t length,
int32_t expected_size) -> Status {
std::span<const int32_t> run_offsets(offsets + start,
static_cast<size_t>(length + 1));
const auto first_invalid_offset = std::ranges::adjacent_find(
run_offsets,
[&](int32_t left, int32_t right) { return right - left != expected_size; });
if (first_invalid_offset != run_offsets.end()) {
const int64_t x =
start + std::ranges::distance(run_offsets.begin(), first_invalid_offset);
return Status::Invalid("Expected offset at index ", x + 1, " to be ",
offsets[x] + expected_size, " but got ", offsets[x + 1]);
}
return Status::OK();
};
if (valid_bits != nullptr) {
bool needs_padding = false;
::arrow::ArrayVector child_arrays;

auto append_child_run = [&](int64_t start, int64_t length, bool valid) -> Status {
const int64_t child_length = length * list_size;
if (!valid) {
ARROW_ASSIGN_OR_RAISE(
auto null_array,
::arrow::MakeArrayOfNull(type.value_type(), child_length, ctx_->pool));
child_arrays.push_back(std::move(null_array));
return Status::OK();
}
child_arrays.push_back(
::arrow::MakeArray(data->child_data[0]->Slice(offsets[start], child_length)));
return Status::OK();
};

auto visit_run = [&](int64_t start, int64_t length, bool valid) -> Status {
RETURN_NOT_OK(validate_offsets(start, length, valid ? list_size : 0));
if (valid && !needs_padding) {
return Status::OK();
}
if (!needs_padding) {
needs_padding = true;
if (start > 0) {
RETURN_NOT_OK(append_child_run(/*start=*/0, start, /*valid=*/true));
}
}
return append_child_run(start, length, valid);
};

RETURN_NOT_OK(::arrow::internal::VisitBitRuns(valid_bits, data->offset,
data->length, visit_run));
if (needs_padding) {
ARROW_ASSIGN_OR_RAISE(auto child_array_with_padding,
::arrow::Concatenate(child_arrays, ctx_->pool));
data->child_data[0] = child_array_with_padding->data();
}
} else {
RETURN_NOT_OK(validate_offsets(/*start=*/0, data->length, list_size));
}
data->buffers.resize(1);
std::shared_ptr<Array> result = ::arrow::MakeArray(data);
Expand Down
Loading