eliasbrange.dev
AWS Recipe: Build an Asynchronous Serverless Task API

AWS Recipe: Build an Asynchronous Serverless Task API

In this article, you will learn how to build an asynchronous serverless task API from scratch on AWS using Lambda, DynamoDB, API Gateway, SQS, and SNS. The Lambda functions will be implemented in Python, and the REST API will use the FastAPI framework. The entire application will be deployed using AWS SAM.

Introduction

This article will guide you through the process of building an asynchronous serverless task API on AWS. With complete code examples, you will learn how to:

We will first build a stack that includes an API that allows clients to create and list tasks. This API will be deployed to AWS Lambda behind a HTTP API Gateway. The application will store tasks in DynamoDB, and whenever a new task is created it will be sent via a DynamoDB Stream to another Lambda that will publish the task to SNS.

In another stack, we will create handlers where each handler will be responsible for processing a specific type of task. Each handler will comprise of an SQS queue that will receive tasks from SNS, and a Lambda function that will process the task and report the task status back to the REST API.

Architecture diagram
Architecture diagram

Recipe ingredients

Requirements

To follow along in this article, you will need:

Build the API stack

Let’s begin by setting up everything required on the API side of the architecture. The API stack will include the following:

1. Create folder structure and required SAM files

To make it easier to understand in which file everything below should go into, this is how the API stack directory will look when you are finished with this section.

1
api-stack/
2
api-function/
3
requirements.txt
4
app/
5
__init__.py
6
dynamo.py
7
models.py
8
publish-function
9
requirements.txt
10
app/
11
__init__.py
12
template.yml
13
samconfig.toml

Start by creating and adding the following to the api-stack/template.yml file.

1
AWSTemplateFormatVersion: '2010-09-09'
2
Transform: AWS::Serverless-2016-10-31
3
Description: Task API
4
5
Resources:

To make SAM deployment easier and avoid parameters to the sam deploy command, add the following to api-stack/sam-config.toml.

1
version = 0.1
2
3
[default.global.parameters]
4
stack_name = "TaskAPI" # Or choose your own stack name
5
6
[default.deploy]
7
[default.deploy.parameters]
8
capabilities = "CAPABILITY_IAM"
9
s3_bucket = "BUCKET_NAME" # A bucket your credentials have access to
10
s3_prefix = "task-api" # The prefix that will be used for your s3 assets
11
region = "eu-west-1" # Change to your preferred region

2. DynamoDB Table

To start off, we will create a table that will store all of our tasks. In this example we will keep the table rather simple, and use only a Primary Key named id.

If you, for example, want to have tasks scoped to e.g. a user, an application, an organization, or some other kind of entity in your systems, you could opt for a schema where you use the entity ID (such as User ID) as the Primary Key and the Task ID as the Sort Key. But for now, let’s keep it simple and continue with just using the Task ID as Primary Key.

We will use the CloudFormation resource AWS::DynamoDB::Table to define the table, since AWS::Serverless::SimpleTable does not support advanced features such as streams.

In Resources in the SAM template, add the following.

1
Table:
2
Type: AWS::DynamoDB::Table
3
Properties:
4
TableName: tasks
5
AttributeDefinitions:
6
- AttributeName: id
7
AttributeType: S
8
KeySchema:
9
- AttributeName: id
10
KeyType: HASH
11
ProvisionedThroughput:
12
ReadCapacityUnits: 1
13
WriteCapacityUnits: 1

Tip

I’ve used the minimum provisioned throughput for testing purposes. Feel free to modify it to your needs, or change it if you want to use on-demand billing.

Feel free to deploy your SAM application after every step to verify that everything works. To do so, from the api-stack directory, first build the application with sam build and deploy it with sam deploy.

3. API Gateway

To front our API we will use API Gateway. More specifically, we will use a HTTP API since it is both cheaper and we do not need the more advanced features of a REST API.

Tip

For now, we will not use any features such as Authorization, CORS, or any other advanced features of an API Gateway. Authorization is mentioned in the potential improvements section.

In Resources in the SAM template, add the following.

1
Api:
2
Type: AWS::Serverless::HttpApi
3
4
ApiUrlParameter:
5
Type: AWS::SSM::Parameter
6
Properties:
7
Name: '/tasks/api_url'
8
Type: String
9
Value: !Sub 'https://${Api}.execute-api.${AWS::Region}.${AWS::URLSuffix}'

The resource ApiUrlParameter will output the URL of the API Gateway to AWS Systems Manager Parameter Store. This will be used as input to the task handler stack(s).

To easily find the auto-generated API URL, you can take advantage of CloudFormation Outputs. Add the following to the SAM template. Remember, Outputs is a top-level key.

1
Outputs:
2
ApiUrl:
3
Description: URL of the Task API
4
Value:
5
Fn::Sub: 'https://${Api}.execute-api.${AWS::Region}.${AWS::URLSuffix}'

The next time you run sam deploy, you should see the URL in the output.

4. Lambda API

Finally it is time to write some actual code and we will be using Python for this. We will use the FastAPI framwork to build our API, and use an adapter library called Mangum to make FastAPI play nice inside a Lambda function.

SAM Template

We need to define our Lambda function in the SAM template. Add the following resource.

1
ApiFunction:
2
Type: AWS::Serverless::Function
3
Properties:
4
FunctionName: 'TaskAPI'
5
MemorySize: 128
6
CodeUri: api-function
7
Handler: app.handler
8
Runtime: python3.9
9
Policies:
10
- DynamoDBCrudPolicy:
11
TableName: !Ref Table
12
Environment:
13
Variables:
14
TABLE_NAME: !Ref Table
15
Events:
16
DefaultEndpoint:
17
Type: HttpApi
18
Properties:
19
ApiId: !Ref Api

The above template references the Table resource created before to add the table name as an environment variable, as well as adding an IAM policy to the Lambda execution role that allows it to perform CRUD operations on the DynamoDB table. We also add the API Gateway created earlier as an event source.

Boilerplate

First let’s create a requirements.txt file in the api-stack/api-function directory. This should include the following packages.

1
mangum
2
fastapi
3
boto3
4
pydantic

Then, in the the api-stack/api-function/app/__init__.py file, add the following.

1
from fastapi import FastAPI, HTTPException
2
from mangum import Mangum
3
4
5
app = FastAPI()
6
handler = Mangum(app)

HTTPException will be used later on, so might aswell add it in now.

Routes

We will implement the following routes:

GET /tasks/{id}

Let’s start with implementing the Get /tasks/{id} endpoint. First, in the api-stack/api-function/app/dynamo.py file, add the following.

1
import os
2
import boto3
3
4
5
table = boto3.resource("dynamodb").Table(os.environ["TABLE_NAME"])
6
7
8
class Error(Exception):
9
pass
10
11
12
class TaskNotFoundError(Error):
13
pass
14
15
16
def get_task(task_id: str) -> dict:
17
res = table.get_item(
18
Key={
19
"id": task_id,
20
},
21
)
22
23
item = res.get("Item")
24
if not item:
25
raise TaskNotFoundError
26
27
return item

Here we use the boto3 resource client to call the GetItem API with the task_id as the Primary Key. If the task is not found, we raise an error, otherwise we return the task data. We do not filter the data here, this is handled by FastAPI response models as you will see soon.

Before creating our route logic, let’s create a response model that will define what data is returned by the API when a task is retrieved. We will use pydantic to declare our models

In api-stack/api-function/app/models.py, add the following.

1
from pydantic import BaseModel
2
from typing import Literal
3
4
5
task_types = Literal["TASK1", "TASK2", "TASK3"]
6
status_types = Literal["CREATED", "IN_PROGRESS", "COMPLETED", "FAILED"]
7
8
9
class TaskResponse(BaseModel):
10
id: str
11
task_type: task_types
12
status: status_types
13
status_msg: str = ""
14
created_time: int = None
15
updated_time: int = None

Now, let’s create the route logic. In api-stack/api-function/app/__init__.py, add the following.

1
from . import models, dynamo
2
3
...
4
5
@app.get("/tasks/{task_id}", response_model=models.TaskResponse)
6
def get_task(task_id: str):
7
try:
8
return dynamo.get_task(task_id)
9
except dynamo.TaskNotFoundError:
10
raise HTTPException(status_code=404, detail="Task not found")

Let’s deploy what we have so far and see if it works. Run sam build and sam deploy.

If you haven’t used FastAPI before this might come as a surprise, but fire up your favourite browser and navigate to https://YOUR_API_URL/docs. The URL should be listed in the output of sam deploy.

FastAPI comes with built-in support for generating and serving documentation for your API using Swagger. Try your new route from the Swagger UI or with something like curl.

Terminal window
1
$ curl https://YOUR_API_URL/tasks/12345
2
{
3
"detail": "Task not found"
4
}

