Consuming a DynamoDB Stream without AWS Lambda

May 15, 2022

DatabaseBackend

In this article, we build a basic DynamoDB Stream consumer using the low-level DynamoDB Stream API and without using AWS Lambda triggers.

DynamoDB Stream

Most databases have a mechanism that allows for replication in a cluster of database servers or between different systems.

DynamoDB has DynamoDB Streams. It is all serverless and comes with its own peculiarities. You commit some change to a DynamoDB table and that creates an event in the stream. For example, if you insert a new document in DynamoDB that only contains: { "Type": "INSERT", NewImage: { name: "Bob" } }.

The recommended way to consume a DynamoDB Stream is to use a Lambda trigger. Under the hood, the DynamoDB Stream trigger for Lambda will poll the stream and invoke the Lambda whenever it retrieves new records / events.

That's great and all but what if you do not want to process the events in a Lambda. The answer is that you can go about consuming your DynamoDB Stream many different ways without having to roll your own consumer:

  1. Plug the stream into a Lambda and pass the events on to some sort of queue (e.g. SQS) and consume that queue instead.
  2. Plug the stream into a Lambda and pass the events on to some sort of pubsub and fanout from there.
  3. Use the DynamoDB Streams Kinesis Adapter. It is a Java-based facade that will do much of the work for you and expose a Kinesis-like interface for you to interact with the stream.
  4. DynamoDB now supports Kinesis Data Stream replication and you could use a Kinesis client library to consume that.

A couple of reasons that might make you not use one of (1), (2) above could be that you want to avoid infrastructure complexity. Having an ever-growing number of Lambdas can become hard to maintain and could make debugging harder. It could also be that you want to do something local to a specific server filesystem with your consumer. You might also want to avoid (3) if you and your team do not have much experience with Java.

These are all legitimate or semi-legitimate reasons for not using the usual ways. But be warned that building a reliable DynamoDB Stream consumer without using AWS Lambda will probably make your life harder than it needs to be.

DynamoDB Stream internals

Let's define some of the jargon first:

  • A Stream is a collection of shards.
  • A Shard is a collection of records distributed over time and you can page through the shard using a shard iterator.
  • A Record is a change event from the DynamoDB table. For example, an event showing that some record in the database was modified.
  • A Shard iterator is roughly similar to a pagination pointer or a bookmark. You can query the shard to retrieve its first iterator, its latest one, or some specific iterator. Then you can use the iterator to query the corresponding bucket of records, from there you obtain the next shard iterator, and so on.

Here is a useful reference page:

It might help to visualize a DynamoDB Stream as a collection of parallel linked lists wherein a shard is a single linked list where iterators are pointers to its nodes and nodes contain records. As the stream progresses, new nodes get appended to the tail the linked lists, some lists get dropped and new ones get created.

Consuming the DynamoDB Stream

Initialize our consumer

First we need to retrieve the shards and keep a reference to the iterator we want to start consuming the stream from. In this example we start from LATEST but could have used TRIM_HORIZON to start from the oldest point or AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER to start from specific points in time.

TypeScript
// retrieve all the shards from API
shards = await fetchShards()

// index shards by their id
shardsById = new Map<ShardId, Shard>()
for (const shard of shards) {
    shardsById.set(shard.id, shards)
}

// fetch iterator for each shard and set pointer
await Promise.all(
    shards.map(async (shard) => {
        const iterator = await shard.getIterator('LATEST')
        shard.setIterator(iterator)
    })
)

In the example above, fetchShards abstracts away a basic call to the GetShardIterator DynamoDB API command . The code would look something like this:

TypeScript
import {
  DescribeStreamCommand,
  DescribeStreamCommandInput,
  DynamoDBStreamsClient,
} from '@aws-sdk/client-dynamodb-streams';

const client = new DynamoDBStreamsClient();

// return the shard object wrapped in a convenience class
// where you could implement a `setIterator` method and keep
// track of the internal state of the shard for example
const getShards = async (): Promise<ShardWrapper[]> => {
    const input: DescribeStreamCommandInput = {
      StreamArn: this.stream.StreamArn,
    };
    const command = new DescribeStreamCommand(input);
    const data = await client.send(command);
    const shards = data.StreamDescription?.Shards;

    return shards.map((shard) => new ShardWrapper(this.stream.StreamArn, client, shard));
}

You could do it without writing a ShardWrapper class, but then you would have to lift all the shard-specific data up to the more global polling part of the code. This gets unwieldy real quickly so you are better off keeping shard-specific data in a shard-specific construct and make each shard responsible for fetching its own records and maintaining its iterator state.

Your ShardWrapper class could look something like this:

TypeScript
import {
  ShardIteratorType,
  GetShardIteratorCommandInput,
  GetShardIteratorCommand,
  GetRecordsCommand,
  Shard,
  GetRecordsCommandInput,
  _Record,
} from '@aws-sdk/client-dynamodb-streams';

export type ShardIterator = string;

class DynamoStreamShard {
  private client: DynamoDBStreamsClient;
  private streamArn: string
  private shard: Shard;
  private iterator?: ShardIterator;

  // you might want to add an interface on top of the client to make testing easier
  constructor(streamArn: string, client: DynamoDBStreamsClient, shard: Shard) {
    this.streamArn = streamArn
    this.client = client;
    this.shard = shard;
  }

  /**
   * Fetches iterator for the shard.
   *
   * @remarks
   * AWS SDK documentation here:
   * https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-dynamodb-streams/interfaces/getsharditeratorcommandinput.html
   *
   * @param iterType the iterator type
   * @param sequenceNumber optional sequenceNumber to start from
   * @returns the shard iterator or null (if at the end of the shard)
   */
  public async getIterator(
    iterType: ShardIteratorType,
    sequenceNumber?: string
  ): Promise<ShardIterator | null> {
    const input: GetShardIteratorCommandInput = {
      StreamArn: this.streamArn,
      ShardIteratorType: iterType as ShardIteratorType,
      ShardId: this.getShardId(),
      SequenceNumber: sequenceNumber, // optional
    };
    const command = new GetShardIteratorCommand(input);
    const data = await this.client.send(command);
    return data.ShardIterator || null;
  }

  /**
   * Fetches the records for this specific shard and iterator.
   *
   * @remarks
   * AWS SDK documentation here:
   * https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-dynamodb-streams/interfaces/getrecordscommandinput.html
   *
   */
  public async getRecords(): Promise<_Record[]> {
    const input: GetRecordsCommandInput = {
      ShardIterator: this.iterator,
      Limit: undefined,
    };
    const command = new GetRecordsCommand(input);
    const data = await this.client.send(command);
    this.setIterator(data.NextShardIterator!);
    return data.Records || [];
  }

  /**
   * Set the shard to a specific iterator position.
   * The basic gist is we pass it a new iterator to make it move
   * one step through its backlog - like pagination pretty much.
   */
  public setIterator(iterator: ShardIterator): void {
    this.iterator = iterator;
  }

  /**
   * Returns the shardId.
   * @returns {string}
   */
  public getShardId(): string {
    return this.shard.ShardId!;
  }
}

We have the shards and we have iterators for each of the shards we just retrieved. We are all done with the initialization and now need to start polling the stream continuously.

Polling

We need to do a couple of things in the loop. We need to retrieve new shards and initialize the iterators for these shards, in very much the same way as we did when initializing our shards in the previous code blocks.

Then we need to poll each shard for records and move the iterators for each shard forward. If a shard has no next iterator, that means we have consumed it all up and we can safely drop it.

TypeScript
// this is the polling loop that needs to be invoked at a
// frequency that is appropriate for your case

const poll = async () => {
    // fetch shards and add newly discovered shards to the map
    const shards = await fetchShards();
    const newShards = [];
    for (const shard of shards) {
        const isNewShard = !shardsById.has(shard.id);
        if (isNewShard) {
            newShards.push(shard);
        }
    }

    // fetch and initialize iterators for the new shards
    await Promise.all(
        newShards.map((shard) => async () => {
            const iterator = await shard.getIterator('LATEST');
            if (iterator) {
                shard.setIterator(iterator);
                this.shards.set(shard.id, shard);
            }
        });
    )

    // iterate over our now up-to-date list of shards
    for (const shard of shards) {
        // reached the tail of the shard, can discard it
        const hasNext = !!this.iterator
        if (!hasNext) {
            this.shards.delete(shard.id);
            continue;
        }

        // fetch the records for the shard and point to the
        // next iterator the API returned
        const { records, nextShardIterator } = await shard.getRecords();
        shard.setIterator(nextShardIterator)

        // process records if any
        await Promise.all(
            records.map(async (record) => {
                // 'INSERT' | 'MODIFY' | 'REMOVE'
                const eventType = record.eventName;
                const { newImage, oldImage } = record.dynamodb;

                // newImage is undefined if type is 'REMOVE'
                const image = newImage
                    ? (unmarshall(newImage!))
                    : (unmarshall(oldImage!));

                // some sort of callback or whatever processing of event
                emit(eventType, image)
            });
        )
    }
};

There are a bunch of edge cases that are outside the scope of this basic high-level overview of what a homemade DynamoDB Stream consumer could look like, but this is the basic idea and that works well for low throughput streams.

Unless you have a good reason to roll your own DynamoDB Stream consumer, you are probably better off using one of the alternatives listed at the beginning of this article - using DynamoDB Streams Kinesis Adapter or using a Lambda trigger.