Rust: create a Lambda function that subscribes to an SQS queue

This blog post explains how to subscribe to an AWS SQS and handle all the messages in parallel.

As a part of the series Serverless Rust you can check out the other parts:

Part 1 and 2 describe how to set up Rust and VsCode.

How to process the SQS

Concurrency: Concurrency means that an application is making progress on more than one task at the same time (concurrently).

Parallelism: Parallelism means that an application splits its tasks into smaller subtasks that can be processed in parallel, for example, on multiple CPUs simultaneously.

Concurrency

// convert array in streams to use async
let mut records = stream::iter(event.records); 
// processing one element at a time
while let Some(record) = records.next().await {

Parallelism

pub async fn execute(client: &AWSClient, event: SqsEvent, _ctx: Context) -> Result<Option<Product>, Error> {the 
    let mut tasks = Vec::with_capacity(event.records.len());
    let shared_client = Arc::from(client.clone());
    for record in event.records.into_iter() {
      let shared_client = Arc::clone(&shared_client);
      tasks.push(tokio::spawn(async move {
        if let Some(body) = &record.body {
          let request: Request = serde_json::from_str(&body).unwrap();
          if let Some(pk) = request.pk {
            do_something(&shared_client, &pk)
            .await
            .map_or_else(|e| log::error!("Error {:?}", e), |_| ());
          }
        } else {
          log::error!("Empty body {:?}", record);
        }
      }))
    }

    join_all(tasks).await;

    Ok(None)
  }

Explanation

At first glance, the syntax can be a bit cumbersome but in the end, what we are doing is:

  1. Define an array of Tasks using the SQS in the input
  2. I am using The Atomic Reference Counter (Arc) to share immutable data across threads in a thread-safe way. In this case, we want to share the shared_client that is holding the reference to AWS clients initialized outside of the handler (check part1).
  3. I am using Tokio syntax, a runtime for writing reliable asynchronous applications with Rust.
  4. For each SQS message, I create a new task.
  5. Execute all tasks in parallel.

When you call .iter() on a Vec, the values the iterator produces are references to the values in the Vec - they're being borrowed. So you can't use them in a thread because the Rust lifetime system needs to know that if you borrow something, that will exist for the whole time.

There are a few ways:

  • If you don't need the Vec after processing, you could switch to use .into_iter(). However, this creates a consuming iterator, that is, one that moves each value out of the vector (from start to end). The vector cannot be used after calling this.
  • If you do need your Vec after, and your values are clonable, you could call .iter().cloned() instead of .iter() - this will make copies of each of the values, which you can then move into the thread.

Conclusion

If you are new to Rust, this syntax can scare you away, but like everything, you get used, and I think you are not even noticing anymore after a while. When I started converting some simple NodeJs code to Rust, I should say that not much was out there. AWS examples are just for syntax, so it was challenging to find the Parallelism example, and so I hope this will help you speed up your experience. You can find the complete experiment with all the commits and struggle on GitHub