Rust for JavaScript developers: SQS batch error handling with AWS Lambda
Table of contents
I will enhance the AWS SQS comparison using the partial batch response in this post. The partial batch response was announced back on Nov 2021, and it is helping us, developers, to handle partial failure.
As a part of the mini-series Rust for JavaScript developers you can check out the other parts:
Part 1 I have shown a basic comparison for AWS SQS.
Part 2 I have shown a basic comparison for AWS AppConfig.
Part 3 I have shown a basic comparison for reading processing a CSV file.
Why
In the last few years, I have switched languages multiple times between .NET, JavaScript and Rust, and the knowledge acquired with one language is transferable to a new one. Therefore, we need mentally map the similarity to pick it quickly.
Because I was doing this with a friend who was curious to see Serverless Rust in action, I wrote small posts about it.
The Basic
Rust is coming with many built-in features, for example:
JS | Rust |
npm | cargo |
npm init | cargo init |
npm install | cargo build |
package. json | Cargo.toml |
package-lock. json | Cargo.lock |
webpack | cargo build |
lint | cargo clippy |
prettier | cargo fmt |
doc generation | cargo doc |
test library like jest | cargo test |
Generate a new SAM based Serverless App
sam init --location gh:aws-samples/cookiecutter-aws-sam-rust
Amazon SQS
The challenge before the partial batch response was announced, it was dealing with errors. Suppose one of the SQS records fails while processing, the entire batch of messages is considered failed. Many of us kept track of success and failure messages and deleted the processed ones programmatically to go around this limitation.
Because the code is very similar to this one, I will report just the changes needed, starting with the aws sam template
AWSTemplateFormatVersion: 2010-09-09
Transform: 'AWS::Serverless-2016-10-31'
Description: Writer Lambda.
##########################################################################
# Global values that are applied to all resources #
##########################################################################
Globals:
Function:
MemorySize: 1024
Architectures: ["arm64"]
Handler: bootstrap
Runtime: provided.al2
Timeout: 29
Layers:
- !Sub arn:aws:lambda:${AWS::Region}:580247275435:layer:LambdaInsightsExtension-Arm64:1
Environment:
Variables:
RUST_BACKTRACE: 1
RUST_LOG: info
Resources:
##########################################################################
# Lambda Function #
##########################################################################
LambdaFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ../build/handler
Policies:
- AWSLambdaBasicExecutionRole
Events:
MySQSEvent:
Type: SQS
Properties:
Queue: <ARN of the SQS queue>
BatchSize: 10
FunctionResponseTypes:
- ReportBatchItemFailures
The code will be:
pub async fn execute(client: &aws_sdk_dynamodb::Client, event: LambdaEvent<SqsEvent>,) -> Result<Value, Error> {
let failed_message: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let mut tasks = Vec::with_capacity(event.payload.records.len());
let shared_client = Arc::from(client.clone());
for record in event.payload.records.into_iter() {
let shared_client = shared_client.clone();
let message_id = record.message_id.unwrap();
let failed_message = failed_message.clone();
tasks.push(tokio::spawn(async move {
if let Some(body) = &record.body {
let request = serde_json::from_str::<MyStruct>(&body);
if let Ok(request) = request {
DoSomething::new()
.execute(&shared_client, &request)
.map_or_else(|e| {
failed_message.lock().unwrap().push(message_id.clone());
}, |_| ());
}
}
}));
}
join_all(tasks).await;
let response = BatchItemFailures {
batch_item_failures: failed_message.lock().unwrap().clone()
.into_iter()
.map(|message_id| {
return ItemIdentifier {
item_identifier: message_id,
};
})
.collect(),
};
Ok(serde_json::to_value(response).unwrap())
}
It is not much different from the TypeScript version:
export const handler = async (event: SQSEvent): Promise<BatchItemFailures> => {
const failedMessageIds: string[] = [];
await Promise.all(
event.Records.map(async (record: SQSRecord) => {
try {
//DoSomething(record)
} catch (error) {
failedMessageIds.push(record.messageId);
}
}),
);
const batchItemFailures: BatchItemFailures = {
batchItemFailures: failedMessageIds.map(id => {
return {
itemIdentifier: id
}
})
};
return batchItemFailures;
};
Rust is more strict, and because I use tokio::spawn inside a loop, the failed_message vector is moved out and is no longer available because it violates the single borrow rule, so I need to use:
Arc<Mutex<Vec<String>>>
I could not find a way to set the SQS identifier inside the return of the join_all where I could filter down for the failures.
Conclusion
Rust syntax in a serverless application is possibly more clumsy than in other languages. Still, I am not using Rust every day, and I do not notice anymore the extra main(), or the unwrap() or the Mutex/Arc. It is just part of the language, and I got used by now.
My goal is to demonstrate that you can use a different programming language, reuse your skill sets and achieve maybe better results based on the requirements (choose the best tool for the job). Having the same code written in other programming languages helps me move quickly to Rust, and I hope it will help you facilitate the migration to Rust.