Rust: CSV processing

Rust: CSV processing

This blog post is about CSV processing, and it sounds boring, but I want to share my experience on how I read and write a CSV file of almost 1 GB in just a few seconds.

As a part of the series Serverless Rust you can check out the other parts:

Part 1 and 2 describe how to set up Rust and VsCode.

Part 3 how to process in parallel AWS SQS messages.

Part 4 how to execute AWS Step Function for each AWS SQS message received.

Part 5 how to inject configuration with AWS AppConfig.

Problem

I want to read IMDb Datasets and process the title.basics.tsv.gz so that I can play with Amazon Neptune.

I will not talk about Amazon Neptune in this blog post, but I will focus on CSV processing.

Because the files of this dataset are massive, I already knew from past experiences in different languages that it could be a problem. So I asked for advice from Nicolas Moutschen, leveraging his experience with Rust. Nicolas pointed me straight away to the libraries and some methods to use.

CSV

I use CSV crate. There is excellent documentation around, and you can find a few links here:

Processing - take 1

As I said, I started following Nicolas Moutschen suggestions to reading and processing data from streams, and Nicolas was kind to show me this.

I needed to change the original CSV on something that I could use with Amazon Neptune, and so I replaced:

println!("{}", String::from_utf8_lossy(&buffer[..len]));

With the necessary code needed to write a new CSV.

Cargo.toml dependencies:

[dependencies]
async-compression = { version = "0.3.12", features = ["all", "tokio"] }
csv = "1.1.6"
hyper = { version = "0.14", features = ["full"] }
hyper-tls = "0.5.0"
serde =  { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.8"
tokio-util = { version = "0.6.9", features = ["full"] }

The complete code is the following:

use csv;
use hyper::Client;
use hyper_tls::HttpsConnector;
use std::io;
use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
const LINK: &str = "https://datasets.imdbws.com/title.basics.tsv.gz";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, hyper::Body>(https);
    let res = client.get(LINK.parse()?).await?;
    let body = res
        .into_body()
        .map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
    let body = StreamReader::new(body);
    let mut decoder = async_compression::tokio::bufread::GzipDecoder::new(body);
    let mut buffer = [0; 1024];
    let mut wtr = csv::Writer::from_path("./export/title.csv")?;
    wtr.write_record(&[
        "~id",
        "~label",
        "titleType",
        "primaryTitle",
        "originalTitle",
        "isAdult",
        "startYear",
        "endYear",
        "runtimeMinutes",
        "genres",
    ])?;
    loop {
        let len = decoder.read(&mut buffer).await?;
        if len == 0 {
            break;
        }
        let line = String::from_utf8_lossy(&buffer[..len]);
        let line: Vec<&str> = line.split("\t").collect();
        wtr.write_record(&[
            line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],
            line[8],
        ])?;
    }
     wtr.flush()?;
    Ok(())
}

This part of the code (please do not mind the quality) :

  let line = String::from_utf8_lossy(&buffer[..len]);
        let line: Vec<&str> = line.split("\t").collect();
        wtr.write_record(&[
            line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],
            line[8],
        ])?;

This code has a problem when I convert the bytes to strings. Because of the formatting of the CSV and the size of the buffer, the rows with this conversion were mixed. If, for example, we assume that the original CSV is made of:

tconst    titleType    primaryTitle    originalTitle    isAdult    startYear    endYear    runtimeMinutes    genres
tt0000001    short    Carmencita    Carmencita    0    1894    \N    1    Documentary,Short
tt0000002    short    Le clown et ses chiens    Le clown et ses chiens    0    1892    \N    5    Animation,Short
tt0000003    short    Pauvre Pierrot    Pauvre Pierrot    0    1892    \N    4    Animation,Comedy,Romance
tt0000004    short    Un bon bock    Un bon bock    0    1892    \N    12    Animation,Short
tt0000005    short    Blacksmith Scene    Blacksmith Scene    0    1893    \N    1    Comedy,Short
tt0000006    short    Chinese Opium Den    Chinese Opium Den    0    1894    \N    1    Short
tt0000007    short    Corbett and Courtney Before the Kinetograph    Corbett and Courtney Before the Kinetograph    0    1894    \N    1    Short,Sport

A line might be:

//get the trailer of the new row
tt0000001    short    Carmencita    Carmencita    0    1894    \N    1    Documentary,Short
tt0000002

or
//incomplete
Courtney Before the Kinetograph

The problem is possibly how I used the libraries or missed something, but I could not find a solution, so I moved on.

Processing - take 2

I tried to force the stream into the CSV ReaderBuilder.

loop {
        let len = decoder.read(&mut buffer).await?;
        if len == 0 {
            break;
        }

        let mut rdr = csv::ReaderBuilder::new()
            .has_headers(true)
            .delimiter(b'\t')
            .flexible(true)
            .from_reader(&buffer[..len]);

        for result in rdr.records() {
             let record = result?;
             wtr.serialize(Record {
                    id: record[0].to_string(),
                    label: "movies".to_string(),
                    title_type: record[1].to_string(),
                    primary_title: record[2].to_string(),
                    original_title: record[3].to_string(),
                    is_adult: record[4].to_string().to_bool(),
                    start_year: record[5].parse::<u16>().unwrap_or_default(),
                    end_year: record[6].parse::<u16>().unwrap_or_default(),
                    runtime_minutes: record[7].parse::<u16>().unwrap_or_default(),
                    genres: record[8].to_string(),
                })?;
        }
    }

