diff --git a/libs/cluster/Session/RespClusterBasicCommands.cs b/libs/cluster/Session/RespClusterBasicCommands.cs index 58a49320325..81b1b8b5b86 100644 --- a/libs/cluster/Session/RespClusterBasicCommands.cs +++ b/libs/cluster/Session/RespClusterBasicCommands.cs @@ -2,6 +2,7 @@ // Licensed under the MIT license. using System; +using System.Buffers; using System.Diagnostics; using System.Text; using Garnet.common; @@ -274,8 +275,8 @@ private bool NetworkClusterNodes(out bool invalidParameters) } var nodes = clusterProvider.clusterManager.CurrentConfig.GetClusterInfo(clusterProvider); - while (!RespWriteUtils.TryWriteAsciiBulkString(nodes, ref dcurr, dend)) - SendAndReset(); + + WriteAsciiLargeRespString(nodes); return true; } @@ -343,8 +344,8 @@ private bool NetworkClusterShards(out bool invalidParameters) var preferredType = clusterProvider.serverOptions.ClusterPreferredEndpointType; var shardsInfo = clusterProvider.clusterManager.CurrentConfig.GetShardsInfo(clusterProvider.clusterManager.clusterConnectionStore, preferredType); - while (!RespWriteUtils.TryWriteAsciiDirect(shardsInfo, ref dcurr, dend)) - SendAndReset(); + + WriteLargeAsciiDirectString(shardsInfo); return true; } @@ -523,5 +524,81 @@ private bool NetworkClusterPublish(out bool invalidParameters) clusterProvider.storeWrapper.subscribeBroker.Publish(parseState.GetArgSliceByRef(0), parseState.GetArgSliceByRef(1)); return true; } + + + /// + /// Handle a potentially large already encoded RESP response, breaking it into pieces if necessary. + /// + private void WriteLargeAsciiDirectString(ReadOnlySpan message) + { + // Attempt to write w/o any buffering + if (RespWriteUtils.TryWriteAsciiDirect(message, ref dcurr, dend)) + { + return; + } + + var buffer = ArrayPool.Shared.Rent(message.Length); + try + { + var len = Encoding.ASCII.GetBytes(message, buffer); + var remaining = buffer.AsSpan()[..len]; + + while (!remaining.IsEmpty) + { + var space = Math.Min((int)(dend - dcurr), remaining.Length); + _ = RespWriteUtils.TryWriteDirect(remaining[..space], ref dcurr, dend); + + SendAndReset(); + + remaining = remaining[space..]; + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + /// + /// Handle a potentially large bulk string by breaking it into pieces if necessary. + /// + private void WriteAsciiLargeRespString(ReadOnlySpan message) + { + while (!RespWriteUtils.TryWriteBulkStringLength(message.Length, ref dcurr, dend)) + SendAndReset(); + + // Attempt to write w/o any buffering + if (RespWriteUtils.TryWriteAsciiDirect(message, ref dcurr, dend)) + { + while (!RespWriteUtils.TryWriteNewLine(ref dcurr, dend)) + SendAndReset(); + + return; + } + + var buffer = ArrayPool.Shared.Rent(message.Length); + try + { + var len = Encoding.ASCII.GetBytes(message, buffer); + var remaining = buffer.AsSpan()[..len]; + + while (!remaining.IsEmpty) + { + var space = Math.Min((int)(dend - dcurr), remaining.Length); + _ = RespWriteUtils.TryWriteDirect(remaining[..space], ref dcurr, dend); + + SendAndReset(); + + remaining = remaining[space..]; + } + + while (!RespWriteUtils.TryWriteNewLine(ref dcurr, dend)) + SendAndReset(); + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } } } \ No newline at end of file diff --git a/libs/common/RespWriteUtils.cs b/libs/common/RespWriteUtils.cs index 3319cf897a7..04568db1fcd 100644 --- a/libs/common/RespWriteUtils.cs +++ b/libs/common/RespWriteUtils.cs @@ -321,14 +321,20 @@ public static bool TryWriteDirect(ref T item, ref byte* curr, byte* end) wher /// Write length header of bulk string /// public static bool TryWriteBulkStringLength(ReadOnlySpan item, ref byte* curr, byte* end) + => TryWriteBulkStringLength(item.Length, ref curr, end); + + /// + /// Write length header of bulk string + /// + public static bool TryWriteBulkStringLength(int len, ref byte* curr, byte* end) { - var itemDigits = NumUtils.CountDigits(item.Length); + var itemDigits = NumUtils.CountDigits(len); var totalLen = 1 + itemDigits + 2; if (totalLen > (int)(end - curr)) return false; *curr++ = (byte)'$'; - NumUtils.WriteInt32(item.Length, itemDigits, ref curr); + NumUtils.WriteInt32(len, itemDigits, ref curr); WriteNewline(ref curr); return true; }