Building a serverless data ingestion pipeline with aws (S3, Lambda, SQS, DynamoDB)

A step-by-step technical guide with full code and CLI examples on how to build a robust serverless data pipeline that overcomes standard Lambda limitations. By decoupling the architecture with an SQS buffer, we create a system that elegantly handles large file processing, DynamoDB throttling, and failed message retries, making it a powerful solution for production environments.

Published on 4 apr 2026

Building a serverless data ingestion pipeline with aws (S3, Lambda, SQS, DynamoDB)

Introduction

Processing large CSV files in AWS Lambda can quickly become challenging. A naive approach fails because:

  • Lambda has a 15-minute timeout — large files can't be fully processed
  • Row-by-row writes to DynamoDB are slow and inefficient
  • No retry mechanism — failed operations lose data
  • Backpressure issues — fast reads from S3 overwhelm slow database writes
  • Partial data loss — if Lambda times out, you don't know what was stored

This guide walks through a production-ready serverless architecture that handles all these challenges elegantly.


The Problem with Simple Architectures

Naive Approach: Direct S3 → Lambda → DynamoDB

S3 (File Upload)
  ↓
Lambda (15 min timeout)
  ↓
DynamoDB (Slow writes)

Why this fails:

IssueImpact
15-minute Lambda timeoutLarge files never finish processing
🐢 Slow DynamoDB writes100 rows/sec = 1.5 hours for 500K rows
No retry mechanismNetwork hiccup = data loss
⚠️ BackpressureS3 reads faster than DynamoDB writes
💥 Partial failuresLambda dies at row 50K; rows 50K-100K missing

Final Scalable Architecture

┌─────────┐
│   S3    │ (User uploads users.csv)
│ (File)  │
└────┬────┘
     │
     │ (S3:ObjectCreated event)
     ▼
┌──────────────────────────┐
│  Producer Lambda         │ ← Triggered by S3 event
│ (Parse CSV + Send SQS)   │
└────────────┬─────────────┘
             │
      (SQS SendMessage)
             │
             ▼
        ┌────────┐
        │  SQS   │ ← Buffer/Queue
        │(Queue) │  (Batches of messages)
        └────┬───┘
             │
        (Batch trigger)
             │
             ▼
┌──────────────────────────┐
│  Worker Lambda           │ ← Scales based on queue depth
│ (Write to DynamoDB)      │
└────────────┬─────────────┘
             │
   (BatchWriteItem)
             │
             ▼
       ┌──────────┐
       │DynamoDB  │ ← Persistent storage
       │(users)   │
       └──────┬───┘
              │
              │ (GetItem)
              │
              ▼
       ┌──────────────┐
       │Query Lambda  │ ← Fetch user by email
       └──────────────┘
              │
              │ (GetItem)
              │
              ▼
       ┌─────────────┐
       │API Gateway  │ ← REST API
       └──────┬──────┘
              │
              ▼

Key Benefits:

Decoupled architecture — Producer and worker are independent
Automatic scaling — Lambda scales based on queue size
Built-in retry — SQS retries failed messages
Backpressure handling — SQS acts as a buffer
No timeouts — Each Lambda processes small batches
Exactly-once semantics — DynamoDB primary key prevents duplicates


Prerequisites & Setup

Required AWS Services

  • ✅ S3 (Storage)
  • ✅ Lambda (Compute)
  • ✅ SQS (Message Queue)
  • ✅ DynamoDB (Database)
  • ✅ API Gateway (REST API)
  • ✅ IAM (Permissions)
  • ✅ CloudWatch (Logs)

Tools & SDKs

# Install AWS CLI (if not already installed) curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" unzip awscliv2.zip sudo ./aws/install # Configure AWS credentials aws configure # Enter: Access Key ID, Secret Access Key, Region (ap-south-1), Output format (json) # Install Node.js 24+ (for Lambda runtime) node --version # Should be v18+ # Install npm dependencies locally (for testing) npm init -y npm install @aws-sdk/client-s3 @aws-sdk/client-sqs @aws-sdk/client-dynamodb csv-parser

Step 1: Create AWS Resources

1.1 Create S3 Bucket

