From 85f89dce2cbd4823b75cba59731e487fe7b3b744 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Wed, 20 May 2026 16:59:09 +0800 Subject: [PATCH 01/17] fix --- src/core/algorithm/flat/CMakeLists.txt | 7 +++++++ src/core/algorithm/flat_sparse/CMakeLists.txt | 9 +++++++++ src/core/algorithm/hnsw/CMakeLists.txt | 6 ++++++ src/core/algorithm/hnsw_rabitq/CMakeLists.txt | 6 ++++++ src/core/algorithm/hnsw_sparse/CMakeLists.txt | 6 ++++++ src/core/algorithm/ivf/CMakeLists.txt | 6 ++++++ src/core/algorithm/vamana/CMakeLists.txt | 6 ++++++ src/core/metric/CMakeLists.txt | 6 ++++++ src/core/mixed_reducer/CMakeLists.txt | 6 ++++++ src/core/quantizer/CMakeLists.txt | 6 ++++++ src/core/utility/CMakeLists.txt | 6 ++++++ 11 files changed, 70 insertions(+) diff --git a/src/core/algorithm/flat/CMakeLists.txt b/src/core/algorithm/flat/CMakeLists.txt index 4564d8ef0..60814960e 100644 --- a/src/core/algorithm/flat/CMakeLists.txt +++ b/src/core/algorithm/flat/CMakeLists.txt @@ -1,11 +1,18 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) #message(STATUS "PROJECT_ROOT_DIR = ${PROJECT_ROOT_DIR}") + +if(NOT APPLE) + set(CORE_KNN_FLAT_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_knn_flat STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS core_framework INCS . ${PROJECT_ROOT_DIR}/src/core ${PROJECT_ROOT_DIR}/src/core/algorithm ${PROJECT_ROOT_DIR}/src/core/framework + LDFLAGS "${CORE_KNN_FLAT_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/algorithm/flat_sparse/CMakeLists.txt b/src/core/algorithm/flat_sparse/CMakeLists.txt index e27d2d3ee..44766138d 100644 --- a/src/core/algorithm/flat_sparse/CMakeLists.txt +++ b/src/core/algorithm/flat_sparse/CMakeLists.txt @@ -1,11 +1,20 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +# --exclude-libs is GNU ld / LLVM lld only; Apple ld does not support it. +# On macOS (Mach-O), symbol interposition works differently and the +# Arrow/Parquet double-free issue does not apply. +if(NOT APPLE) + set(CORE_KNN_FLAT_SPARSE_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_knn_flat_sparse STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS core_framework INCS . ${PROJECT_ROOT_DIR}/src/core ${PROJECT_ROOT_DIR}/src/core/algorithm + LDFLAGS "${CORE_KNN_FLAT_SPARSE_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/algorithm/hnsw/CMakeLists.txt b/src/core/algorithm/hnsw/CMakeLists.txt index f4a105402..cfd1147f4 100644 --- a/src/core/algorithm/hnsw/CMakeLists.txt +++ b/src/core/algorithm/hnsw/CMakeLists.txt @@ -1,11 +1,17 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +if(NOT APPLE) + set(CORE_KNN_HNSW_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_knn_hnsw STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS core_framework sparsehash INCS . ${PROJECT_ROOT_DIR}/src/core ${PROJECT_ROOT_DIR}/src/core/algorithm + LDFLAGS "${CORE_KNN_HNSW_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/algorithm/hnsw_rabitq/CMakeLists.txt b/src/core/algorithm/hnsw_rabitq/CMakeLists.txt index ed547dc76..09ce72f55 100644 --- a/src/core/algorithm/hnsw_rabitq/CMakeLists.txt +++ b/src/core/algorithm/hnsw_rabitq/CMakeLists.txt @@ -11,11 +11,17 @@ if(AUTO_DETECT_ARCH) endforeach() endif() +if(NOT APPLE) + set(CORE_KNN_HNSW_RABITQ_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_knn_hnsw_rabitq STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS core_framework rabitqlib sparsehash INCS . ${PROJECT_ROOT_DIR}/src ${PROJECT_ROOT_DIR}/src/core ${PROJECT_ROOT_DIR}/src/core/algorithm + LDFLAGS "${CORE_KNN_HNSW_RABITQ_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) \ No newline at end of file diff --git a/src/core/algorithm/hnsw_sparse/CMakeLists.txt b/src/core/algorithm/hnsw_sparse/CMakeLists.txt index fe26d10e1..15295b485 100644 --- a/src/core/algorithm/hnsw_sparse/CMakeLists.txt +++ b/src/core/algorithm/hnsw_sparse/CMakeLists.txt @@ -1,11 +1,17 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +if(NOT APPLE) + set(CORE_KNN_HNSW_SPARSE_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_knn_hnsw_sparse STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS core_framework sparsehash INCS . ${PROJECT_ROOT_DIR}/src/core ${PROJECT_ROOT_DIR}/src/core/algorithm + LDFLAGS "${CORE_KNN_HNSW_SPARSE_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/algorithm/ivf/CMakeLists.txt b/src/core/algorithm/ivf/CMakeLists.txt index ffcf30949..8e3872f31 100644 --- a/src/core/algorithm/ivf/CMakeLists.txt +++ b/src/core/algorithm/ivf/CMakeLists.txt @@ -1,10 +1,16 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +if(NOT APPLE) + set(CORE_KNN_IVF_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_knn_ivf STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS zvec_ailego core_framework core_knn_cluster INCS . ${PROJECT_ROOT_DIR}/src/core ${PROJECT_ROOT_DIR}/src/core/algorithm + LDFLAGS "${CORE_KNN_IVF_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/algorithm/vamana/CMakeLists.txt b/src/core/algorithm/vamana/CMakeLists.txt index 8e5bbda1e..b2feaf9c1 100644 --- a/src/core/algorithm/vamana/CMakeLists.txt +++ b/src/core/algorithm/vamana/CMakeLists.txt @@ -1,11 +1,17 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +if(NOT APPLE) + set(CORE_KNN_VAMANA_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_knn_vamana STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS core_framework core_knn_hnsw sparsehash INCS . ${PROJECT_ROOT_DIR}/src/core ${PROJECT_ROOT_DIR}/src/core/algorithm + LDFLAGS "${CORE_KNN_VAMANA_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/metric/CMakeLists.txt b/src/core/metric/CMakeLists.txt index 55dfc901e..2918b909b 100644 --- a/src/core/metric/CMakeLists.txt +++ b/src/core/metric/CMakeLists.txt @@ -1,11 +1,17 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +if(NOT APPLE) + set(CORE_METRIC_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_metric STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS zvec_ailego zvec_turbo core_framework INCS . ${PROJECT_ROOT_DIR}/src/core + LDFLAGS "${CORE_METRIC_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/mixed_reducer/CMakeLists.txt b/src/core/mixed_reducer/CMakeLists.txt index e9566456e..e7204f0f7 100644 --- a/src/core/mixed_reducer/CMakeLists.txt +++ b/src/core/mixed_reducer/CMakeLists.txt @@ -1,10 +1,16 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +if(NOT APPLE) + set(CORE_MIX_REDUCER_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_mix_reducer STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS zvec_ailego core_framework INCS . ${PROJECT_ROOT_DIR}/src/core + LDFLAGS "${CORE_MIX_REDUCER_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/quantizer/CMakeLists.txt b/src/core/quantizer/CMakeLists.txt index 21a03e449..80b4f612a 100644 --- a/src/core/quantizer/CMakeLists.txt +++ b/src/core/quantizer/CMakeLists.txt @@ -1,11 +1,17 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +if(NOT APPLE) + set(CORE_QUANTIZER_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_quantizer STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS zvec_ailego core_framework INCS . ${PROJECT_ROOT_DIR}/src/core + LDFLAGS "${CORE_QUANTIZER_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) diff --git a/src/core/utility/CMakeLists.txt b/src/core/utility/CMakeLists.txt index 99cf87ca2..7c3adf702 100644 --- a/src/core/utility/CMakeLists.txt +++ b/src/core/utility/CMakeLists.txt @@ -1,11 +1,17 @@ include(${PROJECT_ROOT_DIR}/cmake/bazel.cmake) include(${PROJECT_ROOT_DIR}/cmake/option.cmake) +if(NOT APPLE) + set(CORE_UTILITY_LDFLAGS + "-Wl,--exclude-libs,libparquet.a:libarrow.a:libarrow_bundled_dependencies.a") +endif() + cc_library( NAME core_utility STATIC SHARED STRICT ALWAYS_LINK SRCS *.cc LIBS zvec_ailego core_framework INCS . ${PROJECT_ROOT_DIR}/src/core + LDFLAGS "${CORE_UTILITY_LDFLAGS}" VERSION "${PROXIMA_ZVEC_VERSION}" ) From 7da12289a9c01d0ea22ad2c33a9421c1b7bbaf52 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Mon, 1 Jun 2026 21:29:24 +0800 Subject: [PATCH 02/17] buffer read storage --- src/core/interface/indexes/ivf_index.cc | 15 +- src/core/utility/buffer_read_storage.cc | 409 ++++++++++++++++++++++++ src/core/utility/utility_params.h | 8 + 3 files changed, 424 insertions(+), 8 deletions(-) create mode 100644 src/core/utility/buffer_read_storage.cc diff --git a/src/core/interface/indexes/ivf_index.cc b/src/core/interface/indexes/ivf_index.cc index 1b91eebea..5fb71faec 100644 --- a/src/core/interface/indexes/ivf_index.cc +++ b/src/core/interface/indexes/ivf_index.cc @@ -84,20 +84,19 @@ int IVFIndex::Open(const std::string &file_path, break; } case StorageOptions::StorageType::kBufferPool: { - // NOTE: IVF index is dumped via FileDumper (plain binary file), which is - // not compatible with BufferStorage's IndexFormat layout (header/footer - // chain). Until IVF gains a BufferStorage-aware dump path, fall back to - // MMapFileReadStorage so the freshly-dumped file can be reopened. - storage_ = core::IndexFactory::CreateStorage("MMapFileReadStorage"); + // IVF index is dumped via FileDumper (FileDumper container layout). + // BufferReadStorage parses that layout through IndexUnpacker (same as + // MMapFileReadStorage) but serves reads through a VecBufferPool, so the + // freshly-dumped file can be reopened with buffer-pool memory control. + storage_ = core::IndexFactory::CreateStorage("BufferReadStorage"); if (storage_ == nullptr) { - LOG_ERROR( - "Failed to create MMapFileReadStorage (IVF buffer-pool fallback)"); + LOG_ERROR("Failed to create BufferReadStorage (IVF buffer-pool)"); return core::IndexError_Runtime; } int ret = storage_->init(storage_params); if (ret != 0) { LOG_ERROR( - "Failed to init MMapFileReadStorage (IVF buffer-pool fallback), " + "Failed to init BufferReadStorage (IVF buffer-pool), " "path: %s, err: %s", file_path_.c_str(), core::IndexError::What(ret)); return ret; diff --git a/src/core/utility/buffer_read_storage.cc b/src/core/utility/buffer_read_storage.cc new file mode 100644 index 000000000..b1d68b69e --- /dev/null +++ b/src/core/utility/buffer_read_storage.cc @@ -0,0 +1,409 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// BufferReadStorage is a read-only IndexStorage that mirrors the structure of +// MMapFileReadStorage (it parses the FileDumper container layout through +// IndexUnpacker and exposes segment-based access), but instead of mmap-ing the +// file it reads through a VecBufferPool. This lets IVF / DiskANN(Vamana) +// indexes -- which are dumped via FileDumper -- benefit from the buffer-pool's +// paged cache + LRU eviction + memory-budget control, while keeping the same +// Segment interface that those indexes already consume. +#include +#include +#include +#include +#include +#include +#include +#include "utility_params.h" + +namespace zvec { +namespace core { + +/*! Buffer Read Storage (backed by VecBufferPool) + */ +class BufferReadStorage : public IndexStorage { + public: + /*! Buffer Read Storage Segment + * + * Each segment keeps the owning VecBufferPool / VecBufferPoolHandle alive + * (shared_ptr) so that pages it reads remain valid for the segment's + * lifetime. Reads go through the pool's paged cache: + * - fetch() -> read_range into the caller's buffer + * - read(const void**) -> read_range into a per-segment buffer (stable + * pointer, never pins a page) + * - read(MemoryBlock&) -> single page: zero-copy pin tied to the + * MemoryBlock lifecycle; cross page: owned copy + * - read(SegmentData*) -> read_range into the per-segment buffer + */ + class Segment : public IndexStorage::Segment, + public std::enable_shared_from_this { + public: + //! Index Storage Pointer + typedef std::shared_ptr Pointer; + + //! Constructor + Segment(const std::shared_ptr &pool, + const std::shared_ptr &handle, + size_t index_offset, const IndexUnpacker::SegmentMeta &segment) + : data_offset_(index_offset + segment.data_offset()), + data_size_(segment.data_size()), + padding_size_(segment.padding_size()), + region_size_(segment.data_size() + segment.padding_size()), + data_crc_(segment.data_crc()), + pool_(pool), + handle_(handle) {} + + //! Constructor (clone) + Segment(const Segment &rhs) + : data_offset_(rhs.data_offset_), + data_size_(rhs.data_size_), + padding_size_(rhs.padding_size_), + region_size_(rhs.region_size_), + data_crc_(rhs.data_crc_), + pool_(rhs.pool_), + handle_(rhs.handle_) {} + + //! Destructor + ~Segment(void) override {} + + //! Retrieve size of data + size_t data_size(void) const override { + return data_size_; + } + + //! Retrieve crc of data + uint32_t data_crc(void) const override { + return data_crc_; + } + + //! Retrieve size of padding + size_t padding_size(void) const override { + return padding_size_; + } + + //! Retrieve capacity of segment + size_t capacity(void) const override { + return region_size_; + } + + //! Fetch data from segment (copies into the caller-owned buffer) + size_t fetch(size_t offset, void *buf, size_t len) const override { + if (ailego_unlikely(offset + len > region_size_)) { + if (offset > region_size_) { + offset = region_size_; + } + len = region_size_ - offset; + } + if (len == 0) { + return 0; + } + if (!handle_->read_range(data_offset_ + offset, len, + static_cast(buf))) { + LOG_ERROR( + "BufferReadStorage::Segment::fetch: read_range failed, " + "abs_offset=%zu, len=%zu", + data_offset_ + offset, len); + return 0; + } + return len; + } + + //! Read data from segment (stable pointer via per-segment buffer) + size_t read(size_t offset, const void **data, size_t len) override { + if (ailego_unlikely(offset + len > region_size_)) { + if (offset > region_size_) { + offset = region_size_; + } + len = region_size_ - offset; + } + if (len == 0) { + *data = buffer_.data(); + return 0; + } + buffer_.reserve(len); + if (!handle_->read_range(data_offset_ + offset, len, + reinterpret_cast(buffer_.data()))) { + LOG_ERROR( + "BufferReadStorage::Segment::read: read_range failed, " + "abs_offset=%zu, len=%zu", + data_offset_ + offset, len); + *data = nullptr; + return 0; + } + *data = buffer_.data(); + return len; + } + + //! Read data from segment into a MemoryBlock + size_t read(size_t offset, MemoryBlock &data, size_t len) override { + if (ailego_unlikely(offset + len > region_size_)) { + if (offset > region_size_) { + offset = region_size_; + } + len = region_size_ - offset; + } + size_t abs_offset = data_offset_ + offset; + size_t first_page = abs_offset / ailego::kVectorPageSize; + size_t last_page = (len == 0) + ? first_page + : (abs_offset + len - 1) / ailego::kVectorPageSize; + if (first_page == last_page) { + // Single-page: zero-copy pin whose release is tied to the + // MemoryBlock lifecycle (release_one on destruction). + size_t page_id = 0; + char *raw = handle_->get_single_page(abs_offset, len, page_id); + if (!raw) { + LOG_ERROR( + "BufferReadStorage::Segment::read(MemoryBlock&): single-page " + "acquire failed, abs_offset=%zu, len=%zu", + abs_offset, len); + return 0; + } + data.reset(handle_.get(), page_id, raw); + return len; + } + // Cross-page: copy into a freshly-allocated 4K-aligned buffer that the + // MemoryBlock owns (freed via ailego_free on destruction). + static constexpr size_t kAlign = 4096UL; + size_t alloc_size = (len + (kAlign - 1UL)) & ~(kAlign - 1UL); + char *tmp = + static_cast(ailego_aligned_malloc(alloc_size, kAlign)); + if (!tmp) { + LOG_ERROR( + "BufferReadStorage::Segment::read(MemoryBlock&): cross-page alloc " + "failed, abs_offset=%zu, len=%zu", + abs_offset, len); + return 0; + } + if (!handle_->read_range(abs_offset, len, tmp)) { + ailego_free(tmp); + LOG_ERROR( + "BufferReadStorage::Segment::read(MemoryBlock&): cross-page " + "read_range failed, abs_offset=%zu, len=%zu", + abs_offset, len); + return 0; + } + data = MemoryBlock::MakeOwned(tmp, len); + return len; + } + + //! Read scattered data from segment (stable pointers via per-segment buf) + bool read(SegmentData *iovec, size_t count) override { + size_t total = 0u; + for (auto *it = iovec, *end = iovec + count; it != end; ++it) { + ailego_false_if_false(it->offset + it->length <= region_size_); + total += it->length; + } + ailego_false_if_false(total != 0); + + buffer_.reserve(total); + uint8_t *buf = buffer_.data(); + for (auto *it = iovec, *end = iovec + count; it != end; ++it) { + ailego_false_if_false( + handle_->read_range(data_offset_ + it->offset, it->length, + reinterpret_cast(buf))); + it->data = buf; + buf += it->length; + } + return true; + } + + size_t write(size_t, const void *, size_t) override { + return IndexError_NotImplemented; + } + + size_t resize(size_t) override { + return IndexError_NotImplemented; + } + + void update_data_crc(uint32_t) override { + return; + } + + //! Clone the segment + IndexStorage::Segment::Pointer clone(void) override { + return std::make_shared(*this); + } + + //! No stable base pointer: data lives in an evictable paged cache. + const uint8_t *base_data(void) const override { + return nullptr; + } + + private: + size_t data_offset_{0u}; + size_t data_size_{0u}; + size_t padding_size_{0u}; + size_t region_size_{0u}; + uint32_t data_crc_{0u}; + std::vector buffer_{}; + std::shared_ptr pool_{nullptr}; + std::shared_ptr handle_{nullptr}; + }; + + //! Destructor + ~BufferReadStorage(void) override {} + + //! Initialize container + int init(const ailego::Params ¶ms) override { + params.get(BUFFER_READ_STORAGE_CHECKSUM_VALIDATION, &checksum_validation_); + params.get(BUFFER_READ_STORAGE_HEADER_OFFSET, &header_offset_); + params.get(BUFFER_READ_STORAGE_FOOTER_OFFSET, &footer_offset_); + return 0; + } + + int flush(void) override { + return 0; + } + + int append(const std::string &, size_t) override { + return IndexError_NotImplemented; + } + + void refresh(uint64_t) override { + return; + } + + uint64_t check_point(void) const override { + return 0; + } + + //! Cleanup container + int cleanup(void) override { + return this->close(); + } + + //! Load an index file into the container + int open(const std::string &path, bool) override { + // Read-only buffer pool over the freshly-dumped FileDumper container. + buffer_pool_ = std::make_shared(path, + /*writable=*/false); + if (!buffer_pool_) { + LOG_ERROR("Failed to create VecBufferPool, path: %s", path.c_str()); + return IndexError_NoMemory; + } + handle_ = std::make_shared( + buffer_pool_->get_handle()); + + size_t file_size = buffer_pool_->file_size(); + index_offset_ = (header_offset_ >= 0 ? 0 : file_size) + header_offset_; + size_t end_offset = (footer_offset_ > 0 ? 0 : file_size) + footer_offset_; + size_t size = end_offset > index_offset_ ? end_offset - index_offset_ : 0; + + // read_data for IndexUnpacker: provide a stable pointer by copying the + // requested range into a reused scratch buffer via get_meta (direct + // pread, valid before buffer_pool_->init()). + auto read_data = [this, end_offset](size_t offset, const void **data, + size_t len) -> size_t { + size_t off = offset + index_offset_; + if (off + len > end_offset) { + if (off > end_offset) { + off = end_offset; + } + len = end_offset - off; + } + scratch_.reserve(len); + *data = scratch_.data(); + if (len == 0) { + return 0; + } + if (handle_->get_meta(off, len, + reinterpret_cast(scratch_.data())) != 0) { + return 0; + } + return len; + }; + + IndexUnpacker unpacker; + if (!unpacker.unpack(read_data, size, checksum_validation_)) { + LOG_ERROR("Failed to unpack file: %s", path.c_str()); + return IndexError_UnpackIndex; + } + segments_ = std::move(*unpacker.mutable_segments()); + magic_ = unpacker.magic(); + + // Allocate the page table now that the layout is known. + int ret = buffer_pool_->init(); + if (ret != 0) { + LOG_ERROR("Failed to init VecBufferPool, path: %s", path.c_str()); + return IndexError_Runtime; + } + return 0; + } + + int close(void) override { + segments_.clear(); + handle_ = nullptr; + buffer_pool_ = nullptr; + return 0; + } + + //! Retrieve a segment by id + IndexStorage::Segment::Pointer get(const std::string &id, int) override { + if (!buffer_pool_ || !handle_) { + return IndexStorage::Segment::Pointer(); + } + auto it = segments_.find(id); + if (it == segments_.end()) { + return IndexStorage::Segment::Pointer(); + } + return std::make_shared( + buffer_pool_, handle_, index_offset_, it->second); + } + + std::map get_all( + void) const override { + std::map result; + if (buffer_pool_ && handle_) { + for (const auto &it : segments_) { + result.emplace(it.first, + std::make_shared( + buffer_pool_, handle_, index_offset_, it.second)); + } + } + return result; + } + + //! Test if a segment exists + bool has(const std::string &id) const override { + return (segments_.find(id) != segments_.end()); + } + + //! Retrieve magic number of index + uint32_t magic(void) const override { + return magic_; + } + + //! Reads go through the VecBufferPool paged cache. + MemoryBlock::MemoryBlockType memory_block_type(void) const override { + return MemoryBlock::MBT_BUFFERPOOL; + } + + private: + bool checksum_validation_{false}; + int64_t header_offset_{0}; + int64_t footer_offset_{0}; + size_t index_offset_{0}; + uint32_t magic_{0}; + std::vector scratch_{}; + std::map segments_{}; + std::shared_ptr buffer_pool_{nullptr}; + std::shared_ptr handle_{nullptr}; +}; + +INDEX_FACTORY_REGISTER_STORAGE(BufferReadStorage); + +} // namespace core +} // namespace zvec diff --git a/src/core/utility/utility_params.h b/src/core/utility/utility_params.h index c57e6e980..ebd08c504 100644 --- a/src/core/utility/utility_params.h +++ b/src/core/utility/utility_params.h @@ -60,6 +60,14 @@ static const std::string MMAPFILE_READ_STORAGE_HEADER_OFFSET = static const std::string MMAPFILE_READ_STORAGE_FOOTER_OFFSET = "proxima.mmap_file.container.footer_offset"; +//! BufferReadStorage (read-only storage backed by VecBufferPool) +static const std::string BUFFER_READ_STORAGE_CHECKSUM_VALIDATION = + "proxima.buffer.read_storage.checksum_validation"; +static const std::string BUFFER_READ_STORAGE_HEADER_OFFSET = + "proxima.buffer.read_storage.header_offset"; +static const std::string BUFFER_READ_STORAGE_FOOTER_OFFSET = + "proxima.buffer.read_storage.footer_offset"; + //! MMapFileStorage static const std::string MMAPFILE_STORAGE_MEMORY_LOCKED = "proxima.mmap_file.storage.memory_locked"; From c577132a3e24bce057510d900742db65377164e4 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Tue, 16 Jun 2026 10:44:07 +0800 Subject: [PATCH 03/17] upd --- src/ailego/buffer/vector_page_table.cc | 83 ++++++++++++++++--- src/core/utility/buffer_storage.cc | 8 +- src/core/utility/utility_params.h | 4 + .../zvec/ailego/buffer/vector_page_table.h | 9 +- 4 files changed, 88 insertions(+), 16 deletions(-) diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index c9296d640..21d9c2484 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -272,27 +273,71 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, } } -VecBufferPool::VecBufferPool(const std::string &filename, bool writable) { +VecBufferPool::VecBufferPool(const std::string &filename, bool writable, + bool enable_direct_io) { file_name_ = filename; writable_ = writable; #if defined(_MSC_VER) int flags = writable_ ? (O_RDWR | _O_BINARY) : (O_RDONLY | _O_BINARY); fd_ = _open(filename.c_str(), flags, 0644); + meta_fd_ = _open(filename.c_str(), flags, 0644); + (void)enable_direct_io; // O_DIRECT not supported on this path #else - int flags = writable_ ? O_RDWR : O_RDONLY; - fd_ = ::open(filename.c_str(), flags, 0644); + int base_flags = writable_ ? O_RDWR : O_RDONLY; + // Metadata channel: always buffered IO. Serves the unaligned + // header/footer/segment_meta reads & writes and benefits from page cache. + meta_fd_ = ::open(filename.c_str(), base_flags, 0644); + // Page-data channel: optionally O_DIRECT; fall back to buffered open when + // the filesystem (tmpfs/overlayfs/...) rejects O_DIRECT. + int data_flags = base_flags; +#ifdef O_DIRECT + if (enable_direct_io) { + data_flags |= O_DIRECT; + } +#endif + fd_ = ::open(filename.c_str(), data_flags, 0644); +#ifdef O_DIRECT + if (fd_ < 0 && (data_flags & O_DIRECT)) { + LOG_WARN( + "VecBufferPool: open with O_DIRECT failed for file[%s] (errno=%d), " + "falling back to buffered IO", + filename.c_str(), errno); + fd_ = ::open(filename.c_str(), base_flags, 0644); + direct_io_enabled_ = false; + } else { + direct_io_enabled_ = (data_flags & O_DIRECT) != 0; + } +#else + (void)enable_direct_io; +#endif +#endif + if (fd_ < 0 || meta_fd_ < 0) { + if (fd_ >= 0) { +#if defined(_MSC_VER) + _close(fd_); +#else + ::close(fd_); #endif - if (fd_ < 0) { + } + if (meta_fd_ >= 0) { +#if defined(_MSC_VER) + _close(meta_fd_); +#else + ::close(meta_fd_); +#endif + } throw std::runtime_error("Failed to open file: " + filename); } #if defined(_MSC_VER) struct _stat64 st; if (_fstat64(fd_, &st) < 0) { _close(fd_); + _close(meta_fd_); #else struct stat st; if (fstat(fd_, &st) < 0) { ::close(fd_); + ::close(meta_fd_); #endif throw std::runtime_error("Failed to stat file: " + filename); } @@ -375,16 +420,24 @@ char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) { } size_t page_offset = page_id * kVectorPageSize; - size_t expected_bytes = std::min(kVectorPageSize, file_size_ - page_offset); - if (expected_bytes < kVectorPageSize) { - std::memset(buffer + expected_bytes, 0, kVectorPageSize - expected_bytes); - } - ssize_t read_bytes = zvec_pread(fd_, buffer, expected_bytes, page_offset); - if (read_bytes != static_cast(expected_bytes)) { + // O_DIRECT requires the IO length to be a multiple of the device block + // size. The backing file size is always page-aligned (IndexMapping + + // append_segment guarantee this), so reading a full page never reads past + // EOF; the tail padding is the file's own zero region. In direct mode we + // MUST read the whole page; the buffered path keeps the legacy short-read + // + zero-pad behaviour. + size_t read_len = direct_io_enabled_ + ? kVectorPageSize + : std::min(kVectorPageSize, file_size_ - page_offset); + if (read_len < kVectorPageSize) { + std::memset(buffer + read_len, 0, kVectorPageSize - read_len); + } + ssize_t read_bytes = zvec_pread(fd_, buffer, read_len, page_offset); + if (read_bytes != static_cast(read_len)) { LOG_ERROR( "Buffer pool failed to read file at offset: file[%s], page_id[%zu], " "offset[%zu], expected[%zu], got[%zd]", - file_name_.c_str(), page_id, page_offset, expected_bytes, read_bytes); + file_name_.c_str(), page_id, page_offset, read_len, read_bytes); MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); return nullptr; } @@ -392,7 +445,7 @@ char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) { } int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) { - ssize_t read_bytes = zvec_pread(fd_, buffer, length, offset); + ssize_t read_bytes = zvec_pread(meta_fd_, buffer, length, offset); if (read_bytes != static_cast(length)) { LOG_ERROR( "Buffer pool failed to read file at offset: file[%s], offset[%zu], " @@ -446,7 +499,7 @@ int VecBufferPool::write_meta(size_t offset, size_t length, file_name_.c_str()); return -1; } - ssize_t w = zvec_pwrite(fd_, buffer, length, offset); + ssize_t w = zvec_pwrite(meta_fd_, buffer, length, offset); if (w != static_cast(length)) { LOG_ERROR( "Buffer pool failed to write meta: file[%s], offset[%zu], " @@ -495,6 +548,10 @@ bool VecBufferPool::extend_file(size_t new_size) { if (new_size <= file_size_) { return true; } + // The backing file must stay page-aligned so that O_DIRECT full-page reads + // never read past EOF. All current callers pass page-aligned targets. + assert(new_size % kVectorPageSize == 0 && + "extend_file target must be page-aligned for O_DIRECT correctness"); // Pre-validate against the page table's static capacity BEFORE mutating // any on-disk state. Otherwise a successful ftruncate followed by a // failed page_table_.extend() would leave the file size and the page diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index c7cba5e50..ac4eb731f 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -481,6 +481,7 @@ class BufferStorage : public IndexStorage { if (val != 0) { segment_meta_capacity_ = val; } + params.get(BUFFER_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_); return 0; } @@ -509,7 +510,8 @@ class BufferStorage : public IndexStorage { // Open in writable mode when the caller expects to modify the index // (create_if_missing=true implies write intent, same as MMapFileStorage). buffer_pool_ = std::make_shared( - path, /*writable=*/create_if_missing); + path, /*writable=*/create_if_missing, + /*enable_direct_io=*/enable_direct_io_); buffer_pool_handle_ = std::make_shared( buffer_pool_->get_handle()); int ret = ParseToMapping(); @@ -1525,6 +1527,10 @@ class BufferStorage : public IndexStorage { // init_index(). uint32_t segment_meta_capacity_{4096u}; + // When true, the page-data fd is opened with O_DIRECT (metadata fd stays + // buffered). Defaults to false: identical behaviour to the legacy path. + bool enable_direct_io_{false}; + // Per-header-chain file offsets used by flush_index() and append_segment(). struct MetaChain { uint64_t header_start_offset; diff --git a/src/core/utility/utility_params.h b/src/core/utility/utility_params.h index c57e6e980..1b8ba2cef 100644 --- a/src/core/utility/utility_params.h +++ b/src/core/utility/utility_params.h @@ -72,6 +72,10 @@ static const std::string MMAPFILE_STORAGE_FORCE_FLUSH = static const std::string MMAPFILE_STORAGE_SEGMENT_META_CAPACITY = "proxima.mmap_file.storage.segment_meta_capacity"; +//! BufferStorage +static const std::string BUFFER_STORAGE_ENABLE_DIRECT_IO = + "proxima.buffer.storage.enable_direct_io"; + //! MipsConverter static const std::string MIPS_CONVERTER_M_VALUE = "proxima.mips.converter.m_value"; diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 8bcc13e99..6437a71ab 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -201,7 +201,8 @@ class VecBufferPool { static constexpr size_t kMutexBucketCount = 64UL * 1024UL; - VecBufferPool(const std::string &filename, bool writable = false); + VecBufferPool(const std::string &filename, bool writable = false, + bool enable_direct_io = false); ~VecBufferPool() { // Flush any remaining dirty blocks before tearing down memory/fd so that // writes are not silently lost. Safe to call even in read-only mode. @@ -212,8 +213,10 @@ class VecBufferPool { } #if defined(_MSC_VER) _close(fd_); + _close(meta_fd_); #else close(fd_); + close(meta_fd_); #endif } @@ -253,10 +256,12 @@ class VecBufferPool { } private: - int fd_; + int fd_; // page-data channel: may carry O_DIRECT + int meta_fd_; // metadata channel: always buffered IO size_t file_size_; std::string file_name_; bool writable_{false}; + bool direct_io_enabled_{false}; // whether O_DIRECT actually took effect public: VectorPageTable page_table_; From ef0b1b393608d59eb41052f7103bb3fd192f5089 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Tue, 16 Jun 2026 14:29:47 +0800 Subject: [PATCH 04/17] fix --- src/core/utility/buffer_read_storage.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/utility/buffer_read_storage.cc b/src/core/utility/buffer_read_storage.cc index b1d68b69e..bdf5227b2 100644 --- a/src/core/utility/buffer_read_storage.cc +++ b/src/core/utility/buffer_read_storage.cc @@ -67,7 +67,8 @@ class BufferReadStorage : public IndexStorage { //! Constructor (clone) Segment(const Segment &rhs) - : data_offset_(rhs.data_offset_), + : std::enable_shared_from_this(), + data_offset_(rhs.data_offset_), data_size_(rhs.data_size_), padding_size_(rhs.padding_size_), region_size_(rhs.region_size_), From 5fc30a3c2eeeecfa5de15313c4770da66b6182fb Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Wed, 17 Jun 2026 19:51:15 +0800 Subject: [PATCH 05/17] upd --- src/ailego/CMakeLists.txt | 9 + src/ailego/buffer/vector_page_table.cc | 59 +++++++ src/ailego/io/aligned_async_io.cc | 140 +++++++++++++++ src/ailego/io/aligned_async_io.h | 44 +++++ src/core/algorithm/hnsw/hnsw_algorithm.cc | 103 +++++------ .../algorithm/hnsw/hnsw_streamer_entity.cc | 34 ++++ .../algorithm/hnsw/hnsw_streamer_entity.h | 163 +++++++++++++++++- src/core/utility/buffer_storage.cc | 11 ++ .../zvec/ailego/buffer/vector_page_table.h | 8 + .../zvec/core/framework/index_storage.h | 8 + 10 files changed, 513 insertions(+), 66 deletions(-) create mode 100644 src/ailego/io/aligned_async_io.cc create mode 100644 src/ailego/io/aligned_async_io.h diff --git a/src/ailego/CMakeLists.txt b/src/ailego/CMakeLists.txt index 628027cf1..087cc5fee 100644 --- a/src/ailego/CMakeLists.txt +++ b/src/ailego/CMakeLists.txt @@ -17,6 +17,10 @@ set(EXTRA_DEFS) if(UNIX AND NOT APPLE) list(APPEND EXTRA_LIBS ${LIB_RT}) + find_library(LIB_AIO NAMES aio) + if(LIB_AIO) + list(APPEND EXTRA_LIBS ${LIB_AIO}) + endif() endif() if(BUILD_ZVEC_CORE_ONLY) @@ -134,3 +138,8 @@ cc_library( DEFS ${EXTRA_DEFS} VERSION "${GIT_SRCS_VER}" ) + +if(LIB_AIO) + target_compile_definitions(zvec_ailego PUBLIC ZVEC_HAS_LIBAIO) + message(STATUS "Found libaio: ${LIB_AIO} (HNSW async prefetch enabled)") +endif() diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index 21d9c2484..695aa2d40 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -20,6 +20,7 @@ #include #include #include +#include "../io/aligned_async_io.h" #if defined(_MSC_VER) #ifndef NOMINMAX @@ -674,5 +675,63 @@ void VecBufferPoolHandle::acquire_one(block_id_t block_id) { pool_.page_table_.acquire_block(block_id); } +void VecBufferPool::batch_prefetch(const block_id_t *page_ids, size_t count) { +#ifdef ZVEC_HAS_LIBAIO + if (count == 0) return; + + static thread_local ScopedIOContext tl_io_ctx; + if (!tl_io_ctx.valid()) return; + + std::vector reads; + std::vector> pending; + reads.reserve(count); + pending.reserve(count); + + for (size_t i = 0; i < count; ++i) { + block_id_t pid = page_ids[i]; + if (pid >= page_table_.entry_num()) continue; + char *existing = page_table_.acquire_block(pid); + if (existing) { + page_table_.release_block(pid); + continue; + } + char *buf = nullptr; + bool found = + MemoryLimitPool::get_instance().try_acquire_buffer(kVectorPageSize, buf); + if (!found || !buf) continue; + + AlignedRead rd; + rd.offset = static_cast(pid) * kVectorPageSize; + rd.len = kVectorPageSize; + rd.buf = buf; + reads.push_back(rd); + pending.emplace_back(pid, buf); + } + + if (reads.empty()) return; + + int rc = execute_aligned_io(tl_io_ctx.get(), fd_, reads); + if (rc != 0) { + for (auto &p : pending) { + MemoryLimitPool::get_instance().release_buffer(p.second, kVectorPageSize); + } + return; + } + + for (auto &p : pending) { + block_id_t pid = p.first; + char *buf = p.second; + size_t page_offset = static_cast(pid) * kVectorPageSize; + std::lock_guard lock( + block_mutexes_[pid % VecBufferPool::kMutexBucketCount]); + page_table_.set_block_acquired(pid, buf, page_offset); + page_table_.release_block(pid); + } +#else + (void)page_ids; + (void)count; +#endif +} + } // namespace ailego } // namespace zvec \ No newline at end of file diff --git a/src/ailego/io/aligned_async_io.cc b/src/ailego/io/aligned_async_io.cc new file mode 100644 index 000000000..83ac90ba0 --- /dev/null +++ b/src/ailego/io/aligned_async_io.cc @@ -0,0 +1,140 @@ +#include "aligned_async_io.h" + +#include +#include + +#if defined(__linux__) && defined(ZVEC_HAS_LIBAIO) +#include + +namespace zvec { +namespace ailego { + +static int execute_io_pread(int fd, std::vector &reads) { + for (auto &r : reads) { + ssize_t got = ::pread(fd, r.buf, r.len, static_cast(r.offset)); + if (got != static_cast(r.len)) { + LOG_ERROR( + "pread fallback failed: fd=%d, offset=%llu, len=%llu, got=%zd, " + "errno=%d", + fd, (unsigned long long)r.offset, (unsigned long long)r.len, got, + errno); + return -1; + } + } + return 0; +} + +static constexpr int kMaxEvents = 128; + +ScopedIOContext::ScopedIOContext() { + ctx_ = 0; + int ret = io_setup(kMaxEvents, &ctx_); + if (ret == 0) { + valid_ = true; + } else { + LOG_WARN("io_setup failed (ret=%d, errno=%d); prefetch disabled on thread", + ret, errno); + valid_ = false; + } +} + +ScopedIOContext::~ScopedIOContext() { + if (valid_) { + io_destroy(ctx_); + valid_ = false; + } +} + +int execute_aligned_io(IOContext ctx, int fd, std::vector &reads, + uint64_t n_retries) { + if (reads.empty()) return 0; + + std::vector cbs(reads.size()); + std::vector cb_ptrs(reads.size()); + std::vector events(reads.size()); + + for (size_t i = 0; i < reads.size(); ++i) { + io_prep_pread(&cbs[i], fd, reads[i].buf, reads[i].len, + static_cast(reads[i].offset)); + cb_ptrs[i] = &cbs[i]; + } + + int n_submitted = 0; + int total = static_cast(reads.size()); + + for (uint64_t attempt = 0; attempt <= n_retries; ++attempt) { + while (n_submitted < total) { + int batch = std::min(total - n_submitted, kMaxEvents); + int ret = io_submit(ctx, batch, cb_ptrs.data() + n_submitted); + if (ret >= 0) { + n_submitted += ret; + } else if (ret == -EINTR) { + continue; + } else if (ret == -EAGAIN) { + continue; + } else { + LOG_WARN("io_submit failed: ret=%d, errno=%d; fallback to pread", ret, + errno); + return execute_io_pread(fd, reads); + } + } + if (n_submitted == total) break; + } + + if (n_submitted != total) { + LOG_WARN("io_submit incomplete after retries; fallback to pread"); + return execute_io_pread(fd, reads); + } + + int n_collected = 0; + while (n_collected < total) { + int ret = + io_getevents(ctx, total - n_collected, total - n_collected, events.data() + n_collected, nullptr); + if (ret > 0) { + n_collected += ret; + } else if (ret == -EINTR) { + continue; + } else { + LOG_WARN("io_getevents failed: ret=%d; fallback to pread", ret); + return execute_io_pread(fd, reads); + } + } + + for (int i = 0; i < total; ++i) { + if (static_cast(events[i].res) != reads[i].len) { + LOG_WARN( + "aio short read: expected=%llu, got=%lld; fallback to pread", + (unsigned long long)reads[i].len, (long long)events[i].res); + return execute_io_pread(fd, reads); + } + } + + return 0; +} + +} // namespace ailego +} // namespace zvec + +#else + +namespace zvec { +namespace ailego { + +ScopedIOContext::ScopedIOContext() { valid_ = false; } +ScopedIOContext::~ScopedIOContext() {} + +int execute_aligned_io(IOContext, int fd, std::vector &reads, + uint64_t) { + for (auto &r : reads) { + ssize_t got = ::pread(fd, r.buf, r.len, static_cast(r.offset)); + if (got != static_cast(r.len)) { + return -1; + } + } + return 0; +} + +} // namespace ailego +} // namespace zvec + +#endif diff --git a/src/ailego/io/aligned_async_io.h b/src/ailego/io/aligned_async_io.h new file mode 100644 index 000000000..ac486be14 --- /dev/null +++ b/src/ailego/io/aligned_async_io.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include + +#ifdef ZVEC_HAS_LIBAIO +#include +#endif + +namespace zvec { +namespace ailego { + +struct AlignedRead { + uint64_t offset; + uint64_t len; + void *buf; +}; + +#ifdef ZVEC_HAS_LIBAIO +using IOContext = io_context_t; +#else +using IOContext = uint32_t; +#endif + +class ScopedIOContext { + public: + ScopedIOContext(); + ~ScopedIOContext(); + ScopedIOContext(const ScopedIOContext &) = delete; + ScopedIOContext &operator=(const ScopedIOContext &) = delete; + + bool valid() const { return valid_; } + IOContext &get() { return ctx_; } + + private: + IOContext ctx_{}; + bool valid_{false}; +}; + +int execute_aligned_io(IOContext ctx, int fd, std::vector &reads, + uint64_t n_retries = 3); + +} // namespace ailego +} // namespace zvec diff --git a/src/core/algorithm/hnsw/hnsw_algorithm.cc b/src/core/algorithm/hnsw/hnsw_algorithm.cc index 8c6fcfe17..df85438f7 100644 --- a/src/core/algorithm/hnsw/hnsw_algorithm.cc +++ b/src/core/algorithm/hnsw/hnsw_algorithm.cc @@ -176,7 +176,10 @@ void HnswAlgorithm::add_neighbors(node_id_t id, level_t level, // // Two specialized inner loops, dispatched from search_neighbors(): // -// fast_search_neighbors: mmap/contiguous with direct vector pointers. +// fast_search_neighbors: level-0 unfiltered search for all storage +// modes (mmap/BufferPool/contiguous). Vector +// resolution delegated to entity via +// resolve_vectors()/release_vectors(). // Uses BlockHeap (AVX2) or LinearPool (scalar) // for visited tracking and top-k maintenance. // dual_heap_search_neighbors: CandidateHeap + TopkHeap + VisitFilter. @@ -184,16 +187,13 @@ void HnswAlgorithm::add_neighbors(node_id_t id, level_t level, // search, upper levels, and BufferPool fallback. // ============================================================================ -// mmap/contiguous variant: resolve vectors via get_vector_ptr and use -// LinearPool or BlockHeap for visited tracking + top-k maintenance. -// HeapType must expose reset/set_visited/check_visited/push_block/has_next/pop. template void fast_search_neighbors(const EntityType &entity, HeapType &pool, VisitFilter &visit, HnswDistCalculator &dc, uint32_t topk, uint32_t ef, node_id_t entry_point, dist_t entry_dist, uint32_t prefetch_lines, uint32_t prefetch_offset) { - const uint32_t max_deg = entity.max_degree(0); // level 0 only + const uint32_t max_deg = entity.max_degree(0); const uint32_t cap = std::max(topk, ef); pool.reset(static_cast(cap), static_cast(max_deg)); visit.clear(); @@ -219,41 +219,35 @@ void fast_search_neighbors(const EntityType &entity, HeapType &pool, neighbor_vecs.resize(buf_capacity); } - const uint32_t po = - std::min(static_cast(neighbors.size()), prefetch_offset); uint32_t unvisited_count = 0; - uint32_t i = 0; - - // Phase 1: scan first `po` neighbors with prefetch. - for (; i < po; ++i) { + for (uint32_t i = 0; i < neighbors.size(); ++i) { node_id_t node = neighbors[i]; if (visit.visited(node)) continue; visit.set_visited(node); - const void *vec_ptr = entity.get_vector_ptr(node); - const char *p = reinterpret_cast(vec_ptr); + neighbor_ids[unvisited_count++] = node; + } + + if (unvisited_count == 0) continue; + + if (ailego_unlikely(entity.resolve_vectors(neighbor_ids.data(), + unvisited_count, + neighbor_vecs.data()) != 0)) + break; + + const uint32_t po = std::min(prefetch_offset, unvisited_count); + for (uint32_t i = 0; i < po; ++i) { + const char *p = static_cast(neighbor_vecs[i]); for (uint32_t cl = 0; cl < prefetch_lines; ++cl) { ailego_prefetch(p + cl * 64); } - neighbor_ids[unvisited_count] = node; - neighbor_vecs[unvisited_count] = vec_ptr; - unvisited_count++; - } - - // Phase 2: scan remaining neighbors. - for (; i < neighbors.size(); ++i) { - node_id_t node = neighbors[i]; - if (visit.visited(node)) continue; - visit.set_visited(node); - neighbor_ids[unvisited_count] = node; - neighbor_vecs[unvisited_count] = entity.get_vector_ptr(node); - unvisited_count++; } - if (unvisited_count == 0) continue; dc.batch_dist(neighbor_vecs.data(), unvisited_count, dists.data()); pool.push_block(dists.data(), neighbor_ids.data(), static_cast(unvisited_count)); + + entity.release_vectors(); } } @@ -331,6 +325,10 @@ void dual_heap_search_neighbors(const EntityType &entity, level_t level, continue; } + if constexpr (std::is_same_v) { + entity.prefetch_vectors(neighbor_ids.data(), size); + } + neighbor_vec_blocks.clear(); int ret = entity.get_vector_typed(neighbor_ids.data(), size, neighbor_vec_blocks); @@ -379,9 +377,7 @@ void dual_heap_search_neighbors(const EntityType &entity, level_t level, // search_neighbors: Dispatch to fast or dual-heap path. // // - add_node / filtered / upper levels → dual_heap_search_neighbors -// - level-0 unfiltered search: -// MmapMemoryBlock → fast_search_neighbors (BlockHeap/LinearPool) -// BufferPool → dual_heap_search_neighbors (fallback) +// - level-0 unfiltered search → fast_search_neighbors // ============================================================================ template void HnswAlgorithm::search_neighbors(level_t level, @@ -393,7 +389,6 @@ void HnswAlgorithm::search_neighbors(level_t level, HnswDistCalculator &dc = ctx->dist_calculator(); if (!use_pool || ctx->filter().is_valid() || level != 0) { - // Dual-heap path: add_node, filtered search, or upper-level scan. auto run_with_filter = [&](auto &&filter) { dual_heap_search_neighbors( entity, level, entry_point, dist, topk, ctx, dc, @@ -410,36 +405,24 @@ void HnswAlgorithm::search_neighbors(level_t level, run_with_filter(filter); } } else { - // Pool-based path for level-0 unfiltered search. - if constexpr (std::is_same_v) { - const uint32_t prefetch_lines = - ctx->pl() > 0 ? ctx->pl() : (entity.vector_size() + 63) / 64; - - // Fast path: direct pointer access via get_vector_ptr. - // BlockHeap (AVX2) or LinearPool (scalar) for top-k tracking. - const uint32_t topk_v = static_cast(ctx->topk()); - const uint32_t ef_v = ctx->ef(); - const bool avx2_ok = - zvec::ailego::internal::CpuFeatures::static_flags_.AVX2; - - auto &visit = ctx->visit_filter(); - - if (avx2_ok) { - auto &bpool = ctx->block_pool(); - fast_search_neighbors(entity, bpool, visit, dc, topk_v, ef_v, - *entry_point, *dist, prefetch_lines, ctx->po()); - copy_pool_to_topk(bpool, topk); - } else { - auto &lpool = ctx->pool(); - fast_search_neighbors(entity, lpool, visit, dc, topk_v, ef_v, - *entry_point, *dist, prefetch_lines, ctx->po()); - copy_pool_to_topk(lpool, topk); - } + const uint32_t prefetch_lines = + ctx->pl() > 0 ? ctx->pl() : (entity.vector_size() + 63) / 64; + const uint32_t topk_v = static_cast(ctx->topk()); + const uint32_t ef_v = ctx->ef(); + const bool avx2_ok = + zvec::ailego::internal::CpuFeatures::static_flags_.AVX2; + auto &visit = ctx->visit_filter(); + + if (avx2_ok) { + auto &bpool = ctx->block_pool(); + fast_search_neighbors(entity, bpool, visit, dc, topk_v, ef_v, + *entry_point, *dist, prefetch_lines, ctx->po()); + copy_pool_to_topk(bpool, topk); } else { - // BufferPool entities: fallback to dual-heap path. - auto filter = [](node_id_t) { return false; }; - dual_heap_search_neighbors( - entity, level, entry_point, dist, topk, ctx, dc, filter); + auto &lpool = ctx->pool(); + fast_search_neighbors(entity, lpool, visit, dc, topk_v, ef_v, + *entry_point, *dist, prefetch_lines, ctx->po()); + copy_pool_to_topk(lpool, topk); } } } diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.cc b/src/core/algorithm/hnsw/hnsw_streamer_entity.cc index 50f15c3ff..2e8099b37 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.cc +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.cc @@ -811,6 +811,40 @@ const HnswEntity::Pointer HnswMmapStreamerEntity::clone() const { return HnswEntity::Pointer(entity); } +const HnswEntity::Pointer HnswBufferPoolStreamerEntity::clone() const { + std::vector node_chunks; + node_chunks.reserve(node_chunks_.size()); + for (size_t i = 0UL; i < node_chunks_.size(); ++i) { + node_chunks.emplace_back(node_chunks_[i]->clone()); + if (ailego_unlikely(!node_chunks[i])) { + LOG_ERROR("HnswBufferPoolStreamerEntity get chunk failed in clone"); + return HnswEntity::Pointer(); + } + } + + std::vector upper_neighbor_chunks; + upper_neighbor_chunks.reserve(upper_neighbor_chunks_.size()); + for (size_t i = 0UL; i < upper_neighbor_chunks_.size(); ++i) { + upper_neighbor_chunks.emplace_back(upper_neighbor_chunks_[i]->clone()); + if (ailego_unlikely(!upper_neighbor_chunks[i])) { + LOG_ERROR("HnswBufferPoolStreamerEntity get chunk failed in clone"); + return HnswEntity::Pointer(); + } + } + + auto *entity = new (std::nothrow) HnswBufferPoolStreamerEntity( + stats_, header(), chunk_size_, node_index_mask_bits_, + upper_neighbor_mask_bits_, filter_same_key_, get_vector_enabled_, + upper_neighbor_index_, upper_neighbor_rw_mutex_, keys_map_lock_, + keys_map_, use_key_info_map_, std::move(node_chunks), + std::move(upper_neighbor_chunks), broker_, node_chunk_bases_, + upper_neighbor_chunk_bases_); + if (ailego_unlikely(!entity)) { + LOG_ERROR("HnswBufferPoolStreamerEntity new failed"); + } + return HnswEntity::Pointer(entity); +} + const HnswEntity::Pointer HnswContiguousStreamerEntity::clone() const { std::vector node_chunks; node_chunks.reserve(node_chunks_.size()); diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index 013ad8a6f..bfe51890e 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -14,7 +14,11 @@ #pragma once +#include +#include #include +#include +#include #include #include #include @@ -859,14 +863,21 @@ class HnswMmapStreamerEntity : public HnswStreamerEntity { return *reinterpret_cast(base + offset); } - //! Direct vector pointer access (no MemoryBlock wrapper). - //! For use in the merged search loop to avoid intermediate allocations. ailego_force_inline const void *get_vector_ptr(node_id_t id) const { uint32_t chunk_idx = id >> node_index_mask_bits_; uint32_t offset = (id & node_index_mask_) * node_size(); return get_node_chunk_base(chunk_idx) + offset; } + ailego_force_inline int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + for (uint32_t i = 0; i < count; ++i) + out[i] = get_vector_ptr(ids[i]); + return 0; + } + + ailego_force_inline void release_vectors() const {} + protected: //! Get cached base address for a node chunk, syncing if needed ailego_force_inline const char *get_node_chunk_base( @@ -912,7 +923,6 @@ class HnswMmapStreamerEntity : public HnswStreamerEntity { mutable std::vector upper_neighbor_chunk_bases_{}; }; -//! Typed entity subclass for buffer pool mode. class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { public: using MemoryBlock = BufferPoolMemoryBlock; @@ -924,6 +934,8 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { return HnswStorageMode::kBufferPool; } + const HnswEntity::Pointer clone() const override; + inline TypedNeighbors get_neighbors_typed(level_t level, node_id_t id) const { return HnswStreamerEntity::get_neighbors_typed(level, id); @@ -939,6 +951,139 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { inline key_t get_key_typed(node_id_t id) const { return HnswStreamerEntity::get_key_typed(id); } + + int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + ensure_pinned_pages(); + if (ailego_unlikely(!pinned_pages_.bound())) return -1; + const size_t vec_sz = vector_size(); + const size_t pg_sz = ailego::kVectorPageSize; + cross_page_used_ = 0; + if (cross_page_arena_.size() < count * vec_sz) + cross_page_arena_.resize(count * vec_sz); + for (uint32_t i = 0; i < count; ++i) { + const size_t abs_off = get_vector_abs_offset(ids[i]); + const auto page_id = static_cast(abs_off / pg_sz); + const size_t intra = abs_off % pg_sz; + if (ailego_likely(intra + vec_sz <= pg_sz)) { + char *page = pinned_pages_.get_page(page_id); + if (ailego_unlikely(!page)) return -1; + out[i] = page + intra; + } else { + const size_t part1 = pg_sz - intra; + char *p1 = pinned_pages_.get_page(page_id); + char *p2 = pinned_pages_.get_page(page_id + 1); + if (ailego_unlikely(!p1 || !p2)) return -1; + char *scratch = cross_page_arena_.data() + cross_page_used_ * vec_sz; + ++cross_page_used_; + std::memcpy(scratch, p1 + intra, part1); + std::memcpy(scratch + part1, p2, vec_sz - part1); + out[i] = scratch; + } + } + return 0; + } + + void release_vectors() const { + pinned_pages_.release_all(); + } + + void prefetch_vectors(const node_id_t *ids, uint32_t count) const { + if (count == 0) return; + ailego::VecBufferPool *pool = vec_buffer_pool(); + if (!pool) return; + std::vector page_ids; + page_ids.reserve(count); + for (uint32_t i = 0; i < count; ++i) { + auto loc = get_vector_chunk_loc(ids[i]); + size_t abs_off = + node_chunks_[loc.first]->abs_data_offset() + loc.second; + size_t pg = abs_off / ailego::kVectorPageSize; + page_ids.push_back(static_cast(pg)); + } + std::sort(page_ids.begin(), page_ids.end()); + page_ids.erase(std::unique(page_ids.begin(), page_ids.end()), + page_ids.end()); + pool->batch_prefetch(page_ids.data(), page_ids.size()); + } + + private: + struct PinnedPageSet { + static constexpr size_t kCapacity = 128; + static constexpr size_t kMask = kCapacity - 1; + static constexpr ailego::block_id_t kEmpty = + std::numeric_limits::max(); + + PinnedPageSet() { reset_table(); } + ~PinnedPageSet() { release_all(); } + PinnedPageSet(const PinnedPageSet &) = delete; + PinnedPageSet &operator=(const PinnedPageSet &) = delete; + + void bind(ailego::VecBufferPool *pool) { pool_ = pool; } + bool bound() const { return pool_ != nullptr; } + + char *get_page(ailego::block_id_t page_id) { + size_t slot = static_cast(page_id) & kMask; + for (;;) { + if (ids_[slot] == page_id) return bufs_[slot]; + if (ids_[slot] == kEmpty) { + char *buf = pool_->acquire_buffer(page_id, 50); + if (ailego_unlikely(!buf)) return nullptr; + ids_[slot] = page_id; + bufs_[slot] = buf; + ++count_; + return buf; + } + slot = (slot + 1) & kMask; + } + } + + void release_all() { + if (!pool_) return; + for (size_t i = 0; i < kCapacity; ++i) { + if (ids_[i] != kEmpty) { + pool_->page_table_.release_block(ids_[i]); + ids_[i] = kEmpty; + bufs_[i] = nullptr; + } + } + count_ = 0; + } + + private: + void reset_table() { + std::fill_n(ids_, kCapacity, kEmpty); + std::fill_n(bufs_, kCapacity, nullptr); + count_ = 0; + } + ailego::VecBufferPool *pool_{nullptr}; + ailego::block_id_t ids_[kCapacity]; + char *bufs_[kCapacity]; + size_t count_{0}; + }; + + ailego::VecBufferPool *vec_buffer_pool() const { + if (broker_ && broker_->storage()) { + return broker_->storage()->vec_buffer_pool(); + } + return nullptr; + } + + size_t get_vector_abs_offset(node_id_t id) const { + auto loc = get_vector_chunk_loc(id); + return node_chunks_[loc.first]->abs_data_offset() + loc.second; + } + + void ensure_pinned_pages() const { + if (!pinned_pages_.bound()) { + auto *pool = vec_buffer_pool(); + if (pool) pinned_pages_.bind(pool); + } + } + + mutable PinnedPageSet pinned_pages_; + mutable std::vector cross_page_arena_; + mutable uint32_t cross_page_used_{0}; }; //! Typed entity subclass for contiguous memory mode. @@ -1048,18 +1193,24 @@ class HnswContiguousStreamerEntity : public HnswMmapStreamerEntity { return HnswMmapStreamerEntity::get_key_typed(id); } - //! Direct vector pointer from flat vector array (stride = vector_size). - //! For use in the merged search loop to avoid intermediate allocations. ailego_force_inline const void *get_vector_ptr(node_id_t id) const { if (ailego_likely(vector_base_ != nullptr)) { return vector_base_ + static_cast(id) * vector_size(); } - // Fallback to mmap chunk-based access uint32_t chunk_idx = id >> node_index_mask_bits_; uint32_t offset = (id & node_index_mask_) * node_size(); return get_node_chunk_base(chunk_idx) + offset; } + ailego_force_inline int resolve_vectors(const node_id_t *ids, uint32_t count, + const void **out) const { + for (uint32_t i = 0; i < count; ++i) + out[i] = get_vector_ptr(ids[i]); + return 0; + } + + ailego_force_inline void release_vectors() const {} + protected: //! Custom deleter for contiguous memory (munmap / _aligned_free / free) //! Used by shared_ptr to properly release mmap'd memory. diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index ac4eb731f..cb2389eb3 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -449,6 +449,12 @@ class BufferStorage : public IndexStorage { return shared_from_this(); } + size_t abs_data_offset(void) const override { + return segment_info_->segment_header_start_offset + + segment_info_->segment_header->content_offset + + segment_info_->segment.meta()->data_index; + } + protected: friend BufferStorage; // Pointer into BufferStorage::segments_ (unordered_map mapped value). @@ -482,6 +488,7 @@ class BufferStorage : public IndexStorage { segment_meta_capacity_ = val; } params.get(BUFFER_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_); + enable_direct_io_ = true; return 0; } @@ -806,6 +813,10 @@ class BufferStorage : public IndexStorage { return chain_headers_.front()->magic; } + ailego::VecBufferPool *vec_buffer_pool(void) const override { + return buffer_pool_.get(); + } + protected: //! Initialize index version segment (writes content into an IndexMapping). //! Only intended to be called from init_index() while `mapping` is still diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 6437a71ab..a88917adc 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -255,6 +255,10 @@ class VecBufferPool { return file_size_; } + void batch_prefetch(const block_id_t *page_ids, size_t count); + + int fd() const { return fd_; } + private: int fd_; // page-data channel: may carry O_DIRECT int meta_fd_; // metadata channel: always buffered IO @@ -297,6 +301,10 @@ class VecBufferPoolHandle { void acquire_one(block_id_t block_id); + void batch_prefetch(const block_id_t *page_ids, size_t count) { + pool_.batch_prefetch(page_ids, count); + } + private: VecBufferPool &pool_; }; diff --git a/src/include/zvec/core/framework/index_storage.h b/src/include/zvec/core/framework/index_storage.h index 9a7ad3633..ad27aa0eb 100644 --- a/src/include/zvec/core/framework/index_storage.h +++ b/src/include/zvec/core/framework/index_storage.h @@ -332,6 +332,10 @@ class IndexStorage : public IndexModule { virtual const uint8_t *base_data(void) const { return nullptr; } + + virtual size_t abs_data_offset(void) const { + return 0; + } }; //! Destructor @@ -394,6 +398,10 @@ class IndexStorage : public IndexModule { virtual std::string file_path(void) const { return ""; } + + virtual ailego::VecBufferPool *vec_buffer_pool(void) const { + return nullptr; + } }; } // namespace core From c38d04c83085578a3a84f55e985648f46db772af Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Wed, 17 Jun 2026 21:02:28 +0800 Subject: [PATCH 06/17] rm dead code --- src/core/algorithm/hnsw/hnsw_algorithm.cc | 4 ---- .../algorithm/hnsw/hnsw_streamer_entity.h | 19 ------------------- 2 files changed, 23 deletions(-) diff --git a/src/core/algorithm/hnsw/hnsw_algorithm.cc b/src/core/algorithm/hnsw/hnsw_algorithm.cc index df85438f7..50d558bda 100644 --- a/src/core/algorithm/hnsw/hnsw_algorithm.cc +++ b/src/core/algorithm/hnsw/hnsw_algorithm.cc @@ -325,10 +325,6 @@ void dual_heap_search_neighbors(const EntityType &entity, level_t level, continue; } - if constexpr (std::is_same_v) { - entity.prefetch_vectors(neighbor_ids.data(), size); - } - neighbor_vec_blocks.clear(); int ret = entity.get_vector_typed(neighbor_ids.data(), size, neighbor_vec_blocks); diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index d2b30493d..667840698 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -988,25 +988,6 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { pinned_pages_.release_all(); } - void prefetch_vectors(const node_id_t *ids, uint32_t count) const { - if (count == 0) return; - ailego::VecBufferPool *pool = vec_buffer_pool(); - if (!pool) return; - std::vector page_ids; - page_ids.reserve(count); - for (uint32_t i = 0; i < count; ++i) { - auto loc = get_vector_chunk_loc(ids[i]); - size_t abs_off = - node_chunks_[loc.first]->abs_data_offset() + loc.second; - size_t pg = abs_off / ailego::kVectorPageSize; - page_ids.push_back(static_cast(pg)); - } - std::sort(page_ids.begin(), page_ids.end()); - page_ids.erase(std::unique(page_ids.begin(), page_ids.end()), - page_ids.end()); - pool->batch_prefetch(page_ids.data(), page_ids.size()); - } - private: struct PinnedPageSet { static constexpr size_t kCapacity = 128; From 50560ab9779f03074c0c62e4d624a7b9bf80a273 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Mon, 22 Jun 2026 19:57:33 +0800 Subject: [PATCH 07/17] fix --- src/ailego/buffer/vector_page_table.cc | 39 ++++++++------------------ 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index 13d55b212..238053e3c 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -146,17 +146,19 @@ bool VectorPageTable::extend(size_t new_entry_num) { char *VectorPageTable::acquire_block(block_id_t block_id) { assert(block_id < entry_num_.load(std::memory_order_relaxed)); Entry &e = entry_at(block_id); - while (true) { - int current_count = e.ref_count.load(std::memory_order_acquire); - if (current_count < 0) { - return nullptr; - } - if (e.ref_count.compare_exchange_weak(current_count, current_count + 1, - std::memory_order_acq_rel, - std::memory_order_acquire)) { - return e.buffer; + int old = e.ref_count.fetch_add(1, std::memory_order_acq_rel); + if (ailego_likely(old >= 0)) { + return e.buffer; + } + int cur = old + 1; + while (cur < 0 && cur != std::numeric_limits::min()) { + if (e.ref_count.compare_exchange_weak(cur, cur - 1, + std::memory_order_relaxed, + std::memory_order_relaxed)) { + break; } } + return nullptr; } void VectorPageTable::release_block(block_id_t block_id) { @@ -182,12 +184,7 @@ void VectorPageTable::evict_block(block_id_t block_id) { assert(block_id < entry_num_.load(std::memory_order_relaxed)); Entry &e = entry_at(block_id); int expected = 0; - // Two-phase eviction to prevent data race on e.buffer with - // set_block_acquired. We first CAS to kEvicting (-1), which causes - // set_block_acquired to spin-wait; then do the actual work (flush, free, - // null buffer); finally store INT_MIN ("evicted") which unblocks - // set_block_acquired. - static constexpr int kEvicting = -1; + static constexpr int kEvicting = std::numeric_limits::min() / 2; if (e.ref_count.compare_exchange_strong(expected, kEvicting)) { char *buffer = e.buffer; if (buffer && e.is_dirty.load(std::memory_order_relaxed) && @@ -199,8 +196,6 @@ void VectorPageTable::evict_block(block_id_t block_id) { e.buffer = nullptr; MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); } - // Transition to fully-evicted state. Use release so that the - // set_block_acquired acquire-load sees e.buffer == nullptr. e.ref_count.store(std::numeric_limits::min(), std::memory_order_release); } @@ -211,11 +206,6 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, size_t file_offset) { assert(block_id < entry_num_.load(std::memory_order_acquire)); Entry &e = entry_at(block_id); - // Diagnostics for the kEvicting wait. The wait itself never gives up: - // the only thread that can transition kEvicting -> INT_MIN is the - // evict_block() owner, so abandoning the spin here would orphan the - // entry in kEvicting forever. Instead, we use bounded backoff and emit - // tiered logs so a stuck eviction is observable. using clock = std::chrono::steady_clock; const auto wait_start = clock::now(); auto last_log = wait_start; @@ -231,7 +221,6 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, return e.buffer; } } else if (current_count == std::numeric_limits::min()) { - // Fully evicted — safe to claim this entry for our new buffer. e.buffer = buffer; e.file_offset = file_offset; e.in_evict_queue.store(false, std::memory_order_relaxed); @@ -239,11 +228,8 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, e.ref_count.store(1, std::memory_order_release); return e.buffer; } else { - // kEvicting (-1): eviction is in progress on this entry. - // Tiered backoff: hot spin first, then short sleep, then longer sleep. ++spin_count; if (spin_count < 64) { - // Pure busy wait for the common ~μs case. } else if (spin_count < 1024) { std::this_thread::yield(); } else if (spin_count < 8192) { @@ -251,7 +237,6 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } - // Tiered diagnostics: warn once after 100ms, error every 1s after 1s. const auto now = clock::now(); const auto elapsed = now - wait_start; if (!warned && elapsed >= std::chrono::milliseconds(100)) { From 3e0c044ef015429e0d50b6418d59183113571771 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Tue, 23 Jun 2026 15:36:17 +0800 Subject: [PATCH 08/17] upd --- src/ailego/buffer/vector_page_table.cc | 51 +++++++- src/core/algorithm/hnsw/hnsw_algorithm.cc | 80 +++++++++---- src/core/algorithm/hnsw/hnsw_streamer.cc | 5 + .../algorithm/hnsw/hnsw_streamer_entity.h | 113 +++++++++++++++++- src/core/utility/buffer_storage.cc | 1 + .../zvec/ailego/buffer/vector_page_table.h | 31 +++++ 6 files changed, 252 insertions(+), 29 deletions(-) diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index 238053e3c..01f71f23c 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -175,7 +175,8 @@ void VectorPageTable::release_block(block_id_t block_id) { block.owner = this; block.owner_key = block_id; block.version = 0; - BlockEvictionQueue::get_instance().add_single_block(block, 0); + BlockEvictionQueue::get_instance().add_single_block( + block, static_cast(e.evict_priority)); } } } @@ -719,5 +720,53 @@ void VecBufferPool::batch_prefetch(const block_id_t *page_ids, size_t count) { #endif } +void VecBufferPool::warmup() { + const size_t total_pages = page_table_.entry_num(); + // Read in large sequential chunks to minimize syscall overhead. + // Each chunk = 1024 pages = 4MB (maximize sequential I/O throughput). + static constexpr size_t kChunkPages = 1024; + const size_t kChunkSize = kChunkPages * kVectorPageSize; + + // Aligned buffer for bulk read (O_DIRECT requires alignment). + char *chunk_buf = static_cast(aligned_alloc(4096, kChunkSize)); + if (!chunk_buf) return; + + size_t loaded = 0; + bool pool_full = false; + for (size_t base = 0; base < total_pages && !pool_full; base += kChunkPages) { + const size_t pages_in_chunk = std::min(kChunkPages, total_pages - base); + const size_t read_bytes = pages_in_chunk * kVectorPageSize; + const size_t file_offset = base * kVectorPageSize; + + // One large sequential pread instead of N individual ones. + ssize_t got = zvec_pread(fd_, chunk_buf, read_bytes, file_offset); + if (got != static_cast(read_bytes)) break; + + // Distribute chunk data into individual page buffers. + for (size_t j = 0; j < pages_in_chunk; ++j) { + auto page_id = static_cast(base + j); + // Skip if already loaded. + char *existing = page_table_.acquire_block(page_id); + if (existing) { + page_table_.release_block(page_id); + ++loaded; + continue; + } + // Allocate page buffer from pool (no retry - stop if full). + char *buf = nullptr; + bool found = MemoryLimitPool::get_instance().try_acquire_buffer( + kVectorPageSize, buf); + if (!found) { pool_full = true; break; } + std::memcpy(buf, chunk_buf + j * kVectorPageSize, kVectorPageSize); + page_table_.set_block_acquired(page_id, buf, file_offset + j * kVectorPageSize); + page_table_.release_block(page_id); + ++loaded; + } + } + free(chunk_buf); + LOG_INFO("VecBufferPool::warmup: preloaded %zu/%zu pages for file[%s]", + loaded, total_pages, file_name_.c_str()); +} + } // namespace ailego } // namespace zvec diff --git a/src/core/algorithm/hnsw/hnsw_algorithm.cc b/src/core/algorithm/hnsw/hnsw_algorithm.cc index 50d558bda..a8cc7a160 100644 --- a/src/core/algorithm/hnsw/hnsw_algorithm.cc +++ b/src/core/algorithm/hnsw/hnsw_algorithm.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. #include "hnsw_algorithm.h" +#include #include namespace zvec { @@ -81,8 +82,12 @@ int HnswAlgorithm::search(HnswContext *ctx) const { } dist_t dist = ctx->dist_calculator().dist(entry_point); + const auto &upper_entity = + static_cast(ctx->get_entity()); + upper_entity.reset_io_budget(INT32_MAX); for (level_t cur_level = maxLevel; cur_level >= 1; --cur_level) { select_entry_point(cur_level, &entry_point, &dist, ctx); + upper_entity.release_vectors(); } auto &topk_heap = ctx->topk_heap(); @@ -103,6 +108,11 @@ void HnswAlgorithm::select_entry_point(level_t level, HnswContext *ctx) const { const auto &entity = static_cast(ctx->get_entity()); HnswDistCalculator &dc = ctx->dist_calculator(); + uint32_t buf_cap = entity.max_degree(level); + std::vector neighbor_ids(buf_cap); + std::vector neighbor_vecs(buf_cap); + std::vector dists(buf_cap); + while (true) { const auto neighbors = entity.get_neighbors_typed(level, *entry_point); if (ailego_unlikely(ctx->debugging())) { @@ -113,31 +123,35 @@ void HnswAlgorithm::select_entry_point(level_t level, break; } - std::vector neighbor_vec_blocks; - int ret = entity.get_vector_typed(&neighbors[0], size, neighbor_vec_blocks); - if (ailego_unlikely(ctx->debugging())) { - (*ctx->mutable_stats_get_vector())++; + if (size > buf_cap) { + buf_cap = size; + neighbor_ids.resize(buf_cap); + neighbor_vecs.resize(buf_cap); + dists.resize(buf_cap); } - if (ailego_unlikely(ret != 0)) { - break; + for (uint32_t i = 0; i < size; ++i) { + neighbor_ids[i] = neighbors[i]; } - bool find_closer = false; - - std::vector dists(size); - std::vector neighbor_vecs(size); - for (uint32_t i = 0; i < size; ++i) { - neighbor_vecs[i] = neighbor_vec_blocks[i].data(); + if (ailego_unlikely(entity.resolve_vectors(neighbor_ids.data(), size, + neighbor_vecs.data()) != 0)) { + break; + } + if (ailego_unlikely(ctx->debugging())) { + (*ctx->mutable_stats_get_vector())++; } dc.batch_dist(neighbor_vecs.data(), size, dists.data()); - for (uint32_t i = 0; i < size; ++i) { - dist_t cur_dist = dists[i]; + // Release per-hop pages to prevent PinnedPageSet overflow. + // Upper-level pages have high eviction priority, so re-acquire is cheap. + entity.release_vectors(); - if (cur_dist < *dist) { - *entry_point = neighbors[i]; - *dist = cur_dist; + bool find_closer = false; + for (uint32_t i = 0; i < size; ++i) { + if (neighbor_vecs[i] && dists[i] < *dist) { + *entry_point = neighbor_ids[i]; + *dist = dists[i]; find_closer = true; } } @@ -198,6 +212,8 @@ void fast_search_neighbors(const EntityType &entity, HeapType &pool, pool.reset(static_cast(cap), static_cast(max_deg)); visit.clear(); + entity.reset_io_budget(static_cast(ef / 4)); + visit.set_visited(entry_point); pool.push_block(&entry_dist, &entry_point, 1); @@ -234,15 +250,33 @@ void fast_search_neighbors(const EntityType &entity, HeapType &pool, neighbor_vecs.data()) != 0)) break; - const uint32_t po = std::min(prefetch_offset, unvisited_count); - for (uint32_t i = 0; i < po; ++i) { - const char *p = static_cast(neighbor_vecs[i]); - for (uint32_t cl = 0; cl < prefetch_lines; ++cl) { - ailego_prefetch(p + cl * 64); + // Partition: move resolved vectors (non-null, cache hit) to front. + // Unresolved ones (cache miss) go to the back with FLT_MAX distance. + uint32_t resolved = 0; + for (uint32_t i = 0; i < unvisited_count; ++i) { + if (neighbor_vecs[i]) { + if (i != resolved) { + std::swap(neighbor_vecs[i], neighbor_vecs[resolved]); + std::swap(neighbor_ids[i], neighbor_ids[resolved]); + } + ++resolved; } } - dc.batch_dist(neighbor_vecs.data(), unvisited_count, dists.data()); + if (resolved > 0) { + const uint32_t po = std::min(prefetch_offset, resolved); + for (uint32_t i = 0; i < po; ++i) { + const char *p = static_cast(neighbor_vecs[i]); + for (uint32_t cl = 0; cl < prefetch_lines; ++cl) { + ailego_prefetch(p + cl * 64); + } + } + dc.batch_dist(neighbor_vecs.data(), resolved, dists.data()); + } + // Unresolved vectors get FLT_MAX - they won't enter the candidate pool. + for (uint32_t i = resolved; i < unvisited_count; ++i) { + dists[i] = FLT_MAX; + } pool.push_block(dists.data(), neighbor_ids.data(), static_cast(unvisited_count)); diff --git a/src/core/algorithm/hnsw/hnsw_streamer.cc b/src/core/algorithm/hnsw/hnsw_streamer.cc index 64ff5a747..73ef6d396 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer.cc +++ b/src/core/algorithm/hnsw/hnsw_streamer.cc @@ -269,6 +269,11 @@ int HnswStreamer::open(IndexStorage::Pointer stg) { if (ret != 0) { return ret; } + + if (entity_->storage_mode() == HnswStorageMode::kBufferPool) { + static_cast(entity_.get()) + ->mark_upper_level_pages(); + } IndexMeta index_meta; ret = entity_->get_index_meta(&index_meta); if (ret == IndexError_NoExist) { diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index 667840698..049067b61 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -877,6 +877,7 @@ class HnswMmapStreamerEntity : public HnswStreamerEntity { } ailego_force_inline void release_vectors() const {} + void reset_io_budget(int32_t) const {} // no-op for mmap mode protected: //! Get cached base address for a node chunk, syncing if needed @@ -966,14 +967,31 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { const auto page_id = static_cast(abs_off / pg_sz); const size_t intra = abs_off % pg_sz; if (ailego_likely(intra + vec_sz <= pg_sz)) { - char *page = pinned_pages_.get_page(page_id); - if (ailego_unlikely(!page)) return -1; + char *page = pinned_pages_.try_get_page(page_id); + if (!page) { + if (io_budget_ > 0) { + page = pinned_pages_.get_page(page_id); + if (ailego_unlikely(!page)) return -1; + --io_budget_; + } else { + out[i] = nullptr; continue; + } + } out[i] = page + intra; } else { const size_t part1 = pg_sz - intra; - char *p1 = pinned_pages_.get_page(page_id); - char *p2 = pinned_pages_.get_page(page_id + 1); - if (ailego_unlikely(!p1 || !p2)) return -1; + char *p1 = pinned_pages_.try_get_page(page_id); + char *p2 = pinned_pages_.try_get_page(page_id + 1); + if (!p1 || !p2) { + if (io_budget_ > 0) { + if (!p1) p1 = pinned_pages_.get_page(page_id); + if (!p2) p2 = pinned_pages_.get_page(page_id + 1); + if (ailego_unlikely(!p1 || !p2)) return -1; + --io_budget_; + } else { + out[i] = nullptr; continue; + } + } char *scratch = cross_page_arena_.data() + cross_page_used_ * vec_sz; ++cross_page_used_; std::memcpy(scratch, p1 + intra, part1); @@ -988,6 +1006,70 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { pinned_pages_.release_all(); } + //! Reset I/O budget for a new search. budget = max pread calls allowed. + void reset_io_budget(int32_t budget) const { io_budget_ = budget; } + + void mark_upper_level_pages() { + auto *pool = vec_buffer_pool(); + if (!pool) return; + auto ep = entry_point(); + auto max_lvl = cur_max_level(); + if (ep == kInvalidNodeId || max_lvl == 0) return; + + const uint32_t n = doc_cnt(); + std::vector visited(n, false); + std::vector upper_nodes; + upper_nodes.reserve(n / scaling_factor() + 64); + upper_nodes.push_back(ep); + visited[ep] = true; + + for (level_t lvl = max_lvl; lvl >= 1; --lvl) { + for (size_t idx = 0; idx < upper_nodes.size(); ++idx) { + auto id = upper_nodes[idx]; + auto neighbors = get_neighbors_typed(lvl, id); + for (uint32_t i = 0; i < neighbors.size(); ++i) { + auto nid = neighbors[i]; + if (nid < n && !visited[nid]) { + visited[nid] = true; + upper_nodes.push_back(nid); + } + } + } + } + + const size_t pg_sz = ailego::kVectorPageSize; + const size_t vec_sz = vector_size(); + std::vector page_ids; + page_ids.reserve(upper_nodes.size()); + for (auto id : upper_nodes) { + const size_t abs_off = get_vector_abs_offset(id); + page_ids.push_back( + static_cast(abs_off / pg_sz)); + const size_t intra = abs_off % pg_sz; + if (intra + vec_sz > pg_sz) { + page_ids.push_back( + static_cast(abs_off / pg_sz) + 1); + } + } + std::sort(page_ids.begin(), page_ids.end()); + page_ids.erase(std::unique(page_ids.begin(), page_ids.end()), + page_ids.end()); + + size_t marked = 0; + for (auto pid : page_ids) { + pool->page_table_.set_evict_priority(pid, 2); + char *buf = pool->acquire_buffer(pid, 50); + if (buf) { + pool->page_table_.release_block(pid); + ++marked; + } + } + LOG_INFO( + "mark_upper_level_pages: marked %zu/%zu pages for %zu upper-level " + "nodes (maxLevel=%d, priority=2)", + marked, page_ids.size(), upper_nodes.size(), (int)max_lvl); + } + private: struct PinnedPageSet { static constexpr size_t kCapacity = 128; @@ -1019,6 +1101,25 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { } } + //! Try to get a page WITHOUT triggering disk I/O. + //! Returns buffer if page is in PinnedPageSet or already in pool memory. + //! Returns nullptr if page would need a pread (cache miss). + char *try_get_page(ailego::block_id_t page_id) { + size_t slot = static_cast(page_id) & kMask; + for (;;) { + if (ids_[slot] == page_id) return bufs_[slot]; + if (ids_[slot] == kEmpty) { + char *buf = pool_->try_acquire_buffer(page_id); + if (!buf) return nullptr; // page not in memory, skip + ids_[slot] = page_id; + bufs_[slot] = buf; + ++count_; + return buf; + } + slot = (slot + 1) & kMask; + } + } + void release_all() { if (!pool_) return; for (size_t i = 0; i < kCapacity; ++i) { @@ -1065,6 +1166,7 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { mutable PinnedPageSet pinned_pages_; mutable std::vector cross_page_arena_; mutable uint32_t cross_page_used_{0}; + mutable int32_t io_budget_{INT32_MAX}; }; //! Typed entity subclass for contiguous memory mode. @@ -1191,6 +1293,7 @@ class HnswContiguousStreamerEntity : public HnswMmapStreamerEntity { } ailego_force_inline void release_vectors() const {} + void reset_io_budget(int32_t) const {} // no-op for contiguous mode protected: //! Custom deleter for contiguous memory (munmap / _aligned_free / free) diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 50b5bde8f..7a0b25fce 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -531,6 +531,7 @@ class BufferStorage : public IndexStorage { this->close_index(); return ret; } + buffer_pool_->warmup(); LOG_INFO( "BufferStorage opened: file=%s, writable=%d, max_segment_size=%" PRIu64 ", segment_count=%zu", diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 755c53d24..4a7432e20 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -50,6 +50,7 @@ class VectorPageTable : public EvictableBlockOwner { std::atomic ref_count; std::atomic in_evict_queue; std::atomic is_dirty; + uint8_t evict_priority{0}; char *buffer; size_t file_offset; }; @@ -96,6 +97,13 @@ class VectorPageTable : public EvictableBlockOwner { void evict_block(block_id_t block_id) override; + void set_evict_priority(block_id_t block_id, uint8_t priority) { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + Entry &e = entry_at(block_id); + e.evict_priority = priority; + e.in_evict_queue.store(false, std::memory_order_relaxed); + } + char *set_block_acquired(block_id_t block_id, char *buffer, size_t file_offset); @@ -151,6 +159,13 @@ class VectorPageTable : public EvictableBlockOwner { return !e.in_evict_queue.load(std::memory_order_relaxed); } + //! Check if a page is loaded (has a non-null buffer). + //! Used by try_acquire_buffer to avoid ref_count leaks on unloaded pages. + bool is_loaded(block_id_t block_id) const { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + return entry_at(block_id).buffer != nullptr; + } + private: // Segmented page table: entries are split across fixed-size segments so // that extend() can grow the table without moving existing entries. @@ -258,6 +273,22 @@ class VecBufferPool { void batch_prefetch(const block_id_t *page_ids, size_t count); + //! Sequentially preload pages into the pool until pool is full. + void warmup(); + + //! Try to acquire a page buffer WITHOUT triggering disk I/O. + //! Returns the buffer pointer if the page is already in memory, + //! or nullptr if the page would need a pread (cache miss). + //! Avoids touching ref_count for unloaded pages to prevent leaks. + char *try_acquire_buffer(block_id_t page_id) { + assert(page_id < page_table_.entry_num()); + // Quick check: if buffer is null, page not loaded - skip without + // incrementing ref_count (acquire_block would leak ref_count on + // pages with ref_count>=0 but buffer==nullptr). + if (!page_table_.is_loaded(page_id)) return nullptr; + return page_table_.acquire_block(page_id); + } + int fd() const { return fd_; } private: From e1c5750fe7e5149b0079d9d263b6e7c5fd312a31 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Tue, 23 Jun 2026 15:42:19 +0800 Subject: [PATCH 09/17] upd log type --- src/ailego/buffer/vector_page_table.cc | 2 +- src/core/algorithm/hnsw/hnsw_algorithm.cc | 2 +- src/core/algorithm/hnsw/hnsw_streamer_entity.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index 01f71f23c..cc20f938d 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -764,7 +764,7 @@ void VecBufferPool::warmup() { } } free(chunk_buf); - LOG_INFO("VecBufferPool::warmup: preloaded %zu/%zu pages for file[%s]", + LOG_DEBUG("VecBufferPool::warmup: preloaded %zu/%zu pages for file[%s]", loaded, total_pages, file_name_.c_str()); } diff --git a/src/core/algorithm/hnsw/hnsw_algorithm.cc b/src/core/algorithm/hnsw/hnsw_algorithm.cc index a8cc7a160..251cdfa46 100644 --- a/src/core/algorithm/hnsw/hnsw_algorithm.cc +++ b/src/core/algorithm/hnsw/hnsw_algorithm.cc @@ -212,7 +212,7 @@ void fast_search_neighbors(const EntityType &entity, HeapType &pool, pool.reset(static_cast(cap), static_cast(max_deg)); visit.clear(); - entity.reset_io_budget(static_cast(ef / 4)); + entity.reset_io_budget(static_cast(ef / 2)); visit.set_visited(entry_point); pool.push_block(&entry_dist, &entry_point, 1); diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index 049067b61..e6a3ede83 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -1064,7 +1064,7 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { ++marked; } } - LOG_INFO( + LOG_DEBUG( "mark_upper_level_pages: marked %zu/%zu pages for %zu upper-level " "nodes (maxLevel=%d, priority=2)", marked, page_ids.size(), upper_nodes.size(), (int)max_lvl); From d6c7d5e3af14116c3e7d41dab83ea171b9644453 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Tue, 23 Jun 2026 16:11:55 +0800 Subject: [PATCH 10/17] rm useless code --- src/ailego/buffer/vector_page_table.cc | 59 -------- src/ailego/io/aligned_async_io.cc | 140 ------------------ src/ailego/io/aligned_async_io.h | 44 ------ .../zvec/ailego/buffer/vector_page_table.h | 6 - 4 files changed, 249 deletions(-) delete mode 100644 src/ailego/io/aligned_async_io.cc delete mode 100644 src/ailego/io/aligned_async_io.h diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index cc20f938d..01e8fb28d 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -20,7 +20,6 @@ #include #include #include -#include "../io/aligned_async_io.h" #include #if defined(_MSC_VER) @@ -662,64 +661,6 @@ void VecBufferPoolHandle::acquire_one(block_id_t block_id) { pool_.page_table_.acquire_block(block_id); } -void VecBufferPool::batch_prefetch(const block_id_t *page_ids, size_t count) { -#ifdef ZVEC_HAS_LIBAIO - if (count == 0) return; - - static thread_local ScopedIOContext tl_io_ctx; - if (!tl_io_ctx.valid()) return; - - std::vector reads; - std::vector> pending; - reads.reserve(count); - pending.reserve(count); - - for (size_t i = 0; i < count; ++i) { - block_id_t pid = page_ids[i]; - if (pid >= page_table_.entry_num()) continue; - char *existing = page_table_.acquire_block(pid); - if (existing) { - page_table_.release_block(pid); - continue; - } - char *buf = nullptr; - bool found = - MemoryLimitPool::get_instance().try_acquire_buffer(kVectorPageSize, buf); - if (!found || !buf) continue; - - AlignedRead rd; - rd.offset = static_cast(pid) * kVectorPageSize; - rd.len = kVectorPageSize; - rd.buf = buf; - reads.push_back(rd); - pending.emplace_back(pid, buf); - } - - if (reads.empty()) return; - - int rc = execute_aligned_io(tl_io_ctx.get(), fd_, reads); - if (rc != 0) { - for (auto &p : pending) { - MemoryLimitPool::get_instance().release_buffer(p.second, kVectorPageSize); - } - return; - } - - for (auto &p : pending) { - block_id_t pid = p.first; - char *buf = p.second; - size_t page_offset = static_cast(pid) * kVectorPageSize; - std::lock_guard lock( - block_mutexes_[pid % VecBufferPool::kMutexBucketCount]); - page_table_.set_block_acquired(pid, buf, page_offset); - page_table_.release_block(pid); - } -#else - (void)page_ids; - (void)count; -#endif -} - void VecBufferPool::warmup() { const size_t total_pages = page_table_.entry_num(); // Read in large sequential chunks to minimize syscall overhead. diff --git a/src/ailego/io/aligned_async_io.cc b/src/ailego/io/aligned_async_io.cc deleted file mode 100644 index 83ac90ba0..000000000 --- a/src/ailego/io/aligned_async_io.cc +++ /dev/null @@ -1,140 +0,0 @@ -#include "aligned_async_io.h" - -#include -#include - -#if defined(__linux__) && defined(ZVEC_HAS_LIBAIO) -#include - -namespace zvec { -namespace ailego { - -static int execute_io_pread(int fd, std::vector &reads) { - for (auto &r : reads) { - ssize_t got = ::pread(fd, r.buf, r.len, static_cast(r.offset)); - if (got != static_cast(r.len)) { - LOG_ERROR( - "pread fallback failed: fd=%d, offset=%llu, len=%llu, got=%zd, " - "errno=%d", - fd, (unsigned long long)r.offset, (unsigned long long)r.len, got, - errno); - return -1; - } - } - return 0; -} - -static constexpr int kMaxEvents = 128; - -ScopedIOContext::ScopedIOContext() { - ctx_ = 0; - int ret = io_setup(kMaxEvents, &ctx_); - if (ret == 0) { - valid_ = true; - } else { - LOG_WARN("io_setup failed (ret=%d, errno=%d); prefetch disabled on thread", - ret, errno); - valid_ = false; - } -} - -ScopedIOContext::~ScopedIOContext() { - if (valid_) { - io_destroy(ctx_); - valid_ = false; - } -} - -int execute_aligned_io(IOContext ctx, int fd, std::vector &reads, - uint64_t n_retries) { - if (reads.empty()) return 0; - - std::vector cbs(reads.size()); - std::vector cb_ptrs(reads.size()); - std::vector events(reads.size()); - - for (size_t i = 0; i < reads.size(); ++i) { - io_prep_pread(&cbs[i], fd, reads[i].buf, reads[i].len, - static_cast(reads[i].offset)); - cb_ptrs[i] = &cbs[i]; - } - - int n_submitted = 0; - int total = static_cast(reads.size()); - - for (uint64_t attempt = 0; attempt <= n_retries; ++attempt) { - while (n_submitted < total) { - int batch = std::min(total - n_submitted, kMaxEvents); - int ret = io_submit(ctx, batch, cb_ptrs.data() + n_submitted); - if (ret >= 0) { - n_submitted += ret; - } else if (ret == -EINTR) { - continue; - } else if (ret == -EAGAIN) { - continue; - } else { - LOG_WARN("io_submit failed: ret=%d, errno=%d; fallback to pread", ret, - errno); - return execute_io_pread(fd, reads); - } - } - if (n_submitted == total) break; - } - - if (n_submitted != total) { - LOG_WARN("io_submit incomplete after retries; fallback to pread"); - return execute_io_pread(fd, reads); - } - - int n_collected = 0; - while (n_collected < total) { - int ret = - io_getevents(ctx, total - n_collected, total - n_collected, events.data() + n_collected, nullptr); - if (ret > 0) { - n_collected += ret; - } else if (ret == -EINTR) { - continue; - } else { - LOG_WARN("io_getevents failed: ret=%d; fallback to pread", ret); - return execute_io_pread(fd, reads); - } - } - - for (int i = 0; i < total; ++i) { - if (static_cast(events[i].res) != reads[i].len) { - LOG_WARN( - "aio short read: expected=%llu, got=%lld; fallback to pread", - (unsigned long long)reads[i].len, (long long)events[i].res); - return execute_io_pread(fd, reads); - } - } - - return 0; -} - -} // namespace ailego -} // namespace zvec - -#else - -namespace zvec { -namespace ailego { - -ScopedIOContext::ScopedIOContext() { valid_ = false; } -ScopedIOContext::~ScopedIOContext() {} - -int execute_aligned_io(IOContext, int fd, std::vector &reads, - uint64_t) { - for (auto &r : reads) { - ssize_t got = ::pread(fd, r.buf, r.len, static_cast(r.offset)); - if (got != static_cast(r.len)) { - return -1; - } - } - return 0; -} - -} // namespace ailego -} // namespace zvec - -#endif diff --git a/src/ailego/io/aligned_async_io.h b/src/ailego/io/aligned_async_io.h deleted file mode 100644 index ac486be14..000000000 --- a/src/ailego/io/aligned_async_io.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include -#include - -#ifdef ZVEC_HAS_LIBAIO -#include -#endif - -namespace zvec { -namespace ailego { - -struct AlignedRead { - uint64_t offset; - uint64_t len; - void *buf; -}; - -#ifdef ZVEC_HAS_LIBAIO -using IOContext = io_context_t; -#else -using IOContext = uint32_t; -#endif - -class ScopedIOContext { - public: - ScopedIOContext(); - ~ScopedIOContext(); - ScopedIOContext(const ScopedIOContext &) = delete; - ScopedIOContext &operator=(const ScopedIOContext &) = delete; - - bool valid() const { return valid_; } - IOContext &get() { return ctx_; } - - private: - IOContext ctx_{}; - bool valid_{false}; -}; - -int execute_aligned_io(IOContext ctx, int fd, std::vector &reads, - uint64_t n_retries = 3); - -} // namespace ailego -} // namespace zvec diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 4a7432e20..b9396226e 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -271,8 +271,6 @@ class VecBufferPool { return file_size_; } - void batch_prefetch(const block_id_t *page_ids, size_t count); - //! Sequentially preload pages into the pool until pool is full. void warmup(); @@ -333,10 +331,6 @@ class VecBufferPoolHandle { void acquire_one(block_id_t block_id); - void batch_prefetch(const block_id_t *page_ids, size_t count) { - pool_.batch_prefetch(page_ids, count); - } - private: VecBufferPool &pool_; }; From 5839e1d8d60e6a1139e36746f4928cf6decfc491 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Tue, 23 Jun 2026 16:26:36 +0800 Subject: [PATCH 11/17] fix --- src/include/zvec/ailego/buffer/block_eviction_queue.h | 7 ------- src/include/zvec/ailego/buffer/vector_page_table.h | 2 -- 2 files changed, 9 deletions(-) diff --git a/src/include/zvec/ailego/buffer/block_eviction_queue.h b/src/include/zvec/ailego/buffer/block_eviction_queue.h index fa5aff2a5..b6023977d 100644 --- a/src/include/zvec/ailego/buffer/block_eviction_queue.h +++ b/src/include/zvec/ailego/buffer/block_eviction_queue.h @@ -81,13 +81,6 @@ class BlockEvictionQueue { bool add_single_block(const BlockType &block, int queue_index); - // void clear_dead_node(); - - bool is_valid(EvictableBlockOwner *owner) { - std::shared_lock lock(valid_owners_mutex_); - return valid_owners_.find(owner) != valid_owners_.end(); - } - void set_valid(EvictableBlockOwner *owner) { std::unique_lock lock(valid_owners_mutex_); valid_owners_.insert(owner); diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index b9396226e..69be68c89 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -287,8 +287,6 @@ class VecBufferPool { return page_table_.acquire_block(page_id); } - int fd() const { return fd_; } - private: int fd_; // page-data channel: may carry O_DIRECT int meta_fd_; // metadata channel: always buffered IO From 90c6577060ed7b917922637a32afa5904c943dd2 Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Tue, 23 Jun 2026 17:31:38 +0800 Subject: [PATCH 12/17] upd --- src/core/utility/buffer_storage.cc | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 7a0b25fce..573851ec5 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -487,8 +487,12 @@ class BufferStorage : public IndexStorage { if (val != 0) { segment_meta_capacity_ = val; } - params.get(BUFFER_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_); - enable_direct_io_ = true; + // O_DIRECT benefits search (controlled I/O, no page-cache pollution) + // but hurts build (sequential writes benefit from page cache coalescing). + // Default: always on; can be overridden via params. + if (!params.get(BUFFER_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_)) { + enable_direct_io_ = true; + } return 0; } @@ -514,11 +518,13 @@ class BufferStorage : public IndexStorage { } } - // Open in writable mode when the caller expects to modify the index - // (create_if_missing=true implies write intent, same as MMapFileStorage). + // Writable mode (build): disable O_DIRECT — sequential writes benefit + // from page cache coalescing. Read-only mode (search): honour the + // configured enable_direct_io_ flag. + const bool use_direct_io = create_if_missing ? false : enable_direct_io_; buffer_pool_ = std::make_shared( path, /*writable=*/create_if_missing, - /*enable_direct_io=*/enable_direct_io_); + /*enable_direct_io=*/use_direct_io); buffer_pool_handle_ = std::make_shared( buffer_pool_->get_handle()); int ret = ParseToMapping(); From 099b96c514646fc463396d0beeb38ddc1ef1bedc Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Tue, 23 Jun 2026 20:13:51 +0800 Subject: [PATCH 13/17] upd --- src/ailego/buffer/block_eviction_queue.cc | 35 ++++- src/ailego/buffer/vector_page_table.cc | 120 +++++++++++++----- src/core/utility/buffer_storage.cc | 17 +-- .../zvec/ailego/buffer/block_eviction_queue.h | 10 +- .../zvec/ailego/buffer/vector_page_table.h | 25 ++++ 5 files changed, 162 insertions(+), 45 deletions(-) diff --git a/src/ailego/buffer/block_eviction_queue.cc b/src/ailego/buffer/block_eviction_queue.cc index eff930127..cad117b92 100644 --- a/src/ailego/buffer/block_eviction_queue.cc +++ b/src/ailego/buffer/block_eviction_queue.cc @@ -78,9 +78,30 @@ bool BlockEvictionQueue::add_single_block(const BlockType &block, return true; } +MemoryLimitPool::~MemoryLimitPool() { + drain_free_list(); +} + +void MemoryLimitPool::drain_free_list() { + std::lock_guard lock(free_list_mutex_); + size_t drained = 0; + while (free_list_head_) { + char *buf = free_list_head_; + free_list_head_ = *reinterpret_cast(buf); + ailego_free(buf); + ++drained; + } + free_list_count_ = 0; + if (drained > 0) { + LOG_INFO("MemoryLimitPool: drained %zu cached buffers from free list", + drained); + } +} + int MemoryLimitPool::init(size_t pool_size) { pool_size_ = 0; BlockEvictionQueue::get_instance().recycle(); + drain_free_list(); pool_size_ = pool_size; LOG_INFO("MemoryLimitPool initialized with pool size: %lu", pool_size_); return 0; @@ -96,6 +117,15 @@ bool MemoryLimitPool::try_acquire_buffer(const size_t buffer_size, } desired = expected + buffer_size; } while (!used_size_.compare_exchange_weak(expected, desired)); + { + std::lock_guard lock(free_list_mutex_); + if (free_list_head_) { + buffer = free_list_head_; + free_list_head_ = *reinterpret_cast(buffer); + --free_list_count_; + return true; + } + } buffer = (char *)ailego_aligned_malloc(buffer_size, 4096); if (!buffer) { used_size_.fetch_sub(buffer_size); @@ -119,7 +149,10 @@ void MemoryLimitPool::release_buffer(char *buffer, const size_t buffer_size) { desired = expected - buffer_size; assert(expected >= buffer_size); } while (!used_size_.compare_exchange_weak(expected, desired)); - ailego_free(buffer); + std::lock_guard lock(free_list_mutex_); + *reinterpret_cast(buffer) = free_list_head_; + free_list_head_ = buffer; + ++free_list_count_; } void MemoryLimitPool::release_external(const size_t buffer_size) { diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index 01e8fb28d..316a0297c 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -225,6 +225,7 @@ char *VectorPageTable::set_block_acquired(block_id_t block_id, char *buffer, e.file_offset = file_offset; e.in_evict_queue.store(false, std::memory_order_relaxed); e.is_dirty.store(false, std::memory_order_relaxed); + e.ever_loaded = true; e.ref_count.store(1, std::memory_order_release); return e.buffer; } else { @@ -329,6 +330,7 @@ VecBufferPool::VecBufferPool(const std::string &filename, bool writable, throw std::runtime_error("Failed to stat file: " + filename); } file_size_ = st.st_size; + initial_file_size_ = file_size_; // snapshot for skip-pread optimisation } int VecBufferPool::init() { @@ -407,26 +409,35 @@ char *VecBufferPool::acquire_buffer(block_id_t page_id, int retry) { } size_t page_offset = page_id * kVectorPageSize; - // O_DIRECT requires the IO length to be a multiple of the device block - // size. The backing file size is always page-aligned (IndexMapping + - // append_segment guarantee this), so reading a full page never reads past - // EOF; the tail padding is the file's own zero region. In direct mode we - // MUST read the whole page; the buffered path keeps the legacy short-read - // + zero-pad behaviour. - size_t read_len = direct_io_enabled_ - ? kVectorPageSize - : std::min(kVectorPageSize, file_size_ - page_offset); - if (read_len < kVectorPageSize) { - std::memset(buffer + read_len, 0, kVectorPageSize - read_len); - } - ssize_t read_bytes = zvec_pread(fd_, buffer, read_len, page_offset); - if (read_bytes != static_cast(read_len)) { - LOG_ERROR( - "Buffer pool failed to read file at offset: file[%s], page_id[%zu], " - "offset[%zu], expected[%zu], got[%zd]", - file_name_.c_str(), page_id, page_offset, read_len, read_bytes); - MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); - return nullptr; + // Skip pread for pages created by extend_file (beyond the original file + // size at open time) that have never been loaded before. Their on-disk + // content is guaranteed to be zeros (ftruncate). After eviction the + // ever_loaded flag stays true so reloads correctly pread the flushed data. + if (writable_ && page_offset >= initial_file_size_ && + !page_table_.is_ever_loaded(page_id)) { + std::memset(buffer, 0, kVectorPageSize); + } else { + // O_DIRECT requires the IO length to be a multiple of the device block + // size. The backing file size is always page-aligned (IndexMapping + + // append_segment guarantee this), so reading a full page never reads past + // EOF; the tail padding is the file's own zero region. In direct mode we + // MUST read the whole page; the buffered path keeps the legacy short-read + // + zero-pad behaviour. + size_t read_len = direct_io_enabled_ + ? kVectorPageSize + : std::min(kVectorPageSize, file_size_ - page_offset); + if (read_len < kVectorPageSize) { + std::memset(buffer + read_len, 0, kVectorPageSize - read_len); + } + ssize_t read_bytes = zvec_pread(fd_, buffer, read_len, page_offset); + if (read_bytes != static_cast(read_len)) { + LOG_ERROR( + "Buffer pool failed to read file at offset: file[%s], page_id[%zu], " + "offset[%zu], expected[%zu], got[%zd]", + file_name_.c_str(), page_id, page_offset, read_len, read_bytes); + MemoryLimitPool::get_instance().release_buffer(buffer, kVectorPageSize); + return nullptr; + } } return page_table_.set_block_acquired(page_id, buffer, page_offset); } @@ -501,23 +512,72 @@ int VecBufferPool::flush_all() { if (!writable_) { return 0; } + const size_t total = page_table_.entry_num(); + if (total == 0) { + return 0; + } + + static constexpr size_t kBatchPages = 256; + const size_t kBatchSize = kBatchPages * kVectorPageSize; + char *batch_buf = + static_cast(ailego_aligned_malloc(kBatchSize, 4096)); + int rc = 0; size_t total_dirty = 0; size_t fail_count = 0; - for (size_t i = 0; i < page_table_.entry_num(); ++i) { - if (page_table_.is_block_dirty(i)) { - ++total_dirty; - int r = page_table_.flush_block(i); - if (r != 0) { - rc = r; - ++fail_count; + size_t i = 0; + + while (i < total) { + if (!page_table_.is_block_dirty(i)) { + ++i; + continue; + } + + const size_t run_start = i; + size_t run_count = 0; + const size_t limit = batch_buf ? kBatchPages : 1; + while (i < total && run_count < limit && page_table_.is_block_dirty(i)) { + char *buf = page_table_.get_block_buffer(i); + if (!buf) break; + if (batch_buf) { + std::memcpy(batch_buf + run_count * kVectorPageSize, buf, + kVectorPageSize); + } + ++run_count; + ++i; + } + if (run_count == 0) { + ++i; + continue; + } + total_dirty += run_count; + + bool ok = false; + if (batch_buf && run_count > 0) { + const size_t write_size = run_count * kVectorPageSize; + ssize_t w = zvec_pwrite(fd_, batch_buf, write_size, + run_start * kVectorPageSize); + ok = (w == static_cast(write_size)); + } + if (ok) { + for (size_t j = run_start; j < run_start + run_count; ++j) { + page_table_.clear_dirty(j); + } + } else { + for (size_t j = run_start; j < run_start + run_count; ++j) { + int r = page_table_.flush_block(j); + if (r != 0) { + rc = r; + ++fail_count; + } } } } + + if (batch_buf) { + ailego_free(batch_buf); + } if (fail_count != 0) { - // Aggregated diagnostic so that callers (notably ~VecBufferPool, which - // discards the return value) cannot silently lose dirty pages: any - // unflushed page at this point means the on-disk image is now stale. LOG_ERROR( "VecBufferPool::flush_all: %zu/%zu dirty page(s) failed to flush, " "file[%s] last_rc=%d -- on-disk data may be stale.", diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 573851ec5..425da4553 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -487,12 +487,9 @@ class BufferStorage : public IndexStorage { if (val != 0) { segment_meta_capacity_ = val; } - // O_DIRECT benefits search (controlled I/O, no page-cache pollution) - // but hurts build (sequential writes benefit from page cache coalescing). - // Default: always on; can be overridden via params. - if (!params.get(BUFFER_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_)) { - enable_direct_io_ = true; - } + // O_DIRECT is always on for memory controllability. + params.get(BUFFER_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_); + enable_direct_io_ = true; return 0; } @@ -518,13 +515,11 @@ class BufferStorage : public IndexStorage { } } - // Writable mode (build): disable O_DIRECT — sequential writes benefit - // from page cache coalescing. Read-only mode (search): honour the - // configured enable_direct_io_ flag. - const bool use_direct_io = create_if_missing ? false : enable_direct_io_; + // O_DIRECT is used for both build and search to keep memory + // controllable (no uncontrolled page-cache growth). buffer_pool_ = std::make_shared( path, /*writable=*/create_if_missing, - /*enable_direct_io=*/use_direct_io); + /*enable_direct_io=*/enable_direct_io_); buffer_pool_handle_ = std::make_shared( buffer_pool_->get_handle()); int ret = ParseToMapping(); diff --git a/src/include/zvec/ailego/buffer/block_eviction_queue.h b/src/include/zvec/ailego/buffer/block_eviction_queue.h index b6023977d..64a20fec7 100644 --- a/src/include/zvec/ailego/buffer/block_eviction_queue.h +++ b/src/include/zvec/ailego/buffer/block_eviction_queue.h @@ -91,9 +91,6 @@ class BlockEvictionQueue { valid_owners_.erase(owner); } - // Atomically checks under the shared lock that the owner is still valid AND - // the block version has not been superseded, preventing TOCTOU races when an - // owner is concurrently destroyed. bool is_valid_and_alive(const BlockType &item); void recycle(); @@ -136,10 +133,17 @@ class MemoryLimitPool { private: MemoryLimitPool() = default; + ~MemoryLimitPool(); + + void drain_free_list(); private: size_t pool_size_{0}; std::atomic used_size_{0}; + + std::mutex free_list_mutex_; + char *free_list_head_{nullptr}; + size_t free_list_count_{0}; }; } // namespace ailego diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 69be68c89..9584f60b5 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -51,6 +51,7 @@ class VectorPageTable : public EvictableBlockOwner { std::atomic in_evict_queue; std::atomic is_dirty; uint8_t evict_priority{0}; + bool ever_loaded{false}; // true once the page has been loaded at least once char *buffer; size_t file_offset; }; @@ -122,6 +123,19 @@ class VectorPageTable : public EvictableBlockOwner { return entry_at(block_id).is_dirty.load(std::memory_order_relaxed); } + //! Get the raw buffer pointer for a loaded page (nullptr if not loaded). + //! Used by batched flush to memcpy page contents into a coalescing buffer. + char *get_block_buffer(block_id_t block_id) const { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + return entry_at(block_id).buffer; + } + + //! Clear the dirty flag after a successful batched flush. + void clear_dirty(block_id_t block_id) { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + entry_at(block_id).is_dirty.store(false, std::memory_order_relaxed); + } + //! Flush a single dirty block without evicting it. Caller guarantees the //! block is currently loaded (buffer != nullptr). int flush_block(block_id_t block_id) { @@ -166,6 +180,14 @@ class VectorPageTable : public EvictableBlockOwner { return entry_at(block_id).buffer != nullptr; } + //! Check if a page has ever been loaded (so pread is needed on reload + //! after eviction). A page that was never loaded can be zero-filled + //! if it lies beyond the initial file size (created by extend_file). + bool is_ever_loaded(block_id_t block_id) const { + assert(block_id < entry_num_.load(std::memory_order_acquire)); + return entry_at(block_id).ever_loaded; + } + private: // Segmented page table: entries are split across fixed-size segments so // that extend() can grow the table without moving existing entries. @@ -291,6 +313,9 @@ class VecBufferPool { int fd_; // page-data channel: may carry O_DIRECT int meta_fd_; // metadata channel: always buffered IO size_t file_size_; + size_t initial_file_size_; // file size at open time; pages beyond this + // are created by extend_file and can skip + // pread on first load (content is zeros). std::string file_name_; bool writable_{false}; bool direct_io_enabled_{false}; // whether O_DIRECT actually took effect From 2efc8dde675ceccbe8c913324f0e968c3df927be Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Wed, 24 Jun 2026 11:46:43 +0800 Subject: [PATCH 14/17] fix --- src/core/algorithm/hnsw/hnsw_streamer_entity.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/core/algorithm/hnsw/hnsw_streamer_entity.h b/src/core/algorithm/hnsw/hnsw_streamer_entity.h index e6a3ede83..bc5ebf8d5 100644 --- a/src/core/algorithm/hnsw/hnsw_streamer_entity.h +++ b/src/core/algorithm/hnsw/hnsw_streamer_entity.h @@ -1026,6 +1026,11 @@ class HnswBufferPoolStreamerEntity : public HnswStreamerEntity { for (level_t lvl = max_lvl; lvl >= 1; --lvl) { for (size_t idx = 0; idx < upper_nodes.size(); ++idx) { auto id = upper_nodes[idx]; + auto it = upper_neighbor_index_->find(id); + if (it == upper_neighbor_index_->end()) continue; + auto meta = + reinterpret_cast(&it->second); + if (lvl > static_cast(meta->bits.level)) continue; auto neighbors = get_neighbors_typed(lvl, id); for (uint32_t i = 0; i < neighbors.size(); ++i) { auto nid = neighbors[i]; From 0f0ef65f7020dd0d1b24debb848e557f974f34bf Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Thu, 25 Jun 2026 16:11:40 +0800 Subject: [PATCH 15/17] fix --- src/core/algorithm/ivf/ivf_dumper.cc | 2 +- src/core/algorithm/ivf/ivf_entity.cc | 4 ++-- src/core/algorithm/ivf/ivf_entity.h | 2 +- src/core/algorithm/ivf/ivf_streamer.cc | 3 +++ 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/core/algorithm/ivf/ivf_dumper.cc b/src/core/algorithm/ivf/ivf_dumper.cc index 761adaac6..c4edcfea5 100644 --- a/src/core/algorithm/ivf/ivf_dumper.cc +++ b/src/core/algorithm/ivf/ivf_dumper.cc @@ -56,7 +56,7 @@ int IVFDumper::dump_inverted_block(uint32_t inverted_list_id, std::copy(keys, keys + vector_count, std::back_inserter(keys_)); ++inverted_lists_meta_[cur_list_id_].block_count; ++header_.block_count; - header_.inverted_body_size += size; + header_.inverted_body_size += size + pd_size; } else { size_t step_size = meta_.element_size(); if (column_major) { diff --git a/src/core/algorithm/ivf/ivf_entity.cc b/src/core/algorithm/ivf/ivf_entity.cc index 6dccc2b2c..d41f7fc5f 100644 --- a/src/core/algorithm/ivf/ivf_entity.cc +++ b/src/core/algorithm/ivf/ivf_entity.cc @@ -628,7 +628,7 @@ int IVFEntity::search(size_t inverted_list_id, const void *query, ailego_assert_with(block_vecs < sizeof(keeps) * 8, "bits overflow"); for (size_t k = 0; k < vecs_count; ++k) { if (!filter(block_keys[k])) { - keeps |= (1 << k); + keeps |= (1ULL << k); } else { ++(*context_stats->mutable_filtered_count()); } @@ -645,7 +645,7 @@ int IVFEntity::search(size_t inverted_list_id, const void *query, uint32_t id_off = list_meta->id_offset + (i + b) * block_vecs; for (size_t k = 0; k < vecs_count; ++k) { - if (keeps & (1 << k)) { + if (keeps & (1ULL << k)) { if (block_keys[k] != kInvalidKey) { heap->emplace(block_keys[k], distances[k] * norm_val, id_off + k); } diff --git a/src/core/algorithm/ivf/ivf_entity.h b/src/core/algorithm/ivf/ivf_entity.h index e6fd4b6c4..58474b355 100644 --- a/src/core/algorithm/ivf/ivf_entity.h +++ b/src/core/algorithm/ivf/ivf_entity.h @@ -233,7 +233,7 @@ class IVFEntity { return this->convert_to_normalize_value(scale); } - return norm_value_; + return 1.0f; } //! Check whether the feature segment exist diff --git a/src/core/algorithm/ivf/ivf_streamer.cc b/src/core/algorithm/ivf/ivf_streamer.cc index a2c924141..a2e13458a 100644 --- a/src/core/algorithm/ivf/ivf_streamer.cc +++ b/src/core/algorithm/ivf/ivf_streamer.cc @@ -101,6 +101,9 @@ int IVFStreamer::open(IndexStorage::Pointer storage) { stats_.set_loaded_count(entity_->vector_count()); stats_.set_loaded_costtime(timer.milli_seconds()); + LOG_INFO("IVFStreamer open done, vector_count=%zu, inverted_list_count=%zu", + entity_->vector_count(), entity_->inverted_list_count()); + searcher_state_ = STATE_LOADED; return 0; } From 3180820e912b21b0fa79af1a1fc5fd57bae1345d Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Thu, 25 Jun 2026 17:58:01 +0800 Subject: [PATCH 16/17] fix --- src/core/algorithm/flat/flat_searcher.cc | 3 +++ src/core/algorithm/flat/flat_searcher.h | 2 ++ src/core/utility/buffer_read_storage.cc | 8 ++++++-- src/core/utility/utility_params.h | 2 ++ .../zvec/core/framework/index_segment_storage.h | 17 +++++++++++++++-- 5 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/core/algorithm/flat/flat_searcher.cc b/src/core/algorithm/flat/flat_searcher.cc index ef4e7d4b0..6b60abda2 100644 --- a/src/core/algorithm/flat/flat_searcher.cc +++ b/src/core/algorithm/flat/flat_searcher.cc @@ -147,6 +147,9 @@ int FlatSearcher::load(IndexStorage::Pointer cntr, return IndexError_ReadData; } + // Keep the segment alive so that keys_ pointer remains valid + keys_segment_ = std::move(keys_segment); + for (size_t i = 0; i < keys_count; i++) { key_id_mapping_[keys_[i]] = i; } diff --git a/src/core/algorithm/flat/flat_searcher.h b/src/core/algorithm/flat/flat_searcher.h index 2b0d0d93d..e48a9ced6 100644 --- a/src/core/algorithm/flat/flat_searcher.h +++ b/src/core/algorithm/flat/flat_searcher.h @@ -50,6 +50,7 @@ class FlatSearcher : public IndexSearcher { container_ = nullptr; measure_ = nullptr; features_segment_ = nullptr; + keys_segment_ = nullptr; keys_ = nullptr; key_id_mapping_.clear(); return 0; @@ -172,6 +173,7 @@ class FlatSearcher : public IndexSearcher { IndexMetric::Pointer measure_{}; ailego::Params params_{}; IndexStorage::Segment::Pointer features_segment_{}; + IndexStorage::Segment::Pointer keys_segment_{}; mutable std::vector mapping_{}; mutable std::mutex mapping_mutex_{}; FlatDistanceMatrix distance_matrix_{}; diff --git a/src/core/utility/buffer_read_storage.cc b/src/core/utility/buffer_read_storage.cc index bdf5227b2..6e6c88c76 100644 --- a/src/core/utility/buffer_read_storage.cc +++ b/src/core/utility/buffer_read_storage.cc @@ -262,6 +262,8 @@ class BufferReadStorage : public IndexStorage { params.get(BUFFER_READ_STORAGE_CHECKSUM_VALIDATION, &checksum_validation_); params.get(BUFFER_READ_STORAGE_HEADER_OFFSET, &header_offset_); params.get(BUFFER_READ_STORAGE_FOOTER_OFFSET, &footer_offset_); + params.get(BUFFER_READ_STORAGE_ENABLE_DIRECT_IO, &enable_direct_io_); + enable_direct_io_ = true; return 0; } @@ -289,8 +291,8 @@ class BufferReadStorage : public IndexStorage { //! Load an index file into the container int open(const std::string &path, bool) override { // Read-only buffer pool over the freshly-dumped FileDumper container. - buffer_pool_ = std::make_shared(path, - /*writable=*/false); + buffer_pool_ = std::make_shared( + path, /*writable=*/false, /*enable_direct_io=*/enable_direct_io_); if (!buffer_pool_) { LOG_ERROR("Failed to create VecBufferPool, path: %s", path.c_str()); return IndexError_NoMemory; @@ -341,6 +343,7 @@ class BufferReadStorage : public IndexStorage { LOG_ERROR("Failed to init VecBufferPool, path: %s", path.c_str()); return IndexError_Runtime; } + buffer_pool_->warmup(); return 0; } @@ -394,6 +397,7 @@ class BufferReadStorage : public IndexStorage { private: bool checksum_validation_{false}; + bool enable_direct_io_{true}; int64_t header_offset_{0}; int64_t footer_offset_{0}; size_t index_offset_{0}; diff --git a/src/core/utility/utility_params.h b/src/core/utility/utility_params.h index 8f7f5b279..59d21788f 100644 --- a/src/core/utility/utility_params.h +++ b/src/core/utility/utility_params.h @@ -67,6 +67,8 @@ static const std::string BUFFER_READ_STORAGE_HEADER_OFFSET = "proxima.buffer.read_storage.header_offset"; static const std::string BUFFER_READ_STORAGE_FOOTER_OFFSET = "proxima.buffer.read_storage.footer_offset"; +static const std::string BUFFER_READ_STORAGE_ENABLE_DIRECT_IO = + "proxima.buffer.read_storage.enable_direct_io"; //! MMapFileStorage static const std::string MMAPFILE_STORAGE_MEMORY_LOCKED = diff --git a/src/include/zvec/core/framework/index_segment_storage.h b/src/include/zvec/core/framework/index_segment_storage.h index 06b17779a..952413911 100644 --- a/src/include/zvec/core/framework/index_segment_storage.h +++ b/src/include/zvec/core/framework/index_segment_storage.h @@ -49,6 +49,17 @@ class IndexSegmentStorage : public IndexStorage { data_crc_(segment.data_crc()), parent_(parent->clone()) {} + //! Constructor (for clone) + Segment(const IndexStorage::Segment::Pointer &cloned_parent, + size_t data_offset, size_t data_size, size_t padding_size, + uint32_t data_crc) + : data_offset_(data_offset), + data_size_(data_size), + padding_size_(padding_size), + region_size_(data_size + padding_size), + data_crc_(data_crc), + parent_(cloned_parent) {} + //! Destructor ~Segment(void) override {} @@ -114,9 +125,11 @@ class IndexSegmentStorage : public IndexStorage { return; } - //! Clone the segment + //! Clone the segment (each clone gets an independent parent buffer + //! for thread safety — concurrent reads require separate buffers). IndexStorage::Segment::Pointer clone(void) override { - return shared_from_this(); + return std::make_shared( + parent_->clone(), data_offset_, data_size_, padding_size_, data_crc_); } private: From eebedf805993146f8276ae5da16488f76b340deb Mon Sep 17 00:00:00 2001 From: Zefeng Yin Date: Thu, 25 Jun 2026 21:03:46 +0800 Subject: [PATCH 17/17] add ivf prefetch --- src/ailego/buffer/vector_page_table.cc | 67 +++++++++++++++++++ src/core/algorithm/ivf/ivf_entity.cc | 8 ++- src/core/utility/buffer_read_storage.cc | 8 +++ .../zvec/ailego/buffer/vector_page_table.h | 4 ++ .../zvec/core/framework/index_storage.h | 5 ++ 5 files changed, 90 insertions(+), 2 deletions(-) diff --git a/src/ailego/buffer/vector_page_table.cc b/src/ailego/buffer/vector_page_table.cc index 316a0297c..d892bd24f 100644 --- a/src/ailego/buffer/vector_page_table.cc +++ b/src/ailego/buffer/vector_page_table.cc @@ -769,5 +769,72 @@ void VecBufferPool::warmup() { loaded, total_pages, file_name_.c_str()); } +void VecBufferPool::prefetch_pages(block_id_t first_page, size_t page_count) { + size_t end_page = first_page + page_count; + if (end_page > page_table_.entry_num()) { + end_page = page_table_.entry_num(); + } + if (first_page >= end_page) return; + + bool all_loaded = true; + for (size_t pg = first_page; pg < end_page; ++pg) { + if (!page_table_.is_loaded(pg)) { + all_loaded = false; + break; + } + } + if (all_loaded) return; + + static constexpr size_t kChunkPages = 1024; + const size_t kChunkSize = kChunkPages * kVectorPageSize; + char *chunk_buf = static_cast(aligned_alloc(4096, kChunkSize)); + if (!chunk_buf) return; + + size_t pg = first_page; + while (pg < end_page) { + if (page_table_.is_loaded(pg)) { + ++pg; + continue; + } + size_t run_start = pg; + size_t run_end = pg + 1; + while (run_end < end_page && !page_table_.is_loaded(run_end) && + (run_end - run_start) < kChunkPages) { + ++run_end; + } + + size_t run_pages = run_end - run_start; + size_t read_bytes = run_pages * kVectorPageSize; + size_t file_off = run_start * kVectorPageSize; + ssize_t got = zvec_pread(fd_, chunk_buf, read_bytes, file_off); + if (got != static_cast(read_bytes)) { + pg = run_end; + continue; + } + + for (size_t j = 0; j < run_pages; ++j) { + block_id_t pid = static_cast(run_start + j); + if (page_table_.is_loaded(pid)) continue; + char *buf = nullptr; + bool found = MemoryLimitPool::get_instance().try_acquire_buffer( + kVectorPageSize, buf); + if (!found) break; + std::memcpy(buf, chunk_buf + j * kVectorPageSize, kVectorPageSize); + page_table_.set_block_acquired(pid, buf, file_off + j * kVectorPageSize); + page_table_.release_block(pid); + } + pg = run_end; + } + free(chunk_buf); +} + +void VecBufferPoolHandle::prefetch_range(size_t file_offset, size_t len) { + if (len == 0) return; + size_t first_page = file_offset / kVectorPageSize; + size_t last_page = (file_offset + len - 1) / kVectorPageSize; + pool_.prefetch_pages(static_cast(first_page), + last_page - first_page + 1); +} + } // namespace ailego } // namespace zvec diff --git a/src/core/algorithm/ivf/ivf_entity.cc b/src/core/algorithm/ivf/ivf_entity.cc index d41f7fc5f..d4008529b 100644 --- a/src/core/algorithm/ivf/ivf_entity.cc +++ b/src/core/algorithm/ivf/ivf_entity.cc @@ -593,11 +593,13 @@ int IVFEntity::search(size_t inverted_list_id, const void *query, auto list_meta = this->inverted_list_meta(inverted_list_id); ivf_assert(list_meta, IndexError_ReadData); + const size_t block_size = header_.block_size; + inverted_->prefetch(list_meta->offset, list_meta->block_count * block_size); + const void *data = nullptr; const size_t block_vecs = header_.block_vector_count; std::vector distances(block_vecs); const size_t batch_size = kBatchBlocks; - const size_t block_size = header_.block_size; const auto norm_val = this->inverted_list_normalize_value(inverted_list_id); for (size_t i = 0; i < list_meta->block_count; i += batch_size) { //! Read vecs @@ -667,11 +669,13 @@ int IVFEntity::search(size_t inverted_list_id, const void *query, auto list_meta = inverted_list_meta(inverted_list_id); ivf_assert(list_meta, IndexError_ReadData); + const size_t block_size = header_.block_size; + inverted_->prefetch(list_meta->offset, list_meta->block_count * block_size); + const void *data = nullptr; const size_t block_vecs = header_.block_vector_count; std::vector distances(block_vecs); const size_t batch_size = kBatchBlocks; - const size_t block_size = header_.block_size; const auto norm_val = this->inverted_list_normalize_value(inverted_list_id); for (size_t i = 0; i < list_meta->block_count; i += batch_size) { //! Read vecs diff --git a/src/core/utility/buffer_read_storage.cc b/src/core/utility/buffer_read_storage.cc index 6e6c88c76..376a18dfd 100644 --- a/src/core/utility/buffer_read_storage.cc +++ b/src/core/utility/buffer_read_storage.cc @@ -238,6 +238,14 @@ class BufferReadStorage : public IndexStorage { return std::make_shared(*this); } + void prefetch(size_t offset, size_t len) override { + if (offset + len > region_size_) { + len = (offset > region_size_) ? 0 : region_size_ - offset; + } + if (len == 0) return; + handle_->prefetch_range(data_offset_ + offset, len); + } + //! No stable base pointer: data lives in an evictable paged cache. const uint8_t *base_data(void) const override { return nullptr; diff --git a/src/include/zvec/ailego/buffer/vector_page_table.h b/src/include/zvec/ailego/buffer/vector_page_table.h index 9584f60b5..a10c15669 100644 --- a/src/include/zvec/ailego/buffer/vector_page_table.h +++ b/src/include/zvec/ailego/buffer/vector_page_table.h @@ -296,6 +296,8 @@ class VecBufferPool { //! Sequentially preload pages into the pool until pool is full. void warmup(); + void prefetch_pages(block_id_t first_page, size_t page_count); + //! Try to acquire a page buffer WITHOUT triggering disk I/O. //! Returns the buffer pointer if the page is already in memory, //! or nullptr if the page would need a pread (cache miss). @@ -340,6 +342,8 @@ class VecBufferPoolHandle { bool read_range(size_t file_offset, size_t len, char *out); + void prefetch_range(size_t file_offset, size_t len); + int get_meta(size_t offset, size_t length, char *buffer); int write_range(size_t file_offset, size_t len, const char *src); diff --git a/src/include/zvec/core/framework/index_storage.h b/src/include/zvec/core/framework/index_storage.h index ca4e24dbb..d7142d765 100644 --- a/src/include/zvec/core/framework/index_storage.h +++ b/src/include/zvec/core/framework/index_storage.h @@ -336,6 +336,11 @@ class IndexStorage : public IndexModule { virtual size_t abs_data_offset(void) const { return 0; } + + virtual void prefetch(size_t offset, size_t len) { + (void)offset; + (void)len; + } }; //! Destructor