Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions libs/cluster/Server/Migration/MigrateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ public async Task<bool> TransmitSlotsAsync()
var input = new UnifiedInput(RespCommand.MIGRATE);
input.arg1 = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve;

VectorInput vectorInput = new();
vectorInput.AlignmentExpected = true; // We're moving DiskANN sourced data, so alignment is expected
vectorInput.MaxMigrationHeapAllocationSize = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve;
VectorInput vectorInput = new()
{
MaxMigrationHeapAllocationSize = session.NetworkBufferSettings.sendBufferSize - common.NetworkBufferSettings.SendBufferOverheadReserve
};

foreach (var (ns, key, hasNs) in sketch.argSliceVector)
{
Expand Down
16 changes: 7 additions & 9 deletions libs/server/InputHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -609,24 +609,22 @@ public unsafe int DeserializeFrom(byte* src)
/// <summary>
/// Header for Garnet Main Store inputs but for Vector element r/w/d ops
/// </summary>
public struct VectorInput : IStoreInput
public readonly struct VectorInput : IStoreInput
{
public int SerializedLength => throw new NotImplementedException();

public int ReadDesiredSize { get; set; }
public int ReadDesiredSize { get; init; }

public int WriteDesiredSize { get; set; }
public int WriteDesiredSize { get; init; }

public int Index { get; set; }
public nint CallbackContext { get; set; }
public nint Callback { get; set; }

public bool AlignmentExpected { get; set; }
public int Index { get; init; }
public nint CallbackContext { get; init; }
public nint Callback { get; init; }
Comment thread
kevin-montrose marked this conversation as resolved.
Outdated

[MemberNotNullWhen(returnValue: true, member: nameof(MaxMigrationHeapAllocationSize))]
public bool IsMigrationRead => MaxMigrationHeapAllocationSize != null;

public int? MaxMigrationHeapAllocationSize { get; set; }
public int? MaxMigrationHeapAllocationSize { get; init; }

public VectorInput()
{
Expand Down
44 changes: 23 additions & 21 deletions libs/server/Resp/Vector/VectorManager.Callbacks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ public readonly void GetInput(int i, out VectorInput input)
{
Debug.Assert(i >= 0 && i < Count, "Trying to advance out of bounds");

input = default;
input.CallbackContext = callbackContext;
input.Callback = (nint)callback;
input.Index = i;
input = new()
{
CallbackContext = callbackContext,
Callback = (nint)callback,
Index = i,
};
}

/// <inheritdoc/>
Expand Down Expand Up @@ -225,7 +227,6 @@ private static unsafe byte WriteCallbackUnmanaged(ulong context, nint keyData, n
var keyWithNamespace = MakeVectorElementKey(context, keyData, keyLength);
ref var ctx = ref ActiveThreadSession.vectorBasicContext;
VectorInput input = new();
input.AlignmentExpected = true;
var valueSpan = SpanByte.FromPinnedPointer((byte*)writeData, (int)writeLength);
VectorOutput outputSpan = new();

Expand Down Expand Up @@ -258,10 +259,12 @@ private static byte ReadModifyWriteCallbackUnmanaged(ulong context, nint keyData

ref var ctx = ref ActiveThreadSession.vectorBasicContext;

VectorInput input = default;
input.Callback = dataCallback;
input.CallbackContext = dataCallbackContext;
input.WriteDesiredSize = (int)writeLength;
VectorInput input = new()
{
Callback = dataCallback,
CallbackContext = dataCallbackContext,
WriteDesiredSize = (int)writeLength,
};

var status = ctx.RMW(keyWithNamespace, ref input);
if (status.IsPending)
Expand All @@ -274,7 +277,7 @@ private static byte ReadModifyWriteCallbackUnmanaged(ulong context, nint keyData
return status.IsCompletedSuccessfully ? (byte)1 : default;
}

private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, ReadOnlySpan<byte> key, ref SpanByteAndMemory value)
private static unsafe bool ReadSizeUnknown(ulong context, ReadOnlySpan<byte> key, ref SpanByteAndMemory value)
{
#pragma warning disable IDE0302 // [...]-style collection initialization doesn't actually _guarantee_ stackalloc (or inline arrays), which we need here
ReadOnlySpan<byte> nsBytes = stackalloc byte[1] { (byte)context };
Expand All @@ -286,20 +289,19 @@ private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, R

while (true)
Comment thread
kevin-montrose marked this conversation as resolved.
{
VectorInput input = new();
input.ReadDesiredSize = -1;
VectorInput input = new()
{
ReadDesiredSize = -1
};

// Sometimes we read DiskANN written data from the .NET side
// If that's the case, we need to pad for alignment even though .NET doesn't require it
input.AlignmentExpected = forceAlignment;
fixed (byte* ptr = value.Span)
{
VectorOutput asSpanByte = new(ptr, value.Length);
VectorOutput output = new(ptr, value.Length);

var status = ctx.Read(keyWithNamespace, ref input, ref asSpanByte);
var status = ctx.Read(keyWithNamespace, ref input, ref output);
if (status.IsPending)
{
CompletePending(ref status, ref input, ref asSpanByte, ref ctx);
CompletePending(ref status, ref output, ref ctx);
}

if (!status.Found)
Expand All @@ -308,15 +310,15 @@ private static unsafe bool ReadSizeUnknown(ulong context, bool forceAlignment, R
return false;
}

if (input.ReadDesiredSize > asSpanByte.SpanByteAndMemory.Length)
if (output.UpdatedReadDesiredSize != null && output.UpdatedReadDesiredSize.Value > output.SpanByteAndMemory.Length)
Comment thread
kevin-montrose marked this conversation as resolved.
Outdated
{
value.Memory?.Dispose();
var newAlloc = MemoryPool<byte>.Shared.Rent(input.ReadDesiredSize);
var newAlloc = MemoryPool<byte>.Shared.Rent(output.UpdatedReadDesiredSize.Value);
value = new(newAlloc, newAlloc.Memory.Length);
continue;
}

value.Length = asSpanByte.SpanByteAndMemory.Length;
value.Length = output.SpanByteAndMemory.Length;
return true;
}
}
Expand Down
11 changes: 7 additions & 4 deletions libs/server/Resp/Vector/VectorManager.ContextMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,15 @@ private void UpdateContextMetadata(ref VectorBasicContext ctx)
// empty key is context metadata
VectorElementKey key = new(nsBytes, []);

VectorInput input = default;
input.Callback = 0;
input.WriteDesiredSize = ContextMetadata.Size;
VectorInput input;
unsafe
{
input.CallbackContext = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(dataSpan));
input = new()
{
Callback = 0,
WriteDesiredSize = ContextMetadata.Size,
CallbackContext = (nint)Unsafe.AsPointer(ref MemoryMarshal.GetReference(dataSpan)),
};
}

var status = ctx.RMW(key, ref input);
Expand Down
1 change: 0 additions & 1 deletion libs/server/Resp/Vector/VectorManager.Migration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ ReadOnlySpan<byte> value
#endif

VectorInput input = default;
input.AlignmentExpected = true;
VectorOutput outputSpan = new(new SpanByteAndMemory());

VectorElementKey key = new(namespaceBytes[0..1], keyWithoutNamespace);
Expand Down
29 changes: 4 additions & 25 deletions libs/server/Resp/Vector/VectorManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,27 +350,6 @@ private static void CompletePending(ref Status status, ref VectorOutput output,
completedOutputs.Dispose();
}

/// <summary>
/// As <see cref="CompletePending(ref Status, ref VectorOutput, ref VectorBasicContext)"/>, but also propagates the
/// completed <paramref name="input"/> back to the caller.
///
/// This is required for the unknown-size read path (<see cref="ReadSizeUnknown"/>): the Reader records the actual
/// value size on <see cref="VectorInput.ReadDesiredSize"/>, and on the pending (disk) path that update lives on the
/// pending context's copy of the input. Without propagating it back the caller keeps its stale value and skips the
/// grow-and-retry, returning a truncated/uninitialized buffer.
/// </summary>
private static void CompletePending(ref Status status, ref VectorInput input, ref VectorOutput output, ref VectorBasicContext ctx)
{
_ = ctx.CompletePendingWithOutputs(out var completedOutputs, wait: true);
var more = completedOutputs.Next();
Debug.Assert(more);
status = completedOutputs.Current.Status;
output = completedOutputs.Current.Output;
input = completedOutputs.Current.Input;
Debug.Assert(!completedOutputs.Next());
completedOutputs.Dispose();
}

private static void CompletePending(ref Status status, ref StringBasicContext ctx)
{
_ = ctx.CompletePendingWithOutputs(out var completedOutputs, wait: true);
Expand Down Expand Up @@ -821,7 +800,7 @@ internal VectorManagerResult FetchSingleVectorElementAttributes(ReadOnlySpan<byt
{
AssertHaveStorageSession();
ReadIndex(indexValue, out var context, out _, out _, out _, out _, out _, out _, out _);
var found = ReadSizeUnknown(context | DiskANNService.Attributes, forceAlignment: true, element, ref outputAttributes);
var found = ReadSizeUnknown(context | DiskANNService.Attributes, element, ref outputAttributes);
return found ? VectorManagerResult.OK : VectorManagerResult.MissingElement;
}

Expand Down Expand Up @@ -881,7 +860,7 @@ private void FetchVectorElementAttributes(ulong context, int numIds, SpanByteAnd
attributeMem.Length = attributeMem.SpanByte.Length;
}

var found = ReadSizeUnknown(context | DiskANNService.Attributes, forceAlignment: true, id, ref attributeMem);
var found = ReadSizeUnknown(context | DiskANNService.Attributes, id, ref attributeMem);

// Copy attribute into output buffer, length prefixed, resizing as necessary
var neededSpace = 4 + (found ? attributeMem.Length : 0);
Expand Down Expand Up @@ -948,7 +927,7 @@ internal bool TryGetEmbedding(ReadOnlySpan<byte> indexValue, ReadOnlySpan<byte>
var internalIdBytes = SpanByteAndMemory.FromPinnedSpan(internalId);
try
{
if (!ReadSizeUnknown(context | DiskANNService.InternalIdMap, forceAlignment: true, element, ref internalIdBytes))
if (!ReadSizeUnknown(context | DiskANNService.InternalIdMap, element, ref internalIdBytes))
{
return false;
}
Expand All @@ -964,7 +943,7 @@ internal bool TryGetEmbedding(ReadOnlySpan<byte> indexValue, ReadOnlySpan<byte>
var asBytes = SpanByteAndMemory.FromPinnedSpan(asBytesSpan);
try
{
if (!ReadSizeUnknown(context | DiskANNService.FullVector, forceAlignment: true, internalId, ref asBytes))
if (!ReadSizeUnknown(context | DiskANNService.FullVector, internalId, ref asBytes))
{
return false;
}
Expand Down
Loading
Loading