# Create bucket (replace with unique name) aws s3 mb s3://csv-processing-pipeline-$(date +%s) --region ap-south-1 # Store bucket name in variable BUCKET_NAME="csv-processing-pipeline-1234567890" export BUCKET_NAME # Create folder for uploads aws s3api put-object --bucket $BUCKET_NAME --key uploads/ --region ap-south-1 # Enable versioning (optional but recommended) aws s3api put-bucket-versioning \ --bucket $BUCKET_NAME \ --versioning-configuration Status=Enabled \ --region ap-south-1

1.2 Create SQS Queue

# Create standard queue (not FIFO) aws sqs create-queue \ --queue-name user-csv-queue \ --region ap-south-1 # Store queue URL QUEUE_URL=$(aws sqs get-queue-url \ --queue-name user-csv-queue \ --region ap-south-1 \ --query 'QueueUrl' \ --output text) echo "Queue URL: $QUEUE_URL" export QUEUE_URL # Get Queue ARN (needed for IAM permissions) QUEUE_ARN=$(aws sqs get-queue-attributes \ --queue-url $QUEUE_URL \ --attribute-names QueueArn \ --region ap-south-1 \ --query 'Attributes.QueueArn' \ --output text) echo "Queue ARN: $QUEUE_ARN" export QUEUE_ARN

1.3 Create DynamoDB Table

# Create table aws dynamodb create-table \ --table-name users \ --attribute-definitions \ AttributeName=email,AttributeType=S \ AttributeName=createdAt,AttributeType=N \ --key-schema \ AttributeName=email,KeyType=HASH \ AttributeName=createdAt,KeyType=RANGE \ --billing-mode PAY_PER_REQUEST \ --region ap-south-1 # Wait for table to be created aws dynamodb wait table-exists --table-name users --region ap-south-1 # Add TTL (optional, auto-delete after 90 days) aws dynamodb update-time-to-live \ --table-name users \ --time-to-live-specification "Enabled=true, AttributeName=expirationTime" \ --region ap-south-1 # Verify table creation aws dynamodb describe-table --table-name users --region ap-south-1 --query 'Table.{TableName:TableName,Status:TableStatus}'

1.4 Get AWS Account ID

# Store your AWS Account ID (needed for IAM policies) ACCOUNT_ID=$(aws sts get-caller-identity --query 'Account' --output text) echo "Account ID: $ACCOUNT_ID" export ACCOUNT_ID

Step 2: Producer Lambda

Overview

The Producer Lambda is triggered when a CSV file is uploaded to S3. It:

  1. Reads the CSV file from S3
  2. Parses each row using csv-parser
  3. Validates the data
  4. Sends each row as a message to SQS

This keeps the Lambda execution time low (< 5 minutes).

2.1 Create IAM Role for Producer

# Create trust policy for Lambda cat > /tmp/lambda-trust-policy.json << 'EOF' { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } EOF # Create role aws iam create-role \ --role-name ProducerLambdaRole \ --assume-role-policy-document file:///tmp/lambda-trust-policy.json \ --region ap-south-1 # Attach basic Lambda execution policy aws iam attach-role-policy \ --role-name ProducerLambdaRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole # Create inline policy for S3 and SQS cat > /tmp/producer-policy.json << EOF { "Version": "2026-04-04", "Statement": [ { "Sid": "ReadCSVFromS3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": "arn:aws:s3:::${BUCKET_NAME}/*" }, { "Sid": "ListS3Bucket", "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::${BUCKET_NAME}" }, { "Sid": "SendToSQS", "Effect": "Allow", "Action": "sqs:SendMessage", "Resource": "${QUEUE_ARN}" } ] } EOF aws iam put-role-policy \ --role-name ProducerLambdaRole \ --policy-name ProducerInlinePolicy \ --policy-document file:///tmp/producer-policy.json echo "Producer IAM role created: ProducerLambdaRole"

2.2 Producer Lambda Code

