Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the Handler loop #8

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions RSocket.Core/RSocketProtocol.Handler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,42 @@ static public async Task Handler(IRSocketProtocol sink, PipeReader pipereader, C
{
//The original implementation was a state-machine parser with resumability. It doesn't seem like the other implementations follow this pattern and the .NET folks are still figuring this out too - see the internal JSON parser discussion for how they're handling state machine persistence across async boundaries when servicing a Pipeline. So, this version of the handler only processes complete messages at some cost to memory buffer scalability.
//Note that this means that the Pipeline must be configured to have enough buffering for a complete message before source-quenching. This also means that the downstream consumers don't really have to support resumption, so the interface no longer has the partial buffer methods in it.
while (!cancellation.IsCancellationRequested)

try
{
var read = await pipereader.ReadAsync(cancellation);
var buffer = read.Buffer;
if (buffer.IsEmpty && read.IsCompleted) { break; }
var position = buffer.Start;
while (true)
{
var read = await pipereader.ReadAsync(cancellation);
var buffer = read.Buffer;

while (TryParseMessage(ref buffer, out int frameLength, out var payload))
{
await Process(frameLength, payload);
}

if (read.IsCompleted)
{
if (!buffer.IsEmpty)
{
// Partial frame received and there's no more data coming
sink.Error(new Error(ErrorCodes.Connection_Error));
}
break;
}

//Due to the nature of Pipelines as simple binary pipes, all Transport adapters assemble a standard message frame whether or not the underlying transport signals length, EoM, etc.
var (Length, IsEndOfMessage) = MessageFramePeek(buffer);
if (buffer.Length < Length + MESSAGEFRAMESIZE) { pipereader.AdvanceTo(buffer.Start, buffer.End); continue; } //Don't have a complete message yet. Tell the pipe that we've evaluated up to the current buffer end, but cannot yet consume it.
pipereader.AdvanceTo(buffer.Start, buffer.End);

await Process(Length, buffer.Slice(position = buffer.GetPosition(MESSAGEFRAMESIZE, position), Length));
pipereader.AdvanceTo(position = buffer.GetPosition(Length, position));
//TODO UNIT TEST- this should work now too!!! Need to evaluate if there is more than one packet in the pipe including edges like part of the length bytes are there but not all.
}
}
finally
{
pipereader.Complete();
}
pipereader.Complete();


//This is the non-async portion of the handler. SequenceReader<T> and the other stack-allocated items cannot be used in an async context.
Task Process(int framelength, ReadOnlySequence<byte> sequence)
Task Process(int framelength, in ReadOnlySequence<byte> sequence)
{
var reader = new SequenceReader<byte>(sequence);
var header = new Header(ref reader, framelength);
Expand Down
59 changes: 40 additions & 19 deletions RSocket.Core/RSocketProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,27 @@ public partial class RSocketProtocol
static public void MessageFrameWrite(int length, bool isEndOfMessage, Span<byte> target) { target[2] = (byte)((length >> 8 * 0) & 0xFF); target[1] = (byte)((length >> 8 * 1) & 0xFF); target[0] = (byte)((length >> 8 * 2) & 0xFF); }
static public (int Length, bool IsEndOfMessage) MessageFramePeek(ReadOnlySequence<byte> sequence) { var reader = new SequenceReader<byte>(sequence); return reader.TryRead(out byte b1) && reader.TryRead(out byte b2) && reader.TryRead(out byte b3) ? ((b1 << 8 * 2) | (b2 << 8 * 1) | (b3 << 8 * 0), true) : (0, false); }

static public bool TryParseMessage(ref ReadOnlySequence<byte> buffer, out int frameLength, out ReadOnlySequence<byte> payload)
{
//Due to the nature of Pipelines as simple binary pipes, all Transport adapters assemble a standard message frame whether or not the underlying transport signals length, EoM, etc.
var (length, _) = MessageFramePeek(buffer);

if (buffer.Length < length + MESSAGEFRAMESIZE)
{
payload = default;
frameLength = 0;
return false;
}

frameLength = length;
payload = buffer.Slice(MESSAGEFRAMESIZE, length);

// Trim to the unparsed data
buffer = buffer.Slice(payload.End);

return true;
}

static Task Flush(PipeWriter pipe, CancellationToken cancel) { var result = pipe.FlushAsync(cancel); return result.IsCompleted ? Task.CompletedTask : result.AsTask(); }

static bool TryReadRemaining(in Header header, int innerlength, ref SequenceReader<byte> reader, out int metadatalength)
Expand Down Expand Up @@ -74,7 +95,7 @@ public ref struct Payload
public int Length => Header.Length + InnerLength + Header.MetadataHeaderLength + MetadataLength + DataLength;


public Payload(int stream, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default, bool follows = false, bool complete = false, bool next = false) //TODO Parameter ordering, isn't Next much more likely than C or F?
public Payload(int stream, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default, bool follows = false, bool complete = false, bool next = false) //TODO Parameter ordering, isn't Next much more likely than C or F?
{
Header = new Header(Types.Payload, stream, metadata: metadata);
DataLength = (int)data.Length;
Expand Down Expand Up @@ -230,7 +251,7 @@ int Write(BufferWriter writer, ReadOnlySequence<byte> data, ReadOnlySequence<byt
{
var written = Header.Write(writer, Length);
written += writer.WriteInt32BigEndian(InitialRequest);
if (HasMetadata) { written += writer.WriteInt24BigEndian(MetadataLength) + writer.Write(metadata); } //TODO Should this be UInt24? Probably, but not sure if it can actually overflow...
if (HasMetadata) { written += writer.WriteInt24BigEndian(MetadataLength) + writer.Write(metadata); } //TODO Should this be UInt24? Probably, but not sure if it can actually overflow...
written += writer.Write(data);
return written;
}
Expand Down Expand Up @@ -280,7 +301,7 @@ public bool Validate(bool canContinue = false)
int Write(BufferWriter writer, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default)
{
var written = Header.Write(writer, Length);
if (HasMetadata) { written += writer.WriteInt24BigEndian(MetadataLength) + writer.Write(metadata); } //TODO Should this be UInt24? Probably, but not sure if it can actually overflow...
if (HasMetadata) { written += writer.WriteInt24BigEndian(MetadataLength) + writer.Write(metadata); } //TODO Should this be UInt24? Probably, but not sure if it can actually overflow...
written += writer.Write(data);
return written;
}
Expand Down Expand Up @@ -330,7 +351,7 @@ public bool Validate(bool canContinue = false)
int Write(BufferWriter writer, ReadOnlySequence<byte> data, ReadOnlySequence<byte> metadata = default)
{
var written = Header.Write(writer, Length);
if (HasMetadata) { written += writer.WriteInt24BigEndian(MetadataLength) + writer.Write(metadata); } //TODO Should this be UInt24? Probably, but not sure if it can actually overflow...
if (HasMetadata) { written += writer.WriteInt24BigEndian(MetadataLength) + writer.Write(metadata); } //TODO Should this be UInt24? Probably, but not sure if it can actually overflow...
written += writer.Write(data);
return written;
}
Expand Down Expand Up @@ -442,8 +463,8 @@ public KeepAlive(in Header header, ref SequenceReader<byte> reader)

public bool Validate(bool canContinue = false)
{
if (Header.Stream == 0) { return canContinue ? false : throw new ArgumentOutOfRangeException(nameof(Header.Stream), $"Invalid {nameof(KeepAlive)} Message."); } //SPEC: KEEPALIVE frames MUST always use Stream ID 0 as they pertain to the Connection.
if (LastReceivedPosition < 0) { return canContinue ? false : throw new ArgumentOutOfRangeException(nameof(LastReceivedPosition), LastReceivedPosition, $"Invalid {nameof(KeepAlive)} Message."); } //SPEC: Value MUST be > 0. (optional. Set to all 0s when not supported.)
if (Header.Stream == 0) { return canContinue ? false : throw new ArgumentOutOfRangeException(nameof(Header.Stream), $"Invalid {nameof(KeepAlive)} Message."); } //SPEC: KEEPALIVE frames MUST always use Stream ID 0 as they pertain to the Connection.
if (LastReceivedPosition < 0) { return canContinue ? false : throw new ArgumentOutOfRangeException(nameof(LastReceivedPosition), LastReceivedPosition, $"Invalid {nameof(KeepAlive)} Message."); } //SPEC: Value MUST be > 0. (optional. Set to all 0s when not supported.)
else return true;
}

Expand Down Expand Up @@ -484,7 +505,7 @@ public Lease(in Header header, ref SequenceReader<byte> reader)
Header = header;
reader.TryRead(out int timeToLive); TimeToLive = timeToLive;
reader.TryRead(out int numberOfRequests); NumberOfRequests = numberOfRequests;
TryReadRemaining(header, InnerLength, ref reader, out MetadataLength); //SPEC: This frame only supports Metadata, so the Metadata Length header MUST NOT be included, even if the(M)etadata flag is set true.
TryReadRemaining(header, InnerLength, ref reader, out MetadataLength); //SPEC: This frame only supports Metadata, so the Metadata Length header MUST NOT be included, even if the(M)etadata flag is set true.
//MetadataLength = header.HasMetadata ? MetadataLength = framelength - header.Length - sizeof(int) - sizeof(int) : 0; //SPEC: This frame only supports Metadata, so the Metadata Length header MUST NOT be included, even if the(M)etadata flag is set true.
}

Expand Down Expand Up @@ -570,7 +591,7 @@ public MetadataPush(ReadOnlySequence<byte> metadata)
public MetadataPush(in Header header, ref SequenceReader<byte> reader)
{
Header = header;
TryReadRemaining(header, InnerLength, ref reader, out MetadataLength); //SPEC: This frame only supports Metadata, so the Metadata Length header MUST NOT be included.
TryReadRemaining(header, InnerLength, ref reader, out MetadataLength); //SPEC: This frame only supports Metadata, so the Metadata Length header MUST NOT be included.
//MetadataLength = header.HasMetadata ? MetadataLength = framelength - header.Length : 0; //SPEC: This frame only supports Metadata, so the Metadata Length header MUST NOT be included.
}

Expand Down Expand Up @@ -659,7 +680,7 @@ public ref struct Setup
+ sizeof(byte) + Encoding.ASCII.GetByteCount(MetadataMimeType)
+ sizeof(byte) + Encoding.ASCII.GetByteCount(DataMimeType);
public int Length => Header.Length + InnerLength + Header.MetadataHeaderLength + MetadataLength + DataLength;


public Setup(TimeSpan keepalive, TimeSpan lifetime, string metadataMimeType = null, string dataMimeType = null, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default) : this((int)keepalive.TotalMilliseconds, (int)lifetime.TotalMilliseconds, string.IsNullOrEmpty(metadataMimeType) ? string.Empty : metadataMimeType, string.IsNullOrEmpty(dataMimeType) ? string.Empty : dataMimeType, data: data, metadata: metadata) { }

Expand All @@ -673,10 +694,10 @@ public Setup(Int32 keepalive, Int32 lifetime, string metadataMimeType, string da
ResumeToken = resumeToken;
MetadataMimeType = metadataMimeType;
DataMimeType = dataMimeType;
ResumeToken = resumeToken; //TODO Two of these?
ResumeToken = resumeToken; //TODO Two of these?
MetadataLength = (int)metadata.Length;
DataLength = (int)data.Length;
HasResume = resumeToken != default && resumeToken.Length > 0;
HasResume = resumeToken != null && resumeToken.Length > 0;
}

public Setup(in Header header, ref SequenceReader<byte> reader)
Expand All @@ -686,7 +707,7 @@ public Setup(in Header header, ref SequenceReader<byte> reader)
reader.TryReadBigEndian(out UInt16 minorVersion); MinorVersion = minorVersion;
reader.TryReadBigEndian(out Int32 keepAlive); KeepAlive = keepAlive;
reader.TryReadBigEndian(out Int32 lifetime); Lifetime = lifetime;
if ((header.Flags & FLAG_RESUME) != 0) //TODO Duplicate test logic here
if ((header.Flags & FLAG_RESUME) != 0) //TODO Duplicate test logic here
{
reader.TryReadBigEndian(out UInt16 resumeTokenLength);
ResumeToken = new byte[resumeTokenLength];
Expand All @@ -711,7 +732,7 @@ public bool Validate(bool canContinue = false)

//TODO So common, should be library..?
public void Write(PipeWriter pipe, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default) { var writer = BufferWriter.Get(pipe); this.Write(writer, data: data, metadata: metadata); writer.Flush(); BufferWriter.Return(writer); }
public Task WriteFlush(PipeWriter pipe, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default, CancellationToken cancel = default) { Write(pipe, data: data, metadata:metadata); return Flush(pipe, cancel); }
public Task WriteFlush(PipeWriter pipe, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default, CancellationToken cancel = default) { Write(pipe, data: data, metadata: metadata); return Flush(pipe, cancel); }

void Write(BufferWriter writer, ReadOnlySequence<byte> data = default, ReadOnlySequence<byte> metadata = default)
{
Expand All @@ -721,9 +742,9 @@ void Write(BufferWriter writer, ReadOnlySequence<byte> data = default, ReadOnlyS
written += writer.WriteInt32BigEndian(KeepAlive);
written += writer.WriteInt32BigEndian(Lifetime);
if (HasResume) { written += writer.WriteUInt16BigEndian(ResumeToken.Length) + writer.Write(ResumeToken); }
written += writer.WritePrefixByte(MetadataMimeType); //TODO THIS IS ASCII!!! See Spec!!
written += writer.WritePrefixByte(DataMimeType); //TODO THIS IS ASCII!!! See Spec!!
if (HasMetadata) { written += writer.WriteInt24BigEndian(MetadataLength) + writer.Write(metadata); } //TODO Should this be UInt24? Probably, but not sure if it can actually overflow...
written += writer.WritePrefixByte(MetadataMimeType); //TODO THIS IS ASCII!!! See Spec!!
written += writer.WritePrefixByte(DataMimeType); //TODO THIS IS ASCII!!! See Spec!!
if (HasMetadata) { written += writer.WriteInt24BigEndian(MetadataLength) + writer.Write(metadata); } //TODO Should this be UInt24? Probably, but not sure if it can actually overflow...
written += writer.Write(data);
}

Expand All @@ -743,7 +764,7 @@ public ref struct Header
internal const ushort FLAG_METADATA = 0b__01_00000000;
public bool CanIgnore { get => (Flags & FLAG_IGNORE) != 0; set => Flags = value ? (ushort)(Flags | FLAG_IGNORE) : (ushort)(Flags & ~FLAG_IGNORE); }
public bool HasMetadata { get => (Flags & FLAG_METADATA) != 0; set => Flags = value ? (ushort)(Flags | FLAG_METADATA) : (ushort)(Flags & ~FLAG_METADATA); }
public int MetadataHeaderLength => HasMetadata ? METADATALENGTHSIZE : 0; //TODO Only here?
public int MetadataHeaderLength => HasMetadata ? METADATALENGTHSIZE : 0; //TODO Only here?

public Int32 Stream;
public Types Type;
Expand All @@ -753,7 +774,7 @@ public ref struct Header
public int Length => sizeof(Int32) + sizeof(UInt16);

private int FrameLength;
public int Remaining => FrameLength - Length; //TODO Temporary refactoring
public int Remaining => FrameLength - Length; //TODO Temporary refactoring

public Header(Types type, Int32 stream = 0, in ReadOnlySequence<byte> metadata = default)
{
Expand All @@ -775,7 +796,7 @@ public Header(ref SequenceReader<byte> reader, int framelength = 0)

public int Write(BufferWriter writer, int length)
{
writer.WriteInt24BigEndian(length); //Not included in total length.
writer.WriteInt24BigEndian(length); //Not included in total length.
writer.WriteInt32BigEndian(Stream);
writer.WriteUInt16BigEndian((((int)Type << FRAMETYPE_OFFSET) & FRAMETYPE_TYPE) | (Flags & FLAGS));// (Ignore ? FLAG_IGNORE : 0) | (Metadata ? FLAG_METADATA : 0));
return Length;
Expand Down