Skip to content

Commit baa1dc8

Browse files
Fixes #6675: Update IEventHubDataAdapter to support StreamId to partition mapping (#6676) (#6731)
* Fixes #6675: Include stream Namespace in PartitionKey for Event Hub messages * Update IEventHubDataAdapter to support partition to StreamId mapping Co-authored-by: Alex Meyer-Gleaves <[email protected]>
1 parent 47a1fa6 commit baa1dc8

File tree

3 files changed

+29
-7
lines changed

3 files changed

+29
-7
lines changed

src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterFactory.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ public virtual Task QueueMessageBatchAsync<T>(Guid streamGuid, string streamName
225225
Dictionary<string, object> requestContext)
226226
{
227227
EventData eventData = this.dataAdapter.ToQueueMessage(streamGuid, streamNamespace, events, token, requestContext);
228-
return this.client.SendAsync(eventData, streamGuid.ToString());
228+
string partitionKey = this.dataAdapter.GetPartitionKey(streamGuid, streamNamespace);
229+
return this.client.SendAsync(eventData, partitionKey);
229230
}
230231

231232
/// <summary>
@@ -321,4 +322,4 @@ public static EventHubAdapterFactory Create(IServiceProvider services, string na
321322
return factory;
322323
}
323324
}
324-
}
325+
}

src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubDataAdapter.cs

+22-5
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,10 @@ public virtual CachedMessage FromQueueMessage(StreamPosition streamPosition, Eve
7676

7777
public virtual StreamPosition GetStreamPosition(string partition, EventData queueMessage)
7878
{
79-
Guid streamGuid =
80-
Guid.Parse(queueMessage.SystemProperties.PartitionKey);
81-
string streamNamespace = queueMessage.GetStreamNamespaceProperty();
82-
IStreamIdentity stremIdentity = new StreamIdentity(streamGuid, streamNamespace);
79+
IStreamIdentity streamIdentity = this.GetStreamIdentity(queueMessage);
8380
StreamSequenceToken token =
8481
new EventHubSequenceTokenV2(queueMessage.SystemProperties.Offset, queueMessage.SystemProperties.SequenceNumber, 0);
85-
return new StreamPosition(stremIdentity, token);
82+
return new StreamPosition(streamIdentity, token);
8683
}
8784

8885
/// <summary>
@@ -95,6 +92,26 @@ public virtual string GetOffset(CachedMessage lastItemPurged)
9592
return SegmentBuilder.ReadNextString(lastItemPurged.Segment, ref readOffset); // read offset
9693
}
9794

95+
/// <summary>
96+
/// Get the Event Hub partition key to use for a stream.
97+
/// </summary>
98+
/// <param name="streamGuid">The stream Guid.</param>
99+
/// <param name="streamNamespace">The stream Namespace.</param>
100+
/// <returns>The partition key to use for the stream.</returns>
101+
public virtual string GetPartitionKey(Guid streamGuid, string streamNamespace) => streamGuid.ToString();
102+
103+
/// <summary>
104+
/// Get the <see cref="IStreamIdentity"/> for an event message.
105+
/// </summary>
106+
/// <param name="queueMessage">The event message.</param>
107+
/// <returns>The stream identity.</returns>
108+
public virtual IStreamIdentity GetStreamIdentity(EventData queueMessage)
109+
{
110+
Guid streamGuid = Guid.Parse(queueMessage.SystemProperties.PartitionKey);
111+
string streamNamespace = queueMessage.GetStreamNamespaceProperty();
112+
return new StreamIdentity(streamGuid, streamNamespace);
113+
}
114+
98115
// Placed object message payload into a segment.
99116
protected virtual ArraySegment<byte> EncodeMessageIntoSegment(EventData queueMessage, Func<int, ArraySegment<byte>> getSegment)
100117
{

src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/IEventHubDataAdapter.cs

+4
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,9 @@ public interface IEventHubDataAdapter : IQueueDataAdapter<EventData>, ICacheData
1212
StreamPosition GetStreamPosition(string partition, EventData queueMessage);
1313

1414
string GetOffset(CachedMessage cachedMessage);
15+
16+
string GetPartitionKey(Guid streamGuid, string streamNamespace);
17+
18+
IStreamIdentity GetStreamIdentity(EventData queueMessage);
1519
}
1620
}

0 commit comments

Comments
 (0)