Skip to content

Commit 042efc6

Browse files
committed
feat(kinesis): add add_failure and set_failures helpers to KinesisEventResponse
1 parent 38e236d commit 042efc6

File tree

1 file changed

+92
-0
lines changed
  • lambda-events/src/event/streams

1 file changed

+92
-0
lines changed

lambda-events/src/event/streams/mod.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,48 @@ pub struct KinesisEventResponse {
1717
pub other: serde_json::Map<String, Value>,
1818
}
1919

20+
impl KinesisEventResponse {
21+
/// Add a failed item identifier to the batch response.
22+
///
23+
/// When processing Kinesis records in batches, you can use this helper method to
24+
/// register individual record failures. Lambda will automatically retry failed
25+
/// records while successfully processed records will be checkpointed.
26+
///
27+
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
28+
/// to be enabled in your Lambda function's Kinesis event source mapping configuration.
29+
/// Without this setting, Lambda will retry the entire batch on any failure.
30+
pub fn add_failure(&mut self, item_identifier: impl Into<String>) {
31+
self.batch_item_failures.push(KinesisBatchItemFailure {
32+
item_identifier: Some(item_identifier.into()),
33+
#[cfg(feature = "catch-all-fields")]
34+
other: serde_json::Map::new(),
35+
});
36+
}
37+
38+
/// Set multiple failed item identifiers at once.
39+
///
40+
/// This is a convenience method for setting all batch item failures in one call.
41+
/// It replaces any previously registered failures.
42+
///
43+
/// **Important**: This feature requires `FunctionResponseTypes: ReportBatchItemFailures`
44+
/// to be enabled in your Lambda function's Kinesis event source mapping configuration.
45+
/// Without this setting, Lambda will retry the entire batch on any failure.
46+
pub fn set_failures<I, S>(&mut self, item_identifiers: I)
47+
where
48+
I: IntoIterator<Item = S>,
49+
S: Into<String>,
50+
{
51+
self.batch_item_failures = item_identifiers
52+
.into_iter()
53+
.map(|id| KinesisBatchItemFailure {
54+
item_identifier: Some(id.into()),
55+
#[cfg(feature = "catch-all-fields")]
56+
other: serde_json::Map::new(),
57+
})
58+
.collect();
59+
}
60+
}
61+
2062
/// `KinesisBatchItemFailure` is the individual record which failed processing.
2163
#[non_exhaustive]
2264
#[derive(Debug, Default, Clone, Eq, PartialEq, Deserialize, Serialize)]
@@ -94,3 +136,53 @@ pub struct SqsBatchItemFailure {
94136
#[serde(flatten)]
95137
pub other: serde_json::Map<String, Value>,
96138
}
139+
140+
#[cfg(test)]
141+
mod test {
142+
use super::*;
143+
144+
#[test]
145+
fn kinesis_event_response_add_failure() {
146+
let mut response = KinesisEventResponse::default();
147+
response.add_failure("seq-1");
148+
response.add_failure("seq-2".to_string());
149+
150+
assert_eq!(response.batch_item_failures.len(), 2);
151+
assert_eq!(
152+
response.batch_item_failures[0].item_identifier,
153+
Some("seq-1".to_string())
154+
);
155+
assert_eq!(
156+
response.batch_item_failures[1].item_identifier,
157+
Some("seq-2".to_string())
158+
);
159+
}
160+
161+
#[test]
162+
fn kinesis_event_response_set_failures() {
163+
let mut response = KinesisEventResponse::default();
164+
response.set_failures(vec!["seq-1", "seq-2", "seq-3"]);
165+
166+
assert_eq!(response.batch_item_failures.len(), 3);
167+
assert_eq!(
168+
response.batch_item_failures[0].item_identifier,
169+
Some("seq-1".to_string())
170+
);
171+
assert_eq!(
172+
response.batch_item_failures[1].item_identifier,
173+
Some("seq-2".to_string())
174+
);
175+
assert_eq!(
176+
response.batch_item_failures[2].item_identifier,
177+
Some("seq-3".to_string())
178+
);
179+
180+
// Test that set_failures replaces existing failures
181+
response.set_failures(vec!["seq-4".to_string()]);
182+
assert_eq!(response.batch_item_failures.len(), 1);
183+
assert_eq!(
184+
response.batch_item_failures[0].item_identifier,
185+
Some("seq-4".to_string())
186+
);
187+
}
188+
}

0 commit comments

Comments
 (0)