Obviously, no tasks exist yet since we haven’t created any yet. Let’s change that and implement the create route.

POST /tasks

It is time to implement the logic for creating tasks. First, we need to decide what kind of input the user should include in the request. We want to support different kinds of tasks, so we will need to include a task_type field in the request. Different tasks might require different payloads, so let’s add a data field in the request which accepts generic json.

When a task is created, the response we will send the user will include the task ID.

Add the following models to api-stack/api-function/app/models.py.

1
class CreatePayload(BaseModel):
2
task_type: task_types
3
data: dict
4
5
6
class CreateResponse(BaseModel):
7
id: str

When creating a task, we want to generate an ID for the task. We can use the uuid library to generate a random UUID. We will also add the current timestamp in the attribute created_time. And finally, to avoid handling json in payloads when publishing tasks over SNS and SQS we will base64 encode the payload and store the encoded payload in DynamoDB.

Add the following to api-stack/api-function/app/dynamo.py.

1
...
2
import base64
3
import json
4
from uuid import uuid4
5
from datetime import datetime
6
7
...
8
9
def create_task(task_type: str, payload: dict) -> str:
10
task_id = str(uuid4())
11
table.put_item(
12
Item={
13
"id": task_id,
14
"task_type": task_type,
15
"status": "CREATED",
16
"payload": _encode(payload),
17
"created_time": _get_timestamp(),
18
}
19
)
20
21
return {"id": task_id}
22
23
24
25
def _encode(data: dict) -> str:
26
json_string = json.dumps(data)
27
return base64.b64encode(json_string.encode("utf-8")).decode("utf-8")
28
29
30
def _get_timestamp() -> int:
31
return int(datetime.utcnow().timestamp())

And finally, add the following to api-stack/api-function/app/__init__.py.

1
@app.post("/tasks", status_code=201, response_model=models.CreateResponse)
2
def post_task(payload: models.CreatePayload):
3
return dynamo.create_task(payload.task_type, payload.data)

Here we can see how FastAPI takes care of input validation and serialization when we specify the response_model=models.CreateResponse as well as the expected request payload with payload: models.CreatePayload.

Let’s deploy and try creating a task.

Terminal window
1
$ curl -X POST \
2
-H "Content-Type: application/json" \
3
-d '{"task_type": "TASK1", "data": {"foo": "bar"}}' \
4
https://YOUR_API_URL/tasks
5
{
6
"id": "12345678-abcd-1234-abcd-112233445566"
7
}
8
9
...
10
11
$ curl https://YOUR_API_URL/tasks/12345678-abcd-1234-abcd-112233445566
12
{
13
"id": "12345678-abcd-1234-abcd-112233445566",
14
"task_type": "TASK1",
15
"status": "CREATED",
16
"status_msg": "",
17
"created_time": 1648229203,
18
"updated_time": null
19
}

Did you accidentally forgot to copy the task ID after creating a few tasks? If there only was a way to list all tasks without opening up the DynamoDB console. Let’s implement that next.

GET /tasks

The list route will be very simple, without any filter or sort queries. We will allow a maximum of 10 tasks to be returned at a time. If there are more than 10 tasks, we will return a next_token in the response. The next request should then include the next_token in the query string to fetch the next 10 tasks. If there are no more tasks left, next_token will be null.

Since all items in the DynamoDB table have unique primary keys, we will need to use the scan operation to fetch items.

Let’s start with the model for the response which will be a list of tasks and a next_token field.

In api-stack/api-function/app/models.py, add the following.

1
class TaskListResponse(BaseModel):
2
tasks: list[TaskResponse]
3
next_token: str = None

Moving on the the actual database logic, we will need to conditionally add the next_token to the scan operation in case it is provided. The next_token will be base64 encoded before it is returned to the client, so we will need to decode it before we can use it.

The scan response will include a LastEvaluatedKey if there are more items left to fetch, so we use that to set the next_token.

Add the following to api-stack/api-function/app/dynamo.py.

1
def list_tasks(next_token: str = None) -> dict:
2
scan_args = {
3
"Limit": 10,
4
}
5
6
if next_token:
7
scan_args["ExclusiveStartKey"] = _decode(next_token)
8
9
res = table.scan(**scan_args)
10
response = {"tasks": res["Items"]}
11
12
if "LastEvaluatedKey" in res:
13
response["next_token"] = _encode(res["LastEvaluatedKey"])
14
15
return response
16
17
...
18
19
def _decode(data: str) -> dict:
20
json_string = base64.b64decode(data.encode("utf-8")).decode("utf-8")
21
return json.loads(json_string)

