|
| 1 | +using System.Text; |
| 2 | +using System.Threading; |
| 3 | +using System.Threading.Tasks; |
| 4 | +using RabbitMQ.Stream.Client; |
| 5 | +using RabbitMQ.Stream.Client.Reliable; |
| 6 | + |
| 7 | +var streamSystem = await StreamSystem.Create(new StreamSystemConfig()); |
| 8 | + |
| 9 | +var stream = "stream-offset-tracking-dotnet"; |
| 10 | +await streamSystem.CreateStream(new StreamSpec(stream)); |
| 11 | + |
| 12 | +var consumerName = "offset-tracking-tutorial"; |
| 13 | +IOffsetType offsetSpecification; |
| 14 | +try { |
| 15 | + ulong storedOffset = await streamSystem.QueryOffset(consumerName, stream).ConfigureAwait(false); |
| 16 | + offsetSpecification = new OffsetTypeOffset(storedOffset + 1); |
| 17 | +} catch (OffsetNotFoundException) { |
| 18 | + offsetSpecification = new OffsetTypeFirst(); |
| 19 | +} |
| 20 | +ulong initialValue = UInt64.MaxValue; |
| 21 | +ulong firstOffset = initialValue; |
| 22 | +int messageCount = 0; |
| 23 | +ulong lastOffset = initialValue; |
| 24 | +var consumedCde = new CountdownEvent(1); |
| 25 | +var consumer = await Consumer.Create(new ConsumerConfig(streamSystem, stream) |
| 26 | +{ |
| 27 | + OffsetSpec = offsetSpecification, |
| 28 | + Reference = consumerName, |
| 29 | + MessageHandler = async (_, consumer, context, message) => { |
| 30 | + if (Interlocked.CompareExchange(ref firstOffset, context.Offset, initialValue) == initialValue) { |
| 31 | + Console.WriteLine("First message received."); |
| 32 | + } |
| 33 | + if (Interlocked.Increment(ref messageCount) % 10 == 0) { |
| 34 | + await consumer.StoreOffset(context.Offset).ConfigureAwait(false); |
| 35 | + } |
| 36 | + if ("marker".Equals(Encoding.UTF8.GetString(message.Data.Contents))) { |
| 37 | + Interlocked.Exchange(ref lastOffset, context.Offset); |
| 38 | + await consumer.StoreOffset(context.Offset).ConfigureAwait(false); |
| 39 | + await consumer.Close(); |
| 40 | + consumedCde.Signal(); |
| 41 | + } |
| 42 | + await Task.CompletedTask; |
| 43 | + } |
| 44 | +}); |
| 45 | +Console.WriteLine("Started consuming..."); |
| 46 | + |
| 47 | +consumedCde.Wait(); |
| 48 | +Console.WriteLine("Done consuming, first offset {0}, last offset {1}", firstOffset, lastOffset); |
| 49 | +await streamSystem.Close(); |
0 commit comments