On 28 JUN 2018 Amazon announced adding support for SQS events in Lambda. This greatly simplifies development of serverless message-driven systems. Previously, if we wanted to listen on SQS events in Lambda, we would need to poll for messages on some time intervals. Lambda would have to be triggered by cron scheduler to check if any new messages appeared in SQS queue. If there was nothing new then it was a waste of resources and money of course.
Now we do not need scheduler anymore and Lambda function will be automatically invoked when new message appears in SQS queue.
In this post I will show how we can use this new feature in Serverless Framework.
If you have not heard of this framework yet then you can have a look at very good documentation here.
SIDE NOTE: At the time of writing this post the SQS events are not yet supported by released Serverless Framework ver. 1.27.3 but there is a PR ready so this should be added to the next release. If you are impatient like me, you can pull the branch from GitHub and build it yourself:
npm install
Then you can use bin/serverless
script to create and deploy project.
I can imagine many use-cases for message-driven architecture, e.g. in order processing system a message could be sent to another service after an order was placed by a customer . Customer would be immediately notified about order submitted, while the whole processing of the order could happen asynchronously (in message-driven manner).
I tried to keep this example as simple as possible and not to dive deep into any specific business scenario.
We will have only two functions:
- sender - will be triggered by REST API and submit a new message to SQS queue
- receiver - will process messages from SQS queue.
Let’s create a new project from aws-nodejs template:
serverless create --template aws-nodejs
This would generate two files:
- serverless.yml - a defintion of new service stack on AWS (using CloudFromation underneath)
- handler.js - a demo function
We can rename handler.js
to receiver.js
and slightly update it.
After updating the generated sources for your demo they would looks as follows:
serverless.yml
service: sqs-triggers-demo
provider:
name: aws
runtime: nodejs6.10
profile: sls
region: us-east-1
iamRoleStatements:
- Effect: "Allow"
Action:
- "sqs:SendMessage"
- "sqs:GetQueueUrl"
Resource: "arn:aws:sqs:${self:provider.region}:811338114632:MyQueue"
- Effect: "Allow"
Action:
- "sqs:ListQueues"
Resource: "arn:aws:sqs:${self:provider.region}:811338114632:*"
functions:
sender:
handler: sender.handler
events:
- http:
path: v1/sender
method: post
receiver:
handler: receiver.handler
events:
- sqs:
arn:
Fn::GetAtt:
- MyQueue
- Arn
resources:
Resources:
MyQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "MyQueue"
Let me explain you what is happening here.
First, we defined our service name sqs-triggers-demo
and specified in which region we want to create it (us-east-1
).
Next, we need to give access to send messages to SQS queue, which will be used by sender
function.
iamRoleStatements:
- Effect: "Allow"
Action:
- "sqs:SendMessage"
- "sqs:GetQueueUrl"
Resource: "arn:aws:sqs:${self:provider.region}:811338114632:MyQueue"
- Effect: "Allow"
Action:
- "sqs:ListQueues"
Resource: "arn:aws:sqs:${self:provider.region}:811338114632:*"
Queue name MyQueue
is hard-coded here. In production code we would probably like to pass it from environment properties.
I have also hard-coded an account ID for simplicity, but we should also pass it as a property. However, syntax in this case is a bit ugly and would distract readers from the main subject. You can have a look here how to parametrize it.
Then we specify two functions: sender
and receiver
and handlers for them which will be coded in files sender.js
and receiver.js
.
Each of them will have one function named handler
.
functions:
sender:
handler: sender.handler
events:
- http:
path: v1/sender
method: post
receiver:
handler: receiver.handler
events:
- sqs:
arn:
Fn::GetAtt:
- MyQueue
- Arn
The receiver
function has an SQS event defined. It will be triggered from MyQueue
.
It needs to have an arn of the queue defined. In this case it’s specified by logical ID, but it could be done also this way:
receiver:
handler: receiver.handler
events:
- sqs:
arn: "arn:aws:sqs:${self:provider.region}:811338114632:MyQueue"
batchSize: 1
Optionally we can define a batch size, which is how many SQS messages at once the Lambda function should process (default and max. value is 10).
The sqs event will hook up your existing SQS Queue to a Lambda function. Serverless won’t create a new queue. This is why we have a resources
section at the end which will create a new queue for us.
resources:
Resources:
MyQueue:
Type: "AWS::SQS::Queue"
Properties:
QueueName: "MyQueue"
sender.js
var AWS = require('aws-sdk');
var sqs = new AWS.SQS({
region: 'us-east-1'
});
exports.handler = function(event, context, callback) {
var accountId = context.invokedFunctionArn.split(":")[4];
var queueUrl = 'https://sqs.us-east-1.amazonaws.com/' + accountId + '/MyQueue';
// response and status of HTTP endpoint
var responseBody = {
message: ''
};
var responseCode = 200;
// SQS message parameters
var params = {
MessageBody: event.body,
QueueUrl: queueUrl
};
sqs.sendMessage(params, function(err, data) {
if (err) {
console.log('error:', "failed to send message" + err);
var responseCode = 500;
} else {
console.log('data:', data.MessageId);
responseBody.message = 'Sent to ' + queueUrl;
responseBody.messageId = data.MessageId;
}
var response = {
statusCode: responseCode,
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(responseBody)
};
callback(null, response);
});
}
At the top of the file we import AWS SDK API used to send to SQS. As we want to use this function from API Gateway we need to return compliant response which is of this form:
{
statusCode: <HTTP status code>,
headers: {
<a map of HTTP headers>
},
body: <response body as string>
};
What we are sending to SQS is a HTTP request body.
receiver.js
exports.handler = (event, context, callback) => {
const response = {
statusCode: 200,
body: JSON.stringify({
message: 'SQS event processed.',
input: event,
}),
};
console.log('event: ', JSON.stringify(event));
var body = event.Records[0].body;
console.log("text: ", JSON.parse(body).text);
callback(null, response);
};
Here we’re not doing more than parsing an event received from SQS and extracting HTTP body passed from sender
function:
var body = event.Records[0].body;
The event contains an array Records
which is an array of SQS messages. The size depends on the batch size specified for SQS event on Lambda function (batchSize
in serverless.yml
) and the number of events in the queue.
Then I am expecting the HTTP body to have text
parameter and print it out:
console.log("text: ", JSON.parse(body).text);
Surely, it will fail if there is no such parameter, but we will test it with the right data :)
Now, we can deploy the application and print info to get the endpoint URL:
serverless deploy
serverless info
It would print something like:
Service Information
service: sqs-triggers-demo
stage: dev
region: us-east-1
stack: sqs-triggers-demo-dev
api keys:
None
endpoints:
POST - https://fochwxb6gh.execute-api.us-east-1.amazonaws.com/dev/v1/sender
functions:
sender: sqs-triggers-demo-dev-sender
receiver: sqs-triggers-demo-dev-receiver
Then tail logs with command:
serverless logs -f receiver
In the other console send HTTP request to sender
endpoint like that:
curl -d '{"text" : "Hello world!"}' https://fochwxb6gh.execute-api.us-east-1.amazonaws.com/dev/v1/sender
The logs from receiver
would look similar to these:
START RequestId: 384a3c5c-5689-59ba-a04a-6fef0588bae9 Version: $LATEST
2018-07-02 00:08:21.571 (+02:00) 384a3c5c-5689-59ba-a04a-6fef0588bae9 event: {"Records":[{"messageId":"df38c9a3-6562-4dd4-b978-98ac4dce42d0","receiptHandle":"AQEBF/CZlWiLwMpUr8GqE4a2ns7nIBv1nAlD/MJw4HCxYlyzNoSPJxjc+J7Rn34IdKq8wAH+I1e8eUW6ZuVEAat6pD4hZ2WYO+oCgnVFTQMF59zTkT7Miw3F36pR8SxywXAnKUtmCdpeVv40Zz+KYjwlP6VooSIrtCFRuLFFocfjKhBVdKb/9B7Rs5ZgB+QOYgB1bBEfaLfzWNQdFk1bBnDx9BSwQ+9mTMe2wgDIIwUpbSy8NxFZlMW2PzqUHm5JU7wz+dN7bKGVvfnh/URlXS+JNfs2IlXkpZIb+4kU5IG3ItBtQo3y1SzmsXlqczKShI9pgEmL4xtuC94tdNbX0+EgTz2NyX2vN8k4/2aafgw5JIefuiriqCmteesqFvT+awPV","body":"{\"text\" : \"Hello world!\"}","attributes":{"ApproximateReceiveCount":"1","SentTimestamp":"1530482900766","SenderId":"AROAINV2MGHNVAM5STGJY:sqs-triggers-demo-dev-sender","ApproximateFirstReceiveTimestamp":"1530482900780"},"messageAttributes":{},"md5OfBody":"63c34d70a5a1b50dba746843a0cefd5b","eventSource":"aws:sqs","eventSourceARN":"arn:aws:sqs:us-east-1:811338114632:MyQueue","awsRegion":"us-east-1"}]}
2018-07-02 00:08:21.571 (+02:00) 384a3c5c-5689-59ba-a04a-6fef0588bae9 text: Hello world!
END RequestId: 384a3c5c-5689-59ba-a04a-6fef0588bae9
REPORT RequestId: 384a3c5c-5689-59ba-a04a-6fef0588bae9 Duration: 0.56 ms Billed Duration: 100 ms Memory Size: 1024 MB Max Memory Used: 20 MB
You may wonder what’s the case with Lambda scaling when we fload SQS with messages. Amazon documentation explains it:
For Lambda functions that process Amazon SQS queues, AWS Lambda will automatically scale the polling on the queue until the maximum concurrency level is reached, where each message batch can be considered a single concurrent unit. AWS Lambda’s automatic scaling behavior is designed to keep polling costs low when a queue is empty while simultaneously enabling you to achieve high throughput when the queue is being used heavily. Here is how it works:
- When an Amazon SQS event source mapping is initially enabled, or when messages first appear after a period without traffic, Lambda begins polling the Amazon SQS queue using five clients, each making long poll requests in parallel.
- Lambda monitors the number of inflight messages, and when it detects that this number is increasing, it will increase the polling frequency by 20 ReceiveMessage requests per minute and the function concurrency by 60 calls per minute. As long as the queue remains busy, scale up continues until at least one of the following occurs:
- Polling frequency reaches 100 simultaneous ReceiveMessage requests and function invocation concurrency reaches 1,000. The account concurrency maximum has been reached.
- The per-function concurrency limit of the function attached to the SQS queue (if any) has been reached.
Note
Account-level limits are impacted by other functions in the account, and per-function concurrency applies to all events sent to a function. For more information, see Managing Concurrency. When AWS Lambda detects that the number of inflight messages is decreasing, it will decrease the polling frequency by 10 ReceiveMessage requests per minute and decrease the concurrency used to invoke your function by 30 calls per minute.
Fortunately, there is a way to limit concurrency per lambda function with reservedConcurrency
property.
E.g. if we wanted to allow only one thread per receiver
function, the function definition would looks like that :
receiver:
handler: receiver.handler
events:
- sqs:
arn:
Fn::GetAtt:
- MyQueue
- Arn
reservedConcurrency: 1
That’s it. You can find source files for this demo at GitHub. Don’t forget to remove the service stack from AWS after you finished playing with it:
serverless remove
I’m glad if you got till the end of this post. If you found it interesting don’t forget to like this article and follow me to be notified about similar ones in future :)
If you like to learn more about handling errors in Lambda together with SQS, see my other post
comments powered by Disqus