Finally, add the following to api-stack/api-function/app/__init__.py.

1
@app.get("/tasks", response_model=models.TaskListResponse)
2
def list_tasks(next_token: str = None):
3
return dynamo.list_tasks(next_token)

Now deploy, and you should be able to list all tasks you’ve created.

Terminal window
1
$ curl https://YOUR_API_URL/tasks
2
{
3
"tasks": [
4
{
5
"id": "12345678-abcd-1234-abcd-112233445566",
6
"task_type": "TASK1",
7
"status": "CREATED",
8
"status_msg": "",
9
"created_time": 1648229203,
10
"updated_time": null
11
},
12
... more tasks ...
13
],
14
"next_token": null
15
}

We can now create and list tasks, but what good is that if we cannot update their status? Let’s go ahead and implement the final route.

PATCH /tasks/{id}

Tip

This endpoint is supposed to be used internally by the task handlers. Thus, it would be preferable to add some kind of authorization here so that users cannot update tasks directly. But, to keep the scope small, I have ignored it for now.

We want task handlers to be able to update the status of the tasks they handle, as well as an optional message string. Add the following model to api-stack/api-function/app/models.py.

1
class UpdatePayload(BaseModel):
2
status: status_types
3
status_msg: str = ""

The logic for updating a task will be a bit more complex than the other operations. First, we need to make sure the task actually exists. We also want to guard ourselves against multiple handlers trying to start the same task. This could for example happen due to a side effect of the SQS at least once delivery mechanism, or if the visiblity timeout on the queue is shorter than the time it takes to process a task. These two checks are made with conditional updates in DynamoDB.

Add the following code to api-stack/api-function/app/dynamo.py.

1
from boto3.dynamodb.conditions import And, Attr
2
3
...
4
5
class InvalidTaskStateError(Error):
6
pass
7
8
...
9
10
def update_task(task_id: str, status: str, status_msg: str):
11
cond = Attr("id").exists()
12
13
if status == "IN_PROGRESS":
14
cond = And(cond, Attr("status").eq("CREATED"))
15
16
try:
17
table.update_item(
18
Key={
19
"id": task_id,
20
},
21
UpdateExpression="set #S=:s, status_msg=:m, updated_time=:t",
22
# status is reserved
23
ExpressionAttributeNames={
24
"#S": "status",
25
},
26
ExpressionAttributeValues={
27
":s": status,
28
":m": status_msg,
29
":t": _get_timestamp(),
30
},
31
ConditionExpression=cond,
32
)
33
except table.meta.client.exceptions.ConditionalCheckFailedException:
34
raise InvalidTaskStateError

We first create a condition that requires the item to exist already. Then, if we are setting the state to IN_PROGRESS, we will also require the task to have the status CREATED. This way, if another requests comes in that tries to set the state to IN_PROGRESS while it already is in progress, the request will fail.

We then use the update_item method which will throw an exception if the condition evaluates to false. As far as I know, you cannot see which part of the condition failed, so we cannot differentiate between a task that doesn’t exist and a task that is already in progress.

Finally, add the route to api-stack/api-function/app/__init__.py and we should be good to go.

1
@app.patch("/tasks/{task_id}", status_code=204)
2
def update_task(task_id: str, payload: models.UpdatePayload):
3
try:
4
return dynamo.update_task(task_id, payload.status, payload.status_msg)
5
except dynamo.InvalidTaskStateError:
6
raise HTTPException(
7
status_code=400, detail="Task does not exist or is already in progress."
8
)

Deploy and try it out! You are now finished with the API.

5. DynamoDB Stream

We now want to enable a Stream on the DynamoDB table. I choose to use streams to remove the need to handle transactions. Imagine the following scenario:

  1. User creates a task through the API
  2. Task is created in DynamoDB
  3. Request to SNS fails for some reason

We would then need to roll back the task we created in DynamoDB. By using streams, we make sure that we only send a task to SNS after it has been commited to the database. And, if the request to SNS then fails, the function can be configured to retry a set amount of times.

To enable the stream, add StreamSpecification under properties in the Table resource in the SAM template.

1
Table:
2
Type: AWS::DynamoDB::Table
3
Properties:
4
...
5
StreamSpecification:
6
StreamViewType: NEW_IMAGE

