Skip to content

Commit

Permalink
Pool PoolBuffer instances
Browse files Browse the repository at this point in the history
  • Loading branch information
exelix11 committed Apr 1, 2024
1 parent 04f66f6 commit 5e731e7
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 41 deletions.
84 changes: 48 additions & 36 deletions Client/Core/StreamManager.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
using SysDVR.Client.Sources;
using SysDVR.Client.Targets;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace SysDVR.Client.Core
Expand Down Expand Up @@ -56,57 +49,81 @@ public StreamingOptions Clone()

class PoolBuffer
{
private readonly static ArrayPool<byte> pool = ArrayPool<byte>.Shared;
readonly static ArrayPool<byte> bufferPool = ArrayPool<byte>.Shared;
readonly static ConcurrentBag<PoolBuffer> instancePool = new();

public int Length { get; private set; }
private byte[] _buffer;
private int refcount;

public byte[] RawBuffer => _buffer ?? throw new Exception("The buffer has been freed");
public bool IsFree => refcount == 0;

private static PoolBuffer GetInstance()
{
if (instancePool.TryTake(out var instance))
return instance;

return new PoolBuffer();
}

private void ReturnToPool()
{
if (!IsFree)
throw new Exception("Attempted to return a non-free buffer to the pool");

Length = 0;
_buffer = null;
refcount = 0;

instancePool.Add(this);
}

private void Configure(byte[] buf, int len)
{
Length = len;
_buffer = buf;
refcount = 1;
}

public void Reference()
{
if (IsFree || _buffer == null)
throw new Exception("Attempted to reference an invalid buffer");

Interlocked.Increment(ref refcount);
}

public void Free()
{
Interlocked.Decrement(ref refcount);

#if DEBUG
if (refcount < 0)
throw new Exception("Buffer refcount is negative");
#endif

if (refcount == 0)
{
pool.Return(RawBuffer);
_buffer = null;
GC.SuppressFinalize(this);
bufferPool.Return(RawBuffer);
ReturnToPool();
}
}

public static PoolBuffer Rent(int len)
{
if (len == 0)
throw new Exception("Invalid lngth");
return new PoolBuffer(pool.Rent(len), len);
}

private PoolBuffer(byte[] buf, int len)
{
Length = len;
_buffer = buf;
refcount = 1;

var res = GetInstance();
res.Configure(bufferPool.Rent(len), len);
return res;
}

~PoolBuffer()
{
if (refcount != 0)
if (!IsFree)
{
#if DEBUG
Console.WriteLine($"Buffer was not freed {refcount}");
#endif
// It is normal to see this on shutdown since we might be closing with some buffers stuck in the pipeline
Program.DebugLog($"A buffer was not freed len={Length} ref={refcount}");
refcount = 1;
Free();
}
Expand Down Expand Up @@ -156,18 +173,12 @@ public bool UnchainStream(OutStream toRemove)
return false;
}

protected virtual void UseCancellationTokenImpl(CancellationToken tok)
{
Cancel = tok;
Next?.UseCancellationToken(tok);
}

protected abstract void SendDataImpl(PoolBuffer block, ulong ts);

// This must be called before sending any data
public void UseCancellationToken(CancellationToken tok)
public virtual void UseCancellationToken(CancellationToken tok)
{
UseCancellationTokenImpl(tok);
Cancel = tok;
Next?.UseCancellationToken(tok);
}

Expand Down Expand Up @@ -314,7 +325,7 @@ async Task StreamTask()
packet.Buffer?.Free();
if (!Replay.LookupSlot(packet.Header.ReplaySlot, out packet.Buffer))
{
Console.WriteLine("Unknown hash value, skipping packet");
ReportError("Unknown hash value, skipping packet");
continue;
}
}
Expand All @@ -337,7 +348,8 @@ async Task StreamTask()
}
finally
{
packet.Buffer?.Free();
if (packet.Buffer is { IsFree: false })
packet.Buffer.Free();
}
}

Expand Down
4 changes: 2 additions & 2 deletions Client/Sources/PacketReplayTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ public void Flush()
}
}

public bool LookupSlot(int slot, out PoolBuffer buffer)
public bool LookupSlot(int slot, out PoolBuffer? buffer)
{
// invaid slot
if (slot == 0xFF)
{
buffer = default;
buffer = null;
return false;
}

Expand Down
7 changes: 4 additions & 3 deletions Client/Targets/Player/Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,10 @@ unsafe public void UseContext(DecoderContext ctx)
onFrame = ctx.OnFrameEvent;
}

protected override void UseCancellationTokenImpl(CancellationToken tok)
{
base.UseCancellationTokenImpl(tok);
public override void UseCancellationToken(CancellationToken tok)
{
base.UseCancellationToken(tok);

// Start the consumer only after the token is set
VideoConsumerTask = Task.Run(ConsumeVideoAsync);
}
Expand Down

0 comments on commit 5e731e7

Please sign in to comment.