Skip to content

Commit

Permalink
Improve Kinesis event recort type (#886)
Browse files Browse the repository at this point in the history
Make the encryption type an enum.
Make sequence number and partition key non-optional.

Signed-off-by: David Calavera <[email protected]>
  • Loading branch information
calavera authored Jun 2, 2024
1 parent 31250f3 commit 0fcba16
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
29 changes: 26 additions & 3 deletions lambda-events/src/event/kinesis/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,24 @@ pub struct KinesisEventRecord {
pub struct KinesisRecord {
pub approximate_arrival_timestamp: SecondTimestamp,
pub data: Base64Data,
pub encryption_type: Option<String>,
#[serde(default)]
pub partition_key: Option<String>,
pub encryption_type: KinesisEncryptionType,
#[serde(default)]
pub sequence_number: Option<String>,
pub partition_key: String,
#[serde(default)]
pub sequence_number: String,
#[serde(default)]
pub kinesis_schema_version: Option<String>,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum KinesisEncryptionType {
#[default]
None,
Kms,
}

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -80,6 +89,20 @@ mod test {
fn example_kinesis_event() {
let data = include_bytes!("../../fixtures/example-kinesis-event.json");
let parsed: KinesisEvent = serde_json::from_slice(data).unwrap();
assert_eq!(KinesisEncryptionType::None, parsed.records[0].kinesis.encryption_type);

let output: String = serde_json::to_string(&parsed).unwrap();
let reparsed: KinesisEvent = serde_json::from_slice(output.as_bytes()).unwrap();
assert_eq!(parsed, reparsed);
}

#[test]
#[cfg(feature = "kinesis")]
fn example_kinesis_event_encrypted() {
let data = include_bytes!("../../fixtures/example-kinesis-event-encrypted.json");
let parsed: KinesisEvent = serde_json::from_slice(data).unwrap();
assert_eq!(KinesisEncryptionType::Kms, parsed.records[0].kinesis.encryption_type);

let output: String = serde_json::to_string(&parsed).unwrap();
let reparsed: KinesisEvent = serde_json::from_slice(output.as_bytes()).unwrap();
assert_eq!(parsed, reparsed);
Expand Down
37 changes: 37 additions & 0 deletions lambda-events/src/fixtures/example-kinesis-event-encrypted.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333333333333333333333333333333333333333333",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480641523.477,
"encryptionType": "KMS"
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333333333333333333333333333333333333333333",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "s1",
"sequenceNumber": "49568167373333333334444444444444444444444444444444444444",
"data": "SGVsbG8gV29ybGQ=",
"approximateArrivalTimestamp": 1480841523.477
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000000:49568167373333333334444444444444444444444444444444444444",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream"
}
]
}

0 comments on commit 0fcba16

Please sign in to comment.