You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
2023-08-01 21:06:13,500 [LeaseCoordinator-0000] INFO s.a.k.l.d.DynamoDBLeaseRefresher [NONE] - Transferred lease shardId-000000000000 ownership from f4457d0c-af35-4e59-b036-dca933497848 to 2ee036cd-7d94-4a56-b141-816baf05cdda
2023-08-01 21:06:13,501 [LeaseCoordinator-0000] INFO s.a.k.l.dynamodb.DynamoDBLeaseTaker [NONE] - Worker 2ee036cd-7d94-4a56-b141-816baf05cdda successfully took 1 leases: shardId-000000000000
2023-08-01 21:06:16,533 [multi-lang-daemon-0000] INFO s.a.k.r.f.FanOutConsumerRegistration [NONE] - abdul-stream2 : Waiting for StreamConsumer PythonKCLSample to have ACTIVE status...
2023-08-01 21:06:17,552 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Created new shardConsumer for : ShardInfo(streamIdentifierSerOpt=Optional.empty, shardId=shardId-000000000000, concurrencyToken=00c07f81-fe6d-4fd8-82da-1f6781bf3605, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
2023-08-01 21:06:17,554 [ShardRecordProcessor-0000] INFO s.a.k.l.BlockOnParentShardTask [NONE] - No need to block on parents [] of shard shardId-000000000000
2023-08-01 21:06:18,788 [ShardRecordProcessor-0000] ERROR s.a.k.m.MultiLangShardRecordProcessor [NONE] - Encountered an error while trying to initialize record processor
java.io.IOException: Failed to start client executable
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:79)
at software.amazon.kinesis.lifecycle.InitializeTask.call(InitializeTask.java:102)
at software.amazon.kinesis.lifecycle.ShardConsumer.executeTask(ShardConsumer.java:336)
at software.amazon.kinesis.lifecycle.ShardConsumer.lambda$initializeComplete$2(ShardConsumer.java:289)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.io.IOException: Cannot run program "sample_kclpy_app.py": CreateProcess error=193, %1 is not a valid Win32 application
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1140)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1074)
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.startProcess(MultiLangShardRecordProcessor.java:321)
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:73)
Here is the sample.properties file.
`# The script that abides by the multi-language protocol. This script will be executed by the MultiLangDaemon, which will communicate with this scriptover STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py
Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
from future import print_function
import sys
import time
from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor
class RecordProcessor(processor.RecordProcessorBase):
"""
A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:
* initialize will be called once
* process_records will be called zero or more times
* shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
a scaling change.
"""
def __init__(self):
self._SLEEP_SECONDS = 5
self._CHECKPOINT_RETRIES = 5
self._CHECKPOINT_FREQ_SECONDS = 60
self._largest_seq = (None, None)
self._largest_sub_seq = None
self._last_checkpoint_time = None
def log(self, message):
sys.stderr.write(message)
def initialize(self, initialize_input):
"""
Called once by a KCLProcess before any calls to process_records
:param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record
processor has been assigned.
"""
self._largest_seq = (None, None)
self._last_checkpoint_time = time.time()
def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None):
"""
Checkpoints with retries on retryable exceptions.
:param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records
or shutdown
:param str or None sequence_number: the sequence number to checkpoint at.
:param int or None sub_sequence_number: the sub sequence number to checkpoint at.
"""
for n in range(0, self._CHECKPOINT_RETRIES):
try:
checkpointer.checkpoint(sequence_number, sub_sequence_number)
return
except kcl.CheckpointError as e:
if 'ShutdownException' == e.value:
#
# A ShutdownException indicates that this record processor should be shutdown. This is due to
# some failover event, e.g. another MultiLangDaemon has taken the lease for this shard.
#
print('Encountered shutdown exception, skipping checkpoint')
return
elif 'ThrottlingException' == e.value:
#
# A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many
# dynamo writes. We will sleep temporarily to let it recover.
#
if self._CHECKPOINT_RETRIES - 1 == n:
sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n))
return
else:
print('Was throttled while checkpointing, will attempt again in {s} seconds'
.format(s=self._SLEEP_SECONDS))
elif 'InvalidStateException' == e.value:
sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n')
else: # Some other error
sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e))
time.sleep(self._SLEEP_SECONDS)
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
"""
Called for each record that is passed to process_records.
:param str data: The blob of data that was contained in the record.
:param str partition_key: The key associated with this record.
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
####################################
# Insert your processing logic here
####################################
self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}"
.format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data)))
def should_update_sequence(self, sequence_number, sub_sequence_number):
"""
Determines whether a new larger sequence number is available
:param int sequence_number: the sequence number from the current record
:param int sub_sequence_number: the sub sequence number from the current record
:return boolean: true if the largest sequence should be updated, false otherwise
"""
return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \
(sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1])
def process_records(self, process_records_input):
"""
Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers
from the records to indicate where in the stream to checkpoint.
:param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the
records.
"""
try:
for record in process_records_input.records:
data = record.binary_data
seq = int(record.sequence_number)
sub_seq = record.sub_sequence_number
key = record.partition_key
self.process_record(data, key, seq, sub_seq)
if self.should_update_sequence(seq, sub_seq):
self._largest_seq = (seq, sub_seq)
#
# Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds
#
if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS:
self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1])
self._last_checkpoint_time = time.time()
except Exception as e:
self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e))
def lease_lost(self, lease_lost_input):
self.log("Lease has been lost")
def shard_ended(self, shard_ended_input):
self.log("Shard has ended checkpointing")
shard_ended_input.checkpointer.checkpoint()
def shutdown_requested(self, shutdown_requested_input):
self.log("Shutdown has been requested, checkpointing.")
shutdown_requested_input.checkpointer.checkpoint()
if name == "main":
print("inside main")
kcl_process = kcl.KCLProcess(RecordProcessor())
print("KCL Process: ", kcl_process)
kcl_process.run()
`
The text was updated successfully, but these errors were encountered:
I am getting this error.
2023-08-01 21:06:13,500 [LeaseCoordinator-0000] INFO s.a.k.l.d.DynamoDBLeaseRefresher [NONE] - Transferred lease shardId-000000000000 ownership from f4457d0c-af35-4e59-b036-dca933497848 to 2ee036cd-7d94-4a56-b141-816baf05cdda
2023-08-01 21:06:13,501 [LeaseCoordinator-0000] INFO s.a.k.l.dynamodb.DynamoDBLeaseTaker [NONE] - Worker 2ee036cd-7d94-4a56-b141-816baf05cdda successfully took 1 leases: shardId-000000000000
2023-08-01 21:06:16,533 [multi-lang-daemon-0000] INFO s.a.k.r.f.FanOutConsumerRegistration [NONE] - abdul-stream2 : Waiting for StreamConsumer PythonKCLSample to have ACTIVE status...
2023-08-01 21:06:17,552 [multi-lang-daemon-0000] INFO s.a.kinesis.coordinator.Scheduler [NONE] - Created new shardConsumer for : ShardInfo(streamIdentifierSerOpt=Optional.empty, shardId=shardId-000000000000, concurrencyToken=00c07f81-fe6d-4fd8-82da-1f6781bf3605, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0})
2023-08-01 21:06:17,554 [ShardRecordProcessor-0000] INFO s.a.k.l.BlockOnParentShardTask [NONE] - No need to block on parents [] of shard shardId-000000000000
2023-08-01 21:06:18,788 [ShardRecordProcessor-0000] ERROR s.a.k.m.MultiLangShardRecordProcessor [NONE] - Encountered an error while trying to initialize record processor
java.io.IOException: Failed to start client executable
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:79)
at software.amazon.kinesis.lifecycle.InitializeTask.call(InitializeTask.java:102)
at software.amazon.kinesis.lifecycle.ShardConsumer.executeTask(ShardConsumer.java:336)
at software.amazon.kinesis.lifecycle.ShardConsumer.lambda$initializeComplete$2(ShardConsumer.java:289)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.io.IOException: Cannot run program "sample_kclpy_app.py": CreateProcess error=193, %1 is not a valid Win32 application
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1140)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1074)
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.startProcess(MultiLangShardRecordProcessor.java:321)
at software.amazon.kinesis.multilang.MultiLangShardRecordProcessor.initialize(MultiLangShardRecordProcessor.java:73)
Here is the sample.properties file.
`# The script that abides by the multi-language protocol. This script will be executed by the MultiLangDaemon, which will communicate with this scriptover STDIN and STDOUT according to the multi-language protocol.
executableName = sample_kclpy_app.py
streamArn = arn:aws:kinesis:us-east-1:249144947172:stream/abdul-stream2
streamName = abdul-stream2
applicationName = PythonKCLSample
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
processingLanguage = python/3.11
initialPositionInStream = TRIM_HORIZON
regionName = us-east-1
`
Here is the python sample_kplpy_app.py file.
`#!C:/Users/rehmana16/AppData/Local/Programs/Python/Python311/python.exe
Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
from future import print_function
import sys
import time
from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor
class RecordProcessor(processor.RecordProcessorBase):
"""
A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:
if name == "main":
print("inside main")
kcl_process = kcl.KCLProcess(RecordProcessor())
print("KCL Process: ", kcl_process)
kcl_process.run()
`
The text was updated successfully, but these errors were encountered: