Real-time Data Processing with AWS DynamoDB Streams and Lambda: A Beginner’s Guide

Table of contents

No heading

No headings in the article.

Setting up an AWS DynamoDB Stream with a Lambda function can be a powerful way to respond to CRUD (Create, Read, Update, Delete) operations on your DynamoDB table. This setup allows you to react to changes in your table in real-time and perform various actions, such as updating other systems, sending notifications, or triggering additional processing.

In this blog post, we will walk through the steps to set up a DynamoDB Stream and a Lambda function to process the stream. We will also provide examples using the AWS SDK for JavaScript to demonstrate how to handle CRUD operations.

Step 1: Create a DynamoDB Table

First, we need to create a DynamoDB table. You can do this using the AWS Management Console or the AWS SDK.

For this example, let’s create a table named Users with a userId as the partition key:

const AWS = require('aws-sdk');
const dynamodb = new AWS.DynamoDB();

const params = {
  TableName: 'Users',
  KeySchema: [
    { AttributeName: 'userId', KeyType: 'HASH' }
  ],
  AttributeDefinitions: [
    { AttributeName: 'userId', AttributeType: 'S' }
  ],
  ProvisionedThroughput: {
    ReadCapacityUnits: 5,
    WriteCapacityUnits: 5
  }
};

dynamodb.createTable(params, (err, data) => {
  if (err) {
    console.error('Unable to create table. Error JSON:', JSON.stringify(err, null, 2));
  } else {
    console.log('Created table. Table description JSON:', JSON.stringify(data, null, 2));
  }
});

Step 2: Enable DynamoDB Streams

Next, we need to enable DynamoDB Streams on the Users table:

const params = {
  TableName: 'Users',
  StreamSpecification: {
    StreamEnabled: true,
    StreamViewType: 'NEW_AND_OLD_IMAGES'
  }
};

dynamodb.updateTable(params, (err, data) => {
  if (err) {
    console.error('Unable to update table. Error JSON:', JSON.stringify(err, null, 2));
  } else {
    console.log('Updated table with stream. Table description JSON:', JSON.stringify(data, null, 2));
  }
});

Step 3: Create a Lambda Function

Now, let’s create a Lambda function to process the DynamoDB stream. This function will be triggered whenever a record is created, updated, or deleted in the Users table:

const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient();

exports.handler = async (event) => {
  for (const record of event.Records) {
    console.log('Processing record:', JSON.stringify(record, null, 2));

    if (record.eventName === 'INSERT') {
      await handleInsert(record);
    } else if (record.eventName === 'MODIFY') {
      await handleModify(record);
    } else if (record.eventName === 'REMOVE') {
      await handleRemove(record);
    }
  }
};

async function handleInsert(record) {
  const { userId, name, email } = record.dynamodb.NewImage;
  console.log(`New user inserted. userId: ${userId}, name: ${name}, email: ${email}`);
  // Perform actions like sending notifications, updating other systems, etc.
}

async function handleModify(record) {
  const { userId, name, email } = record.dynamodb.NewImage;
  console.log(`User modified. userId: ${userId}, name: ${name}, email: ${email}`);
  // Perform actions like sending notifications, updating other systems, etc.
}

async function handleRemove(record) {
  const { userId } = record.dynamodb.OldImage;
  console.log(`User removed. userId: ${userId}`);
  // Perform actions like sending notifications, updating other systems, etc.
}

Step 4: Connect Lambda to DynamoDB Stream

Finally, we need to connect the Lambda function to the DynamoDB stream. This can be done using the AWS Management Console or the AWS SDK. Here’s an example using the AWS SDK:

const AWS = require('aws-sdk');
const lambda = new AWS.Lambda();

const params = {
  FunctionName: 'myDynamoDBStreamLambdaFunction', // Replace with your Lambda function name
  Enabled: true,
  EventSourceArn: 'arn:aws:dynamodb:REGION:ACCOUNT_ID:table/Users/stream/2024-03-12T18:37:03.465', // Replace with your DynamoDB stream ARN
};

lambda.createEventSourceMapping(params, (err, data) => {
  if (err) {
    console.error('Unable to connect Lambda to stream. Error JSON:', JSON.stringify(err, null, 2));
  } else {
    console.log('Connected Lambda to stream. Mapping JSON:', JSON.stringify(data, null, 2));
  }
});

Conclusion

In this blog post, we’ve covered how to set up a DynamoDB Stream and a Lambda function to process CRUD operations in real time. By following these steps and examples, you can create powerful and responsive applications that react to changes in your DynamoDB tables.

Did you find this article valuable?

Support Mahtab Haider by becoming a sponsor. Any amount is appreciated!