Skip to content

Commit 4eaef68

Browse files
committed
Start stream tutorial 2 for .NET
1 parent 2cf71a3 commit 4eaef68

File tree

4 files changed

+47
-2
lines changed

4 files changed

+47
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System.Text;
2+
using System.Threading.Tasks;
3+
using RabbitMQ.Stream.Client;
4+
using RabbitMQ.Stream.Client.Reliable;
5+
6+
var streamSystem = await StreamSystem.Create(new StreamSystemConfig());
7+
8+
var stream = "stream-offset-tracking-dotnet";
9+
await streamSystem.CreateStream(new StreamSpec(stream));
10+
11+
var messageCount = 100;
12+
var confirmedCde = new CountdownEvent(messageCount);
13+
var producer = await Producer.Create(new ProducerConfig(streamSystem, stream) {
14+
ConfirmationHandler = async confirmation => {
15+
if (confirmation.Status == ConfirmationStatus.Confirmed) {
16+
confirmedCde.Signal();
17+
}
18+
await Task.CompletedTask.ConfigureAwait(false);
19+
}
20+
});
21+
22+
Console.WriteLine("Publishing {0} messages...", messageCount);
23+
for (int i = 0; i < messageCount; i++) {
24+
var body = i == messageCount - 1 ? "marker" : "hello";
25+
await producer.Send(new Message(Encoding.UTF8.GetBytes(body)));
26+
}
27+
28+
confirmedCde.Wait();
29+
Console.WriteLine("Messages confirmed.");
30+
await producer.Close();
31+
await streamSystem.Close();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net8.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<PackageReference Include="RabbitMQ.Stream.Client" Version="1.8.7" />
12+
</ItemGroup>
13+
14+
</Project>

go-stream/offset_tracking_send.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func main() {
3232
ch := make(chan bool)
3333
handlePublishConfirm(chPublishConfirm, messageCount, ch)
3434

35-
fmt.Printf("Publishing %d messages\n", messageCount)
35+
fmt.Printf("Publishing %d messages...\n", messageCount)
3636
for i := 0; i < messageCount; i++ {
3737
var body string
3838
if i == messageCount-1 {

java-stream-mvn/src/main/java/OffsetTrackingSend.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public static void main(String[] args) throws InterruptedException {
1919

2020
int messageCount = 100;
2121
CountDownLatch confirmedLatch = new CountDownLatch(messageCount);
22-
System.out.printf("Publishing %d messages%n", messageCount);
22+
System.out.printf("Publishing %d messages...%n", messageCount);
2323
IntStream.range(0, messageCount).forEach(i -> {
2424
String body = i == messageCount - 1 ? "marker" : "hello";
2525
producer.send(producer.messageBuilder().addData(body.getBytes(UTF_8)).build(),

0 commit comments

Comments
 (0)