1
+ using System ;
2
+ using System . Collections . Generic ;
3
+ using System . Linq ;
4
+ using System . Text ;
5
+ using System . Threading ;
6
+ using System . Threading . Tasks ;
7
+ using Confluent . Kafka ;
8
+ using FluentAssertions ;
9
+ using Kafka . Protocol ;
10
+ using Kafka . Protocol . Records ;
11
+ using Xunit ;
12
+ using Xunit . Abstractions ;
13
+ using Int32 = Kafka . Protocol . Int32 ;
14
+ using Record = Kafka . Protocol . Records . Record ;
15
+
16
+ namespace Kafka . TestFramework . Tests
17
+ {
18
+ public partial class Given_a_socket_based_test_server
19
+ {
20
+ public class When_connecting_to_the_server_and_consuming_a_message : TestSpecificationAsync
21
+ {
22
+ private SocketBasedKafkaTestFramework _testServer ;
23
+ private readonly List < string > _result = new List < string > ( ) ;
24
+ private const int NumberOfMessage = 5 ;
25
+
26
+ public When_connecting_to_the_server_and_consuming_a_message (
27
+ ITestOutputHelper testOutputHelper )
28
+ : base ( testOutputHelper )
29
+ {
30
+ }
31
+
32
+ protected override Task GivenAsync ( )
33
+ {
34
+ _testServer = KafkaTestFramework . WithSocket ( ) ;
35
+
36
+ _testServer . On < ApiVersionsRequest , ApiVersionsResponse > (
37
+ request => request . Respond ( )
38
+ . WithAllApiKeys ( ) ) ;
39
+
40
+ _testServer . On < MetadataRequest , MetadataResponse > (
41
+ request => request . Respond ( )
42
+ . WithTopicsCollection (
43
+ request . TopicsCollection ? . Select ( topic =>
44
+ new Func < MetadataResponse . MetadataResponseTopic ,
45
+ MetadataResponse . MetadataResponseTopic > (
46
+ responseTopic =>
47
+ responseTopic
48
+ . WithName ( topic . Name )
49
+ . WithPartitionsCollection ( partition =>
50
+ partition
51
+ . WithLeaderId ( 0 )
52
+ . WithPartitionIndex ( 0 ) ) ) )
53
+ . ToArray ( ) ??
54
+ Array . Empty < Func < MetadataResponse . MetadataResponseTopic ,
55
+ MetadataResponse . MetadataResponseTopic > > ( ) )
56
+ . WithBrokersCollection ( broker => broker
57
+ . WithHost ( "localhost" )
58
+ . WithPort ( _testServer . Port ) )
59
+ ) ;
60
+
61
+ _testServer . On < FindCoordinatorRequest , FindCoordinatorResponse > (
62
+ request => request . Respond ( )
63
+ . WithHost ( "localhost" )
64
+ . WithPort ( _testServer . Port )
65
+ ) ;
66
+
67
+ _testServer . On < JoinGroupRequest , JoinGroupResponse > (
68
+ request => request . Respond ( )
69
+ . WithProtocolName ( request . ProtocolsCollection . First ( ) . Value . Name ) ) ;
70
+
71
+ _testServer . On < SyncGroupRequest , SyncGroupResponse > (
72
+ async ( request , cancellationToken ) => request . Respond ( )
73
+ . WithAssignment (
74
+ await new ConsumerProtocolAssignment ( ConsumerProtocolAssignment . MaxVersion )
75
+ . WithAssignedPartitionsCollection ( partition => partition
76
+ . WithTopic ( "topic1" )
77
+ . WithPartitionsCollection ( new Int32 [ ] { 0 } ) )
78
+ . ToBytesAsync ( cancellationToken )
79
+ . ConfigureAwait ( false ) )
80
+ ) ;
81
+
82
+ _testServer . On < OffsetFetchRequest , OffsetFetchResponse > ( request => request . Respond ( )
83
+ . WithTopicsCollection (
84
+ request . TopicsCollection ? . Select ( topic =>
85
+ new Func < OffsetFetchResponse . OffsetFetchResponseTopic ,
86
+ OffsetFetchResponse . OffsetFetchResponseTopic > ( responseTopic =>
87
+ responseTopic
88
+ . WithName ( topic . Name )
89
+ . WithPartitionsCollection ( topic . PartitionIndexesCollection
90
+ . Select ( partitionIndex =>
91
+ new Func < OffsetFetchResponse . OffsetFetchResponseTopic .
92
+ OffsetFetchResponsePartition ,
93
+ OffsetFetchResponse . OffsetFetchResponseTopic .
94
+ OffsetFetchResponsePartition > (
95
+ partition => partition
96
+ . WithPartitionIndex ( partitionIndex ) ) )
97
+ . ToArray ( ) ) ) )
98
+ . ToArray ( ) ??
99
+ Array . Empty < Func < OffsetFetchResponse . OffsetFetchResponseTopic ,
100
+ OffsetFetchResponse . OffsetFetchResponseTopic > > ( ) ) ) ;
101
+
102
+ var records = new Dictionary < long , Record > ( ) ;
103
+ for ( var i = 0 ; i < NumberOfMessage ; i ++ )
104
+ {
105
+ records . Add ( i , new Record
106
+ {
107
+ OffsetDelta = i ,
108
+ Value = Encoding . UTF8 . GetBytes (
109
+ $ "data{ i } fetched from broker")
110
+ } ) ;
111
+ }
112
+
113
+ _testServer . On < FetchRequest , FetchResponse > ( async ( request , cancellationToken ) =>
114
+ {
115
+ var returnsData = false ;
116
+ var response = request . Respond ( )
117
+ . WithResponsesCollection (
118
+ request . TopicsCollection . Select ( topic =>
119
+ new Func < FetchResponse . FetchableTopicResponse , FetchResponse . FetchableTopicResponse > (
120
+ response => response
121
+ . WithTopic ( topic . Topic )
122
+ . WithPartitionsCollection ( topic . PartitionsCollection . Select ( partition =>
123
+ new Func < FetchResponse . FetchableTopicResponse . PartitionData ,
124
+ FetchResponse . FetchableTopicResponse . PartitionData > ( data =>
125
+ {
126
+ var recordBatch = new NullableRecordBatch
127
+ {
128
+ LastOffsetDelta = ( int ) partition . FetchOffset ,
129
+ Magic = 2 ,
130
+ Records = records . TryGetValue ( partition . FetchOffset . Value ,
131
+ out var record )
132
+ ? new NullableArray < Record > ( record )
133
+ : NullableArray < Record > . Default
134
+ } ;
135
+ returnsData = recordBatch . Records != NullableArray < Record > . Default ;
136
+ return returnsData ? data . WithRecords ( recordBatch ) : data ;
137
+ } ) ) . ToArray ( )
138
+ ) ) ) . ToArray ( ) ) ;
139
+ if ( ! returnsData )
140
+ {
141
+ await Task . Delay ( request . MaxWaitMs , cancellationToken )
142
+ . ConfigureAwait ( false ) ;
143
+ }
144
+
145
+ return response ;
146
+ } ) ;
147
+
148
+ _testServer . On < OffsetCommitRequest , OffsetCommitResponse > ( request => request . Respond ( )
149
+ . WithTopicsCollection ( request . TopicsCollection . Select ( topic =>
150
+ new Func < OffsetCommitResponse . OffsetCommitResponseTopic ,
151
+ OffsetCommitResponse . OffsetCommitResponseTopic > ( responseTopic => responseTopic
152
+ . WithName ( topic . Name )
153
+ . WithPartitionsCollection ( topic . PartitionsCollection . Select ( partition =>
154
+ new Func < OffsetCommitResponse . OffsetCommitResponseTopic .
155
+ OffsetCommitResponsePartition ,
156
+ OffsetCommitResponse . OffsetCommitResponseTopic .
157
+ OffsetCommitResponsePartition > ( responsePartition => responsePartition
158
+ . WithPartitionIndex ( partition . PartitionIndex ) ) ) . ToArray ( ) )
159
+ ) ) . ToArray ( )
160
+ ) ) ;
161
+
162
+ _testServer . On < LeaveGroupRequest , LeaveGroupResponse > ( request => request . Respond ( ) ) ;
163
+ _testServer . On < HeartbeatRequest , HeartbeatResponse > ( request => request . Respond ( ) ) ;
164
+
165
+ return Task . CompletedTask ;
166
+ }
167
+
168
+ protected override async Task WhenAsync ( )
169
+ {
170
+ await using ( _testServer . Start ( )
171
+ . ConfigureAwait ( false ) )
172
+ {
173
+ ConsumeMessages ( "localhost" , _testServer . Port , _testServer . Stopping ) ;
174
+ }
175
+ }
176
+
177
+ [ Fact ]
178
+ public void It_should_have_read_the_messages_sent ( )
179
+ {
180
+ _result . Should ( ) . HaveCount ( NumberOfMessage ) ;
181
+ for ( var i = 0 ; i < NumberOfMessage ; i ++ )
182
+ {
183
+ _result . Should ( ) . Contain ( $ "data{ i } fetched from broker") ;
184
+ }
185
+ }
186
+
187
+ private void ConsumeMessages ( string host ,
188
+ int port , CancellationToken testServerStopping )
189
+ {
190
+ var consumerConfig = new ConsumerConfig ( new Dictionary < string , string >
191
+ {
192
+ { "log_level" , "7" }
193
+ } )
194
+ {
195
+ BootstrapServers = $ "{ host } :{ port } ",
196
+ ApiVersionRequestTimeoutMs = 30000 ,
197
+ Debug = "all" ,
198
+ GroupId = "group1" ,
199
+ } ;
200
+
201
+ using var consumer = new ConsumerBuilder < Ignore , string > ( consumerConfig )
202
+ . SetLogHandler ( this . Log )
203
+ . Build ( ) ;
204
+
205
+ consumer . Subscribe ( "topic1" ) ;
206
+ var cancellationToken = CancellationTokenSource
207
+ . CreateLinkedTokenSource ( testServerStopping , TimeoutCancellationToken ) . Token ;
208
+ try
209
+ {
210
+ for ( var i = 0 ; i < NumberOfMessage ; i ++ )
211
+ {
212
+ _result . Add ( consumer . Consume ( cancellationToken ) . Message . Value ) ;
213
+ }
214
+ }
215
+ finally
216
+ {
217
+ consumer . Close ( ) ;
218
+ }
219
+ }
220
+ }
221
+ }
222
+ }
0 commit comments