For now, we only care about the current state of the items in the stream. If we wanted to act on specific changes in the DynamoDB table, such as send a notification somewhere when a task went from IN_PROGRESS to FAILED, we would need to use NEW_AND_OLD_IMAGES instead of NEW_IMAGE. That way we could compare the old and new values of the item in the stream handler.

6. SNS Topic

We also need an SNS Topic to send our tasks to. Add the following to the SAM template.

1
Topic:
2
Type: AWS::SNS::Topic
3
4
TopicArnParameter:
5
Type: AWS::SSM::Parameter
6
Properties:
7
Name: '/tasks/topic_arn'
8
Type: String
9
Value: !Ref Topic

The resource TopicArnParameter will output the Topic ARN to AWS Systems Manager Parameter Store. This will be used as input to the task handler stack(s).

7. Publisher Lambda

It is time to create our second Lambda function. This function will be responsible for publishing the task to SNS.

Required packages

Add the following to api-stack/publish-function/requirements.txt.

1
boto3
2
aws-lambda-powertools

Tip

Lambda Powertools for Python is a package that provides a lot of useful features when working with Lambda functions, such as logging, tracing, and data classes for common event source payloads.

SAM Template

Let’s define the publisher function in the SAM template.

1
PublishFunction:
2
Type: AWS::Serverless::Function
3
Properties:
4
FunctionName: 'TaskPublisher'
5
MemorySize: 128
6
CodeUri: publish-function
7
Handler: app.handler
8
Runtime: python3.9
9
Policies:
10
- SNSPublishMessagePolicy:
11
TopicName: !GetAtt Topic.TopicName
12
Environment:
13
Variables:
14
TOPIC_ARN: !Ref Topic
15
Events:
16
Stream:
17
Type: DynamoDB
18
Properties:
19
Stream: !GetAtt Table.StreamArn
20
StartingPosition: TRIM_HORIZON
21
MaximumRetryAttempts: 5
22
FilterCriteria:
23
Filters:
24
- Pattern: '{"eventName": ["INSERT"]}'

Here we can see that we are doing almost the same thing with the SNS Topic as we did with the DynamoDB table in the API function. We are using the built in SNSPublishMessagePolicy policy to give the function permissions to publish to the topic.

We also define a Stream event that will trigger the function when some operation is done in the DynamoDB table. We also define a filter criteria to only trigger the function when a new item is added to the table, i.e. when a task is first created.

Handler

The publisher handler will be quite simple. Here we utilize the package aws-lambda-powertools to provide us with data classes for the event payloads. For each message received from the stream (which is filtered to only include INSERT events), we publish a message to SNS.

We send the payload (which is already base64 encoded) as the message body, and we add the task ID and task type as message attributes.

All in all, it looks like this. Add it to api-stack/publish-function/app/__init__.py and we are done with the entire API stack.

1
import os
2
import boto3
3
from aws_lambda_powertools.utilities.data_classes import (
4
event_source,
5
DynamoDBStreamEvent,
6
)
7
8
9
topic = boto3.resource("sns").Topic(os.environ["TOPIC_ARN"])
10
11
12
@event_source(data_class=DynamoDBStreamEvent)
13
def handler(event: DynamoDBStreamEvent, _):
14
for record in event.records:
15
task_id = record.dynamodb.keys["id"].get_value
16
task_type = record.dynamodb.new_image["task_type"].get_value
17
payload = record.dynamodb.new_image["payload"].get_value
18
19
res = topic.publish(
20
MessageAttributes={
21
"TaskId": {
22
"DataType": "String",
23
"StringValue": task_id,
24
},
25
"TaskType": {
26
"DataType": "String",
27
"StringValue": task_type,
28
},
29
},
30
Message=payload,
31
)
32
33
print(f"Message {res['MessageId']} published.")

Great job so far! We have now finished the API part of the system, and it is time to start building our handlers.

Build the Handler stack(s)

With the API complete, we can now build the handler stack(s). In this example, I will only create a single stack with a single handler for events with task_type set to TASK1. Implementing handlers for TASK2 and TASK3, or other events, is left as an exercise for the reader.

The stack we will build will include the following resources:

1. Create folder structure and required SAM files

To make it easier to understand in which file everything below should go into, this is how the Handler stack directory will look when you are finished with this section.

1
handler-stack/
2
task1-function/
3
requirements.txt
4
app/
5
__init__.py
6
template.yml
7
samconfig.toml