But the problem was the same incomplete data, but with some condition around, I made it works, but I was not happy because the code was ugly, so I decided to move on.

Processing - final take...for now

I assume that I have downloaded the compressed file and have the CSV in a folder for this part of the code.

use csv;
use serde::{Deserialize, Deserializer, Serialize};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let mut wtr = csv::WriterBuilder::default()
        .has_headers(false)
        .from_path("./export/title.csv")?;

    wtr.write_record(&[
        "~id",
        "~label",
        "titleType",
        "primaryTitle",
        "originalTitle",
        "isAdult",
        "startYear",
        "endYear",
        "runtimeMinutes",
        "genres",
    ])?;

    let mut rdr = csv::ReaderBuilder::new()
        .has_headers(true)
        .delimiter(b'\t')
        .double_quote(false)
        .escape(Some(b'\\'))
        .flexible(true)
        //.comment(Some(b'#'))
        .from_path("./import/title.basics.tsv")?;

    for result in rdr.deserialize() {
        let record: Record = result?;
        wtr.serialize(record)?;
    }

    wtr.flush()?;

    Ok(())
}

#[derive(Debug, Deserialize, Serialize)]
struct Record {
    #[serde(alias = "tconst")]
    id: String,

    #[serde(default = "default_label")]
    label: String,

    #[serde(alias = "titleType")]
    title_type: String,

    #[serde(alias = "primaryTitle")]
    primary_title: String,

    #[serde(alias = "originalTitle")]
    original_title: String,

    #[serde(alias = "isAdult")]
    #[serde(deserialize_with = "bool_from_string")]
    is_adult: bool,

    #[serde(alias = "startYear")]
    #[serde(deserialize_with = "csv::invalid_option")]
    start_year: Option<u16>,

    #[serde(alias = "endYear")]
    #[serde(deserialize_with = "csv::invalid_option")]
    end_year: Option<u16>,

    #[serde(alias = "runtimeMinutes")]
    #[serde(deserialize_with = "csv::invalid_option")]
    runtime_minutes: Option<u16>,

    #[serde(alias = "genres")]
    #[serde(deserialize_with = "csv::invalid_option")]
    genres: Option<String>,
}

fn default_label() -> String {
    "movies".to_string()
}

/// Deserialize bool from String with custom value mapping
fn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
    D: Deserializer<'de>,
{
    match String::deserialize(deserializer)?.as_ref() {
        "1" => Ok(true),
        "0" => Ok(false),
        _ => Ok(false),
    }
}

I decide to use csv::ReaderBuilder to read the CSV file. To read this CSV data, I set the following:

  • Enable headers. This should skip the first line.
  • Change the delimiter from "," to "tabs".
  • Escape the backslash.
  • Permit flexible length records since some are in a strange format.

Instead of dealing with arbitrary records, I use Serde to deserialize records with specific types. For example, I have applied Serde annotation to attributes to map the original CSV to mine.

#[derive(Debug, Deserialize, Serialize)]
struct Record {
    #[serde(alias = "tconst")]
    id: String,
  .....

Because the "isAdult" column in the original CSV is in the form of "1" and "0" and not pure boolean, I need to convert them so, I wrote an extension:

/// Deserialize bool from String with custom value mapping
fn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
    D: Deserializer<'de>,
{
    match String::deserialize(deserializer)?.as_ref() {
        "1" => Ok(true),
        "0" => Ok(false),
        _ => Ok(false),
    }
}

Finally, because other data is not mandatory in the original CSV and it will appear as /N, I must handle the wrong deserialization when the target is in a different type.

For this, I can use csv::invalid_option:

#[serde(deserialize_with = "csv::invalid_option")]

It will tell Serde to convert any deserialization errors on this field to a None value.

The outcome of the serialization will be a CSV with empty values (note the ",,")

~id,~label,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt0000001,movies,short,Carmencita,Carmencita,false,1894,,1,"Documentary,Short"

Conclusion

Currently, AWS Lambda has 512-MB temporary storage, so this use case will not fit because of the file size. However, if we have more significant temporary storage one day, we could run it inside the AWS Lambda.

One alternative is to use Amazon EFS, a fully managed, flexible, shared file system designed to be consumed by other AWS services. It was announced on Jun 16, 2020. AWS Lambda will automatically mount the file system and provide a local path to read and write data. If you want to read more, there is an excellent article here.

Another alternative is to use AWS Batch with spot instances in conjunction with AWS Step Functions leveraging the service integration Run a Job (.sync) pattern. After calling AWS Batch submitJob, the workflow pauses. When the job is complete, Step Functions progresses to the next state.

The CSV crate does a fantastic job, and it is unbelievable faster. I can run this script on my computer in release mode and process it all in around 10seconds.