# Create directory for Lambda code mkdir -p /home/claude/producer-lambda cd /home/claude/producer-lambda # Initialize Node.js project npm init -y # Install dependencies npm install @aws-sdk/client-s3 @aws-sdk/client-sqs csv-parser # Create index.js cat > index.js << 'EOF' import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3"; import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs"; import csv from "csv-parser"; const s3 = new S3Client({ region: process.env.AWS_REGION }); const sqs = new SQSClient({ region: process.env.AWS_REGION }); const QUEUE_URL = process.env.QUEUE_URL; const BATCH_SIZE = 10; /** * Producer Lambda Handler * Triggered by S3:ObjectCreated event * Parses CSV and sends messages to SQS */ export const handler = async (event) => { console.log("Producer Lambda triggered:", JSON.stringify(event, null, 2)); const record = event.Records[0]; const bucket = record.s3.bucket.name; const key = decodeURIComponent(record.s3.object.key); // Skip if not a CSV file if (!key.endsWith('.csv')) { console.log(`Skipping non-CSV file: ${key}`); return { statusCode: 400, body: 'Only CSV files are allowed' }; } console.log(`Processing: s3://${bucket}/${key}`); try { // Get the CSV file from S3 const response = await s3.send( new GetObjectCommand({ Bucket: bucket, Key: key }) ); const stream = response.Body; const promises = []; let rowCount = 0; let validRows = 0; let skippedRows = 0; return new Promise((resolve, reject) => { stream .pipe(csv()) .on("data", (data) => { rowCount++; // Validate required fields if (!data.name || !data.mail) { console.warn(`Row ${rowCount}: Missing required fields (name, mail)`); skippedRows++; return; } // Clean and prepare data const message = { name: data.name.trim(), mail: data.mail.trim().toLowerCase(), phone: (data.phone || "").trim(), processedAt: new Date().toISOString(), rowNumber: rowCount }; validRows++; // Send to SQS const sendPromise = sqs.send( new SendMessageCommand({ QueueUrl: QUEUE_URL, MessageBody: JSON.stringify(message), MessageGroupId: "csv-processing", // For FIFO queues only MessageAttributes: { email: { StringValue: message.mail, DataType: "String" } } }) ).catch(err => { console.error(`Failed to send message for row ${rowCount}:`, err); throw err; }); promises.push(sendPromise); }) .on("end", async () => { console.log(`CSV parsing complete. Total rows: ${rowCount}, Valid: ${validRows}, Skipped: ${skippedRows}`); try { // Wait for all SQS messages to be sent const results = await Promise.all(promises); console.log(`All ${validRows} messages sent to SQS`); resolve({ statusCode: 200, body: JSON.stringify({ message: "CSV processed successfully", totalRows: rowCount, validRows: validRows, skippedRows: skippedRows, messagesSent: results.length }) }); } catch (error) { console.error("Error sending messages to SQS:", error); reject(error); } }) .on("error", (error) => { console.error("CSV parsing error:", error); reject(error); }); }); } catch (error) { console.error("Error in Producer Lambda:", error); return { statusCode: 500, body: JSON.stringify({ error: error.message }) }; } }; EOF # Create package.json with ES modules support cat > package.json << 'EOF' { "name": "producer-lambda", "version": "1.0.0", "type": "module", "main": "index.js", "scripts": { "test": "node index.js" }, "dependencies": { "@aws-sdk/client-s3": "^3.400.0", "@aws-sdk/client-sqs": "^3.400.0", "csv-parser": "^3.0.0" } } EOF # Create deployment package (zip) zip -r producer.zip . -x "*.git*" "node_modules/*"

2.3 Deploy Producer Lambda

