Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KCL client consistently throws exception when in use with localstack #135

Open
kevin-woodward opened this issue May 18, 2021 · 1 comment

Comments

@kevin-woodward
Copy link

Using localstack's latest image (0.12.11) and the unmodified provided code in /samples on the master branch, the client consistently fails ~30 seconds after starting the worker loop. I'm aware of the 30 second limitation for process_records, but no records are being put into the stream at all. I've tested disabling CBOR as well as validating that the region in my .properties file matches the region localstack is using.

I'm at a loss for how to debug this, can someone point me in the right direction? Here is the stack trace for the client's exception:

2021-05-17 17:25:06,087 [main] INFO  s.a.k.m.MultiLangDaemonConfig [NONE] - Using a cached thread pool. 
2021-05-17 17:25:06,185 [main] INFO  s.a.k.m.MultiLangDaemonConfig [NONE] - Running PythonTestConsumer to process stream etl with executable python3 ./kinesis_consumer_base.py 
2021-05-17 17:25:06,189 [main] INFO  s.a.k.m.MultiLangDaemonConfig [NONE] - Using workerId: d69a95d7-fa03-430b-8395-7eceb1d20f4a 
2021-05-17 17:25:06,189 [main] INFO  s.a.k.m.MultiLangDaemonConfig [NONE] - MultiLangDaemon is adding the following fields to the User Agent: amazon-kinesis-client-library-java amazon-kinesis-multi-lang-daemon/1.0.1 python/3.8.2 python3 
2021-05-17 17:25:06,440 [main] INFO  s.a.k.l.d.DynamoDBLeaseCoordinator [NONE] - With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time. 
2021-05-17 17:25:06,445 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Initialization attempt 1 
2021-05-17 17:25:06,445 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Initializing LeaseCoordinator 
2021-05-17 17:25:12,911 [multi-lang-daemon-0000] INFO  s.a.k.l.d.DynamoDBLeaseCoordinator [NONE] - Created new lease table for coordinator with initial read capacity of 10 and write capacity of 10. 
2021-05-17 17:25:12,984 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Syncing Kinesis shard info 
2021-05-17 17:25:13,350 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Starting LeaseCoordinator 
2021-05-17 17:25:13,433 [LeaseCoordinator-0000] INFO  s.a.k.l.dynamodb.DynamoDBLeaseTaker [NONE] - Worker d69a95d7-fa03-430b-8395-7eceb1d20f4a saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases 
2021-05-17 17:25:13,503 [LeaseCoordinator-0000] INFO  s.a.k.l.dynamodb.DynamoDBLeaseTaker [NONE] - Worker d69a95d7-fa03-430b-8395-7eceb1d20f4a successfully took 1 leases: shardId-000000000000 
2021-05-17 17:25:23,399 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Initialization complete. Starting worker loop. 
2021-05-17 17:25:23,522 [multi-lang-daemon-0000] INFO  s.a.k.r.f.FanOutConsumerRegistration [NONE] - StreamConsumer not found, need to create it. 
2021-05-17 17:25:23,625 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Created new shardConsumer for : ShardInfo(shardId=shardId-000000000000, concurrencyToken=1e9387b7-f138-4960-aff6-f0f655be6701, parentShardIds=[], checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}) 
2021-05-17 17:25:23,626 [ShardRecordProcessor-0000] INFO  s.a.k.l.BlockOnParentShardTask [NONE] - No need to block on parents [] of shard shardId-000000000000 
2021-05-17 17:25:24,682 [multi-lang-daemon-0001] INFO  s.a.kinesis.multilang.LineReaderTask [NONE] - Starting: Reading STDERR for shardId-000000000000 
2021-05-17 17:25:24,683 [ShardRecordProcessor-0000] INFO  s.a.kinesis.multilang.MessageWriter [NONE] - Writing InitializeMessage to child process for shard shardId-000000000000 
2021-05-17 17:25:24,717 [multi-lang-daemon-0002] INFO  s.a.kinesis.multilang.MessageWriter [NONE] - Message size == 104 bytes for shard shardId-000000000000 
2021-05-17 17:25:24,718 [multi-lang-daemon-0003] INFO  s.a.kinesis.multilang.LineReaderTask [NONE] - Starting: Reading next message from STDIN for shardId-000000000000 
2021-05-17 17:25:24,752 [ShardRecordProcessor-0000] INFO  s.a.k.multilang.MultiLangProtocol [NONE] - Received response {"action":"status","responseFor":"initialize"} from subprocess while waiting for initialize while processing shard shardId-000000000000 
2021-05-17 17:25:56,750 [ShardRecordProcessor-0001] WARN  s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000000: onError().  Cancelling subscription, and marking self as failed. 
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
        at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:163)
        at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access$700(FanOutRecordsPublisher.java:54)
        at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:553)
        at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard$1(DefaultKinesisAsyncClient.java:2238)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2158)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$1(AsyncApiCallTimeoutTrackingStage.java:62)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2158)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryErrorIfNeeded(AsyncRetryableStage.java:167)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:119)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:104)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2158)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.onError(MakeAsyncHttpRequestStage.java:236)
        at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.lambda$onError$2(ResponseHandler.java:258)
        at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:164)
        at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$700(ResponseHandler.java:64)
        at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onError(ResponseHandler.java:257)
        at com.typesafe.netty.HandlerPublisher.exceptionCaught(HandlerPublisher.java:432)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
        at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
        at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
        at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
        at io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
        at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
        at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
        at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
        at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
        at io.netty.handler.timeout.ReadTimeoutHandler.readTimedOut(ReadTimeoutHandler.java:98)
        at io.netty.handler.timeout.ReadTimeoutHandler.channelIdle(ReadTimeoutHandler.java:90)
        at io.netty.handler.timeout.IdleStateHandler$ReaderIdleTimeoutTask.run(IdleStateHandler.java:494)
        at io.netty.handler.timeout.IdleStateHandler$AbstractIdleTask.run(IdleStateHandler.java:466)
        at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
        at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: software.amazon.awssdk.core.exception.SdkClientException: null
        at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
        at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98)
        at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:118)
        ... 37 common frames omitted
Caused by: io.netty.handler.timeout.ReadTimeoutException: null
2021-05-17 17:26:06,781 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Current stream shard assignments: shardId-000000000000 
2021-05-17 17:26:06,781 [multi-lang-daemon-0000] INFO  s.a.kinesis.coordinator.Scheduler [NONE] - Sleeping ... 

The same exception will get rethrown indefinitely if the client is left running.

@kevin-woodward
Copy link
Author

I believe this might be related to #97

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant