eliasbrange.dev
Test Event-Driven Architectures with AWS EventBridge and Momento Topics

Test Event-Driven Architectures with AWS EventBridge and Momento Topics

1. Introduction

Testing Serverless applications end-to-end is difficult. Testing distributed systems is difficult. But with the right tools, it can be made easier.

Imagine you have an OrderService that emits an OrderCreated event to EventBridge each time an order is created. To test this flow end-to-end, you must subscribe to the EventBridge bus and verify that the correct event has been published.

There is a new player in town. The team at Momento has built a Topics service. I’ve recently found the time to play around with this service, and I’m impressed.

In this post, I will show you how you can use Momento Topics to test that your application(s) produces the events you expect.

TLDR; Show me the code!

You can find the sample application and a step-by-step guide on GitHub.

2. Momento Topics

Momento Topics is a serverless pub/sub service that makes publishing and subscribing to events simple for any application. This is what Momento themselves say about the service:

Momento Topics is a serverless event messaging service that allows for real-time communication between parts of a distributed application. Instead of spending cycles defining topic resources and dealing with the pain of a tightly-coupled system, push to an unlimited number of channels on the fly without any topic management.

It is a fully managed service that enables a wide range of use cases in distributed systems. For example, you can use it to build an interactive live reaction app to spice up your online presentations.

Being serverless, it also comes with the pay-as-you-go pricing model that we all love and a generous free tier.

Momento has SDKs for a wide range of languages and an HTTP API, which enables the use case in this post. We will integrate EventBridge with this API with the help of API Destinations.

3. Sample application

Our sample application mimics a simple Order Service. It consists of an API Gateway with a single POST / endpoint that triggers a Lambda function. This Lambda function generates a new order with a random ID and publishes an OrderCreated event to EventBridge.

The simple application architecture.
The simple application architecture.

The Lambda function, in this case, is a simple function with the following handler:

1
export const handler = async (): Promise<APIGatewayProxyResult> => {
2
const order = {
3
id: ulid(),
4
name: 'test order',
5
};
6
7
await eventBridgeClient.send(
8
new PutEventsCommand({
9
Entries: [
10
{
11
EventBusName: EVENT_BUS,
12
Source: 'OrderService',
13
DetailType: 'OrderCreated',
14
Detail: JSON.stringify(order),
15
},
16
],
17
}),
18
);
19
return {
20
statusCode: 201,
21
body: JSON.stringify(order),
22
};
23
};

When the API receives a POST / request, an event with the following structure is published to EventBridge:

1
{
2
"version": "0",
3
"id": "11111111-2222-4444-5555-666666666666",
4
"detail-type": "OrderCreated",
5
"source": "OrderService",
6
"account": "123456789012",
7
"time": "2023-10-19T08:00:00Z",
8
"region": "eu-west-1",
9
"resources": [],
10
"detail": {
11
"id": "01F9ZQZJZJZJZJZJZJZJZJZJZJ",
12
"name": "test order"
13
}
14
}

How can you test this application? In a more real-life scenario, you would probably persist an order to a database. If that’s the case, you could verify that the service has persisted the order in the database. Your service might have external consumers that rely on the events it emits. You must ensure that your service publishes the OrderCreated event to EventBridge and has the correct format.

There are different ways to do this. You could integrate EventBridge with services like SNS or AppSync subscriptions and subscribe to those in your tests. But, there is an easier way: Momento Topics.

4. Integrate EventBridge and Momento Topics

The first thing that enables this pattern is the API Destinations feature of EventBridge. API Destinations allow you to send events from EventBridge to an HTTP endpoint. You can configure an API Destination to use Basic, OAuth, and API Key authorization. The authorization configuration, if any, is securely stored in Secrets Manager.

The second thing that enables this pattern is the Momento HTTP API. The API lets you authenticate via an API Key and publish events to a Momento Topic. For a deeper dive into integrating EventBridge and Momento, refer to the official Momento documentation.

Architecture with API destination to Momento topic.
Architecture with API destination to Momento topic.

The following standalone CloudFormation template shows the required resources to integrate EventBridge and Momento Topics:

1
AWSTemplateFormatVersion: '2010-09-09'
2
Description: Momento destination for EventBridge
3
4
Parameters:
5
EventBusName:
6
Type: String
7
MomentoEndpoint:
8
Type: String
9
MomentoAuthToken:
10
Type: String
11
NoEcho: true
12
TopicName:
13
Type: String
14
CacheName:
15
Type: String
16
17
Resources:
18
DLQ:
19
Type: AWS::SQS::Queue
20
21
Connection:
22
Type: AWS::Events::Connection
23
Properties:
24
AuthorizationType: API_KEY
25
AuthParameters:
26
ApiKeyAuthParameters:
27
ApiKeyName: Authorization
28
ApiKeyValue: !Sub ${MomentoAuthToken}
29
30
Destination:
31
Type: AWS::Events::ApiDestination
32
Properties:
33
ConnectionArn: !GetAtt Connection.Arn
34
HttpMethod: POST
35
InvocationEndpoint: !Sub ${MomentoEndpoint}/topics/${CacheName}/${TopicName}
36
InvocationRateLimitPerSecond: 300
37
38
TargetRole:
39
Type: AWS::IAM::Role
40
Properties:
41
AssumeRolePolicyDocument:
42
Version: '2012-10-17'
43
Statement:
44
- Effect: Allow
45
Principal:
46
Service: events.amazonaws.com
47
Action: sts:AssumeRole
48
Path: /service-role/
49
Policies:
50
- PolicyName: destinationinvoke
51
PolicyDocument:
52
Version: '2012-10-17'
53
Statement:
54
- Effect: Allow
55
Action:
56
- events:InvokeApiDestination
57
Resource: !GetAtt Destination.Arn
58
59
Rule:
60
Type: AWS::Events::Rule
61
Properties:
62
EventBusName: !Ref EventBusName
63
EventPattern:
64
source:
65
- prefix: ''
66
State: ENABLED
67
Targets:
68
- Id: topicPublish-rule
69
Arn: !GetAtt Destination.Arn
70
RoleArn: !GetAtt TargetRole.Arn
71
DeadLetterConfig:
72
Arn: !GetAtt DLQ.Arn

The template consists of a few resources:

With this stack in place, every event sent to the EventBridge bus will be sent to the Momento Topic. We can now use this in our End-to-End tests to verify that the application publishes the events we expect to EventBridge.

5. Test the application End-to-End

Now, let’s get to the magic part. With the EventBridge and Momento integration in place, we can now test our application and be confident that it emits the correct event(s). The architecture below shows the flow of events:

End-to-end testing flow.
End-to-end testing flow.
  1. A test calls the API Gateway endpoint that triggers the Lambda to send an event.
  2. The API destination relays the event to the Momento topic.
  3. The test receives the event from the Momento topic and verifies that it matches the expected event.

So, we need a test that subscribes to the topic, calls the API, and verifies that the event is received. The test could look like this:

1
describe('When an order is created', async () => {
2
const subscription = await subscribe();
3
let order: { id: string; name: string };
4
5
beforeAll(async () => {
6
const response = await fetch(API_URL, {
7
method: 'POST',
8
});
9
expect(response.status).toBe(201);
10
11
order = await response.json();
12
});
13
14
afterAll(async () => {
15
subscription.unsubscribe();
16
});
17
18
it('It should publish an OrderCreated event to EventBridge', async () => {
19
const message = await subscription.waitForMessageMatching({
20
source: 'OrderService',
21
'detail-type': 'OrderCreated',
22
detail: {
23
id: order.id,
24
name: order.name,
25
},
26
});
27
28
expect(message).not.toBeNull();
29
}, 5000);
30
});

The test uses a subscribe function to set up a subscription to the topic. It then calls the API and saves the ID of the created order. It then verifies that it receives an OrderCreated event with the format via the waitForMessageMatching function.

The test explicitly sets a timeout of five seconds. This should be ample time for the service to publish the event and let it propagate to the Momento topic. If no matching event is received within that duration, the test fails.

So, what magic is going on in these functions? Not that much, actually. The Momento SDK is a joy to work with:

1
import 'dotenv/config';
2
import {
3
TopicClient,
4
TopicConfigurations,
5
CredentialProvider,
6
TopicItem,
7
TopicSubscribe,
8
} from '@gomomento/sdk';
9
import { ReplaySubject, firstValueFrom } from 'rxjs';
10
import { filter } from 'rxjs/operators';
11
import * as _ from 'lodash';
12
13
const topicClient = new TopicClient({
14
configuration: TopicConfigurations.Default.latest(),
15
credentialProvider: CredentialProvider.fromEnvironmentVariable({
16
environmentVariableName: 'MOMENTO_API_KEY',
17
}),
18
});
19
const CACHE_NAME = process.env.CACHE_NAME || '';
20
const TOPIC_NAME = process.env.TOPIC_NAME || '';
21
22
export const subscribe = async () => {
23
const messages = new ReplaySubject(100);
24
25
const subscription = await topicClient.subscribe(CACHE_NAME, TOPIC_NAME, {
26
onError: (error) => {
27
throw error;
28
},
29
onItem: (item: TopicItem) => {
30
try {
31
const message = JSON.parse(item.valueString());
32
messages.next(message);
33
} catch (error) {
34
console.log('Error parsing message from Momento Topic', item);
35
}
36
},
37
});
38
39
if (!(subscription instanceof TopicSubscribe.Subscription)) {
40
throw new Error('Failed to subscribe to topic');
41
}
42
43
const unsubscribe = async () => {
44
subscription.unsubscribe();
45
};
46
47
const waitForMessageMatching = async (expected: object) => {
48
const predicate = (message: unknown) => {
49
if (typeof message !== 'object' || message === null) {
50
return false;
51
}
52
53
return _.isMatch(message, expected);
54
};
55
56
const data = messages.pipe(filter((message) => predicate(message)));
57
return firstValueFrom(data);
58
};
59
60
return {
61
unsubscribe,
62
waitForMessageMatching,
63
};
64
};

Let’s break this down.

We first create a TopicClient and configure it to read the Momento API Key from the environment variable MOMENTO_API_KEY. We also fetch the name of the cache and topic from environment variables.

Inside the subscribe function, we first create an RxJS ReplaySubject. We use this to store incoming events from the Momento topic.

Then, we use the client to subscribe to our topic. In onItem, we parse the incoming event and store it by calling messages.next(message). Every event sent to the topic will be stored during the test.

In waitForMessageMatching, we filter out any messages that do not match the expected event. It returns the first matching event that is received, if any. The test fails if no matching event is received within the test timeout.

Looking back at the actual test case, we are expecting an OrderCreated event from the OrderService that contains the id and name of the order that was created:

1
const message = await subscription.waitForMessageMatching({
2
source: 'OrderService',
3
'detail-type': 'OrderCreated',
4
detail: {
5
id: order.id,
6
name: order.name,
7
},
8
});
9
10
expect(message).not.toBeNull();

With this, we can be confident that our application works correctly end-to-end.

6. Conclusion

One of the most common arguments against Serverless is that testing is difficult. This partly comes from a misconception that you should test everything locally. With serverless, you should embrace testing in the cloud. This is where the application runs, and this is where you should test it.

Distributed systems are complex to test, no matter if they are built with Serverless technologies or not. But, with the right tools and patterns, you can make it easier. In this post, you have learned how to use EventBridge and Momento Topics to test that your applications produce the events you expect.

Try it out!

You can find the sample application and a step-by-step guide on GitHub.


About the author

I'm Elias Brange, a Cloud Consultant and AWS Community Builder in the Serverless category. I'm on a mission to drive Serverless adoption and help others on their Serverless AWS journey.

Did you find this article helpful? Share it with your friends and colleagues using the buttons below. It could help them too!

Are you looking for more content like this? Follow me on LinkedIn & Twitter !