# Create Lambda function aws lambda create-function \ --function-name ProducerCSVLambda \ --runtime nodejs24.x \ --role arn:aws:iam::${ACCOUNT_ID}:role/ProducerLambdaRole \ --handler index.handler \ --zip-file fileb:///home/claude/producer-lambda/producer.zip \ --timeout 300 \ --memory-size 512 \ --environment "Variables={QUEUE_URL=${QUEUE_URL}}" \ --region ap-south-1 echo "Producer Lambda created: ProducerCSVLambda" # Add S3 trigger (S3 → Lambda) aws lambda create-event-source-mapping \ --event-source-arn arn:aws:s3:::${BUCKET_NAME} \ --function-name ProducerCSVLambda \ --region ap-south-1 2>/dev/null || echo "Event source mapping may need to be configured via S3 console" # Instead, use S3 notification configuration cat > /tmp/s3-notification.json << EOF { "LambdaFunctionConfigurations": [ { "LambdaFunctionArn": "arn:aws:lambda:ap-south-1:${ACCOUNT_ID}:function:ProducerCSVLambda", "Events": ["s3:ObjectCreated:*"], "Filter": { "Key": { "FilterRules": [ { "Name": "prefix", "Value": "uploads/" }, { "Name": "suffix", "Value": ".csv" } ] } } } ] } EOF aws s3api put-bucket-notification-configuration \ --bucket $BUCKET_NAME \ --notification-configuration file:///tmp/s3-notification.json \ --region ap-south-1 # Add S3 invoke permission to Lambda aws lambda add-permission \ --function-name ProducerCSVLambda \ --statement-id AllowS3Invoke \ --action lambda:InvokeFunction \ --principal s3.amazonaws.com \ --source-arn "arn:aws:s3:::${BUCKET_NAME}" \ --region ap-south-1

Step 3: Worker Lambda

Overview

The Worker Lambda is triggered by SQS messages. It:

  1. Receives batches of messages from SQS
  2. Validates each message
  3. Writes data to DynamoDB using BatchWriteItem
  4. Deletes messages from queue on success

3.1 Create IAM Role for Worker

# Create role aws iam create-role \ --role-name WorkerLambdaRole \ --assume-role-policy-document file:///tmp/lambda-trust-policy.json \ --region ap-south-1 # Attach basic Lambda execution policy aws iam attach-role-policy \ --role-name WorkerLambdaRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole # Create inline policy for SQS and DynamoDB cat > /tmp/worker-policy.json << EOF { "Version": "2012-10-17", "Statement": [ { "Sid": "ReceiveFromSQS", "Effect": "Allow", "Action": [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:ChangeMessageVisibility" ], "Resource": "${QUEUE_ARN}" }, { "Sid": "WriteToDynamoDB", "Effect": "Allow", "Action": [ "dynamodb:BatchWriteItem", "dynamodb:PutItem" ], "Resource": "arn:aws:dynamodb:ap-south-1:${ACCOUNT_ID}:table/users" } ] } EOF aws iam put-role-policy \ --role-name WorkerLambdaRole \ --policy-name WorkerInlinePolicy \ --policy-document file:///tmp/worker-policy.json echo "Worker IAM role created: WorkerLambdaRole"

3.2 Worker Lambda Code

# Create directory for Worker Lambda mkdir -p /home/claude/worker-lambda cd /home/claude/worker-lambda # Initialize Node.js project npm init -y # Install dependencies npm install @aws-sdk/client-dynamodb # Create index.js cat > index.js << 'EOF' import { DynamoDBClient, BatchWriteItemCommand } from "@aws-sdk/client-dynamodb"; const dynamo = new DynamoDBClient({ region: process.env.AWS_REGION }); /** * Worker Lambda Handler * Triggered by SQS BatchSize * Writes validated records to DynamoDB */ export const handler = async (event) => { console.log("Worker Lambda triggered with records:", event.Records.length); const items = []; const errors = []; let successCount = 0; // Process each SQS message for (const record of event.Records) { try { const body = JSON.parse(record.body); // Validate required fields if (!body.mail || !body.name) { errors.push({ messageId: record.messageId, error: "Missing required fields (mail, name)" }); continue; } // Create timestamp const createdAt = Math.floor(Date.now() / 1000); // Create DynamoDB item (using email as partition key, createdAt as sort key) const item = { email: { S: body.mail }, name: { S: body.name }, phone: { S: body.phone || "N/A" }, createdAt: { N: createdAt.toString() }, processedAt: { S: body.processedAt || new Date().toISOString() }, rowNumber: { N: (body.rowNumber || 0).toString() } }; items.push({ PutRequest: { Item: item } }); successCount++; } catch (err) { errors.push({ messageId: record.messageId, error: err.message }); } } // Log errors if (errors.length > 0) { console.warn(`Errors processing ${errors.length} records:`, errors); } // Write to DynamoDB if we have items if (items.length === 0) { console.log("No valid items to write"); return { batchItemFailures: event.Records.map(r => ({ itemId: r.messageId })) }; } try { // DynamoDB BatchWriteItem has a limit of 25 items per request const batchSize = 25; for (let i = 0; i < items.length; i += batchSize) { const batch = items.slice(i, i + batchSize); console.log(`Writing batch ${Math.floor(i / batchSize) + 1} with ${batch.length} items`); const params = { RequestItems: { users: batch } }; const response = await dynamo.send(new BatchWriteItemCommand(params)); // Handle unprocessed items if (response.UnprocessedItems && response.UnprocessedItems.users) { console.warn( `${response.UnprocessedItems.users.length} items unprocessed. These will be retried by SQS.` ); } } console.log(`Successfully written ${successCount} items to DynamoDB`); return { batchItemFailures: [] }; } catch (error) { console.error("Error writing to DynamoDB:", error); // Return all records as failed so SQS retries them return { batchItemFailures: event.Records.map(r => ({ itemId: r.messageId })) }; } }; EOF # Create package.json cat > package.json << 'EOF' { "name": "worker-lambda", "version": "1.0.0", "type": "module", "main": "index.js", "dependencies": { "@aws-sdk/client-dynamodb": "^3.400.0" } } EOF # Create deployment package zip -r worker.zip . -x "*.git*" "node_modules/*"

3.3 Deploy Worker Lambda

# Create Lambda function aws lambda create-function \ --function-name WorkerCSVLambda \ --runtime nodejs24.x \ --role arn:aws:iam::${ACCOUNT_ID}:role/WorkerLambdaRole \ --handler index.handler \ --zip-file fileb:///home/claude/worker-lambda/worker.zip \ --timeout 60 \ --memory-size 256 \ --region ap-south-1 echo "Worker Lambda created: WorkerCSVLambda" # Create SQS event source mapping (SQS → Lambda) aws lambda create-event-source-mapping \ --event-source-arn $QUEUE_ARN \ --function-name WorkerCSVLambda \ --enabled \ --batch-size 10 \ --maximum-batching-window-in-seconds 5 \ --region ap-south-1 # Verify event source mapping aws lambda list-event-source-mappings \ --function-name WorkerCSVLambda \ --region ap-south-1

Step 4: Query API (DynamoDB Retrieval)

4.1 Create IAM Role for Query Lambda

# Create role aws iam create-role \ --role-name QueryLambdaRole \ --assume-role-policy-document file:///tmp/lambda-trust-policy.json \ --region ap-south-1 # Attach basic Lambda execution policy aws iam attach-role-policy \ --role-name QueryLambdaRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole # Create inline policy for DynamoDB cat > /tmp/query-policy.json << EOF { "Version": "2012-10-17", "Statement": [ { "Sid": "QueryDynamoDB", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:Query", "dynamodb:Scan" ], "Resource": "arn:aws:dynamodb:ap-south-1:${ACCOUNT_ID}:table/users" } ] } EOF aws iam put-role-policy \ --role-name QueryLambdaRole \ --policy-name QueryInlinePolicy \ --policy-document file:///tmp/query-policy.json echo "Query IAM role created: QueryLambdaRole"

4.2 Query Lambda Code

