Integrate Kafka with a Serverless application

We are all working with the microservices pattern, or at least in a context with multiple teams where information exchange is necessary.

Introduction

In an event-driven-application setup, I usually see 3 main components to exchange messages:

  1. Apache Kafka is an open-source, high-performance, fault-tolerant, and scalable platform for building real-time streaming data pipelines and applications. Streaming data is continuously generated by thousands of data sources, which simultaneously send the data records in.

  2. Amazon EventBridge is a serverless service that uses events to connect application components, making it easier to build scalable event-driven applications. Event-driven architecture builds loosely coupled software systems that work together by emitting and responding to events.

  3. Amazon Kinesis is an AWS version similar to Kafka, a high-performance, fault-tolerant, and scalable platform for building real-time streaming data pipelines and applications. Streaming data is continuously generated by thousands of data sources, which send the data records simultaneously.

While Apache Kafka and Amazon Kinesis share many similarities in terms of functionality, they are not identical. Their choice often depends on factors such as use cases and infrastructure.

In theory, streaming platforms like Kafka and Kinesis Data Streams and event routers like EventBridge serve different purposes and have different characteristics:

  • Streaming Platforms are designed to handle high-volume real-time data streams. They provide capabilities for storing, processing, and analysing streams of records in real-time. They are often used in cases where large amounts of live data need to be processed and analysed, such as log monitoring, real-time analytics, and IoT data processing.
  • Event Routers are designed to route events from various sources to different destinations. Event-driven architectures often use them to decouple microservices, orchestrate workflows, or integrate with third-party services.

Both types of systems deal with events or messages, and the difference is subtle. I have often found a reality where those services are used for the same reasons.

Image description

In the company where I work, we use Confluent Apache Kafka as a common bus for all teams. Regardless of the architecture, whether it's Cluster or Serverless, they all subscribe to or publish to Kafka.

The direct integration to Kafka requires code for:

  • Filtering
  • Transforming the message
  • Moving to some other downstream service
  • Handling the error

The problem with writing code is that you must maintain it, which is never the same over time. When the architecture grows, sometimes it gets wild, and there is no longer a standard.

For example, Kafka has the concept of Consumer Groups. A Kafka Consumer Group is a feature that allows a pool of consumers (each with a unique ID) to divide up the processing of data over a collection of Kafka topics.

In simple words, it means this:

  • Kafka is configured with 3 consumer groups for a Topic
  • I have 3 workers that subscribe to that Topic

When a worker (consumer) fails to process a message and throws an error, Kafka's default behaviour is to retry delivering the message. If the worker continues to throw an error for the same message, this can lead to repeated retries, blocking the consumer group and creating a lag.

The solution could be for the worker to catch the error and either handle it or ignore it. It can then manually commit the offset for the message, telling Kafka it has been processed. This will prevent Kafka from trying to redeliver the same message or trying to move it to a DLQ.

Even if it is logically easy, I can guarantee that with time, there will be services that do it, some will swallow the error, and so on, creating a mess.

I usually try to mitigate issues I can have while working with distributed systems, so I tend to migrate to or use the AWS Serverless offering.

Integration types

I know I sound like a broken record, but I'll repeat it: the AWS Serverless offering is 2 things:

  1. AWS takes care of scalability and availability and patching of their services
  2. I integrate their managed services and focus on what truly matters: writing code and delivering business value.

When you are working with Apache Kafka in AWS setup, there are 2 types of integrations:

  1. Amazon EventBridge open-source connector for Apache Kafka Connect
  2. Amazon EventBridge Pipes

This Kafka sink connector for Amazon EventBridge allows you to send events (records) from one or multiple Kafka topic(s) to the specified event bus, including useful features such as:

  • offloading large events to S3
  • configurable Topic to event detail-type name mapping custom IAM profiles per connector
  • IAM role-based authentication
  • support for dead-letter queues
  • schema registry support for Avro and Protocol Buffers (Protobuf).

Image description

The connector is currently in version v1.3.0 and has great features compared to Pipes. These are offloading large events to S3 and automatic dead-letter queues, which is a different topic where you can send the failed message without writing code to handle the errors in the consumer logic.

On the other hand, without access to the cluster or relying on a different team for this setup, the best choice to move forward and apply standards to my services is to use Amazon EventBridge Pipes.

Image description

EventBridge Pipes is an incredible service that offers point-to-point connections between a source and a target. This feature can be very powerful, especially when we use EB as a target. Pipes handle all point-to-point connections efficiently, allowing us to transform the message and ensure that the downstream consumer can understand it.

There are so many other concepts, but the most exciting part of this service is that it enables me to enrich and modify the message before passing it to the downstream consumer.

If we compare the connector and pipe at a glance in the context of Kafka, the most significant differentiators are DLQ/S3 offloading (until available in Pipes) and Avro/Protobuf on the connector side vs. super simple setup/operations and enrichment with Pipes.

Kafka messages can go up to 32MB, while serverless services like EventBridge/SQS/SNS can handle only a maximum of 256KB while Lambda is up to 6MB While it seems unrealistic to have such events, having events bigger than 256KB is not unrealistic.

What is the problem with such messages?

This is known as a poisoned message. The Amazon EventBridge open-source connector for Apache Kafka Connect can handle this case because it moves the message to a different Kafka Topic or is configured to offload large payloads to S3 and send the “trimmed-down” event to EventBridge, while with Amazon EventBridge Pipes and other types like Lambda/Cluster, it will block the processing of your messages from the Kafka consumer group until the Pipes retries are exhausted. Because the message cannot be moved to a DLQ (SQS only supported), it will be lost forever without you knowing about it.