As before, add some boilerplate to the handler-stack/template.yml file.

1
AWSTemplateFormatVersion: '2010-09-09'
2
Transform: AWS::Serverless-2016-10-31
3
Description: Task API
4
5
Resources:

We will also use a handler-stack/samconfig.toml file for easier CLI usage.

1
version = 0.1
2
3
[default.global.parameters]
4
stack_name = "TaskHandlers" # Or choose your own stack name
5
6
[default.deploy]
7
[default.deploy.parameters]
8
capabilities = "CAPABILITY_IAM"
9
s3_bucket = "BUCKET_NAME" # A bucket your credentials have access to
10
s3_prefix = "task-handlers" # The prefix that will be used for your s3 assets
11
region = "eu-west-1" # Change to your preferred region

2. SQS Queue

First, let’s create the SQS Queue as well as the dead letter queue. Add the following resources to the SAM template.

1
TaskHandler1Queue:
2
Type: AWS::SQS::Queue
3
Properties:
4
RedrivePolicy:
5
deadLetterTargetArn: !GetAtt TaskHandler1DLQ.Arn
6
maxReceiveCount: 1
7
8
TaskHandler1DLQ:
9
Type: AWS::SQS::Queue

By using a DLQ, we will be able to capture events that failed to be processed by the Lambda function. We can analyze the events and either discard them or send them back to the main queue to be processed again. maxReceiveCount specifies how many times you want to retry the event in case of failure before sending it to the dead letter queue. In this example, we will keep it at one to disable retries.

3. SNS Subscription

We now want to set up a subscription to the SNS Topic that was created in the API stack. Remember how we created an SSM parameter in the API stack with the topic arn? We will now import that value in the handler stack. We will also need to create a Queue Policy that will allow the SNS Topic to send messages to the SQS Queue. In the handler we are creating now, we only want to process events which have the message attribute TaskType set to TASK1. To do this, we will use a filter policy on the subscription.

In the SAM template, first add the Parameters section.

1
Parameters:
2
TasksTopic:
3
Type: AWS::SSM::Parameter::Value<String>
4
Description: Tasks Topic Arn
5
Default: /tasks/topic_arn

Now, under resources, add the following.

1
TaskHandler1QueuePolicy:
2
Type: AWS::SQS::QueuePolicy
3
Properties:
4
Queues:
5
- !Ref TaskHandler1Queue
6
PolicyDocument:
7
Version: '2012-10-17'
8
Statement:
9
- Effect: Allow
10
Action: sqs:SendMessage
11
Resource: !GetAtt TaskHandler1Queue.Arn
12
Principal:
13
Service: 'sns.amazonaws.com'
14
Condition:
15
ArnEquals:
16
aws:SourceArn: !Ref TasksTopic
17
18
TaskHandler1Subscription:
19
Type: AWS::SNS::Subscription
20
Properties:
21
Protocol: sqs
22
TopicArn: !Ref TasksTopic
23
Endpoint: !GetAtt TaskHandler1Queue.Arn
24
RawMessageDelivery: True
25
FilterPolicy:
26
TaskType:
27
- 'TASK1'

4. Handler function

SAM Template

We need to define our Lambda function in the SAM template. Add the following resource.

1
TaskHandler1Function:
2
Type: AWS::Serverless::Function
3
Properties:
4
FunctionName: 'TaskHandler1'
5
MemorySize: 128
6
Timeout: 30
7
CodeUri: task1-function
8
Handler: app.handler
9
Runtime: python3.9
10
Policies:
11
- SQSPollerPolicy:
12
QueueName: !GetAtt TaskHandler1Queue.QueueName
13
Environment:
14
Variables:
15
TASKS_API_URL: !Ref TasksApiUrl
16
Events:
17
Stream:
18
Type: SQS
19
Properties:
20
Queue: !GetAtt TaskHandler1Queue.Arn
21
BatchSize: 1

As you can see, the lambda function requires the URL of the Task API. Since we exported the URL to the parameter store from the API stack, we should add the following under the Parameters section.

1
TasksApiUrl:
2
Type: AWS::SSM::Parameter::Value<String>
3
Description: Tasks
4
Default: /tasks/api_url

Code

