Chat Server Example¶
Note
This example is for illustration purposes and does not represent best practices.
A simple chat server example application. This example will walk through deploying a chat application with separate chat rooms and nicknames. It uses a DynamoDB table to store state like connection IDs between websocket messages.
First install a copy of Chalice in a fresh environment, create a new project and cd into the directory:
$ pip install -U chalice
$ chalice new-project chalice-chat-example
$ cd chalice-chat-example
Our Chalice application will need boto3 as a dependency for both DynamoDB
access and in order to communicate back with API Gateway to send websocket
messages. Let’s add a boto3 to the requirements.txt
file:
$ echo "boto3>=1.9.91" > requirements.txt
Now that the requirement has been added. Let’s install it locally since our next script will need it as well:
$ pip install -r requirements.txt
Unlike our previous example where we used chalice deploy
, we will use
chalice package
to create a CloudFormation template. The AWS CLI will be
used to deploy the template. To install the AWS CLI run the command:
$ pip install -U awscli
Starting in Chalice 1.10, the package command has a --merge-template
argument that allows us to merge in a custom JSON file to the generated
CloudFormation template. Since Chalice does not have any built-in support for
DynamoDB currently, we will make a resources.json
file with the DynamoDB
definition. The template file will set the environment variable TABLE in all
our Lambda functions as a CloudFormatiion reference to the DynamoDB table.
Finally, the template will also override our IAM policy with a custom one to
allow all the DynamoDB operations our application will need.
Below is the JSON file that contains all of our custom Cloudformation.
{
"Resources": {
"ChaliceChatTable": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"AttributeDefinitions": [
{
"AttributeName": "PK",
"AttributeType": "S"
},
{
"AttributeName": "SK",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "PK",
"KeyType": "HASH"
},
{
"AttributeName": "SK",
"KeyType": "RANGE"
}
],
"GlobalSecondaryIndexes": [
{
"IndexName": "ReverseLookup",
"KeySchema": [
{
"AttributeName": "SK",
"KeyType": "HASH"
},
{
"AttributeName": "PK",
"KeyType": "RANGE"
}
],
"Projection": {
"ProjectionType": "ALL"
},
"ProvisionedThroughput": {
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
}
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
},
"TableName": "ChaliceChat"
}
},
"WebsocketConnect": {
"Properties": {
"Environment": {
"Variables": {
"TABLE": {
"Ref": "ChaliceChatTable"
}
}
}
}
},
"WebsocketMessage": {
"Properties": {
"Environment": {
"Variables": {
"TABLE": {
"Ref": "ChaliceChatTable"
}
}
}
}
},
"WebsocketDisconnect": {
"Properties": {
"Environment": {
"Variables": {
"TABLE": {
"Ref": "ChaliceChatTable"
}
}
}
}
},
"DefaultRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
"Policies": [
{
"PolicyName": "DefaultRolePolicy",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"execute-api:ManageConnections"
],
"Resource": "arn:aws:execute-api:*:*:*/@connections/*"
},
{
"Effect": "Allow",
"Action": [
"dynamodb:DeleteItem",
"dynamodb:PutItem",
"dynamodb:GetItem",
"dynamodb:UpdateItem",
"dynamodb:Query",
"dynamodb:Scan"
],
"Resource": [
{
"Fn::Sub": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${ChaliceChatTable}"
},
{
"Fn::Sub": "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${ChaliceChatTable}/index/ReverseLookup"
}
]
}
]
}
}
]
}
}
}
}
The current directory layout should now look like this:
$ tree -a .
.
├── .chalice
│ └── config.json
├── .gitignore
├── app.py
├── resources.json
└── requirements.txt
1 directory, 5 files
Next let’s fill out the app.py
file since it is pretty simple. Most of this
example code is contained in the chalicelib/
directory.
from boto3.session import Session
from chalice import Chalice
from chalicelib import Storage
from chalicelib import Sender
from chalicelib import Handler
app = Chalice(app_name="chalice-chat-example")
app.websocket_api.session = Session()
app.experimental_feature_flags.update([
'WEBSOCKETS'
])
STORAGE = Storage.from_env()
SENDER = Sender(app, STORAGE)
HANDLER = Handler(STORAGE, SENDER)
@app.on_ws_connect()
def connect(event):
STORAGE.create_connection(event.connection_id)
@app.on_ws_disconnect()
def disconnect(event):
STORAGE.delete_connection(event.connection_id)
@app.on_ws_message()
def message(event):
HANDLER.handle(event.connection_id, event.body)
Similar to the previous example. We need to use boto3
to construct a
Session and pass it to app.websocket_api.session
. We opt into the
usage of the WEBSOCKET
experimental feature. Most of the actual work is
done in some classes that we import from chalicelib/
. These classes are
detailed below, and the various parts are explained in comments and doc
strings. In addition to the previous example, we register a handler for
on_ws_connect
and on_ws_disconnect
to handle events from API gateway
when a new socket is trying to connect, or an existing socket is disconnected.
Finally before being able to deploy and test the app out, we need to fill out
the chalicelib directory. This is the bulk of the app and it is explained
inline in comments. Create a new directory called chalicelib
and inside
that directory create an __init__.py
file and fill it out with the
following file.
import os
import boto3
from boto3.dynamodb.conditions import Key
from chalice import WebsocketDisconnectedError
class Storage(object):
"""An abstraction to interact with the DynamoDB Table."""
def __init__(self, table):
"""Initialize Storage object
:param table: A boto3 dynamodb Table resource object.
"""
self._table = table
@classmethod
def from_env(cls):
"""Create table from the environment.
The environment variable TABLE is present for a deployed application
since it is set in all of the Lambda functions by a CloudFormation
reference. We default to '', which will happen when we run
``chalice package`` since it loads the application, and no
environment variable has been set. For local testing, a value should
be manually set in the environment if '' will not suffice.
"""
table_name = os.environ.get('TABLE', '')
table = boto3.resource('dynamodb').Table(table_name)
return cls(table)
def create_connection(self, connection_id):
"""Create a new connection object in the dtabase.
When a new connection is created, we create a stub for
it in the table. The stub uses a primary key of the
connection_id and a sort key of username_. This translates
to a connection with an unset username. The first message
sent over the wire from the connection is to be used as the
username, and this entry will be re-written.
:param connection_id: The connection id to write to
the table.
"""
self._table.put_item(
Item={
'PK': connection_id,
'SK': 'username_',
},
)
def set_username(self, connection_id, old_name, username):
"""Set the username.
The SK entry that goes with this connection id that starts
with username_ is taken to be the username. The previous
entry needs to be deleted, and a new entry needs to be
written.
:param connection_id: Connection id of the user trying to
change their name.
:param old_name: The original username. Since this is part of
the key, it needs to be deleted and re-created rather than
updated.
:param username: The new username the user wants.
"""
self._table.delete_item(
Key={
'PK': connection_id,
'SK': 'username_%s' % old_name,
},
)
self._table.put_item(
Item={
'PK': connection_id,
'SK': 'username_%s' % username,
},
)
def list_rooms(self):
"""Get a list of all rooms that exist.
Scan through the table looking for SKs that start with room_
which indicates a room that a user is in. Collect a unique set
of those and return them.
"""
r = self._table.scan()
rooms = set([item['SK'].split('_', 1)[1] for item in r['Items']
if item['SK'].startswith('room_')])
return rooms
def set_room(self, connection_id, room):
"""Set the room a user is currently in.
The room a user is in is in the form of an SK that starts with
room_ prefix.
:param connection_id: The connection id to move to a room.
:param room: The room name to join.
"""
self._table.put_item(
Item={
'PK': connection_id,
'SK': 'room_%s' % room,
},
)
def remove_room(self, connection_id, room):
"""Remove a user from a room.
The room a user is in is in the form of an SK that starts with
room_ prefix. To leave a room we need to delete this entry.
:param connection_id: The connection id to move to a room.
:param room: The room name to join.
"""
self._table.delete_item(
Key={
'PK': connection_id,
'SK': 'room_%s' % room,
},
)
def get_connection_ids_by_room(self, room):
"""Find all connection ids that go to a room.
This is needed whenever we broadcast to a room. We collect all
their connection ids so we can send messages to them. We use a
ReverseLookup table here which inverts the PK, SK relationship
creating a partition called room_{room}. Everything in that
partition is a connection in the room.
:param room: Room name to get all connection ids from.
"""
r = self._table.query(
IndexName='ReverseLookup',
KeyConditionExpression=(
Key('SK').eq('room_%s' % room)
),
Select='ALL_ATTRIBUTES',
)
return [item['PK'] for item in r['Items']]
def delete_connection(self, connection_id):
"""Delete a connection.
Called when a connection is disconnected and all its entries need
to be deleted.
:param connection_id: The connection partition to delete from
the table.
"""
try:
r = self._table.query(
KeyConditionExpression=(
Key('PK').eq(connection_id)
),
Select='ALL_ATTRIBUTES',
)
for item in r['Items']:
self._table.delete_item(
Key={
'PK': connection_id,
'SK': item['SK'],
},
)
except Exception as e:
print(e)
def get_record_by_connection(self, connection_id):
"""Get all the properties associated with a connection.
Each connection_id creates a partition in the table with multiple
SK entries. Each SK entry is in the format {property}_{value}.
This method reads all those records from the database and puts them
all into dictionary and returns it.
:param connection_id: The connection to get properties for.
"""
r = self._table.query(
KeyConditionExpression=(
Key('PK').eq(connection_id)
),
Select='ALL_ATTRIBUTES',
)
r = {
entry['SK'].split('_', 1)[0]: entry['SK'].split('_', 1)[1]
for entry in r['Items']
}
return r
class Sender(object):
"""Class to send messages over websockets."""
def __init__(self, app, storage):
"""Initialize a sender object.
:param app: A Chalice application object.
:param storage: A Storage object.
"""
self._app = app
self._storage = storage
def send(self, connection_id, message):
"""Send a message over a websocket.
:param connection_id: API Gateway Connection ID to send a
message to.
:param message: The message to send to the connection.
"""
try:
# Call the chalice websocket api send method
self._app.websocket_api.send(connection_id, message)
except WebsocketDisconnectedError as e:
# If the websocket has been closed, we delete the connection
# from our database.
self._storage.delete_connection(e.connection_id)
def broadcast(self, connection_ids, message):
""""Send a message to multiple connections.
:param connection_id: A list of API Gateway Connection IDs to
send the message to.
:param message: The message to send to the connections.
"""
for cid in connection_ids:
self.send(cid, message)
class Handler(object):
"""Handler object that handles messages received from a websocket.
This class implements the bulk of our app behavior.
"""
def __init__(self, storage, sender):
"""Initialize a Handler object.
:param storage: Storage object to interact with database.
:param sender: Sender object to send messages to websockets.
"""
self._storage = storage
self._sender = sender
# Command table to translate a string command name into a
# method to call.
self._command_table = {
'help': self._help,
'nick': self._nick,
'join': self._join,
'room': self._room,
'quit': self._quit,
'ls': self._list,
}
def handle(self, connection_id, message):
"""Entry point for our application.
:param connection_id: Connection id that the message came from.
:param message: Message we got from the connection.
"""
# First look the user up in the database and get a record for it.
record = self._storage.get_record_by_connection(connection_id)
if record['username'] == '':
# If the user does not have a username, we assume that the message
# is the username they want and we call _handle_login_message.
self._handle_login_message(connection_id, message)
else:
# Otherwise we assume the user is logged in. So we call
# a method to handle the message. We pass along the
# record we loaded from the database so we don't need to
# again.
self._handle_message(connection_id, message, record)
def _handle_login_message(self, connection_id, message):
"""Handle a login message.
The message is the username to give the user. Re-write the
database entry for this user to reset their username from ''
to {message}. Once that is done send a message back to the user
to confirm the name choice. Also send a /help prompt.
"""
self._storage.set_username(connection_id, '', message)
self._sender.send(
connection_id,
'Using nickname: %s\nType /help for list of commands.' % message
)
def _handle_message(self, connection_id, message, record):
""""Handle a message from a connected and logged in user.
If the message starts with a / it's a command. Otherwise its a
text message to send to all rooms in the room.
:param connection_id: Connection id that the message came from.
:param message: Message we got from the connection.
:param record: A data record about the sender.
"""
if message.startswith('/'):
self._handle_command(connection_id, message[1:], record)
else:
self._handle_text(connection_id, message, record)
def _handle_command(self, connection_id, message, record):
"""Handle a command message.
Check the command name and look it up in our command table.
If there is an entry, we call that method and pass along
the connection_id, arguments, and the loaded record.
:param connection_id: Connection id that the message came from.
:param message: Message we got from the connection.
:param record: A data record about the sender.
"""
args = message.split(' ')
command_name = args.pop(0).lower()
command = self._command_table.get(command_name)
if command:
command(connection_id, args, record)
else:
# If no command method is found, send an error message
# back to the user.
self._sender(
connection_id, 'Unknown command: %s' % command_name)
def _handle_text(self, connection_id, message, record):
"""Handle a raw text message.
:param connection_id: Connection id that the message came from.
:param message: Message we got from the connection.
:param record: A data record about the sender.
"""
if 'room' not in record:
# If the user is not in a room send them an error message
# and return early.
self._sender.send(
connection_id, 'Cannot send message if not in chatroom.')
return
# Collect a list of connection_ids in the same room as the message
# sender.
connection_ids = self._storage.get_connection_ids_by_room(
record['room'])
# Prefix the message with the sender's name.
message = '%s: %s' % (record['username'], message)
# Broadcast the new message to everyone in the room.
self._sender.broadcast(connection_ids, message)
def _help(self, connection_id, _message, _record):
"""Send the help message.
Build a help message and send back to the same connection.
:param connection_id: Connection id that the message came from.
"""
self._sender.send(
connection_id,
'\n'.join([
'Commands available:',
' /help',
' Display this message.',
' /join {chat_room_name}',
' Join a chatroom named {chat_room_name}.',
' /nick {nickname}',
' Change your name to {nickname}. If no {nickname}',
' is provided then your current name will be printed',
' /room',
' Print out the name of the room you are currently ',
' in.',
' /ls',
' If you are in a room, list all users also in the',
' room. Otherwise, list all rooms.',
' /quit',
' Leave current room.',
'',
'If you are in a room, raw text messages that do not start ',
'with a / will be sent to everyone else in the room.',
]),
)
def _nick(self, connection_id, args, record):
"""Change or check nickname (username).
:param connection_id: Connection id that the message came from.
:param args: Argument list that came after the command.
:param record: A data record about the sender.
"""
if not args:
# If a nickname argument was not provided, we just want to
# report the current nickname to the user.
self._sender.send(
connection_id, 'Current nickname: %s' % record['username'])
return
# The first argument is assumed to be the new desired nickname.
nick = args[0]
# Change the username from record['username'] to nick in the storage
# layer.
self._storage.set_username(connection_id, record['username'], nick)
# Send a message to the requestor to confirm the nickname change.
self._sender.send(connection_id, 'Nickname is: %s' % nick)
# Get the room the user is in.
room = record.get('room')
if room:
# If the user was in a room, announce to the room they have
# changed their name. Don't send this me sage to the user since
# they already got a name change message.
room_connections = self._storage.get_connection_ids_by_room(room)
room_connections.remove(connection_id)
self._sender.broadcast(
room_connections,
'%s is now known as %s.' % (record['username'], nick))
def _join(self, connection_id, args, record):
"""Join a chat room.
:param connection_id: Connection id that the message came from.
:param args: Argument list. The first argument should be the
name of the room to join.
:param record: A data record about the sender.
"""
# Get the room name to join.
room = args[0]
# Call quit to leave the current room we are in if there is any.
self._quit(connection_id, '', record)
# Get a list of connections in the target chat room.
room_connections = self._storage.get_connection_ids_by_room(room)
# Join the target chat room.
self._storage.set_room(connection_id, room)
# Send a message to the requestor that they have joined the room.
# At the same time send an announcement to everyone who was already
# in the room to alert them of the new user.
self._sender.send(
connection_id, 'Joined chat room "%s"' % room)
message = '%s joined room.' % record['username']
self._sender.broadcast(room_connections, message)
def _room(self, connection_id, _args, record):
"""Report the name of the current room.
:param connection_id: Connection id that the message came from.
:param record: A data record about the sender.
"""
if 'room' in record:
# If the user is in a room send them the name back.
self._sender.send(connection_id, record['room'])
else:
# If the user is not in a room. Tell them so, and how to
# join a room.
self._sender.send(
connection_id,
'Not currently in a room. Type /join {room_name} to do so.'
)
def _quit(self, connection_id, _args, record):
"""Quit from a room.
:param connection_id: Connection id that the message came from.
:param record: A data record about the sender.
"""
if 'room' not in record:
# If the user is not in a room there is nothing to do.
return
# Find the current room name, and delete that entry from
# the database.
room_name = record['room']
self._storage.remove_room(connection_id, room_name)
# Send a message to the user to inform them they left the room.
self._sender.send(
connection_id, 'Left chat room "%s"' % room_name)
# Tell everyone in the room that the user has left.
self._sender.broadcast(
self._storage.get_connection_ids_by_room(room_name),
'%s left room.' % record['username'],
)
def _list(self, connection_id, _args, record):
"""Show a context dependent listing.
:param connection_id: Connection id that the message came from.
:param record: A data record about the sender.
"""
room = record.get('room')
if room:
# If the user is in a room, get a listing of everyone
# in the room.
result = [
self._storage.get_record_by_connection(c_id)['username']
for c_id in self._storage.get_connection_ids_by_room(room)
]
else:
# If they are not in a room. Get a listing of all rooms
# currently open.
result = self._storage.list_rooms()
# Send the result list back to the requestor.
self._sender.send(connection_id, '\n'.join(result))
The final directory layout should be
$ tree -a .
.
├── .chalice
│ ├── config.json
├── .gitignore
├── app.py
├── chalicelib
│ └── __init__.py
├── resources.json
└── requirements.txt
2 directories, 6 files
Deploying our app with CloudFormation requires 3 steps. First we use Chalice to package our app into a JSON CloudFormation template:
$ chalice package --merge-template resources.json out
This will result in a new directory called out
being created, inside which
there is a sam.json
file. This template contains our Chalice app as a
CloudFormation template, merged with our resources.json
template.
Next we use the AWS CLI to package this template, and prepare it for
deployment. In order for this to work you will need to replace $BUCKET
with the name of a bucket you control:
$ aws cloudformation package --template-file out/sam.json \
--s3-bucket $BUCKET --output-template-file out/template.yml
Once this is complete, a new template should be located at out/template.yml
this is the final CloudFormation template which is ready for deployment.
Deploying it with the AWS CLI can be done with the following command:
$ aws cloudformation deploy --template-file out/template.yml \
--stack-name ChaliceChat --capabilities CAPABILITY_IAM
This command should wait awhile, and once it exits the app should be ready. To
get the websocket connection URL, we can use the AWS CLI again to check the
stack output WebsocketConnectEndpointURL
:
$ aws cloudformation describe-stacks --stack-name ChaliceChat \
--query "Stacks[0].Outputs[?OutputKey=='WebsocketConnectEndpointURL'].OutputValue" \
--output text
wss://{id}.execute-api.{region}.amazonaws.com/api/
Once deployed we can take the result of the previous command and connect to it
using wsdump.py
. Below is a sample of two running clients, the first
message sent to the server is used as the client’s username.
$ wsdump.py wss://{id}.execute-api.{region}.amazonaws.com/api/
Press Ctrl+C to quit
> John
< Using nickname: John
Type /help for list of commands.
> /help
< Commands available:
/help
Display this message.
/join {chat_room_name}
Join a chatroom named {chat_room_name}.
/nick {nickname}
Change your name to {nickname}. If no {nickname}
is provided then your current name will be printed
/room
Print out the name of the room you are currently
in.
/ls
If you are in a room, list all users also in the
room. Otherwise, list all rooms.
/quit
Leave current room.
If you are in a room, raw text messages that do not start
with a / will be sent to everyone else in the room.
> /join chalice
< Joined chat room "chalice"
< Jenny joined room.
> Hi
< John: Hi
< Jenny is now known as JennyJones.
> /quit
< Left chat room "chalice"
> /ls
< chalice
> Ctrl-D
$ wsdump.py wss://{id}.execute-api.{region}.amazonaws.com/api/
Press Ctrl+C to quit
> Jenny
< Using nickname: Jenny
Type /help for list of commands.
> /help
< Commands available:
/help
Display this message.
/join {chat_room_name}
Join a chatroom named {chat_room_name}.
/nick {nickname}
Change your name to {nickname}. If no {nickname}
is provided then your current name will be printed
/room
Print out the name of the room you are currently
in.
/ls
If you are in a room, list all users also in the
room. Otherwise, list all rooms.
/quit
Leave current room.
If you are in a room, raw text messages that do not start
with a / will be sent to everyone else in the room.
> /join chalice
< Joined chat room "chalice"
> /ls
< John
Jenny
< John: Hi
> /nick JennyJones
< Nickname is: JennyJones
< John left room.
> /ls
< JennyJones
> /room
< chalice
> /nick
< Current nickname: JennyJones
> Ctrl-D
To delete the resources you can run use the AWS CLI to delete the stack:
$ aws cloudformation delete-stack --stack-name ChaliceChat