A limited solution with EventBridge Pipes is to leverage the enrichment concept. With the Claim Check pattern, we can circumvent this limitation. However, this works only if the message in input is not over the 6MB Lambda payload limit.

Image description

It is particularly useful when dealing with large data sets or messages. The pattern works by storing the large message in a data store and then passing a reference to the data to subsequent components. These components can then use the claim check to retrieve the data.

Publishing to Kafka

Until now, I wrote only about how to configure Kafka to subscribe to the message, but I also need to publish, and we have 2 options:

  1. Writing code using the SDK
  2. Kafka proxy via API destinations

Writing code is simple once downloaded from the official SDK library. In this example, I will use KafkaJS

const producer = kafka.producer()

await producer.connect()
await producer.send({
    topic: 'topic-name',
    messages: [
        { key: 'key1', value: 'hello world' },
        { key: 'key2', value: 'hey hey!' }
    ],
})

An alternative is to use Confluent REST Proxy for Apache Kafka, and I can do this without writing any code but only leveraging the Serverless offering using EventBridge API destinations.

#########################################################################
#  FROM EB TO KAKFKA
##########################################################################
  ApiDestinationsTargetRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: events.amazonaws.com
            Action: sts:AssumeRole
      Path: /service-role/
      Policies:
        - PolicyName: secretmanager
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - "secretsmanager:DescribeSecret"
                  - "secretsmanager:GetSecretValue"
                Resource: "*"
        - PolicyName: destinationinvoke
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - events:InvokeApiDestination
                Resource:
                  - !GetAtt KafkaProxyApi.Arn

  MyApiDestinationFailedDeliveryDLQ:
    Type: AWS::SQS::Queue

  MyApiDestinationFailedDeliveryDLQPolicy:
    Type: AWS::SQS::QueuePolicy
    Properties:
      Queues:
        - !Ref MyApiDestinationFailedDeliveryDLQ
      PolicyDocument:
        Statement:
        - Effect: Allow
          Principal:
            Service: events.amazonaws.com
          Action: SQS:SendMessage
          Resource: !GetAtt MyApiDestinationFailedDeliveryDLQ.Arn

  ApiConnection:
    Type: AWS::Events::Connection
    Properties:
      AuthorizationType: BASIC
      AuthParameters:
        BasicAuthParameters:
          Password: '{{resolve:secretsmanager:confluent_cloud/service_account/myservice:SecretString:password}}'
          Username: '{{resolve:secretsmanager:confluent_cloud/service_account/myservice:SecretString:username}}'

  KafkaProxyApi:
    Type: AWS::Events::ApiDestination
    Properties:
      ConnectionArn: !GetAtt ApiConnection.Arn
      HttpMethod: POST
      InvocationEndpoint:
        Fn::Sub:
          - '${RegistryUrl}/kafka/v3/clusters/${ClusterId}/topics/${Topic}/records'
          - RegistryUrl: '{{resolve:secretsmanager:confluent_cloud/service_account/myservice:SecretString:schema_registry_url}}'
            ClusterId: '{{resolve:secretsmanager:confluent_cloud/service_account/myservice:SecretString:cluster_id}}'
            Topic:
              Fn::FindInMap:
                - KafkaTopic
                - Ref: StageName
                - MyApiDestinationModified
      InvocationRateLimitPerSecond: !FindInMap [Api, !Ref StageName, 'invocationRateLimitPerSecond']

  MyApiDestinationAddedRule:
    Type: AWS::Events::Rule
    Properties:
      EventBusName: !Ref LocalBusName
      EventPattern:
        source:
          - "mysource"
        detail-type:
          - "SOMETHING_CREATED"
      State: ENABLED
      Targets:
        - Id: MyApiDestination-added
          Arn: !GetAtt KafkaProxyApi.Arn
          RoleArn: !GetAtt ApiDestinationsTargetRole.Arn
          InputTransformer:
            InputPathsMap:
              prop1: $.detail.data.prop1
              prop2: $.detail.data.prop2
              createdAt: $.detail.data.createdAt
            InputTemplate: |-
              {
                "key": {
                  "type": "JSON",
                  "data": "<prop2>"
                },
                "value": {
                  "type": "JSON",
                  "data": {
                    "prop1":  "<prop1>",
                    "prop2":  "<prop2>",
                    "_timestamp": <createdAt>,
                    "action": "CREATED"
                  }
                }
              }
          HttpParameters:
            HeaderParameters:
              Content-Type: "application/json"
          DeadLetterConfig:
            Arn: !GetAtt MyApiDestinationFailedDeliveryDLQ.Arn
          RetryPolicy:
            MaximumRetryAttempts: 3
            MaximumEventAgeInSeconds: 60

Conclusion

In this article, I discussed two types of integration with Apache Kafka and how we can integrate into the AWS ecosystem without giving up the convenience of using managed services. At the time of writing, Kafka EventBridge Connector and Amazon EventBridge Pipes have some differences, which can significantly impact architectural decisions. With time, they will become more equal, but until then, it is crucial to understand the specific use case and the tradeoffs each configuration can bring. Ultimately, I presented two ways to publish messages into Kafka, and again, it is crucial to understand the limitations. For example, using the HTTP API Destination can lead to quota limitations at both the AWS and Confluent Kafka levels. While these limitations are sometimes flexible, writing simple code in highly concurrency scenarios may be easier.