Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add: Support for ESM v2 partial batch failure handling (Kinesis & DynamoDB) #9

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

lizard-boy
Copy link

Motivation

This PR adds support for FunctionResponseTypes to ESM v2, allowing for partial batch failures to be reported and handled. Failed items of a batch can now be retried in accordance with a MaximumRetryAttempts policy.

Changes

Event Source Mapping: SQS

  • Fixed failing test at tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_sqs.py::test_report_batch_item_failures_invalid_result_json_batch_fails by propagating error and payload information to the raised PartialBatchFailure.

Event Source Mapping: Kinesis & DynamoDB

  • ESM v2 supports setting FunctionResponseTypes to [ReportBatchItemFailures] and handling partial batch failures when batchItemFailures is correctly returned by Lambda invocation via Kinesis or DynamoDB ESM.

Testing

Partial Failure with ReportBatchItemFailures

Both tests simulates a partial batch failure, capturing failure information in an OnFailure destination config using a DLQ:

  • DynamoDB: Added TestDynamoDBEventSourceMapping::test_dynamodb_report_batch_item_failures
  • Kinesis: Added TestKinesisSource::test_kinesis_report_batch_item_failures which simulates a batch failure and captures failure information in a DLQ

Success and failure conditions

All success and failure cases outlined in the ESM docs have test coverage (where conditions are identical for all Kinesis, DynamoDB, and SQS). These are covered in the following parametrized tests:

Total Batch Successes:

  • test_kinesis_report_batch_item_success_scenarios
  • test_dynamodb_report_batch_item_success_scenarios

Total Batch Failures:

  • test_kinesis_report_batch_item_failure_scenarios
  • test_dynamodb_report_batch_item_failure_scenarios

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

This PR adds support for partial batch failure handling in Event Source Mapping (ESM) v2 for Kinesis, DynamoDB, and SQS in LocalStack's Lambda service.

  • Implemented FunctionResponseTypes support in EsmConfigFactory for SQS, Kinesis, and DynamoDB Streams
  • Enhanced StreamPoller to handle PartialBatchFailureError, including retry logic for failed items
  • Added report_batch_item_failures flag in LambdaSender to enable partial success/failure processing
  • Introduced new test cases in test_lambda_integration_dynamodbstreams.py and test_lambda_integration_kinesis.py for partial batch failure scenarios
  • Updated utils.py with new Lambda function templates for DynamoDB and Kinesis batch item failure handling

16 file(s) reviewed, 9 comment(s)
Edit PR Review Bot Settings

)

# TODO What happens when FunctionResponseTypes value or target service is invalid?
if service in ["sqs", "kinesis", "dynamodbstreams"]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Use a constant or enum for the list of services that support FunctionResponseTypes

Comment on lines +347 to +354
def bisect_events(
self, sequence_number: str, events: list[dict]
) -> tuple[list[dict], list[dict]]:
for i, event in enumerate(events):
if self.get_sequence_number(event) == sequence_number:
return events[:i], events[i:]

return events, []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider optimizing bisect_events for large event lists, potentially using binary search

@@ -19,9 +19,20 @@ class LambdaSender(Sender):
# Flag to enable the payload dict using the "Records" key used for Lambda event source mapping
payload_dict: bool

def __init__(self, target_arn, target_parameters=None, target_client=None, payload_dict=False):
# Flag to enable partial successes/failures when processing batched events through a Lambda event source mapping
report_batch_item_failures: bool
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider adding type hints for boolean flags

Comment on lines +728 to +729
expected_successes = 5
expected_failures = 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider using a constant or configuration value for these numbers

@@ -578,7 +563,7 @@ def test_report_batch_item_failures(
)
snapshot.match("first_invocation", first_invocation)

# check that the DQL is empty
# check that the DLQ is empty
dlq_messages = aws_client.sqs.receive_message(QueueUrl=event_dlq_url)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: DQL typo

@@ -578,7 +563,7 @@ def test_report_batch_item_failures(
)
snapshot.match("first_invocation", first_invocation)

# check that the DQL is empty
# check that the DLQ is empty
dlq_messages = aws_client.sqs.receive_message(QueueUrl=event_dlq_url)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: DLQ misspelled as DQL

Suggested change
dlq_messages = aws_client.sqs.receive_message(QueueUrl=event_dlq_url)
# check that the DLQ is empty

Comment on lines +18 to +19
if new_image.get("should_fail", {}).get("BOOL", False):
batch_item_failures.append({"itemIdentifier": record["dynamodb"]["SequenceNumber"]})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider using .get('BOOL') instead of .get('BOOL', False) for consistency with the Kinesis handler

if payload.get("should_fail", False):
batch_item_failures.append({"itemIdentifier": record["kinesis"]["sequenceNumber"]})

return {"batchItemFailures" : batch_item_failures}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: Extra space before the colon in 'batchItemFailures :'

Comment on lines +51 to +53
def create_lambda_with_response(response: str) -> str:
"""Creates a lambda with pre-defined response"""
return _LAMBDA_WITH_RESPONSE.format(response=response)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Add type hinting for the 'response' parameter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants