Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions libs/cluster/Server/Migration/MigrateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ public async Task<bool> TransmitSlotsAsync()
var input = new UnifiedInput(RespCommand.MIGRATE);
input.arg1 = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve;

VectorInput vectorInput = new();
vectorInput.AlignmentExpected = true; // We're moving DiskANN sourced data, so alignment is expected
vectorInput.MaxMigrationHeapAllocationSize = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve;
VectorInput vectorInput = new()
{
IsMigrationRead = true,
};

foreach (var (ns, key, hasNs) in sketch.argSliceVector)
{
Expand Down
43 changes: 32 additions & 11 deletions libs/server/InputHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Tsavorite.core;
Expand Down Expand Up @@ -609,24 +608,46 @@ public unsafe int DeserializeFrom(byte* src)
/// <summary>
/// Header for Garnet Main Store inputs but for Vector element r/w/d ops
/// </summary>
public struct VectorInput : IStoreInput
public readonly struct VectorInput : IStoreInput
{
public int SerializedLength => throw new NotImplementedException();

public int ReadDesiredSize { get; set; }
/// <summary>
/// True if the read value might not fit in the provided output buffer.
///
/// If false, the output buffer is guaranteed to be correctly sized.
/// </summary>
public bool VariableSizedRead { get; init; }

public int WriteDesiredSize { get; set; }
/// <summary>
/// Size of write in bytes.
///
/// If negative, this is an append to an existing value.
/// If positive, this is a create of a new value or an overwrite of an existing value.
/// </summary>
public int WriteDesiredSize { get; init; }

public int Index { get; set; }
public nint CallbackContext { get; set; }
public nint Callback { get; set; }
/// <summary>
/// If part of a batch operation, the zero-based index of that operation.
/// </summary>
public int Index { get; init; }

public bool AlignmentExpected { get; set; }
/// <summary>
/// Context to pass to <see cref="Callback"/>, if any.
///
/// This value is opaque to Garnet and should not be modified.
/// </summary>
public nint CallbackContext { get; init; }

[MemberNotNullWhen(returnValue: true, member: nameof(MaxMigrationHeapAllocationSize))]
public bool IsMigrationRead => MaxMigrationHeapAllocationSize != null;
/// <summary>
/// The native callback to invoke, if any, on the inplace record data.
/// </summary>
public nint Callback { get; init; }

public int? MaxMigrationHeapAllocationSize { get; set; }
/// <summary>
/// True if the read being performed is part of a migration operation.
/// </summary>
public bool IsMigrationRead { get; init; }

public VectorInput()
{
Expand Down
55 changes: 34 additions & 21 deletions libs/server/Resp/Vector/VectorManager.Callbacks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ private void AdvanceTo(int i)
currentLen = *(int*)currentPtr;
currentIndex = 0;

Debug.Assert((currentLen % 4) == 0, "Keys must be 4-byte aligned to preserve value alignment");

if (i == 0)
{
return;
Expand Down Expand Up @@ -144,10 +146,12 @@ public readonly void GetInput(int i, out VectorInput input)
{
Debug.Assert(i >= 0 && i < Count, "Trying to advance out of bounds");

input = default;
input.CallbackContext = callbackContext;
input.Callback = (nint)callback;
input.Index = i;
input = new()
{
CallbackContext = callbackContext,
Callback = (nint)callback,
Index = i,
};
}

/// <inheritdoc/>
Expand Down Expand Up @@ -222,10 +226,11 @@ nint dataCallbackContext
[UnmanagedCallersOnly(CallConvs = [typeof(CallConvCdecl)])]
private static unsafe byte WriteCallbackUnmanaged(ulong context, nint keyData, nuint keyLength, nint writeData, nuint writeLength)
{
Debug.Assert((keyLength % 4) == 0, "Key must be 4-byte aligned to preserve value alignment");

var keyWithNamespace = MakeVectorElementKey(context, keyData, keyLength);
ref var ctx = ref ActiveThreadSession.vectorBasicContext;
VectorInput input = new();
input.AlignmentExpected = true;
var valueSpan = SpanByte.FromPinnedPointer((byte*)writeData, (int)writeLength);
VectorOutput outputSpan = new();

Expand All @@ -241,6 +246,8 @@ private static unsafe byte WriteCallbackUnmanaged(ulong context, nint keyData, n
[UnmanagedCallersOnly(CallConvs = [typeof(CallConvCdecl)])]
private static byte DeleteCallbackUnmanaged(ulong context, nint keyData, nuint keyLength)
{
Debug.Assert((keyLength % 4) == 0, "Key must be 4-byte aligned to preserve value alignment");

var keyWithNamespace = MakeVectorElementKey(context, keyData, keyLength);

ref var ctx = ref ActiveThreadSession.vectorBasicContext;
Expand All @@ -254,14 +261,18 @@ private static byte DeleteCallbackUnmanaged(ulong context, nint keyData, nuint k
[UnmanagedCallersOnly(CallConvs = [typeof(CallConvCdecl)])]
private static byte ReadModifyWriteCallbackUnmanaged(ulong context, nint keyData, nuint keyLength, nuint writeLength, nint dataCallback, nint dataCallbackContext)
{
Debug.Assert((keyLength % 4) == 0, "Key must be 4-byte aligned to preserve value alignment");

var keyWithNamespace = MakeVectorElementKey(context, keyData, keyLength);

ref var ctx = ref ActiveThreadSession.vectorBasicContext;

VectorInput input = default;
input.Callback = dataCallback;
input.CallbackContext = dataCallbackContext;
input.WriteDesiredSize = (int)writeLength;
VectorInput input = new()
{
Callback = dataCallback,
CallbackContext = dataCallbackContext,
WriteDesiredSize = (int)writeLength,
};

var status = ctx.RMW(keyWithNamespace, ref input);
if (status.IsPending)
Expand All @@ -274,8 +285,10 @@ private static byte ReadModifyWriteCallbackUnmanaged(ulong context, nint keyData
return status.IsCompletedSuccessfully ? (byte)1 : default;
}

private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, ReadOnlySpan<byte> key, ref SpanByteAndMemory value)
private static unsafe bool ReadSizeUnknown(ulong context, ReadOnlySpan<byte> key, ref SpanByteAndMemory value)
{
// We explicitly DO NOT check alignment here because we're always in managed code which doesn't care

#pragma warning disable IDE0302 // [...]-style collection initialization doesn't actually _guarantee_ stackalloc (or inline arrays), which we need here
ReadOnlySpan<byte> nsBytes = stackalloc byte[1] { (byte)context };
#pragma warning restore IDE0302
Expand All @@ -286,20 +299,19 @@ private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, R

while (true)
Comment thread
kevin-montrose marked this conversation as resolved.
{
VectorInput input = new();
input.ReadDesiredSize = -1;
VectorInput input = new()
{
VariableSizedRead = true,
};

// Sometimes we read DiskANN written data from the .NET side
// If that's the case, we need to pad for alignment even though .NET doesn't require it
input.AlignmentExpected = forceAlignment;
fixed (byte* ptr = value.Span)
{
VectorOutput asSpanByte = new(ptr, value.Length);
VectorOutput output = new(ptr, value.Length);

var status = ctx.Read(keyWithNamespace, ref input, ref asSpanByte);
var status = ctx.Read(keyWithNamespace, ref input, ref output);
if (status.IsPending)
{
CompletePending(ref status, ref input, ref asSpanByte, ref ctx);
CompletePending(ref status, ref output, ref ctx);
}

if (!status.Found)
Expand All @@ -308,15 +320,16 @@ private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, R
return false;
}

if (input.ReadDesiredSize > asSpanByte.SpanByteAndMemory.Length)
var updateReadDesiredSize = output.UpdatedReadDesiredSize.GetValueOrDefault(-1);
if (updateReadDesiredSize > output.SpanByteAndMemory.Length)
{
value.Memory?.Dispose();
var newAlloc = MemoryPool<byte>.Shared.Rent(input.ReadDesiredSize);
var newAlloc = MemoryPool<byte>.Shared.Rent(updateReadDesiredSize);
value = new(newAlloc, newAlloc.Memory.Length);
continue;
}

value.Length = asSpanByte.SpanByteAndMemory.Length;
value.Length = output.SpanByteAndMemory.Length;
return true;
}
}
Expand Down
11 changes: 7 additions & 4 deletions libs/server/Resp/Vector/VectorManager.ContextMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,15 @@ private void UpdateContextMetadata(ref VectorBasicContext ctx)
// empty key is context metadata
VectorElementKey key = new(nsBytes, []);

VectorInput input = default;
input.Callback = 0;
input.WriteDesiredSize = ContextMetadata.Size;
VectorInput input;
unsafe
{
input.CallbackContext = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(dataSpan));
input = new()
{
Callback = 0,
WriteDesiredSize = ContextMetadata.Size,
CallbackContext = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(dataSpan)),
};
}

var status = ctx.RMW(key, ref input);
Expand Down
1 change: 0 additions & 1 deletion libs/server/Resp/Vector/VectorManager.Migration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ ReadOnlySpan<byte> value
#endif

VectorInput input = default;
input.AlignmentExpected = true;
VectorOutput outputSpan = new(new SpanByteAndMemory());

VectorElementKey key = new(namespaceBytes[0..1], keyWithoutNamespace);
Expand Down
29 changes: 4 additions & 25 deletions libs/server/Resp/Vector/VectorManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,27 +350,6 @@ private static void CompletePending(ref Status status, ref VectorOutput output,
completedOutputs.Dispose();
}

/// <summary>
/// As <see cref="CompletePending(ref Status, ref VectorOutput, ref VectorBasicContext)"/>, but also propagates the
/// completed <paramref name="input"/> back to the caller.
///
/// This is required for the unknown-size read path (<see cref="ReadSizeUnknown"/>): the Reader records the actual
/// value size on <see cref="VectorInput.ReadDesiredSize"/>, and on the pending (disk) path that update lives on the
/// pending context's copy of the input. Without propagating it back the caller keeps its stale value and skips the
/// grow-and-retry, returning a truncated/uninitialized buffer.
/// </summary>
private static void CompletePending(ref Status status, ref VectorInput input, ref VectorOutput output, ref VectorBasicContext ctx)
{
_ = ctx.CompletePendingWithOutputs(out var completedOutputs, wait: true);
var more = completedOutputs.Next();
Debug.Assert(more);
status = completedOutputs.Current.Status;
output = completedOutputs.Current.Output;
input = completedOutputs.Current.Input;
Debug.Assert(!completedOutputs.Next());
completedOutputs.Dispose();
}

private static void CompletePending(ref Status status, ref StringBasicContext ctx)
{
_ = ctx.CompletePendingWithOutputs(out var completedOutputs, wait: true);
Expand Down Expand Up @@ -821,7 +800,7 @@ internal VectorManagerResult FetchSingleVectorElementAttributes(ReadOnlySpan<byt
{
AssertHaveStorageSession();
ReadIndex(indexValue, out var context, out _, out _, out _, out _, out _, out _, out _);
var found = ReadSizeUnknown(context | DiskANNService.Attributes, forceAlignment: true, element, ref outputAttributes);
var found = ReadSizeUnknown(context | DiskANNService.Attributes, element, ref outputAttributes);
return found ? VectorManagerResult.OK : VectorManagerResult.MissingElement;
}

Expand Down Expand Up @@ -881,7 +860,7 @@ private void FetchVectorElementAttributes(ulong context, int numIds, SpanByteAnd
attributeMem.Length = attributeMem.SpanByte.Length;
}

var found = ReadSizeUnknown(context | DiskANNService.Attributes, forceAlignment: true, id, ref attributeMem);
var found = ReadSizeUnknown(context | DiskANNService.Attributes, id, ref attributeMem);

// Copy attribute into output buffer, length prefixed, resizing as necessary
var neededSpace = 4 + (found ? attributeMem.Length : 0);
Expand Down Expand Up @@ -948,7 +927,7 @@ internal bool TryGetEmbedding(ReadOnlySpan<byte> indexValue, ReadOnlySpan<byte>
var internalIdBytes = SpanByteAndMemory.FromPinnedSpan(internalId);
try
{
if (!ReadSizeUnknown(context | DiskANNService.InternalIdMap, forceAlignment: true, element, ref internalIdBytes))
if (!ReadSizeUnknown(context | DiskANNService.InternalIdMap, element, ref internalIdBytes))
{
return false;
}
Expand All @@ -964,7 +943,7 @@ internal bool TryGetEmbedding(ReadOnlySpan<byte> indexValue, ReadOnlySpan<byte>
var asBytes = SpanByteAndMemory.FromPinnedSpan(asBytesSpan);
try
{
if (!ReadSizeUnknown(context | DiskANNService.FullVector, forceAlignment: true, internalId, ref asBytes))
if (!ReadSizeUnknown(context | DiskANNService.FullVector, internalId, ref asBytes))
{
return false;
}
Expand Down
Loading
Loading