Now it is time to implement the actual task handler for the TASK1 events. The example I will show here is very minimal (and completely useless), but it should be enough to get you started. Again, we will use the aws-lambda-powertools to deserialize the event payload from SQS to make our life a little easier. For each record that the lambda receives, we will do the following.

  1. Read Task ID and Task Type from the message attributes.
  2. Decode the base64 encoded payload that the client provided when creating the task.
  3. Call the Update endpoint in the Task API to set the task to IN_PROGRESS.
  4. Perform the actual task. In this case, sleep for 10 seconds…
  5. If the task was successful, call the Update endpoint in the Task API to set the task to COMPLETED. If an exception was raised, update the status to FAILED.

In the code example, I also randomly raise exceptions to simulate failures.

All in all, the handler function looks like this. Add it to the handler-stack/task1-function/app/__init__.py file.

1
import base64
2
import json
3
import os
4
import time
5
import requests
6
import random
7
from aws_lambda_powertools.utilities.data_classes import (
8
event_source,
9
SQSEvent,
10
)
11
12
13
API_URL = os.environ["TASKS_API_URL"]
14
15
16
@event_source(data_class=SQSEvent)
17
def handler(event: SQSEvent, context):
18
for record in event.records:
19
task_id = record.message_attributes["TaskId"].string_value
20
task_type = record.message_attributes["TaskType"].string_value
21
payload = _decode_payload(record.body)
22
23
print(f"Starting task {task_type} with id {task_id}")
24
_update_task_status(task_id, "IN_PROGRESS", "Task started")
25
try:
26
_do_task(payload)
27
except Exception as e:
28
print(f"Task with id {task_id} failed: {str(e)}")
29
_update_task_status(task_id, "FAILED", str(e))
30
continue
31
32
print(f"Task with id {task_id} completed successfully.")
33
_update_task_status(task_id, "COMPLETED", "Task completed")
34
35
36
def _do_task(payload: dict):
37
# Do something here.
38
print(f"Payload: {payload}")
39
time.sleep(10)
40
if random.randint(1, 4) == 1:
41
# Simulate failure in some invocations
42
raise Exception("Task failed somehow")
43
44
45
def _decode_payload(payload: str) -> dict:
46
json_string = base64.b64decode(payload.encode("utf-8")).decode("utf-8")
47
return json.loads(json_string)
48
49
50
def _update_task_status(task_id: str, status: str, status_msg: str):
51
data = {
52
"status": status,
53
"status_msg": status_msg,
54
}
55
56
url = f"{API_URL}/tasks/{task_id}"
57
res = requests.patch(url, json=data)
58
59
if res.status_code != 204:
60
print(f"Request to API failed: {res.json()}")
61
raise Exception("Update task status failed")

Also, don’t forget the to add the required packages in the handler-stack/task1-function/requirements.txt file.

1
aws-lambda-powertools
2
requests

The handler stack is done for now, I’ll leave you to implement the actual task and perhaps create handlers for the other task types as well. Time to deploy what we have so far!

Showtime

It’s showtime. With both stacks deployed, try creating a few tasks through the API and see the magic happen. Try creating both tasks with Task Type TASK1 and TASK2 and see what happens. If everything works as it’s supposed to, you should see the TASK1 tasks change status to IN_PROGRESS and then COMPLETED/FAILED after a few seconds. Tasks with other task types should be ignored and be left with status CREATED.

Terminal window
1
# Create a task with type TASK1
2
$ curl -X POST \
3
-H "Content-Type: application/json" \
4
-d '{"task_type": "TASK1", "data": {"foo": "bar"}}' \
5
https://YOUR_API_URL/tasks
6
{
7
"id": "11111111-abcd-1111-abcd-111111111111"
8
}
9
10
# Create a task with type TASK2
11
$ curl -X POST \
12
-H "Content-Type: application/json" \
13
-d '{"task_type": "TASK2", "data": {"foo": "bar"}}' \
14
https://YOUR_API_URL/tasks
15
{
16
"id": "22222222-abcd-2222-abcd-222222222222"
17
}
18
19
# List tasks. The task with type TASK1 should have status IN_PROGRESS and the other
20
# should still have status CREATED. If it isn't IN_PROGRESS, try again after a few seconds.
21
$ curl https://YOUR_API_URL/tasks
22
{
23
"tasks": [
24
{
25
"id": "11111111-abcd-1111-abcd-111111111111",
26
"task_type": "TASK1",
27
"status": "IN_PROGRESS",
28
"status_msg": "",
29
"created_time": 1648229203,
30
"updated_time": null
31
},
32
{
33
"id": "22222222-abcd-2222-abcd-222222222222",
34
"task_type": "TASK1",
35
"status": "CREATED",
36
"status_msg": "",
37
"created_time": 1648229203,
38
"updated_time": null
39
},
40
],
41
"next_token": null
42
}
43
44
# Allow 10 seconds for the task to complete and then list tasks again. The task should
45
# now have the status COMPLETED or FAILED.
46
$ curl https://YOUR_API_URL/tasks
47
{
48
"tasks": [
49
{
50
"id": "11111111-abcd-1111-abcd-111111111111",
51
"task_type": "TASK1",
52
"status": "COMPLETED",
53
"status_msg": "",
54
"created_time": 1648229203,
55
"updated_time": 1648229218
56
},
57
{
58
"id": "22222222-abcd-2222-abcd-222222222222",
59
"task_type": "TASK1",
60
"status": "CREATED",
61
"status_msg": "",
62
"created_time": 1648229203,
63
"updated_time": null
64
},
65
],
66
"next_token": null
67
}

