From 3b58f0d4f2c897f9fe9dfed792ceb78cc41d13e6 Mon Sep 17 00:00:00 2001 From: Kevin Montrose Date: Wed, 17 Jun 2026 15:59:46 -0400 Subject: [PATCH 1/8] drop alignment hackery in VectorSessionFunctions now that Tsavorite guarantees alignment for values --- .../Server/Migration/MigrateOperation.cs | 1 - libs/server/InputHeader.cs | 2 - .../Resp/Vector/VectorManager.Callbacks.cs | 4 - .../Resp/Vector/VectorManager.Migration.cs | 1 - .../VectorStore/VectorSessionFunctions.cs | 474 +++++++----------- 5 files changed, 171 insertions(+), 311 deletions(-) diff --git a/libs/cluster/Server/Migration/MigrateOperation.cs b/libs/cluster/Server/Migration/MigrateOperation.cs index 48aa19da260..c7389f80c6e 100644 --- a/libs/cluster/Server/Migration/MigrateOperation.cs +++ b/libs/cluster/Server/Migration/MigrateOperation.cs @@ -95,7 +95,6 @@ public async Task TransmitSlotsAsync() input.arg1 = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve; VectorInput vectorInput = new(); - vectorInput.AlignmentExpected = true; // We're moving DiskANN sourced data, so alignment is expected vectorInput.MaxMigrationHeapAllocationSize = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve; foreach (var (ns, key, hasNs) in sketch.argSliceVector) diff --git a/libs/server/InputHeader.cs b/libs/server/InputHeader.cs index deebfb6aa94..505c687e31e 100644 --- a/libs/server/InputHeader.cs +++ b/libs/server/InputHeader.cs @@ -621,8 +621,6 @@ public struct VectorInput : IStoreInput public nint CallbackContext { get; set; } public nint Callback { get; set; } - public bool AlignmentExpected { get; set; } - [MemberNotNullWhen(returnValue: true, member: nameof(MaxMigrationHeapAllocationSize))] public bool IsMigrationRead => MaxMigrationHeapAllocationSize != null; diff --git a/libs/server/Resp/Vector/VectorManager.Callbacks.cs b/libs/server/Resp/Vector/VectorManager.Callbacks.cs index 301f159b504..90d37def695 100644 --- a/libs/server/Resp/Vector/VectorManager.Callbacks.cs +++ b/libs/server/Resp/Vector/VectorManager.Callbacks.cs @@ -225,7 +225,6 @@ private static unsafe byte WriteCallbackUnmanaged(ulong context, nint keyData, n var keyWithNamespace = MakeVectorElementKey(context, keyData, keyLength); ref var ctx = ref ActiveThreadSession.vectorBasicContext; VectorInput input = new(); - input.AlignmentExpected = true; var valueSpan = SpanByte.FromPinnedPointer((byte*)writeData, (int)writeLength); VectorOutput outputSpan = new(); @@ -289,9 +288,6 @@ private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, R VectorInput input = new(); input.ReadDesiredSize = -1; - // Sometimes we read DiskANN written data from the .NET side - // If that's the case, we need to pad for alignment even though .NET doesn't require it - input.AlignmentExpected = forceAlignment; fixed (byte* ptr = value.Span) { VectorOutput asSpanByte = new(ptr, value.Length); diff --git a/libs/server/Resp/Vector/VectorManager.Migration.cs b/libs/server/Resp/Vector/VectorManager.Migration.cs index 47926d6b64c..96b35f73e34 100644 --- a/libs/server/Resp/Vector/VectorManager.Migration.cs +++ b/libs/server/Resp/Vector/VectorManager.Migration.cs @@ -48,7 +48,6 @@ ReadOnlySpan value #endif VectorInput input = default; - input.AlignmentExpected = true; VectorOutput outputSpan = new(new SpanByteAndMemory()); VectorElementKey key = new(namespaceBytes[0..1], keyWithoutNamespace); diff --git a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs index c67423ce2aa..1946973c2f2 100644 --- a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs +++ b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs @@ -17,8 +17,6 @@ namespace Garnet.server /// public readonly struct VectorSessionFunctions : ISessionFunctions { - private const int ValueAlignmentBytes = 4; - private readonly FunctionsState functionsState; private readonly ReadSessionState readSessionState; @@ -41,80 +39,73 @@ public readonly bool Reader(in TSourceLogRecord srcLogRecord, Debug.Assert(srcLogRecord.HasNamespace, "Should never write a non-namespaced value with VectorSessionFunctions"); Debug.Assert(srcLogRecord.NamespaceBytes.Length == 1, "Variable length namespaces not supported"); - var value = AlignOrPin(in srcLogRecord, ref input, out var pin); - try + var value = srcLogRecord.ValueSpan; + if (input.IsMigrationRead) { - if (input.IsMigrationRead) - { - Debug.Assert(input.Callback == 0, "No callback expected"); + Debug.Assert(input.Callback == 0, "No callback expected"); - // We can't ship the log record over because of alignment shenanigans - // TODO: When alignment is handled at the Tsavorite level, we CAN start shipping the log over like everything else + // We can't ship the log record over because of alignment shenanigans + // TODO: When alignment is handled at the Tsavorite level, we CAN start shipping the log over like everything else - var neededSpace = - sizeof(int) + srcLogRecord.NamespaceBytes.Length + - sizeof(int) + srcLogRecord.KeyBytes.Length + - sizeof(int) + value.Length; + var neededSpace = + sizeof(int) + srcLogRecord.NamespaceBytes.Length + + sizeof(int) + srcLogRecord.KeyBytes.Length + + sizeof(int) + value.Length; - output.SpanByteAndMemory.EnsureHeapMemorySize(neededSpace); + output.SpanByteAndMemory.EnsureHeapMemorySize(neededSpace); - var writeTo = output.SpanByteAndMemory.Span; + var writeTo = output.SpanByteAndMemory.Span; - BinaryPrimitives.WriteInt32LittleEndian(writeTo, srcLogRecord.NamespaceBytes.Length); - writeTo = writeTo[sizeof(int)..]; - srcLogRecord.NamespaceBytes.CopyTo(writeTo); - writeTo = writeTo[srcLogRecord.NamespaceBytes.Length..]; + BinaryPrimitives.WriteInt32LittleEndian(writeTo, srcLogRecord.NamespaceBytes.Length); + writeTo = writeTo[sizeof(int)..]; + srcLogRecord.NamespaceBytes.CopyTo(writeTo); + writeTo = writeTo[srcLogRecord.NamespaceBytes.Length..]; - BinaryPrimitives.WriteInt32LittleEndian(writeTo, srcLogRecord.KeyBytes.Length); - writeTo = writeTo[sizeof(int)..]; - srcLogRecord.KeyBytes.CopyTo(writeTo); - writeTo = writeTo[srcLogRecord.KeyBytes.Length..]; + BinaryPrimitives.WriteInt32LittleEndian(writeTo, srcLogRecord.KeyBytes.Length); + writeTo = writeTo[sizeof(int)..]; + srcLogRecord.KeyBytes.CopyTo(writeTo); + writeTo = writeTo[srcLogRecord.KeyBytes.Length..]; - // Move value over _without_ any padding for alignment - BinaryPrimitives.WriteInt32LittleEndian(writeTo, value.Length); - writeTo = writeTo[sizeof(int)..]; - value.CopyTo(writeTo); + // Move value over _without_ any padding for alignment + BinaryPrimitives.WriteInt32LittleEndian(writeTo, value.Length); + writeTo = writeTo[sizeof(int)..]; + value.CopyTo(writeTo); - return true; - } + return true; + } - unsafe + unsafe + { + if (input.Callback != 0) { - if (input.Callback != 0) - { - var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; + var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; - var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(value)); - var dataLen = (nuint)value.Length; + var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(value)); + var dataLen = (nuint)value.Length; - callback(input.Index, input.CallbackContext, dataPtr, dataLen); - return true; - } + callback(input.Index, input.CallbackContext, dataPtr, dataLen); + return true; } + } - if (input.ReadDesiredSize > 0) - { - Debug.Assert(output.SpanByteAndMemory.Length >= value.Length, "Should always have space for vector point reads"); - - output.SpanByteAndMemory.Length = value.Length; - value.CopyTo(output.SpanByteAndMemory.Span); - } - else - { - input.ReadDesiredSize = value.Length; - if (output.SpanByteAndMemory.Length >= value.Length) - { - value.CopyTo(output.SpanByteAndMemory.Span); - output.SpanByteAndMemory.Length = value.Length; - } - } + if (input.ReadDesiredSize > 0) + { + Debug.Assert(output.SpanByteAndMemory.Length >= value.Length, "Should always have space for vector point reads"); - return true; + output.SpanByteAndMemory.Length = value.Length; + value.CopyTo(output.SpanByteAndMemory.Span); } - finally + else { - pin?.Free(); + input.ReadDesiredSize = value.Length; + if (output.SpanByteAndMemory.Length >= value.Length) + { + value.CopyTo(output.SpanByteAndMemory.Span); + output.SpanByteAndMemory.Length = value.Length; + } } + + return true; } /// @@ -130,17 +121,10 @@ public readonly bool InitialWriter(ref LogRecord logRecord, in RecordSizeInfo si Debug.Assert(logRecord.HasNamespace, "Should never write a non-namespaced value with VectorSessionFunctions"); Debug.Assert(logRecord.NamespaceBytes.Length == 1, "Variable length namespaces not supported"); - var value = AlignOrPin(in logRecord, ref input, out var pin); - try - { - srcValue.CopyTo(value); + var value = logRecord.ValueSpan; + srcValue.CopyTo(value); - return logRecord.TrySetContentLengths(logRecord.ValueSpan.Length, in sizeInfo); - } - finally - { - pin?.Free(); - } + return logRecord.TrySetContentLengths(logRecord.ValueSpan.Length, in sizeInfo); } /// @@ -158,17 +142,10 @@ public readonly bool InPlaceWriter(ref LogRecord logRecord, ref VectorInput inpu Debug.Assert(logRecord.HasNamespace, "Should never write a non-namespaced value with VectorSessionFunctions"); Debug.Assert(logRecord.NamespaceBytes.Length == 1, "Variable length namespaces not supported"); - var value = AlignOrPin(in logRecord, ref input, out var pin); - try - { - newValue.CopyTo(value); + var value = logRecord.ValueSpan; + newValue.CopyTo(value); - return true; - } - finally - { - pin?.Free(); - } + return true; } /// @@ -195,17 +172,7 @@ public readonly RecordFieldInfo GetRMWModifiedFieldInfo(in TSo return new RecordFieldInfo() { KeySize = srcLogRecord.Key.Length, ValueSize = value.Length + (-input.WriteDesiredSize) }; } - var needsAlignmentPadding = input.AlignmentExpected || input.Callback != 0; - - // Constant size indicated - if (needsAlignmentPadding) - { - return new RecordFieldInfo() { KeySize = srcLogRecord.Key.Length, ValueSize = input.WriteDesiredSize + ValueAlignmentBytes }; - } - else - { - return new RecordFieldInfo() { KeySize = srcLogRecord.Key.Length, ValueSize = input.WriteDesiredSize }; - } + return new RecordFieldInfo() { KeySize = srcLogRecord.Key.Length, ValueSize = input.WriteDesiredSize }; } /// Initial expected length of value object when populated by RMW using given input @@ -217,21 +184,12 @@ public readonly RecordFieldInfo GetRMWInitialFieldInfo(TKey key, ref Vecto { var effectiveWriteDesiredSize = input.WriteDesiredSize; - var needsAlignmentPadding = input.AlignmentExpected || input.Callback != 0; - if (effectiveWriteDesiredSize < 0) { effectiveWriteDesiredSize = -effectiveWriteDesiredSize; } - if (!needsAlignmentPadding) - { - return new() { KeySize = key.KeyBytes.Length, ValueSize = effectiveWriteDesiredSize }; - } - else - { - return new() { KeySize = key.KeyBytes.Length, ValueSize = effectiveWriteDesiredSize + ValueAlignmentBytes }; - } + return new() { KeySize = key.KeyBytes.Length, ValueSize = effectiveWriteDesiredSize }; } /// Length of value object, when populated by Upsert using given value and input @@ -240,7 +198,7 @@ public readonly RecordFieldInfo GetUpsertFieldInfo(TKey key, ReadOnlySpan< #if NET9_0_OR_GREATER , allows ref struct #endif - => new() { KeySize = key.KeyBytes.Length, ValueSize = value.Length + ValueAlignmentBytes }; + => new() { KeySize = key.KeyBytes.Length, ValueSize = value.Length }; /// Length of value object, when populated by Upsert using given value and input public readonly RecordFieldInfo GetUpsertFieldInfo(TKey key, IHeapObject value, ref VectorInput input) @@ -282,52 +240,43 @@ public readonly bool InitialUpdater(ref LogRecord logRecord, in RecordSizeInfo s Debug.Assert(logRecord.NamespaceBytes.Length == 1, "Variable length namespaces not supported"); var key = logRecord.Key; - var alignedValue = AlignOrPin(in logRecord, ref input, out var pin); - - try + var value = logRecord.ValueSpan; + if (input.Callback == 0) { + Debug.Assert(logRecord.NamespaceBytes.Length == 1 && logRecord.NamespaceBytes[0] == VectorManager.MetadataNamespace, "Should never write a non-namespaced value with VectorSessionFunctions"); + Debug.Assert(key.Length == 0, "Shouldn't have a non-zero key, expected to working on ContextMetadata"); + + // Operating on ContextMetadata - if (input.Callback == 0) + PinnedSpanByte newMetadataValue; + unsafe { - Debug.Assert(logRecord.NamespaceBytes.Length == 1 && logRecord.NamespaceBytes[0] == VectorManager.MetadataNamespace, "Should never write a non-namespaced value with VectorSessionFunctions"); - Debug.Assert(key.Length == 0, "Shouldn't have a non-zero key, expected to working on ContextMetadata"); + newMetadataValue = PinnedSpanByte.FromPinnedPointer((byte*)input.CallbackContext, VectorManager.ContextMetadata.Size); + } - // Operating on ContextMetadata + newMetadataValue.CopyTo(value); - PinnedSpanByte newMetadataValue; - unsafe - { - newMetadataValue = PinnedSpanByte.FromPinnedPointer((byte*)input.CallbackContext, VectorManager.ContextMetadata.Size); - } + return logRecord.TrySetContentLengths(logRecord.ValueSpan.Length, in sizeInfo); + } + else + { + Debug.Assert(input.WriteDesiredSize <= value.Length, "Insufficient space for initial update, this should never happen"); - newMetadataValue.CopyTo(alignedValue); + // Must explicitly 0 before passing if we're doing an initial update + value.Clear(); - return logRecord.TrySetContentLengths(logRecord.ValueSpan.Length, in sizeInfo); - } - else + unsafe { - Debug.Assert(input.WriteDesiredSize <= alignedValue.Length, "Insufficient space for initial update, this should never happen"); - - // Must explicitly 0 before passing if we're doing an initial update - alignedValue.Clear(); - - unsafe - { - // Callback takes: dataCallbackContext, dataPtr, dataLength - var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; + // Callback takes: dataCallbackContext, dataPtr, dataLength + var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; - var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(alignedValue)); - var dataLen = (nuint)input.WriteDesiredSize; - callback(input.CallbackContext, dataPtr, dataLen); + var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(value)); + var dataLen = (nuint)input.WriteDesiredSize; + callback(input.CallbackContext, dataPtr, dataLen); - return logRecord.TrySetContentLengths(logRecord.ValueSpan.Length, in sizeInfo); - } + return logRecord.TrySetContentLengths(logRecord.ValueSpan.Length, in sizeInfo); } } - finally - { - pin?.Free(); - } } #endregion InitialUpdater @@ -346,68 +295,60 @@ public readonly bool CopyUpdater(in TSourceLogRecord srcLogRec var key = srcLogRecord.Key; - var oldValueAligned = AlignOrPin(in srcLogRecord, ref input, out var srcPin); - var newValueAligned = AlignOrPin(in dstLogRecord, ref input, out var dstPin); + var oldValue = srcLogRecord.ValueSpan; + var newValue = dstLogRecord.ValueSpan; - try + if (input.Callback == 0) { - if (input.Callback == 0) - { - // We're doing a Metadata or InProgressDelete update + // We're doing a Metadata or InProgressDelete update - Debug.Assert(srcLogRecord.NamespaceBytes[0] == VectorManager.MetadataNamespace, "Should be operating on special namespace"); - Debug.Assert(key.Length == 0, "Shouldn't have a non-zero key, expected to working on ContextMetadata"); + Debug.Assert(srcLogRecord.NamespaceBytes[0] == VectorManager.MetadataNamespace, "Should be operating on special namespace"); + Debug.Assert(key.Length == 0, "Shouldn't have a non-zero key, expected to working on ContextMetadata"); - // Doing a Metadata update - Debug.Assert(srcLogRecord.ValueSpan.Length == VectorManager.ContextMetadata.Size, "Should be ContextMetadata"); - Debug.Assert(dstLogRecord.ValueSpan.Length == VectorManager.ContextMetadata.Size, "Should be ContextMetadata"); - Debug.Assert(input.CallbackContext != 0, "Should have data on VectorInput"); + // Doing a Metadata update + Debug.Assert(srcLogRecord.ValueSpan.Length == VectorManager.ContextMetadata.Size, "Should be ContextMetadata"); + Debug.Assert(dstLogRecord.ValueSpan.Length == VectorManager.ContextMetadata.Size, "Should be ContextMetadata"); + Debug.Assert(input.CallbackContext != 0, "Should have data on VectorInput"); - ref readonly var oldMetadata = ref MemoryMarshal.Cast(oldValueAligned)[0]; + ref readonly var oldMetadata = ref MemoryMarshal.Cast(oldValue)[0]; - PinnedSpanByte newMetadataValue; - unsafe - { - newMetadataValue = PinnedSpanByte.FromPinnedPointer((byte*)input.CallbackContext, VectorManager.ContextMetadata.Size); - } + PinnedSpanByte newMetadataValue; + unsafe + { + newMetadataValue = PinnedSpanByte.FromPinnedPointer((byte*)input.CallbackContext, VectorManager.ContextMetadata.Size); + } - ref readonly var newMetadata = ref MemoryMarshal.Cast(newMetadataValue.ReadOnlySpan)[0]; + ref readonly var newMetadata = ref MemoryMarshal.Cast(newMetadataValue.ReadOnlySpan)[0]; - if (newMetadata.Version < oldMetadata.Version) - { - rmwInfo.Action = RMWAction.CancelOperation; - return false; - } - - newMetadataValue.CopyTo(newValueAligned); - return dstLogRecord.TrySetContentLengths(srcLogRecord.ValueSpan.Length, in sizeInfo); - } - else + if (newMetadata.Version < oldMetadata.Version) { - Debug.Assert(input.WriteDesiredSize <= newValueAligned.Length, "Insufficient space for copy update, this should never happen"); - Debug.Assert(input.WriteDesiredSize <= oldValueAligned.Length, "Insufficient space for copy update, this should never happen"); - - oldValueAligned.CopyTo(newValueAligned); + rmwInfo.Action = RMWAction.CancelOperation; + return false; + } - unsafe - { - // Callback takes: dataCallbackContext, dataPtr, dataLength - var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; + newMetadataValue.CopyTo(newValue); + return dstLogRecord.TrySetContentLengths(srcLogRecord.ValueSpan.Length, in sizeInfo); + } + else + { + Debug.Assert(input.WriteDesiredSize <= newValue.Length, "Insufficient space for copy update, this should never happen"); + Debug.Assert(input.WriteDesiredSize <= oldValue.Length, "Insufficient space for copy update, this should never happen"); - var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(newValueAligned)); - var dataLen = (nuint)input.WriteDesiredSize; + oldValue.CopyTo(newValue); - callback(input.CallbackContext, dataPtr, dataLen); - } + unsafe + { + // Callback takes: dataCallbackContext, dataPtr, dataLength + var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; - return true; + var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(newValue)); + var dataLen = (nuint)input.WriteDesiredSize; + callback(input.CallbackContext, dataPtr, dataLen); } - } - finally - { - srcPin?.Free(); - dstPin?.Free(); + + return true; + } } #endregion CopyUpdater @@ -421,85 +362,78 @@ public readonly bool InPlaceUpdater(ref LogRecord logRecord, ref VectorInput inp var key = logRecord.Key; - var alignedValue = AlignOrPin(in logRecord, ref input, out var pin); - try + var alignedValue = logRecord.ValueSpan; + if (input.Callback == 0) { - if (input.Callback == 0) - { - // We're doing a Metadata or InProgressDelete update - - Debug.Assert(logRecord.NamespaceBytes.Length == 1 && logRecord.NamespaceBytes[0] == VectorManager.MetadataNamespace, "Should be operating on special namespace"); - - if (key.Length == 0) - { - // Doing a Metadata update - Debug.Assert(alignedValue.Length >= VectorManager.ContextMetadata.Size, "Should be ContextMetadata"); - Debug.Assert(input.CallbackContext != 0, "Should have data on VectorInput"); - - ref readonly var oldMetadata = ref MemoryMarshal.Cast(alignedValue)[0]; + // We're doing a Metadata or InProgressDelete update - PinnedSpanByte newMetadataValue; - unsafe - { - newMetadataValue = PinnedSpanByte.FromPinnedPointer((byte*)input.CallbackContext, VectorManager.ContextMetadata.Size); - } + Debug.Assert(logRecord.NamespaceBytes.Length == 1 && logRecord.NamespaceBytes[0] == VectorManager.MetadataNamespace, "Should be operating on special namespace"); - ref readonly var newMetadata = ref MemoryMarshal.Cast(newMetadataValue.ReadOnlySpan)[0]; + if (key.Length == 0) + { + // Doing a Metadata update + Debug.Assert(alignedValue.Length >= VectorManager.ContextMetadata.Size, "Should be ContextMetadata"); + Debug.Assert(input.CallbackContext != 0, "Should have data on VectorInput"); - if (newMetadata.Version < oldMetadata.Version) - { - rmwInfo.Action = RMWAction.CancelOperation; - return false; - } + ref readonly var oldMetadata = ref MemoryMarshal.Cast(alignedValue)[0]; - newMetadataValue.CopyTo(alignedValue); - return true; - } - else + PinnedSpanByte newMetadataValue; + unsafe { - // Doing an InProgressDelete update - Debug.Assert(input.CallbackContext != 0, "Should have data on VectorInput"); - Debug.Assert(key.Length == 1 && key[0] == 1, "Should be working on InProgressDeletes"); - - Span inProgressDeleteUpdateData; - bool adding; + newMetadataValue = PinnedSpanByte.FromPinnedPointer((byte*)input.CallbackContext, VectorManager.ContextMetadata.Size); + } - unsafe - { - var len = BinaryPrimitives.ReadInt32LittleEndian(new Span((byte*)input.CallbackContext + sizeof(long), sizeof(int))); - adding = len > 0; - if (!adding) - { - len = -len; - } - - inProgressDeleteUpdateData = new Span((byte*)input.CallbackContext, sizeof(ulong) + sizeof(int) + len); - } + ref readonly var newMetadata = ref MemoryMarshal.Cast(newMetadataValue.ReadOnlySpan)[0]; - return true; + if (newMetadata.Version < oldMetadata.Version) + { + rmwInfo.Action = RMWAction.CancelOperation; + return false; } + + newMetadataValue.CopyTo(alignedValue); + return true; } else { - Debug.Assert(input.WriteDesiredSize <= alignedValue.Length, "Insufficient space for inplace update, this should never happen"); + // Doing an InProgressDelete update + Debug.Assert(input.CallbackContext != 0, "Should have data on VectorInput"); + Debug.Assert(key.Length == 1 && key[0] == 1, "Should be working on InProgressDeletes"); + + Span inProgressDeleteUpdateData; + bool adding; unsafe { - // Callback takes: dataCallbackContext, dataPtr, dataLength - var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; - - var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(alignedValue)); - var dataLen = (nuint)input.WriteDesiredSize; + var len = BinaryPrimitives.ReadInt32LittleEndian(new Span((byte*)input.CallbackContext + sizeof(long), sizeof(int))); + adding = len > 0; + if (!adding) + { + len = -len; + } - callback(input.CallbackContext, dataPtr, dataLen); + inProgressDeleteUpdateData = new Span((byte*)input.CallbackContext, sizeof(ulong) + sizeof(int) + len); } return true; } } - finally + else { - pin?.Free(); + Debug.Assert(input.WriteDesiredSize <= alignedValue.Length, "Insufficient space for inplace update, this should never happen"); + + unsafe + { + // Callback takes: dataCallbackContext, dataPtr, dataLength + var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; + + var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(alignedValue)); + var dataLen = (nuint)input.WriteDesiredSize; + + callback(input.CallbackContext, dataPtr, dataLen); + } + + return true; } } #endregion InPlaceUpdater @@ -540,72 +474,6 @@ private static TReturn ObjectOperationsNotExpected([CallerMemberName] s private static TReturn LogRecordOperationsNotExpected([CallerMemberName] string callerName = null, [CallerLineNumber] int lineNum = -1) => throw new InvalidOperationException($"LogRecord related operations are not expected, was: {callerName} on {lineNum}"); - // TODO: Remove all this alignment hackery when Tsavorite can enforce it - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static unsafe Span AlignOrPin(in TSourceLogRecord logRecord, ref VectorInput input, out GCHandle? pin) - where TSourceLogRecord : ISourceLogRecord - { - var maybeUnaligned = logRecord.ValueSpan; - - // Alignment is expected if we're passing to DiskANN or Garnet code explicitly requested it - var inputRequiresAligment = input.AlignmentExpected || input.Callback != 0; - - if (inputRequiresAligment) - { - if (logRecord.IsPinnedValue) - { - // LogRecord itself is in POH, but value might not be aligned so we need to do some checking - - Span ret; - - var leading = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(maybeUnaligned)) % 4; - if (leading == 0) - { - ret = maybeUnaligned[..^ValueAlignmentBytes]; - } - else - { - var skip = (int)(ValueAlignmentBytes - leading); - var tail = ValueAlignmentBytes - skip; - ret = maybeUnaligned[skip..^tail]; - } - - AssertAlignment(ret); - - pin = null; - return ret; - } - else - { - // Value isn't in log record, it's on the (presumably unpinned) heap as a byte[] - // - // This guarantees it's aligned, but it might move during any callback so pin - - pin = logRecord.ValueOverflow.Pin(); - - // We over allocated (we don't know how Tsavorite is going to place the value in advance) so trim the extra allocation off the end. - var ret = maybeUnaligned[..^ValueAlignmentBytes]; - - AssertAlignment(ret); - - return ret; - } - } - else - { - pin = null; - return maybeUnaligned; - } - } - - [Conditional("DEBUG")] - private static unsafe void AssertAlignment(ReadOnlySpan aligned) - { - var ptr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(aligned)); - Debug.Assert((ptr % ValueAlignmentBytes) == 0, "Must guarantee 4-byte alignment before invoking callback"); - } - #region Post operation callbacks /// public readonly void PostInitialWriter(ref LogRecord logRecord, in RecordSizeInfo sizeInfo, ref VectorInput input, ReadOnlySpan srcValue, ref VectorOutput output, ref UpsertInfo upsertInfo) From 61041ca936aba81f8241b6af6a0a8c22fa0a4bd3 Mon Sep 17 00:00:00 2001 From: Kevin Montrose Date: Wed, 17 Jun 2026 16:28:20 -0400 Subject: [PATCH 2/8] make VectorInput readonly and move outputs to... VectorOutput --- .../Server/Migration/MigrateOperation.cs | 6 ++- libs/server/InputHeader.cs | 14 +++--- .../Resp/Vector/VectorManager.Callbacks.cs | 38 ++++++++------- .../Vector/VectorManager.ContextMetadata.cs | 11 +++-- libs/server/Resp/Vector/VectorManager.cs | 21 -------- .../VectorStore/VectorSessionFunctions.cs | 2 +- libs/server/VectorOutput.cs | 5 ++ .../RespVectorSetTests.cs | 48 ++++++++++++------- 8 files changed, 76 insertions(+), 69 deletions(-) diff --git a/libs/cluster/Server/Migration/MigrateOperation.cs b/libs/cluster/Server/Migration/MigrateOperation.cs index c7389f80c6e..dd0b7846299 100644 --- a/libs/cluster/Server/Migration/MigrateOperation.cs +++ b/libs/cluster/Server/Migration/MigrateOperation.cs @@ -94,8 +94,10 @@ public async Task TransmitSlotsAsync() var input = new UnifiedInput(RespCommand.MIGRATE); input.arg1 = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve; - VectorInput vectorInput = new(); - vectorInput.MaxMigrationHeapAllocationSize = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve; + VectorInput vectorInput = new() + { + MaxMigrationHeapAllocationSize = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve + }; foreach (var (ns, key, hasNs) in sketch.argSliceVector) { diff --git a/libs/server/InputHeader.cs b/libs/server/InputHeader.cs index 505c687e31e..66015cb8f44 100644 --- a/libs/server/InputHeader.cs +++ b/libs/server/InputHeader.cs @@ -609,22 +609,22 @@ public unsafe int DeserializeFrom(byte* src) /// /// Header for Garnet Main Store inputs but for Vector element r/w/d ops /// - public struct VectorInput : IStoreInput + public readonly struct VectorInput : IStoreInput { public int SerializedLength => throw new NotImplementedException(); - public int ReadDesiredSize { get; set; } + public int ReadDesiredSize { get; init; } - public int WriteDesiredSize { get; set; } + public int WriteDesiredSize { get; init; } - public int Index { get; set; } - public nint CallbackContext { get; set; } - public nint Callback { get; set; } + public int Index { get; init; } + public nint CallbackContext { get; init; } + public nint Callback { get; init; } [MemberNotNullWhen(returnValue: true, member: nameof(MaxMigrationHeapAllocationSize))] public bool IsMigrationRead => MaxMigrationHeapAllocationSize != null; - public int? MaxMigrationHeapAllocationSize { get; set; } + public int? MaxMigrationHeapAllocationSize { get; init; } public VectorInput() { diff --git a/libs/server/Resp/Vector/VectorManager.Callbacks.cs b/libs/server/Resp/Vector/VectorManager.Callbacks.cs index 90d37def695..0420d2fcdb4 100644 --- a/libs/server/Resp/Vector/VectorManager.Callbacks.cs +++ b/libs/server/Resp/Vector/VectorManager.Callbacks.cs @@ -144,10 +144,12 @@ public readonly void GetInput(int i, out VectorInput input) { Debug.Assert(i >= 0 && i < Count, "Trying to advance out of bounds"); - input = default; - input.CallbackContext = callbackContext; - input.Callback = (nint)callback; - input.Index = i; + input = new() + { + CallbackContext = callbackContext, + Callback = (nint)callback, + Index = i, + }; } /// @@ -257,10 +259,12 @@ private static byte ReadModifyWriteCallbackUnmanaged(ulong context, nint keyData ref var ctx = ref ActiveThreadSession.vectorBasicContext; - VectorInput input = default; - input.Callback = dataCallback; - input.CallbackContext = dataCallbackContext; - input.WriteDesiredSize = (int)writeLength; + VectorInput input = new() + { + Callback = dataCallback, + CallbackContext = dataCallbackContext, + WriteDesiredSize = (int)writeLength, + }; var status = ctx.RMW(keyWithNamespace, ref input); if (status.IsPending) @@ -285,17 +289,19 @@ private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, R while (true) { - VectorInput input = new(); - input.ReadDesiredSize = -1; + VectorInput input = new() + { + ReadDesiredSize = -1 + }; fixed (byte* ptr = value.Span) { - VectorOutput asSpanByte = new(ptr, value.Length); + VectorOutput output = new(ptr, value.Length); - var status = ctx.Read(keyWithNamespace, ref input, ref asSpanByte); + var status = ctx.Read(keyWithNamespace, ref input, ref output); if (status.IsPending) { - CompletePending(ref status, ref input, ref asSpanByte, ref ctx); + CompletePending(ref status, ref output, ref ctx); } if (!status.Found) @@ -304,15 +310,15 @@ private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, R return false; } - if (input.ReadDesiredSize > asSpanByte.SpanByteAndMemory.Length) + if (output.UpdatedReadDesiredSize != null && output.UpdatedReadDesiredSize.Value > output.SpanByteAndMemory.Length) { value.Memory?.Dispose(); - var newAlloc = MemoryPool.Shared.Rent(input.ReadDesiredSize); + var newAlloc = MemoryPool.Shared.Rent(output.UpdatedReadDesiredSize.Value); value = new(newAlloc, newAlloc.Memory.Length); continue; } - value.Length = asSpanByte.SpanByteAndMemory.Length; + value.Length = output.SpanByteAndMemory.Length; return true; } } diff --git a/libs/server/Resp/Vector/VectorManager.ContextMetadata.cs b/libs/server/Resp/Vector/VectorManager.ContextMetadata.cs index 1dfb23075ff..e82b8c17f51 100644 --- a/libs/server/Resp/Vector/VectorManager.ContextMetadata.cs +++ b/libs/server/Resp/Vector/VectorManager.ContextMetadata.cs @@ -508,12 +508,15 @@ private void UpdateContextMetadata(ref VectorBasicContext ctx) // empty key is context metadata VectorElementKey key = new(nsBytes, []); - VectorInput input = default; - input.Callback = 0; - input.WriteDesiredSize = ContextMetadata.Size; + VectorInput input; unsafe { - input.CallbackContext = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(dataSpan)); + input = new() + { + Callback = 0, + WriteDesiredSize = ContextMetadata.Size, + CallbackContext = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(dataSpan)), + }; } var status = ctx.RMW(key, ref input); diff --git a/libs/server/Resp/Vector/VectorManager.cs b/libs/server/Resp/Vector/VectorManager.cs index 4153e473ad4..cbab9b7154d 100644 --- a/libs/server/Resp/Vector/VectorManager.cs +++ b/libs/server/Resp/Vector/VectorManager.cs @@ -331,27 +331,6 @@ private static void CompletePending(ref Status status, ref VectorOutput output, completedOutputs.Dispose(); } - /// - /// As , but also propagates the - /// completed back to the caller. - /// - /// This is required for the unknown-size read path (): the Reader records the actual - /// value size on , and on the pending (disk) path that update lives on the - /// pending context's copy of the input. Without propagating it back the caller keeps its stale value and skips the - /// grow-and-retry, returning a truncated/uninitialized buffer. - /// - private static void CompletePending(ref Status status, ref VectorInput input, ref VectorOutput output, ref VectorBasicContext ctx) - { - _ = ctx.CompletePendingWithOutputs(out var completedOutputs, wait: true); - var more = completedOutputs.Next(); - Debug.Assert(more); - status = completedOutputs.Current.Status; - output = completedOutputs.Current.Output; - input = completedOutputs.Current.Input; - Debug.Assert(!completedOutputs.Next()); - completedOutputs.Dispose(); - } - private static void CompletePending(ref Status status, ref StringBasicContext ctx) { _ = ctx.CompletePendingWithOutputs(out var completedOutputs, wait: true); diff --git a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs index 1946973c2f2..b7b90cfe90d 100644 --- a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs +++ b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs @@ -97,7 +97,7 @@ public readonly bool Reader(in TSourceLogRecord srcLogRecord, } else { - input.ReadDesiredSize = value.Length; + output.UpdatedReadDesiredSize = value.Length; if (output.SpanByteAndMemory.Length >= value.Length) { value.CopyTo(output.SpanByteAndMemory.Span); diff --git a/libs/server/VectorOutput.cs b/libs/server/VectorOutput.cs index fd9df75d6fe..1ac40800af1 100644 --- a/libs/server/VectorOutput.cs +++ b/libs/server/VectorOutput.cs @@ -18,6 +18,11 @@ public struct VectorOutput /// public SpanByteAndMemory SpanByteAndMemory; + /// + /// If a call needs a larger than was provided, it is stored here. + /// + public int? UpdatedReadDesiredSize { get; set; } + public VectorOutput() => SpanByteAndMemory = new(null); public VectorOutput(SpanByteAndMemory span) => SpanByteAndMemory = span; diff --git a/test/standalone/Garnet.test.vectorset/RespVectorSetTests.cs b/test/standalone/Garnet.test.vectorset/RespVectorSetTests.cs index ae40f3e079e..54d24e67f6f 100644 --- a/test/standalone/Garnet.test.vectorset/RespVectorSetTests.cs +++ b/test/standalone/Garnet.test.vectorset/RespVectorSetTests.cs @@ -1396,9 +1396,11 @@ public unsafe void VectorReadBatchVariants() { // Single key, 4 byte keys { - VectorInput input = default; - input.Callback = 5678; - input.CallbackContext = 9012; + VectorInput input = new() + { + Callback = 5678, + CallbackContext = 9012, + }; ReadOnlySpan namespaceBytes = stackalloc byte[1] { 64 }; @@ -1446,9 +1448,11 @@ public unsafe void VectorReadBatchVariants() // Multiple keys, 4 byte keys { - VectorInput input = default; - input.Callback = 5678; - input.CallbackContext = 9012; + VectorInput input = new() + { + Callback = 5678, + CallbackContext = 9012, + }; ReadOnlySpan namespaceBytes = stackalloc byte[1] { 32 }; @@ -1500,9 +1504,11 @@ public unsafe void VectorReadBatchVariants() // Multiple keys, 4 byte keys, random order { - VectorInput input = default; - input.Callback = 5678; - input.CallbackContext = 9012; + VectorInput input = new() + { + Callback = 5678, + CallbackContext = 9012, + }; ReadOnlySpan namespaceBytes = stackalloc byte[1] { 16 }; @@ -1553,9 +1559,11 @@ public unsafe void VectorReadBatchVariants() // Single key, variable length { - VectorInput input = default; - input.Callback = 5678; - input.CallbackContext = 9012; + VectorInput input = new() + { + Callback = 5678, + CallbackContext = 9012, + }; ReadOnlySpan namespaceBytes = stackalloc byte[1] { 8 }; @@ -1623,9 +1631,11 @@ public unsafe void VectorReadBatchVariants() // Multiple keys, variable length { - VectorInput input = default; - input.Callback = 5678; - input.CallbackContext = 9012; + VectorInput input = new() + { + Callback = 5678, + CallbackContext = 9012, + }; ReadOnlySpan namespaceBytes = stackalloc byte[1] { 4 }; @@ -1756,9 +1766,11 @@ public unsafe void VectorReadBatchVariants() // Multiple keys, variable length, random access { - VectorInput input = default; - input.Callback = 5678; - input.CallbackContext = 9012; + VectorInput input = new() + { + Callback = 5678, + CallbackContext = 9012, + }; ReadOnlySpan namespaceBytes = stackalloc byte[1] { 2 }; From 70da272181a2720149e6b742b72e6d375adcb3c0 Mon Sep 17 00:00:00 2001 From: Kevin Montrose Date: Thu, 18 Jun 2026 11:03:36 -0400 Subject: [PATCH 3/8] formatting and comment correction --- libs/server/InputHeader.cs | 2 +- .../Storage/Functions/VectorStore/VectorSessionFunctions.cs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/libs/server/InputHeader.cs b/libs/server/InputHeader.cs index 66015cb8f44..81f8eb30cc9 100644 --- a/libs/server/InputHeader.cs +++ b/libs/server/InputHeader.cs @@ -617,7 +617,7 @@ public unsafe int DeserializeFrom(byte* src) public int WriteDesiredSize { get; init; } - public int Index { get; init; } + public int Index { get; init; } public nint CallbackContext { get; init; } public nint Callback { get; init; } diff --git a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs index b7b90cfe90d..a866623b34f 100644 --- a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs +++ b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs @@ -44,8 +44,7 @@ public readonly bool Reader(in TSourceLogRecord srcLogRecord, { Debug.Assert(input.Callback == 0, "No callback expected"); - // We can't ship the log record over because of alignment shenanigans - // TODO: When alignment is handled at the Tsavorite level, we CAN start shipping the log over like everything else + // Format for migration, including space for namespace var neededSpace = sizeof(int) + srcLogRecord.NamespaceBytes.Length + From 6f7b4d905dd23cf5b77d9f5cf5f423c8b1389437 Mon Sep 17 00:00:00 2001 From: Kevin Montrose Date: Thu, 18 Jun 2026 11:08:11 -0400 Subject: [PATCH 4/8] remove forceAlignment parameter --- libs/server/Resp/Vector/VectorManager.Callbacks.cs | 2 +- libs/server/Resp/Vector/VectorManager.cs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libs/server/Resp/Vector/VectorManager.Callbacks.cs b/libs/server/Resp/Vector/VectorManager.Callbacks.cs index 0420d2fcdb4..68cacde3a65 100644 --- a/libs/server/Resp/Vector/VectorManager.Callbacks.cs +++ b/libs/server/Resp/Vector/VectorManager.Callbacks.cs @@ -277,7 +277,7 @@ private static byte ReadModifyWriteCallbackUnmanaged(ulong context, nint keyData return status.IsCompletedSuccessfully ? (byte)1 : default; } - private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, ReadOnlySpan key, ref SpanByteAndMemory value) + private static unsafe bool ReadSizeUnknown(ulong context, ReadOnlySpan key, ref SpanByteAndMemory value) { #pragma warning disable IDE0302 // [...]-style collection initialization doesn't actually _guarantee_ stackalloc (or inline arrays), which we need here ReadOnlySpan nsBytes = stackalloc byte[1] { (byte)context }; diff --git a/libs/server/Resp/Vector/VectorManager.cs b/libs/server/Resp/Vector/VectorManager.cs index 3de046a26c6..4942a709a09 100644 --- a/libs/server/Resp/Vector/VectorManager.cs +++ b/libs/server/Resp/Vector/VectorManager.cs @@ -800,7 +800,7 @@ internal VectorManagerResult FetchSingleVectorElementAttributes(ReadOnlySpan indexValue, ReadOnlySpan var internalIdBytes = SpanByteAndMemory.FromPinnedSpan(internalId); try { - if (!ReadSizeUnknown(context | DiskANNService.InternalIdMap, forceAlignment: true, element, ref internalIdBytes)) + if (!ReadSizeUnknown(context | DiskANNService.InternalIdMap, element, ref internalIdBytes)) { return false; } @@ -943,7 +943,7 @@ internal bool TryGetEmbedding(ReadOnlySpan indexValue, ReadOnlySpan var asBytes = SpanByteAndMemory.FromPinnedSpan(asBytesSpan); try { - if (!ReadSizeUnknown(context | DiskANNService.FullVector, forceAlignment: true, internalId, ref asBytes)) + if (!ReadSizeUnknown(context | DiskANNService.FullVector, internalId, ref asBytes)) { return false; } From e1b0e614d6dcdabd663e127a4e2c11b6a0da7d05 Mon Sep 17 00:00:00 2001 From: Kevin Montrose Date: Thu, 18 Jun 2026 11:30:11 -0400 Subject: [PATCH 5/8] restore pinning when value in log record isn't pinned --- .../VectorStore/VectorSessionFunctions.cs | 71 +++++++++++++++---- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs index a866623b34f..b7d90eef441 100644 --- a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs +++ b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs @@ -79,10 +79,21 @@ public readonly bool Reader(in TSourceLogRecord srcLogRecord, { var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; - var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(value)); var dataLen = (nuint)value.Length; - callback(input.Index, input.CallbackContext, dataPtr, dataLen); + if (srcLogRecord.IsPinnedValue) + { + var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(value)); + callback(input.Index, input.CallbackContext, dataPtr, dataLen); + } + else + { + fixed (byte* dataPtr = value) + { + callback(input.Index, input.CallbackContext, (nint)dataPtr, dataLen); + } + } + return true; } } @@ -269,9 +280,20 @@ public readonly bool InitialUpdater(ref LogRecord logRecord, in RecordSizeInfo s // Callback takes: dataCallbackContext, dataPtr, dataLength var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; - var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(value)); var dataLen = (nuint)input.WriteDesiredSize; - callback(input.CallbackContext, dataPtr, dataLen); + + if (logRecord.IsPinnedValue) + { + var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(value)); + callback(input.CallbackContext, dataPtr, dataLen); + } + else + { + fixed (byte* dataPtr = value) + { + callback(input.Callback, (nint)dataPtr, dataLen); + } + } return logRecord.TrySetContentLengths(logRecord.ValueSpan.Length, in sizeInfo); } @@ -340,10 +362,22 @@ public readonly bool CopyUpdater(in TSourceLogRecord srcLogRec // Callback takes: dataCallbackContext, dataPtr, dataLength var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; - var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(newValue)); var dataLen = (nuint)input.WriteDesiredSize; - callback(input.CallbackContext, dataPtr, dataLen); + if (dstLogRecord.IsPinnedValue) + { + var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(newValue)); + callback(input.CallbackContext, dataPtr, dataLen); + } + else + { + fixed (byte* dataPtr = newValue) + { + callback(input.CallbackContext, (nint)dataPtr, dataLen); + } + } + + } return true; @@ -361,7 +395,7 @@ public readonly bool InPlaceUpdater(ref LogRecord logRecord, ref VectorInput inp var key = logRecord.Key; - var alignedValue = logRecord.ValueSpan; + var value = logRecord.ValueSpan; if (input.Callback == 0) { // We're doing a Metadata or InProgressDelete update @@ -371,10 +405,10 @@ public readonly bool InPlaceUpdater(ref LogRecord logRecord, ref VectorInput inp if (key.Length == 0) { // Doing a Metadata update - Debug.Assert(alignedValue.Length >= VectorManager.ContextMetadata.Size, "Should be ContextMetadata"); + Debug.Assert(value.Length >= VectorManager.ContextMetadata.Size, "Should be ContextMetadata"); Debug.Assert(input.CallbackContext != 0, "Should have data on VectorInput"); - ref readonly var oldMetadata = ref MemoryMarshal.Cast(alignedValue)[0]; + ref readonly var oldMetadata = ref MemoryMarshal.Cast(value)[0]; PinnedSpanByte newMetadataValue; unsafe @@ -390,7 +424,7 @@ public readonly bool InPlaceUpdater(ref LogRecord logRecord, ref VectorInput inp return false; } - newMetadataValue.CopyTo(alignedValue); + newMetadataValue.CopyTo(value); return true; } else @@ -419,17 +453,26 @@ public readonly bool InPlaceUpdater(ref LogRecord logRecord, ref VectorInput inp } else { - Debug.Assert(input.WriteDesiredSize <= alignedValue.Length, "Insufficient space for inplace update, this should never happen"); + Debug.Assert(input.WriteDesiredSize <= value.Length, "Insufficient space for inplace update, this should never happen"); unsafe { // Callback takes: dataCallbackContext, dataPtr, dataLength var callback = (delegate* unmanaged[Cdecl, SuppressGCTransition])input.Callback; - var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(alignedValue)); var dataLen = (nuint)input.WriteDesiredSize; - - callback(input.CallbackContext, dataPtr, dataLen); + if (logRecord.IsPinnedValue) + { + var dataPtr = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(value)); + callback(input.CallbackContext, dataPtr, dataLen); + } + else + { + fixed (byte* dataPtr = value) + { + callback(input.CallbackContext, (nint)dataPtr, dataLen); + } + } } return true; From 88f41bd315229c809ccb4d08a063b3e0dec704a7 Mon Sep 17 00:00:00 2001 From: Kevin Montrose Date: Thu, 18 Jun 2026 11:39:25 -0400 Subject: [PATCH 6/8] fix copy/pasta mistake --- .../Storage/Functions/VectorStore/VectorSessionFunctions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs index b7d90eef441..ba513fc8d68 100644 --- a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs +++ b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs @@ -291,7 +291,7 @@ public readonly bool InitialUpdater(ref LogRecord logRecord, in RecordSizeInfo s { fixed (byte* dataPtr = value) { - callback(input.Callback, (nint)dataPtr, dataLen); + callback(input.CallbackContext, (nint)dataPtr, dataLen); } } From dec0f3df83c7cbe282f00f5e0ab5f85063109313 Mon Sep 17 00:00:00 2001 From: Kevin Montrose Date: Fri, 19 Jun 2026 10:39:19 -0400 Subject: [PATCH 7/8] address feedback; more cleanup --- .../Server/Migration/MigrateOperation.cs | 2 +- libs/server/InputHeader.cs | 35 +++++++++++++++---- .../Resp/Vector/VectorManager.Callbacks.cs | 7 ++-- .../VectorStore/VectorSessionFunctions.cs | 2 +- libs/server/VectorOutput.cs | 2 +- 5 files changed, 36 insertions(+), 12 deletions(-) diff --git a/libs/cluster/Server/Migration/MigrateOperation.cs b/libs/cluster/Server/Migration/MigrateOperation.cs index 64676ee47b7..7e1e4570f51 100644 --- a/libs/cluster/Server/Migration/MigrateOperation.cs +++ b/libs/cluster/Server/Migration/MigrateOperation.cs @@ -105,7 +105,7 @@ public async Task TransmitSlotsAsync() VectorInput vectorInput = new() { - MaxMigrationHeapAllocationSize = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve + IsMigrationRead = true, }; foreach (var (ns, key, hasNs) in sketch.argSliceVector) diff --git a/libs/server/InputHeader.cs b/libs/server/InputHeader.cs index 81f8eb30cc9..c2d32313e1d 100644 --- a/libs/server/InputHeader.cs +++ b/libs/server/InputHeader.cs @@ -3,7 +3,6 @@ using System; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using Tsavorite.core; @@ -613,18 +612,42 @@ public unsafe int DeserializeFrom(byte* src) { public int SerializedLength => throw new NotImplementedException(); - public int ReadDesiredSize { get; init; } + /// + /// True if the read value might not fit in the provided output buffer. + /// + /// If false, the output buffer is guaranteed to be correctly sized. + /// + public bool VariableSizedRead { get; init; } + /// + /// Size of write in bytes. + /// + /// If negative, this is an append to an existing value. + /// If positive, this is a create of a new value or an overwrite of an existing value. + /// public int WriteDesiredSize { get; init; } + /// + /// If part of a batch operation, the zero-based index of that operation. + /// public int Index { get; init; } + + /// + /// Context to pass to , if any. + /// + /// This value is opaque to Garnet and should not be modified. + /// public nint CallbackContext { get; init; } - public nint Callback { get; init; } - [MemberNotNullWhen(returnValue: true, member: nameof(MaxMigrationHeapAllocationSize))] - public bool IsMigrationRead => MaxMigrationHeapAllocationSize != null; + /// + /// The native callback to invoke, if any, on the inplace record data. + /// + public nint Callback { get; init; } - public int? MaxMigrationHeapAllocationSize { get; init; } + /// + /// True if the read being performed is part of a migration operation. + /// + public bool IsMigrationRead { get; init; } public VectorInput() { diff --git a/libs/server/Resp/Vector/VectorManager.Callbacks.cs b/libs/server/Resp/Vector/VectorManager.Callbacks.cs index 68cacde3a65..9159ae3615d 100644 --- a/libs/server/Resp/Vector/VectorManager.Callbacks.cs +++ b/libs/server/Resp/Vector/VectorManager.Callbacks.cs @@ -291,7 +291,7 @@ private static unsafe bool ReadSizeUnknown(ulong context, ReadOnlySpan key { VectorInput input = new() { - ReadDesiredSize = -1 + VariableSizedRead = true, }; fixed (byte* ptr = value.Span) @@ -310,10 +310,11 @@ private static unsafe bool ReadSizeUnknown(ulong context, ReadOnlySpan key return false; } - if (output.UpdatedReadDesiredSize != null && output.UpdatedReadDesiredSize.Value > output.SpanByteAndMemory.Length) + var updateReadDesiredSize = output.UpdatedReadDesiredSize.GetValueOrDefault(-1); + if (updateReadDesiredSize > output.SpanByteAndMemory.Length) { value.Memory?.Dispose(); - var newAlloc = MemoryPool.Shared.Rent(output.UpdatedReadDesiredSize.Value); + var newAlloc = MemoryPool.Shared.Rent(updateReadDesiredSize); value = new(newAlloc, newAlloc.Memory.Length); continue; } diff --git a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs index ba513fc8d68..3c3f2923c1a 100644 --- a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs +++ b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs @@ -98,7 +98,7 @@ public readonly bool Reader(in TSourceLogRecord srcLogRecord, } } - if (input.ReadDesiredSize > 0) + if (!input.VariableSizedRead) { Debug.Assert(output.SpanByteAndMemory.Length >= value.Length, "Should always have space for vector point reads"); diff --git a/libs/server/VectorOutput.cs b/libs/server/VectorOutput.cs index 1ac40800af1..9718f3e5361 100644 --- a/libs/server/VectorOutput.cs +++ b/libs/server/VectorOutput.cs @@ -19,7 +19,7 @@ public struct VectorOutput public SpanByteAndMemory SpanByteAndMemory; /// - /// If a call needs a larger than was provided, it is stored here. + /// If a call needs a larger output buffer than was provided, it is stored here. /// public int? UpdatedReadDesiredSize { get; set; } From 6d9fa4636fb1453239dc4bb9a1a9960e9c4444fb Mon Sep 17 00:00:00 2001 From: Kevin Montrose Date: Fri, 19 Jun 2026 16:02:45 -0400 Subject: [PATCH 8/8] assert the alignment requirements --- .../Resp/Vector/VectorManager.Callbacks.cs | 10 ++++++++++ .../VectorStore/VectorSessionFunctions.cs | 16 ++++++++++++++-- .../Garnet.test.vectorset/RespVectorSetTests.cs | 8 ++++---- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/libs/server/Resp/Vector/VectorManager.Callbacks.cs b/libs/server/Resp/Vector/VectorManager.Callbacks.cs index 9159ae3615d..a66af464304 100644 --- a/libs/server/Resp/Vector/VectorManager.Callbacks.cs +++ b/libs/server/Resp/Vector/VectorManager.Callbacks.cs @@ -107,6 +107,8 @@ private void AdvanceTo(int i) currentLen = *(int*)currentPtr; currentIndex = 0; + Debug.Assert((currentLen % 4) == 0, "Keys must be 4-byte aligned to preserve value alignment"); + if (i == 0) { return; @@ -224,6 +226,8 @@ nint dataCallbackContext [UnmanagedCallersOnly(CallConvs = [typeof(CallConvCdecl)])] private static unsafe byte WriteCallbackUnmanaged(ulong context, nint keyData, nuint keyLength, nint writeData, nuint writeLength) { + Debug.Assert((keyLength % 4) == 0, "Key must be 4-byte aligned to preserve value alignment"); + var keyWithNamespace = MakeVectorElementKey(context, keyData, keyLength); ref var ctx = ref ActiveThreadSession.vectorBasicContext; VectorInput input = new(); @@ -242,6 +246,8 @@ private static unsafe byte WriteCallbackUnmanaged(ulong context, nint keyData, n [UnmanagedCallersOnly(CallConvs = [typeof(CallConvCdecl)])] private static byte DeleteCallbackUnmanaged(ulong context, nint keyData, nuint keyLength) { + Debug.Assert((keyLength % 4) == 0, "Key must be 4-byte aligned to preserve value alignment"); + var keyWithNamespace = MakeVectorElementKey(context, keyData, keyLength); ref var ctx = ref ActiveThreadSession.vectorBasicContext; @@ -255,6 +261,8 @@ private static byte DeleteCallbackUnmanaged(ulong context, nint keyData, nuint k [UnmanagedCallersOnly(CallConvs = [typeof(CallConvCdecl)])] private static byte ReadModifyWriteCallbackUnmanaged(ulong context, nint keyData, nuint keyLength, nuint writeLength, nint dataCallback, nint dataCallbackContext) { + Debug.Assert((keyLength % 4) == 0, "Key must be 4-byte aligned to preserve value alignment"); + var keyWithNamespace = MakeVectorElementKey(context, keyData, keyLength); ref var ctx = ref ActiveThreadSession.vectorBasicContext; @@ -279,6 +287,8 @@ private static byte ReadModifyWriteCallbackUnmanaged(ulong context, nint keyData private static unsafe bool ReadSizeUnknown(ulong context, ReadOnlySpan key, ref SpanByteAndMemory value) { + // We explicitly DO NOT check alignment here because we're always in managed code which doesn't care + #pragma warning disable IDE0302 // [...]-style collection initialization doesn't actually _guarantee_ stackalloc (or inline arrays), which we need here ReadOnlySpan nsBytes = stackalloc byte[1] { (byte)context }; #pragma warning restore IDE0302 diff --git a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs index 3c3f2923c1a..b3f9006b670 100644 --- a/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs +++ b/libs/server/Storage/Functions/VectorStore/VectorSessionFunctions.cs @@ -174,6 +174,8 @@ public readonly bool InPlaceWriter(ref LogRecord logRecord, re public readonly RecordFieldInfo GetRMWModifiedFieldInfo(in TSourceLogRecord srcLogRecord, ref VectorInput input) where TSourceLogRecord : ISourceLogRecord { + Debug.Assert((srcLogRecord.KeyBytes.Length % 4) == 0, "Keys must be 4-byte aligned to preserve value alignment"); + var value = srcLogRecord.ValueSpan; if (input.WriteDesiredSize < 0) @@ -192,6 +194,8 @@ public readonly RecordFieldInfo GetRMWInitialFieldInfo(TKey key, ref Vecto , allows ref struct #endif { + Debug.Assert((key.KeyBytes.Length % 4) == 0, "Keys must be 4-byte aligned to preserve value alignment"); + var effectiveWriteDesiredSize = input.WriteDesiredSize; if (effectiveWriteDesiredSize < 0) @@ -208,7 +212,11 @@ public readonly RecordFieldInfo GetUpsertFieldInfo(TKey key, ReadOnlySpan< #if NET9_0_OR_GREATER , allows ref struct #endif - => new() { KeySize = key.KeyBytes.Length, ValueSize = value.Length }; + { + Debug.Assert((key.KeyBytes.Length % 4) == 0, "Keys must be 4-byte aligned to preserve value alignment"); + + return new() { KeySize = key.KeyBytes.Length, ValueSize = value.Length }; + } /// Length of value object, when populated by Upsert using given value and input public readonly RecordFieldInfo GetUpsertFieldInfo(TKey key, IHeapObject value, ref VectorInput input) @@ -225,7 +233,11 @@ public readonly RecordFieldInfo GetUpsertFieldInfo(TKey , allows ref struct #endif where TSourceLogRecord : ISourceLogRecord - => new() { KeySize = key.KeyBytes.Length, ValueSize = inputLogRecord.ValueSpan.Length }; + { + Debug.Assert((key.KeyBytes.Length % 4) == 0, "Keys must be 4-byte aligned to preserve value alignment"); + + return new() { KeySize = key.KeyBytes.Length, ValueSize = inputLogRecord.ValueSpan.Length }; + } #endregion Variable Length #region InitialUpdater diff --git a/test/standalone/Garnet.test.vectorset/RespVectorSetTests.cs b/test/standalone/Garnet.test.vectorset/RespVectorSetTests.cs index afede41dee4..821b60c1b4c 100644 --- a/test/standalone/Garnet.test.vectorset/RespVectorSetTests.cs +++ b/test/standalone/Garnet.test.vectorset/RespVectorSetTests.cs @@ -1767,14 +1767,14 @@ public unsafe void VectorReadBatchVariants() ReadOnlySpan namespaceBytes = stackalloc byte[1] { 2 }; - var key0 = "hello"u8.ToArray(); + var key0 = "buzz"u8.ToArray(); var key1 = "fizz"u8.ToArray(); - var key2 = "the quick brown fox jumps over the lazy dog"u8.ToArray(); + var key2 = "the quick brown fox jumps over the lazy dog."u8.ToArray(); var key3 = "CF29E323-E376-4BC4-AB63-FCFD371EB445"u8.ToArray(); var key4 = Array.Empty(); var key5 = new byte[] { 1 }; - var key6 = new byte[] { 2, 3 }; - var key7 = new byte[] { 4, 5, 6 }; + var key6 = new byte[] { 2, 3, 4, 5 }; + var key7 = new byte[] { 6, 7, 8, 9, 10, 11, 12, 13 }; var data = MemoryMarshal.Cast([key0.Length]) .ToArray()