Skip to content

Commit c113391

Browse files
committedSep 24, 2024·
GH-245: Expose customizers for KCL configs
Fixes: #245 Issue link: #245 * Add something like `setLeaseManagementConfigCustomizer(Consumer<LeaseManagementConfig> leaseManagementConfigCustomizer)` to the `KclMessageDrivenChannelAdapter` and call them before creating a `Scheduler` * Also add a simple `emptyRecordList` property for the `ProcessorConfig`
1 parent 577c2e8 commit c113391

File tree

3 files changed

+128
-19
lines changed

3 files changed

+128
-19
lines changed
 

‎README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ For example, users may want to fully read any parent shards before starting to r
435435
return openShards.stream()
436436
.filter(shard -> !openShardIds.contains(shard.getParentShardId())
437437
&& !openShardIds.contains(shard.getAdjacentParentShardId()))
438-
.collect(Collectors.toList());
438+
.toList();
439439
}
440440
```
441441

‎src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

+96-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.Arrays;
2222
import java.util.List;
2323
import java.util.UUID;
24-
import java.util.stream.Collectors;
24+
import java.util.function.Consumer;
2525

2626
import javax.annotation.Nullable;
2727

@@ -40,10 +40,12 @@
4040
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
4141
import software.amazon.kinesis.common.StreamConfig;
4242
import software.amazon.kinesis.common.StreamIdentifier;
43+
import software.amazon.kinesis.coordinator.CoordinatorConfig;
4344
import software.amazon.kinesis.coordinator.Scheduler;
4445
import software.amazon.kinesis.exceptions.InvalidStateException;
4546
import software.amazon.kinesis.exceptions.ShutdownException;
4647
import software.amazon.kinesis.exceptions.ThrottlingException;
48+
import software.amazon.kinesis.leases.LeaseManagementConfig;
4749
import software.amazon.kinesis.lifecycle.LifecycleConfig;
4850
import software.amazon.kinesis.lifecycle.events.InitializationInput;
4951
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
@@ -55,6 +57,7 @@
5557
import software.amazon.kinesis.metrics.NullMetricsFactory;
5658
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
5759
import software.amazon.kinesis.processor.MultiStreamTracker;
60+
import software.amazon.kinesis.processor.ProcessorConfig;
5861
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
5962
import software.amazon.kinesis.processor.ShardRecordProcessor;
6063
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
@@ -151,6 +154,20 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
151154

152155
private MetricsLevel metricsLevel = MetricsLevel.DETAILED;
153156

157+
private Consumer<CoordinatorConfig> coordinatorConfigCustomizer = (config) -> {
158+
};
159+
160+
private Consumer<LifecycleConfig> lifecycleConfigCustomizer = (config) -> {
161+
};
162+
163+
private Consumer<MetricsConfig> metricsConfigCustomizer = (config) -> {
164+
};
165+
166+
private Consumer<LeaseManagementConfig> leaseManagementConfigCustomizer = (config) -> {
167+
};
168+
169+
private boolean emptyRecordList;
170+
154171
public KclMessageDrivenChannelAdapter(String... streams) {
155172
this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams);
156173
}
@@ -283,9 +300,70 @@ public void setMetricsLevel(MetricsLevel metricsLevel) {
283300
this.metricsLevel = metricsLevel;
284301
}
285302

303+
/**
304+
* Set a {@link Consumer} to configure a {@link CoordinatorConfig}.
305+
* @param coordinatorConfigCustomizer the {@link Consumer} to configure a {@link CoordinatorConfig}.
306+
* @since 3.0.8
307+
* @see CoordinatorConfig
308+
*/
309+
public void setCoordinatorConfigCustomizer(Consumer<CoordinatorConfig> coordinatorConfigCustomizer) {
310+
Assert.notNull(coordinatorConfigCustomizer, "'coordinatorConfigCustomizer' must not be null");
311+
this.coordinatorConfigCustomizer = coordinatorConfigCustomizer;
312+
}
313+
314+
/**
315+
* Set a {@link Consumer} to configure a {@link LifecycleConfig}.
316+
* @param lifecycleConfigCustomizer the {@link Consumer} to configure a {@link LifecycleConfig}.
317+
* @since 3.0.8
318+
* @see LifecycleConfig
319+
*/
320+
public void setLifecycleConfigCustomizer(Consumer<LifecycleConfig> lifecycleConfigCustomizer) {
321+
Assert.notNull(lifecycleConfigCustomizer, "'lifecycleConfigCustomizer' must not be null");
322+
this.lifecycleConfigCustomizer = lifecycleConfigCustomizer;
323+
}
324+
325+
/**
326+
* Set a {@link Consumer} to configure a {@link MetricsConfig}.
327+
* May override whatever could be set individually, like {@link #setMetricsLevel(MetricsLevel)}.
328+
* @param metricsConfigCustomizer the {@link Consumer} to configure a {@link MetricsConfig}.
329+
* @since 3.0.8
330+
* @see MetricsConfig
331+
*/
332+
public void setMetricsConfigCustomizer(Consumer<MetricsConfig> metricsConfigCustomizer) {
333+
Assert.notNull(metricsConfigCustomizer, "'metricsConfigCustomizer' must not be null");
334+
this.metricsConfigCustomizer = metricsConfigCustomizer;
335+
}
336+
337+
/**
338+
* Set a {@link Consumer} to configure a {@link LeaseManagementConfig}.
339+
* @param leaseManagementConfigCustomizer the {@link Consumer} to configure a {@link LeaseManagementConfig}.
340+
* @since 3.0.8
341+
* @see LeaseManagementConfig
342+
*/
343+
public void setLeaseManagementConfigCustomizer(Consumer<LeaseManagementConfig> leaseManagementConfigCustomizer) {
344+
Assert.notNull(leaseManagementConfigCustomizer, "'leaseManagementConfigCustomizer' must not be null");
345+
this.leaseManagementConfigCustomizer = leaseManagementConfigCustomizer;
346+
}
347+
348+
/**
349+
* Whether to return an empty record list from consumer to the processor.
350+
* Works only in {@link ListenerMode#batch} mode.
351+
* The message will be sent into the output channel with an empty {@link List} as a payload.
352+
* @param emptyRecordList true to return an empty record list.
353+
* @since 3.0.8
354+
* @see ProcessorConfig#callProcessRecordsEvenForEmptyRecordList(boolean)
355+
*/
356+
public void setEmptyRecordList(boolean emptyRecordList) {
357+
this.emptyRecordList = emptyRecordList;
358+
}
359+
286360
@Override
287361
protected void onInit() {
288362
super.onInit();
363+
if (this.listenerMode.equals(ListenerMode.record) && this.emptyRecordList) {
364+
this.emptyRecordList = false;
365+
logger.warn("The 'emptyRecordList' is processed only in the [ListenerMode.batch].");
366+
}
289367
this.config =
290368
new ConfigsBuilder(buildStreamTracker(),
291369
this.consumerGroup,
@@ -316,8 +394,9 @@ protected void doStart() {
316394
+ "because it does not make sense in case of [ListenerMode.batch].");
317395
}
318396

319-
LifecycleConfig lifecycleConfig = this.config.lifecycleConfig();
397+
LifecycleConfig lifecycleConfig = this.config.lifecycleConfig();
320398
lifecycleConfig.taskBackoffTimeMillis(this.consumerBackoff);
399+
this.lifecycleConfigCustomizer.accept(lifecycleConfig);
321400

322401
RetrievalSpecificConfig retrievalSpecificConfig;
323402
String singleStreamName = this.streams.length == 1 ? this.streams[0] : null;
@@ -342,15 +421,25 @@ protected void doStart() {
342421
if (MetricsLevel.NONE.equals(this.metricsLevel)) {
343422
metricsConfig.metricsFactory(new NullMetricsFactory());
344423
}
424+
this.metricsConfigCustomizer.accept(metricsConfig);
425+
426+
CoordinatorConfig coordinatorConfig = this.config.coordinatorConfig();
427+
this.coordinatorConfigCustomizer.accept(coordinatorConfig);
428+
429+
LeaseManagementConfig leaseManagementConfig = this.config.leaseManagementConfig();
430+
this.leaseManagementConfigCustomizer.accept(leaseManagementConfig);
431+
432+
ProcessorConfig processorConfig = this.config.processorConfig()
433+
.callProcessRecordsEvenForEmptyRecordList(this.emptyRecordList);
345434

346435
this.scheduler =
347436
new Scheduler(
348437
this.config.checkpointConfig(),
349-
this.config.coordinatorConfig(),
350-
this.config.leaseManagementConfig(),
438+
coordinatorConfig,
439+
leaseManagementConfig,
351440
lifecycleConfig,
352441
metricsConfig,
353-
this.config.processorConfig(),
442+
processorConfig,
354443
retrievalConfig);
Has conversations. Original line has conversations.
355444

356445
this.executor.execute(this.scheduler);
@@ -542,7 +631,7 @@ private void processMultipleRecords(List<KinesisClientRecord> records,
542631
records.stream()
543632
.map(this::prepareMessageForRecord)
544633
.map(AbstractIntegrationMessageBuilder::build)
545-
.collect(Collectors.toList());
634+
.toList();
546635

547636
messageBuilder = getMessageBuilderFactory().withPayload(payload);
548637
}
@@ -557,7 +646,7 @@ else if (KclMessageDrivenChannelAdapter.this.converter != null) {
557646

558647
return KclMessageDrivenChannelAdapter.this.converter.convert(r.data().array());
559648
})
560-
.collect(Collectors.toList());
649+
.toList();
561650

562651
messageBuilder = getMessageBuilderFactory().withPayload(payload)
563652
.setHeader(AwsHeaders.RECEIVED_PARTITION_KEY, partitionKeys)

‎src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java

+31-11
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ static void setup() {
8686
.join();
8787
}
8888

89-
9089
@AfterAll
9190
static void tearDown() {
9291
AMAZON_KINESIS
@@ -126,24 +125,40 @@ void kclChannelAdapterReceivesRecords() {
126125

127126
@Test
128127
public void metricsLevelOfMetricsConfigShouldBeSetToMetricsLevelOfAdapter() {
129-
MetricsLevel metricsLevel = TestUtils.getPropertyValue(
130-
this.kclMessageDrivenChannelAdapter,
131-
"scheduler.metricsConfig.metricsLevel",
132-
MetricsLevel.class
133-
);
128+
MetricsLevel metricsLevel =
129+
TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter,
130+
"scheduler.metricsConfig.metricsLevel",
131+
MetricsLevel.class);
134132
assertThat(metricsLevel).isEqualTo(MetricsLevel.NONE);
135133
}
136134

137135
@Test
138136
public void metricsFactoryOfSchedulerShouldBeSetNullMetricsFactoryIfMetricsLevelIsNone() {
139-
MetricsFactory metricsFactory = TestUtils.getPropertyValue(
140-
this.kclMessageDrivenChannelAdapter,
141-
"scheduler.metricsFactory",
142-
MetricsFactory.class
143-
);
137+
MetricsFactory metricsFactory =
138+
TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter,
139+
"scheduler.metricsFactory",
140+
MetricsFactory.class);
144141
assertThat(metricsFactory).isInstanceOf(NullMetricsFactory.class);
145142
}
146143

144+
@Test
145+
public void maxLeasesForWorkerOverriddenByCustomizer() {
146+
Integer maxLeasesForWorker =
147+
TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter,
148+
"scheduler.leaseCoordinator.leaseTaker.maxLeasesForWorker",
149+
Integer.class);
150+
assertThat(maxLeasesForWorker).isEqualTo(10);
151+
}
152+
153+
@Test
154+
public void shardConsumerDispatchPollIntervalMillisOverriddenByCustomizer() {
155+
Long shardConsumerDispatchPollIntervalMillis =
156+
TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter,
157+
"scheduler.shardConsumerDispatchPollIntervalMillis",
158+
Long.class);
159+
assertThat(shardConsumerDispatchPollIntervalMillis).isEqualTo(500L);
160+
}
161+
147162
@Configuration
148163
@EnableIntegration
149164
public static class TestConfiguration {
@@ -159,7 +174,12 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
159174
adapter.setConsumerGroup("single_stream_group");
160175
adapter.setFanOut(false);
161176
adapter.setMetricsLevel(MetricsLevel.NONE);
177+
adapter.setLeaseManagementConfigCustomizer(leaseManagementConfig ->
178+
leaseManagementConfig.maxLeasesForWorker(10));
179+
adapter.setCoordinatorConfigCustomizer(coordinatorConfig ->
180+
coordinatorConfig.shardConsumerDispatchPollIntervalMillis(500L));
162181
adapter.setBindSourceRecord(true);
182+
adapter.setEmptyRecordList(true);
163183
return adapter;
164184
}
165185

0 commit comments

Comments
 (0)
Please sign in to comment.