Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions libs/cluster/Session/RespClusterBasicCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System;
using System.Buffers;
using System.Diagnostics;
using System.Text;
using Garnet.common;
Expand Down Expand Up @@ -274,8 +275,8 @@ private bool NetworkClusterNodes(out bool invalidParameters)
}

var nodes = clusterProvider.clusterManager.CurrentConfig.GetClusterInfo(clusterProvider);
while (!RespWriteUtils.TryWriteAsciiBulkString(nodes, ref dcurr, dend))
SendAndReset();

WriteAsciiLargeRespString(nodes);

return true;
}
Expand Down Expand Up @@ -343,8 +344,8 @@ private bool NetworkClusterShards(out bool invalidParameters)

var preferredType = clusterProvider.serverOptions.ClusterPreferredEndpointType;
var shardsInfo = clusterProvider.clusterManager.CurrentConfig.GetShardsInfo(clusterProvider.clusterManager.clusterConnectionStore, preferredType);
while (!RespWriteUtils.TryWriteAsciiDirect(shardsInfo, ref dcurr, dend))
SendAndReset();

WriteLargeAsciiDirectString(shardsInfo);

Comment thread
kevin-montrose marked this conversation as resolved.
return true;
}
Expand Down Expand Up @@ -523,5 +524,81 @@ private bool NetworkClusterPublish(out bool invalidParameters)
clusterProvider.storeWrapper.subscribeBroker.Publish(parseState.GetArgSliceByRef(0), parseState.GetArgSliceByRef(1));
return true;
}


/// <summary>
/// Handle a potentially large already encoded RESP response, breaking it into pieces if necessary.
/// </summary>
private void WriteLargeAsciiDirectString(ReadOnlySpan<char> message)
{
// Attempt to write w/o any buffering
if (RespWriteUtils.TryWriteAsciiDirect(message, ref dcurr, dend))
{
return;
}

var buffer = ArrayPool<byte>.Shared.Rent(message.Length);
try
{
var len = Encoding.ASCII.GetBytes(message, buffer);
var remaining = buffer.AsSpan()[..len];

while (!remaining.IsEmpty)
{
var space = Math.Min((int)(dend - dcurr), remaining.Length);
_ = RespWriteUtils.TryWriteDirect(remaining[..space], ref dcurr, dend);
Comment thread
kevin-montrose marked this conversation as resolved.

SendAndReset();

remaining = remaining[space..];
}
Comment thread
kevin-montrose marked this conversation as resolved.
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}

/// <summary>
/// Handle a potentially large bulk string by breaking it into pieces if necessary.
/// </summary>
private void WriteAsciiLargeRespString(ReadOnlySpan<char> message)
{
while (!RespWriteUtils.TryWriteBulkStringLength(message.Length, ref dcurr, dend))
SendAndReset();

// Attempt to write w/o any buffering
if (RespWriteUtils.TryWriteAsciiDirect(message, ref dcurr, dend))
{
while (!RespWriteUtils.TryWriteNewLine(ref dcurr, dend))
SendAndReset();

return;
}
Comment thread
kevin-montrose marked this conversation as resolved.

var buffer = ArrayPool<byte>.Shared.Rent(message.Length);
try
{
var len = Encoding.ASCII.GetBytes(message, buffer);
var remaining = buffer.AsSpan()[..len];

while (!remaining.IsEmpty)
{
var space = Math.Min((int)(dend - dcurr), remaining.Length);
_ = RespWriteUtils.TryWriteDirect(remaining[..space], ref dcurr, dend);

SendAndReset();

remaining = remaining[space..];
}
Comment thread
kevin-montrose marked this conversation as resolved.
Comment thread
kevin-montrose marked this conversation as resolved.

while (!RespWriteUtils.TryWriteNewLine(ref dcurr, dend))
SendAndReset();
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
}
10 changes: 8 additions & 2 deletions libs/common/RespWriteUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,20 @@ public static bool TryWriteDirect<T>(ref T item, ref byte* curr, byte* end) wher
/// Write length header of bulk string
/// </summary>
public static bool TryWriteBulkStringLength(ReadOnlySpan<byte> item, ref byte* curr, byte* end)
=> TryWriteBulkStringLength(item.Length, ref curr, end);

/// <summary>
/// Write length header of bulk string
/// </summary>
public static bool TryWriteBulkStringLength(int len, ref byte* curr, byte* end)
{
var itemDigits = NumUtils.CountDigits(item.Length);
var itemDigits = NumUtils.CountDigits(len);
var totalLen = 1 + itemDigits + 2;
if (totalLen > (int)(end - curr))
return false;

*curr++ = (byte)'$';
NumUtils.WriteInt32(item.Length, itemDigits, ref curr);
NumUtils.WriteInt32(len, itemDigits, ref curr);
WriteNewline(ref curr);
return true;
}
Expand Down
Loading