Cleaning up

To remove everything we have created, simply run the sam delete command in the api-stack and handler-stack directories.

Potential improvements

While this is a simple example, there are many things we could do to make it better. Below are some ideas that I can think of from the top of my head. Why don’t give one of them a try?

Authentication and Authorization

Right now there is no authentication or authorization on the Task API. This means that any client can create tasks and see the status of any task, and also update the status of tasks. First, I would make sure that only the handlers themselves are allowed to use the PATCH /tasks/{id} endpoint. This could for example be done by setting up IAM authorization on the API Gateway. Secondly, we might want to require that the client is authenticated before creating and listing tasks. If you want to do this in a serverless fashion, you could look into AWS Cognito and use a JWT authorizer.

DynamoDB TTL

Since our list endpoint retrieves all tasks, this list could grow very large. Perhaps we want to remove tasks when they are older than a certain amount of time. This could be done by setting a TTL on the DynamoDB table to automatically delete old tasks after a set period of time.

Logging

Right now we do not have much logging in place. And in the few places we have, it is only simple print statements that aren’t as structured as we want. aws-lambda-powertools has a great logging library that helps with setting up structured logs for your lambda functions.

Tracing

Monitoring in distributed serverless systems can be quite daunting. AWS provides X-Ray for this purpose, which is a distributed tracing system. This can help you visualize the flow of events in your application going from the API Gateway -> Lambda -> DynamoDB -> Lambda -> SNS -> SQS -> Lambda and so on. aws-lambda-powertools has a great tracer library that helps with setting up X-Ray for your lambda functions.

Error handling

I have not included any kind of error handling in the example. You could for example implement functionality to allow a task to be retried a set amount of times in case of failure. Right now, if a task fails, it will be updated in Dynamo to have a status of FAILED. The current implementation of the update endpoint requires the task to be have a status of CREATED when updating the status to IN_PROGRESS. If that logic is left unchanged, retried invocations will fail on the first request to the update endpoint.

Webhooks

Most of the improvements above have been about securing, managing, and monitoring the API. We could also extend it with new features. One example would be to include webhooks. Clients could for example include a webhook URL in their task creation request. We could then add another lambda function that reads from the DynamoDB stream and sends a notification to the webhook URL when the task goes from IN_PROGRESS to COMPLETED or from IN_PROGRESS to FAILED.

To be able to react on certain changes in a DynamoDB item, you must update the stream view type to be NEW_AND_OLD_IMAGES instead of NEW_IMAGE. This way the lambda will receive both the old and the new version of the item.

Scoped events

Perhaps you want to scope tasks to users, applications, or some other entity in your system. Then, I would do the following changes.

Conclusion

Congratulations for making it this far. You have now managed to build an asynchronous task API running entirely on serverless services on AWS. Hopefully you have learned a thing or two, I know I definitely did by creating this. If you have any questions, please feel free to reach out to me in any way you see fit.

All code in this guide is available on GitHub. It might drift a bit if I decide to build upon it, but if I do, I will try to keep this blog entry up to date.

Now go build something awesome! Why not try implementing one of the ideas from the potential improvements section?


About the author

I'm Elias Brange, a Cloud Consultant and AWS Community Builder in the Serverless category. I'm on a mission to drive Serverless adoption and help others on their Serverless AWS journey.

Did you find this article helpful? Share it with your friends and colleagues using the buttons below. It could help them too!

Are you looking for more content like this? Follow me on LinkedIn & Twitter !