Lambda Event Sources¶
Scheduled Events¶
Chalice has support for scheduled events. This feature allows you to periodically invoke a lambda function based on some regular schedule. You can specify a fixed rate or a cron expression.
To create a scheduled event in chalice, you use the @app.schedule()
decorator. Let’s look at an example.
app = chalice.Chalice(app_name='foo')
@app.schedule('rate(1 hour)')
def every_hour(event):
print(event.to_dict())
In this example, we have a single lambda function that we want automatically
invoked every hour. When you run chalice deploy
Chalice will create a
lambda function as well as the necessary CloudWatch events/rules such that the
every_hour
function is invoked every hour.
The Chalice.schedule()
method accepts either a string or an
instance of Rate
or Cron
. For example:
app = chalice.Chalice(app_name='foo')
@app.schedule(Rate(1, unit=Rate.HOURS))
def every_hour(event):
print(event.to_dict())
The function you decorate must accept a single argument,
which will be of type CloudWatchEvent
.
You can use the schedule()
decorator multiple times
in your chalice app. Each schedule()
decorator will
result in a new lambda function and associated CloudWatch
event rule. For example:
app = chalice.Chalice(app_name='foo')
@app.schedule(Rate(1, unit=Rate.HOURS))
def every_hour(event):
print(event.to_dict())
@app.schedule(Rate(2, unit=Rate.HOURS))
def every_two_hours(event):
print(event.to_dict())
In the app above, chalice will create two lambda functions,
and configure every_hour
to be invoked once an hour,
and every_two_hours
to be invoked once every two hours.
CloudWatch Events¶
You can configure a lambda function to subscribe to any CloudWatch Event.
To subscribe to a CloudWatch Event in chalice, you use the
@app.on_cw_event()
decorator. Let’s look at an example.
app = chalice.Chalice(app_name='foo')
@app.on_cw_event({"source": ["aws.codecommit"]})
def on_code_commit_changes(event):
print(event.to_dict())
In this example, we have a single lambda function that we subscribe to all events from the AWS Code Commit service. The first parameter to the decorator is the event pattern that will be used to filter the events sent to the function.
See the CloudWatch Event pattern docs for additional syntax and examples.
The function you decorate must accept a single argument,
which will be of type CloudWatchEvent
.
S3 Events¶
You can configure a lambda function to be invoked whenever certain events happen in an S3 bucket. This uses the event notifications feature provided by Amazon S3.
To configure this, you just tell Chalice the name of an existing
S3 bucket, along with what events should trigger the lambda function.
This is done with the Chalice.on_s3_event()
decorator.
Here’s an example:
from chalice import Chalice
app = chalice.Chalice(app_name='s3eventdemo')
app.debug = True
@app.on_s3_event(bucket='mybucket-name',
events=['s3:ObjectCreated:*'])
def handle_s3_event(event):
app.log.debug("Received event for bucket: %s, key: %s",
event.bucket, event.key)
In this example above, Chalice connects the S3 bucket to the
handle_s3_event
Lambda function such that whenever an object is uploaded
to the mybucket-name
bucket, the Lambda function will be invoked.
This example also uses the .bucket
and .key
attributes from the
event
parameter, which is of type S3Event
.
It will automatically create the appropriate S3 notification configuration
as needed. Chalice will also leave any existing notification configuration
on the mybucket-name
untouched. It will only merge in the additional
configuration needed for the handle_s3_event
Lambda function.
Warning
This feature only works when using chalice deploy. Because you
configure the lambda function with the name of an existing S3 bucket,
it is not possible to describe this using a CloudFormation/SAM template.
The chalice package
command will fail. You will eventually be able
to request that chalice create a bucket for you, which will support
the chalice package
command.
The function you decorate must accept a single argument,
which will be of type S3Event
.
SNS Events¶
You can configure a lambda function to be automatically invoked whenever something publishes to an SNS topic. Chalice will automatically handle creating the lambda function, subscribing the lambda function to the SNS topic, and modifying the lambda function policy to allow SNS to invoke the function.
To configure this, you just need the name of an existing SNS topic you’d like to subscribe to. The SNS topic must already exist.
Below is an example of how to set this up. The example uses boto3 to create the SNS topic. If you don’t have boto3 installed in your virtual environment, be sure to install it with:
$ pip install boto3
First, we’ll create an SNS topic using boto3.
$ python
>>> import boto3
>>> sns = boto3.client('sns')
>>> sns.create_topic(Name='my-demo-topic')
{'TopicArn': 'arn:aws:sns:us-west-2:12345:my-demo-topic',
'ResponseMetadata': {}}
Next, we’ll create our chalice app:
$ chalice new-project chalice-demo-sns
$ cd chalice-demo-sns/
We’ll update the app.py
file to use the on_sns_message
decorator:
from chalice import Chalice
app = Chalice(app_name='chalice-sns-demo')
app.debug = True
@app.on_sns_message(topic='my-demo-topic')
def handle_sns_message(event):
app.log.debug("Received message with subject: %s, message: %s",
event.subject, event.message)
We can now deploy our chalice app:
$ chalice deploy
Creating deployment package.
Creating IAM role: chalice-demo-sns-dev
Creating lambda function: chalice-demo-sns-dev-handle_sns_message
Subscribing chalice-demo-sns-dev-handle_sns_message to SNS topic my-demo-topic
Resources deployed:
- Lambda ARN: arn:aws:lambda:us-west-2:123:function:...
And now we can test our app by publishing a few SNS messages to our topic.
We’ll do this using boto3. In the example below, we’re using list_topics()
to find the ARN associated with our topic name before calling the publish()
method.
$ python
>>> import boto3
>>> sns = boto3.client('sns')
>>> topic_arn = [t['TopicArn'] for t in sns.list_topics()['Topics']
... if t['TopicArn'].endswith(':my-demo-topic')][0]
>>> sns.publish(Message='TestMessage1', Subject='TestSubject1',
... TopicArn=topic_arn)
{'MessageId': '12345', 'ResponseMetadata': {}}
>>> sns.publish(Message='TestMessage2', Subject='TestSubject2',
... TopicArn=topic_arn)
{'MessageId': '54321', 'ResponseMetadata': {}}
To verify our function was called correctly, we can use the chalice logs
command:
$ chalice logs -n handle_sns_message
2018-06-28 17:49:30.513000 547e0f chalice-demo-sns - DEBUG - Received message with subject: TestSubject1, message: TestMessage1
2018-06-28 17:49:40.391000 547e0f chalice-demo-sns - DEBUG - Received message with subject: TestSubject2, message: TestMessage2
In this example we used the SNS topic name to register our handler, but you can also use the topic arn. This can be useful if your topic is in another region or account.
SQS Events¶
You can configure a lambda function to be invoked whenever messages are
available on an SQS queue. To configure this, use the
Chalice.on_sqs_message()
decorator and provide the name of the SQS queue
and an optional batch size.
The message visibility timeout of your SQS queue must be greater than or equal to the lambda timeout. The default message visibility timeout when you create an SQS queue is 30 seconds, and the default timeout for a Lambda function is 60 seconds, so you’ll need to modify one of these values in order to successfully connect an SQS queue to a Lambda function.
You can check the visibility timeout of your queue using the
GetQueueAttributes
API call. Using the
AWS CLI,
you can run this command to check the value:
$ aws sqs get-queue-attributes \
--queue-url https://us-west-2.queue.amazonaws.com/1/testq \
--attribute-names VisibilityTimeout
{
"Attributes": {
"VisibilityTimeout": "30"
}
}
You can set the visibility timeout of your SQS queue using the
SetQueueAttributes
API call. Again using the AWS CLI you can
run this command:
$ aws sqs set-queue-attributes \
--queue-url https://us-west-2.queue.amazonaws.com/1/testq \
--attributes VisibilityTimeout=60
If you would prefer to change the timeout of your lambda function instead,
you can specify this timeout value using the lambda_timeout
config key
if your .chalice/config.json
file.
See Lambda Specific Configuration for a list of all supported lambda configuration
values in chalice. In this example below, we’re setting the timeout
of our handle_sqs_message
lambda function to 30 seconds:
$ cat .chalice/config.json
{
"stages": {
"dev": {
"lambda_functions": {
"handle_sqs_message": {
"lambda_timeout": 30
}
}
}
},
"version": "2.0",
"app_name": "chalice-sqs-demo"
}
In this example below, we’re connecting the handle_sqs_message
lambda
function to the my-queue
SQS queue. Note that we are specifying the
queue name, not the queue URL or queue ARN. If you are connecting your
lambda function to a FIFO queue, make sure you specify the .fifo
suffix, e.g. my-queue.fifo
.
from chalice import Chalice
app = chalice.Chalice(app_name='chalice-sqs-demo')
app.debug = True
@app.on_sqs_message(queue='my-queue', batch_size=1)
def handle_sqs_message(event):
for record in event:
app.log.debug("Received message with contents: %s", record.body)
Whenever a message is sent to the SQS queue our function will be automatically
invoked. The function argument is an SQSEvent
object, and each
record
in the example above is of type SQSRecord
. Lambda takes
care of automatically scaling your function as needed. See Understanding
Scaling Behavior for more information on how Lambda scaling works.
If your lambda function completes without raising an exception, then
Lambda will automatically delete all the messages associated with the
SQSEvent
. You don’t need to manually call sqs.delete_message()
in your lambda function. If your lambda function raises an exception, then
Lambda won’t delete any messages, and once the visibility timeout has been
reached, the messages will be available again in the SQS queue. Note that
if you are using a batch size of more than one, the entire batch succeeds or
fails. This means that it is possible for your lambda function to see
a message multiple times, even if it’s successfully processed the message
previously. There are a few options available to mitigate this:
Use a batch size of 1 (the default value).
Use a separate data store to check if you’ve already processed an SQS message. You can use services such as Amazon DynamoDB or Amazon ElastiCache.
Manually call
sqs.delete_message()
in your Lambda function once you’ve successfully processed a message.
For more information on Lambda and SQS, see the AWS documentation.
Kinesis Events¶
You can configure a Lambda function to be invoked whenever messages are
published to an Amazon Kinesis data stream. To configure this, use the
Chalice.on_kinesis_record()
decorator and provide the name of the
Kinesis stream.
The KinesisEvent
that is passed in as the event
argument
to the event handler is also iterable. This allows you to iterate over
all the records in the event. Additionally, each record has a .data
attribute that is automatically base64 decoded for you.
Here’s an example:
from chalice import Chalice
app = chalice.Chalice(app_name='kinesiseventdemo')
app.debug = True
@app.on_kinesis_record(stream='mystream')
def handle_kinesis_message(event):
for record in event:
# The .data attribute is automatically base64 decoded for you.
app.log.debug("Received message with contents: %s", record.data)
For more information on using Kinesis and Lambda, see Using AWS Lambda with Amazon Kinesis.
DynamoDB Events¶
You can configure a Lambda function to be invoked whenever messages are
published to an Amazon DynamoDB stream. To configure this, use the
Chalice.on_dynamodb_record()
decorator and provide the name of the
DynamoDB stream ARN.
Note
Other event handlers such as Chalice.on_kinesis_record()
,
Chalice.on_sqs_message()
, and Chalice.on_sns_message()
only require the resource name and not the full ARN. In the case
of DynamoDB streams, there are auto-generated portions of the
stream ARN that cannot be computed based on the resource name. This
is why Chalice requires that full stream ARN when configuring
a DynamoDB stream handler.
The DynamoDBEvent
that is passed in as the event
argument
to the event handler is also iterable. This allows you to iterate over
all the records in the event.
Here’s an example:
from chalice import Chalice
app = chalice.Chalice(app_name='ddb-event-demo')
app.debug = True
@app.on_dynamodb_record(stream_arn='arn:aws:dynamodb:.../stream/2020')
def handle_ddb_message(event):
for record in event:
app.log.debug("New: %s", record.new_image)
For more information on using Lambda and DynamoDB, see Using AWS Lambda with Amazon DynamoDB.