Rust for JavaScript developers: SQS batch error handling with AWS Lambda

I will enhance the AWS SQS comparison using the partial batch response in this post. The partial batch response was announced back on Nov 2021, and it is helping us, developers, to handle partial failure.

As a part of the mini-series Rust for JavaScript developers you can check out the other parts:

  • Part 1 I have shown a basic comparison for AWS SQS.

  • Part 2 I have shown a basic comparison for AWS AppConfig.

  • Part 3 I have shown a basic comparison for reading processing a CSV file.

Why

In the last few years, I have switched languages multiple times between .NET, JavaScript and Rust, and the knowledge acquired with one language is transferable to a new one. Therefore, we need mentally map the similarity to pick it quickly.

Because I was doing this with a friend who was curious to see Serverless Rust in action, I wrote small posts about it.

The Basic

Rust is coming with many built-in features, for example:

JSRust
npmcargo
npm initcargo init
npm installcargo build
package. jsonCargo.toml
package-lock. jsonCargo.lock
webpackcargo build
lintcargo clippy
prettiercargo fmt
doc generationcargo doc
test library like jestcargo test

Generate a new SAM based Serverless App

sam init --location gh:aws-samples/cookiecutter-aws-sam-rust

Amazon SQS

The challenge before the partial batch response was announced, it was dealing with errors. Suppose one of the SQS records fails while processing, the entire batch of messages is considered failed. Many of us kept track of success and failure messages and deleted the processed ones programmatically to go around this limitation.

Because the code is very similar to this one, I will report just the changes needed, starting with the aws sam template

AWSTemplateFormatVersion: 2010-09-09
Transform: 'AWS::Serverless-2016-10-31'
Description: Writer Lambda.

##########################################################################
#  Global values that are applied to all resources                       #
##########################################################################
Globals:
  Function:
    MemorySize: 1024
    Architectures: ["arm64"]
    Handler: bootstrap
    Runtime: provided.al2
    Timeout: 29
    Layers:
      - !Sub arn:aws:lambda:${AWS::Region}:580247275435:layer:LambdaInsightsExtension-Arm64:1
    Environment:
      Variables:
        RUST_BACKTRACE: 1
        RUST_LOG: info

Resources:
##########################################################################
#   Lambda Function                                                      #
##########################################################################
  LambdaFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: ../build/handler
      Policies:
        - AWSLambdaBasicExecutionRole
      Events:
        MySQSEvent:
          Type: SQS
          Properties:
            Queue: <ARN of the SQS queue>
            BatchSize: 10
            FunctionResponseTypes:
              - ReportBatchItemFailures

The code will be:

pub async fn execute(client: &aws_sdk_dynamodb::Client, event: LambdaEvent<SqsEvent>,) -> Result<Value, Error> {
    let failed_message: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
    let mut tasks = Vec::with_capacity(event.payload.records.len());
    let shared_client = Arc::from(client.clone());

    for record in event.payload.records.into_iter() {
        let shared_client = shared_client.clone();
        let message_id = record.message_id.unwrap();
        let failed_message = failed_message.clone();

        tasks.push(tokio::spawn(async move {
            if let Some(body) = &record.body {
                let request = serde_json::from_str::<MyStruct>(&body);
                if let Ok(request) = request {
                    DoSomething::new()
                      .execute(&shared_client, &request)
                      .map_or_else(|e| {
                        failed_message.lock().unwrap().push(message_id.clone());
                      }, |_| ());
                } 
            }
        }));
    }

    join_all(tasks).await;

    let response = BatchItemFailures {
        batch_item_failures: failed_message.lock().unwrap().clone()
            .into_iter()
            .map(|message_id| {
                return ItemIdentifier {
                    item_identifier: message_id,
                };
            })
            .collect(),
    };

    Ok(serde_json::to_value(response).unwrap())
}

It is not much different from the TypeScript version:

export const handler = async (event: SQSEvent): Promise<BatchItemFailures> => {
  const failedMessageIds: string[] = [];
  await Promise.all(
    event.Records.map(async (record: SQSRecord) => {
      try {
        //DoSomething(record)
      } catch (error) {
        failedMessageIds.push(record.messageId);
      }
    }),
  );

  const batchItemFailures: BatchItemFailures = {
    batchItemFailures: failedMessageIds.map(id => {
      return {
        itemIdentifier: id
      }
    })
  };

  return batchItemFailures;
};

Rust is more strict, and because I use tokio::spawn inside a loop, the failed_message vector is moved out and is no longer available because it violates the single borrow rule, so I need to use:

 Arc<Mutex<Vec<String>>>

I could not find a way to set the SQS identifier inside the return of the join_all where I could filter down for the failures.

Conclusion

Rust syntax in a serverless application is possibly more clumsy than in other languages. Still, I am not using Rust every day, and I do not notice anymore the extra main(), or the unwrap() or the Mutex/Arc. It is just part of the language, and I got used by now.

My goal is to demonstrate that you can use a different programming language, reuse your skill sets and achieve maybe better results based on the requirements (choose the best tool for the job). Having the same code written in other programming languages helps me move quickly to Rust, and I hope it will help you facilitate the migration to Rust.