# Create directory for Query Lambda mkdir -p /home/claude/query-lambda cd /home/claude/query-lambda # Initialize Node.js project npm init -y # Install dependencies npm install @aws-sdk/client-dynamodb @aws-sdk/util-dynamodb # Create index.js cat > index.js << 'EOF' import { DynamoDBClient, GetItemCommand, QueryCommand } from "@aws-sdk/client-dynamodb"; import { unmarshall } from "@aws-sdk/util-dynamodb"; const dynamo = new DynamoDBClient({ region: process.env.AWS_REGION }); /** * Query Lambda Handler * GET /users/{email} - Fetch user by email * GET /users - List all users with pagination */ export const handler = async (event) => { console.log("Query Lambda triggered:", JSON.stringify(event, null, 2)); try { const pathParameters = event.pathParameters || {}; const queryStringParameters = event.queryStringParameters || {}; const email = pathParameters.email; // Case 1: Get specific user by email if (email) { return await getUserByEmail(email); } // Case 2: List all users (with pagination) const limit = parseInt(queryStringParameters.limit || "10", 10); const startKey = queryStringParameters.startKey; return await listUsers(limit, startKey); } catch (error) { console.error("Error in Query Lambda:", error); return { statusCode: 500, headers: { "Content-Type": "application/json" }, body: JSON.stringify({ error: error.message }) }; } }; /** * Get user by email */ async function getUserByEmail(email) { console.log(`Fetching user: ${email}`); const result = await dynamo.send( new GetItemCommand({ TableName: "users", Key: { email: { S: email.toLowerCase() } } }) ); if (!result.Item) { return { statusCode: 404, headers: { "Content-Type": "application/json" }, body: JSON.stringify({ error: "User not found" }) }; } // Convert DynamoDB format to regular JSON const user = unmarshall(result.Item); return { statusCode: 200, headers: { "Content-Type": "application/json" }, body: JSON.stringify(user) }; } /** * List all users with pagination */ async function listUsers(limit, startKey) { console.log(`Listing users (limit: ${limit})`); const params = { TableName: "users", Limit: limit }; // If pagination token provided, use it if (startKey) { try { params.ExclusiveStartKey = JSON.parse(Buffer.from(startKey, 'base64').toString()); } catch (err) { console.warn("Invalid startKey provided, ignoring"); } } const result = await dynamo.send(new QueryCommand(params)); // Convert items from DynamoDB format const items = (result.Items || []).map(item => unmarshall(item)); // Create pagination token if more items exist let nextToken = null; if (result.LastEvaluatedKey) { nextToken = Buffer.from(JSON.stringify(result.LastEvaluatedKey)).toString('base64'); } return { statusCode: 200, headers: { "Content-Type": "application/json" }, body: JSON.stringify({ items, count: items.length, nextToken }) }; } EOF # Create package.json cat > package.json << 'EOF' { "name": "query-lambda", "version": "1.0.0", "type": "module", "main": "index.js", "dependencies": { "@aws-sdk/client-dynamodb": "^3.400.0", "@aws-sdk/util-dynamodb": "^3.400.0" } } EOF # Create deployment package zip -r query.zip . -x "*.git*" "node_modules/*"

4.3 Deploy Query Lambda & API Gateway

# Create Lambda function aws lambda create-function \ --function-name QueryUserLambda \ --runtime nodejs24.x \ --role arn:aws:iam::${ACCOUNT_ID}:role/QueryLambdaRole \ --handler index.handler \ --zip-file fileb:///home/claude/query-lambda/query.zip \ --timeout 10 \ --memory-size 128 \ --region ap-south-1 echo "Query Lambda created: QueryUserLambda" # Create API Gateway REST API API_ID=$(aws apigateway create-rest-api \ --name "CSV Processing API" \ --description "API to query processed CSV users" \ --region ap-south-1 \ --query 'id' \ --output text) echo "API Gateway created: $API_ID" export API_ID # Get root resource ID ROOT_ID=$(aws apigateway get-resources \ --rest-api-id $API_ID \ --region ap-south-1 \ --query 'items[0].id' \ --output text) # Create /users resource USERS_RESOURCE_ID=$(aws apigateway create-resource \ --rest-api-id $API_ID \ --parent-id $ROOT_ID \ --path-part users \ --region ap-south-1 \ --query 'id' \ --output text) # Create /users/{email} resource EMAIL_RESOURCE_ID=$(aws apigateway create-resource \ --rest-api-id $API_ID \ --parent-id $USERS_RESOURCE_ID \ --path-part '{email}' \ --region ap-south-1 \ --query 'id' \ --output text) # Add Lambda permission for API Gateway aws lambda add-permission \ --function-name QueryUserLambda \ --statement-id AllowAPIGateway \ --action lambda:InvokeFunction \ --principal apigateway.amazonaws.com \ --source-arn "arn:aws:execute-api:ap-south-1:${ACCOUNT_ID}:${API_ID}/*/*" \ --region ap-south-1 # Create GET method on /users/{email} aws apigateway put-method \ --rest-api-id $API_ID \ --resource-id $EMAIL_RESOURCE_ID \ --http-method GET \ --authorization-type NONE \ --region ap-south-1 # Create integration aws apigateway put-integration \ --rest-api-id $API_ID \ --resource-id $EMAIL_RESOURCE_ID \ --http-method GET \ --type AWS_PROXY \ --integration-http-method POST \ --uri "arn:aws:apigateway:ap-south-1:lambda:path/2015-03-31/functions/arn:aws:lambda:ap-south-1:${ACCOUNT_ID}:function:QueryUserLambda/invocations" \ --region ap-south-1 # Create GET method on /users aws apigateway put-method \ --rest-api-id $API_ID \ --resource-id $USERS_RESOURCE_ID \ --http-method GET \ --authorization-type NONE \ --region ap-south-1 # Create integration for /users aws apigateway put-integration \ --rest-api-id $API_ID \ --resource-id $USERS_RESOURCE_ID \ --http-method GET \ --type AWS_PROXY \ --integration-http-method POST \ --uri "arn:aws:apigateway:ap-south-1:lambda:path/2015-03-31/functions/arn:aws:lambda:ap-south-1:${ACCOUNT_ID}:function:QueryUserLambda/invocations" \ --region ap-south-1 # Deploy API DEPLOYMENT_ID=$(aws apigateway create-deployment \ --rest-api-id $API_ID \ --stage-name prod \ --region ap-south-1 \ --query 'id' \ --output text) echo "API Gateway deployed!" echo "API Endpoint: https://${API_ID}.execute-api.ap-south-1.amazonaws.com/prod"

