Cloud-native services are autonomous. Each service is completely self-sufficient and runs in isolation to minimize the blast radius when any given service experiences a failure. To achieve this isolation, bulkheads must be established between the services. Event streaming is one mechanism that is used to create these bulkheads. Autonomous cloud-native services perform all inter-service communication asynchronously via streams to decouple upstream services from downstream services. In Chapter 2, Applying The Event Sourcing and CQRS Patterns, we will dive deeper into how we create bounded, isolated, and autonomous cloud-native services. This recipe creates the event stream that we will use throughout this cookbook and provides a function for publishing events to the stream.
Creating an event stream and publishing an event
How to do it...
- Create the project from the following template:
$ sls create --template-url https://github.com/danteinc/js-cloud-native-cookbook/tree/master/ch1/event-stream --path cncb-event-stream
- Navigate to the cncb-event-stream directory with cd cncb-event-stream.
- Review the file named serverless.yml with the following content:
service: cncb-event-stream
provider:
name: aws
runtime: nodejs8.10
iamRoleStatements:
- Effect: Allow
Action:
- kinesis:PutRecord
Resource:
Fn::GetAtt: [ Stream, Arn ]
functions:
publish:
handler: handler.publish
environment:
STREAM_NAME:
Ref: Stream
resources:
Resources:
Stream:
Type: AWS::Kinesis::Stream
Properties:
Name: ${opt:stage}-${self:service}-s1
RetentionPeriodHours: 24
ShardCount: 1
Outputs:
streamName:
Value:
Ref: Stream
streamArn:
Value:
Fn::GetAtt: [ Stream, Arn ]
- Review the file named handler.js with the following content:
const aws = require('aws-sdk');
const uuid = require('uuid');
module.exports.publish = (event, context, callback) => {
const e = {
id: uuid.v1(),
partitionKey: event.partitionKey || uuid.v4(),
timestamp: Date.now(),
tags: {
region: process.env.AWS_REGION,
},
...event,
}
const params = {
StreamName: process.env.STREAM_NAME,
PartitionKey: e.partitionKey,
Data: Buffer.from(JSON.stringify(e)),
};
const kinesis = new aws.Kinesis();
kinesis.putRecord(params, callback);
};
- Install the dependencies with npm install.
- Run the tests with npm test.
- Review the contents generated in the .serverless directory.
- Deploy the stack:
$ npm run dp:lcl -- -s $MY_STAGE
> [email protected] dp:lcl <path-to-your-workspace>/cncb-create-stream
> sls deploy -v -r us-east-1 "-s" "john"
Serverless: Packaging service...
...
Serverless: Stack update finished...
Service Information
service: cncb-event-stream
stage: john
region: us-east-1
stack: cncb-event-stream-john
...
functions:
publish: cncb-event-stream-john-publish
Stack Outputs
PublishLambdaFunctionQualifiedArn: arn:aws:lambda:us-east-1:999999999999:function:cncb-event-stream-john-publish:3
streamArn: arn:aws:kinesis:us-east-1:999999999999:stream/john-cncb-event-stream-s1
streamName: john-cncb-event-stream-s1
...
- Review the stack, stream, and function in the AWS Console.
- Invoke the function with the following command:
$ sls invoke -r us-east-1 -f publish -s $MY_STAGE -d '{"type":"thing-created"}'
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49582906351415672136958521359460531895314381358803976194"
}
- Take a look at the logs:
$ sls logs -f publish -r us-east-1 -s $MY_STAGE
START ...
2018-03-24 23:20:46 ... event: {"type":"thing-created"}
2018-03-24 23:20:46 ... event:
{
"type":"thing-created",
"id":"81fd8920-2fdb-11e8-b749-0d2c43ec73d0",
"partitionKey":"6f4f9a38-61f7-41c9-a3ad-b8c16e42db7c",
"timestamp":1521948046003,
"tags":{
"region":"us-east-1"
}
}
2018-03-24 23:20:46 ... params: {"StreamName":"john-cncb-event-stream-s1","PartitionKey":"6f4f9a38-61f7-41c9-a3ad-b8c16e42db7c","Data":{"type":"Buffer","data":[...]}}
END ...
REPORT ... Duration: 153.47 ms Billed Duration: 200 ms ... Max Memory Used: 39 MB
- Remove the stack once you have finished with npm run rm:lcl -- -s $MY_STAGE.
How it works...
The resources section of the serverless.yml file is used to create cloud resources that are used by services. These resources are defined using standard AWS CloudFormation resource types. In this recipe, we are creating an AWS Kinesis stream. We give the stream a name, define the retention period, and specify the number of shards. The Serverless Framework provides a robust mechanism for dynamically replacing variables.
Here, we use the ${opt:stage} option passed in on the command line and the ${self:service} name defined in the serverless.yml file to create a unique stream name. The standard retention period is 24 hours and the maximum is seven days. For our recipes, one shard will be more than sufficient. We will discuss shards shortly and again in Chapter 7, Optimizing Observability, and Chapter 9, Optimizing Performance.
The Outputs section of the serverless.yml file is where we define values, such as generated IDs and names, that we want to use outside of the stack. We output the Amazon Resource Names (ARNs) streamName and streamArn so that we can reference them with Serverless Framework variables in other projects. These values are also displayed on the Terminal when a deployment is complete.
The publish function defined in the serverless.yml file is used to demonstrate how to publish an event to the stream. We are passing the STREAM_NAME to the function as an environment variable. In the iamRoleStatements section, we give the function kinesis: PutRecord permission to allow it to publish events to this specific stream.
The function handler.js file has runtime dependencies on two external libraries—aws-sdk and uuid. The Serverless Framework will automatically include the runtime dependencies, as defined in the package.json file. Take a look inside the generated .serverless/cncb-event-stream.zip file. The aws-sdk is a special case. It is already available in the AWS Lambda Node.js runtime, and therefore is not included. This is important because aws-sdk is a large library and the ZIP file size impacts cold start times. We will discuss this in more detail in Chapter 9, Optimizing Performance.
The publish function expects to receive an event object as input, such as {"type":"thing-created"}. We then adorn the event with additional information to conform to our standard event format, which we will discuss shortly. Finally, the function creates the required params object and then calls kinesis.putRecord from the aws-sdk. We will be using this function in this and other recipes to simulate event traffic.
All events in our cloud-native systems will conform to the following Event structure to allow for consistent handling across all services. Additional fields are event-type-specific:
interface Event {
id: string;
type: string;
timestamp: number;
partitionKey: string;
tags: { [key: string]: string };
}
- type describes the event, such as thing-created
- timestamp is an epoch value, as returned from Date.now()
- id should be a V1 UUID, which is time-based
- partitionKey should be a V4 UUID, which is random number-based
- tags is a hashmap of useful data values that are leveraged for content-based routing and aggregating event metrics
It is important to use a V4 UUID for the partitionKey to avoid hot shards and maximize concurrency. If a V1 UUID were used, then all events produced at the same time would go to the same shard. The partitionKey will typically be the ID of the domain entity that produced the event, which should use a V4 UUID for the same reason. This has the added benefit of ensuring that all events for the same domain entity are processed through the same shard in the order received.