Complete IAM Permissions

Producer Lambda Role Policy

{ "Version": "2026-04-04", "Statement": [ { "Sid": "CloudWatchLogs", "Effect": "Allow", "Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"], "Resource": "arn:aws:logs:ap-south-1:ACCOUNT_ID:log-group:/aws/lambda/ProducerCSVLambda:*" }, { "Sid": "ReadCSVFromS3", "Effect": "Allow", "Action": ["s3:GetObject", "s3:GetObjectVersion", "s3:GetObjectTagging"], "Resource": "arn:aws:s3:::BUCKET_NAME/*" }, { "Sid": "ListS3Bucket", "Effect": "Allow", "Action": ["s3:ListBucket", "s3:GetBucketVersioning"], "Resource": "arn:aws:s3:::BUCKET_NAME" }, { "Sid": "SendToSQS", "Effect": "Allow", "Action": ["sqs:SendMessage", "sqs:SendMessageBatch"], "Resource": "arn:aws:sqs:ap-south-1:ACCOUNT_ID:user-csv-queue" } ] }

Worker Lambda Role Policy

{ "Version": "2012-10-17", "Statement": [ { "Sid": "CloudWatchLogs", "Effect": "Allow", "Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"], "Resource": "arn:aws:logs:ap-south-1:ACCOUNT_ID:log-group:/aws/lambda/WorkerCSVLambda:*" }, { "Sid": "ReceiveFromSQS", "Effect": "Allow", "Action": [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:ChangeMessageVisibility", "sqs:GetQueueUrl" ], "Resource": "arn:aws:sqs:ap-south-1:ACCOUNT_ID:user-csv-queue" }, { "Sid": "WriteToDynamoDB", "Effect": "Allow", "Action": ["dynamodb:PutItem", "dynamodb:BatchWriteItem", "dynamodb:UpdateItem"], "Resource": "arn:aws:dynamodb:ap-south-1:ACCOUNT_ID:table/users" } ] }

Query Lambda Role Policy

{ "Version": "2026-04-04", "Statement": [ { "Sid": "CloudWatchLogs", "Effect": "Allow", "Action": ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"], "Resource": "arn:aws:logs:ap-south-1:ACCOUNT_ID:log-group:/aws/lambda/QueryUserLambda:*" }, { "Sid": "ReadFromDynamoDB", "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:Query", "dynamodb:Scan", "dynamodb:BatchGetItem" ], "Resource": "arn:aws:dynamodb:ap-south-1:ACCOUNT_ID:table/users" } ] }

Deployment & Testing

Sample CSV File

Create users.csv:

name,mail,phone John Doe,john@gmail.com,9876543210 Alice Smith,alice@gmail.com,9876543211 Bob Johnson,bob@gmail.com,9876543212 Charlie Brown,charlie@gmail.com,9876543213 Diana Prince,diana@gmail.com,9876543214 Edward Norton,edward@gmail.com,9876543215 Fiona Apple,fiona@gmail.com,9876543216 George Wilson,george@gmail.com,9876543217 Hannah Montana,hannah@gmail.com,9876543218 Isaac Newton,isaac@gmail.com,9876543219

Upload CSV File

# Upload to S3 (triggers Producer Lambda) aws s3 cp users.csv s3://${BUCKET_NAME}/uploads/users.csv --region ap-south-1 echo "File uploaded! Producer Lambda should trigger automatically."

Monitor Processing

# Check Producer Lambda logs aws logs tail /aws/lambda/ProducerCSVLambda --follow --region ap-south-1 # Check Worker Lambda logs aws logs tail /aws/lambda/WorkerCSVLambda --follow --region ap-south-1 # Check SQS queue aws sqs get-queue-attributes \ --queue-url $QUEUE_URL \ --attribute-names ApproximateNumberOfMessages \ --region ap-south-1

Test Query API

# Get specific user curl https://${API_ID}.execute-api.ap-south-1.amazonaws.com/prod/users/john@gmail.com # List all users curl https://${API_ID}.execute-api.ap-south-1.amazonaws.com/prod/users?limit=5 # List with pagination curl "https://${API_ID}.execute-api.ap-south-1.amazonaws.com/prod/users?limit=3&startKey=$(echo 'eyJlbWFpbCI6eyJTIjoiam9obkBnbWFpbC5jb20ifX0=' | base64)"

Performance Metrics & Optimization

Throughput Calculation

SQS Configuration:
- Batch size: 10 messages
- Maximum concurrency: 5 workers

Throughput = 5 workers × 10 messages/batch = 50 records/sec
= 3,000 records/min
= 180,000 records/hour
= 4.3M records/day

DynamoDB Optimization

With PAY_PER_REQUEST billing:

Costs for 1M rows:
- Write: 1,000,000 WCU = $0.50 (write-optimized)
- Read: Variable based on queries

For consistent loads, consider provisioned billing:
- 5,000 WCU = $2,500/month
- Handles ~430M writes/month

Lambda Optimization

Producer Lambda:
- Memory: 512 MB (optimal for I/O)
- Timeout: 300 sec (handles large files)
- Cost: ~$0.0000083 per GB-second

Worker Lambda:
- Memory: 256 MB (sufficient)
- Timeout: 60 sec (batch processing)
- Cost: ~$0.0000042 per GB-second

Monitoring

# Cloudwatch metrics aws cloudwatch get-metric-statistics \ --namespace AWS/Lambda \ --metric-name Duration \ --dimensions Name=FunctionName,Value=WorkerCSVLambda \ --start-time 2026-04-04T00:00:00Z \ --end-time 2024-04-05T00:00:00Z \ --period 3600 \ --statistics Average,Maximum,Minimum \ --region ap-south-1

Key Learnings

1. Decouple Ingestion & Processing

✔ Don't process directly in response to uploads
✔ Use a queue as a buffer
✔ Scale components independently

2. Handle Backpressure

✔ SQS prevents overwhelming DynamoDB
✔ Worker Lambdas scale with queue depth
✔ Built-in retry mechanism for failures

3. Batch Operations

✔ BatchWriteItem: 25 items max, much faster
✔ SendMessageBatch: Up to 10 messages per call
✔ Reduces API calls and costs

4. Exactly-Once Semantics

✔ DynamoDB partition key prevents duplicates
✔ If duplicate rows uploaded, only latest is stored
✔ Email as primary key ensures uniqueness

5. Monitoring & Debugging

✔ CloudWatch Logs on every Lambda
✔ SQS metrics show queue depth
✔ DynamoDB metrics show throughput

6. Cost Optimization

ComponentEstimated Monthly Cost
S3$0.023/GB
Lambda (1M invocations)$0.20
SQS (1M messages)$0.40
DynamoDB (1M writes, on-demand)$0.50
Total~$1.13

Conclusion

This architecture demonstrates a production-ready serverless CSV processing system that:

  1. Scales automatically
  2. Handles failures gracefully
  3. Costs minimal money
  4. Requires zero infrastructure management
  5. Processes millions of records reliably