Skip to content

Publish/Subscribe Architectures

Copilot Worker Services take advantage of the publish field common to all service and job types to allow customers to easily create publish/subscribe logic for passing messages between services.

A common pattern in AWS is the combination of SNS and SQS to deliver and process messages. SNS is a robust message delivery system which can send messages to a variety of subscribed endpoints with guarantees about message delivery.

SQS is a message queue to allow asynchronous processing of messages. Queues can be populated by one or more SNS topics or AWS EventBridge event filters.

The combination of these two services effectively decouples the sending and receipt of messages, meaning publishers don't have to care what queues are actually subscribed to their topics, and worker service code doesn't have to care where the messages come from.

Sending Messages from a Publisher

To allow an existing service to publish messages to SNS, simply set the publish field in its manifest. We suggest using a name for the topic that describes its function.

# manifest.yml for api service
name: api
type: Backend Service

publish:
  topics:
    - name: ordersTopic

This will create an SNS topic and set a resource policy on the topic to allow SQS queues in your AWS account to create subscriptions.

Copilot also injects the ARNs of any SNS topics into your container under the environment variable COPILOT_SNS_TOPIC_ARNS. The JSON string is of the format:

{
  "firstTopicName": "arn:aws:sns:us-east-1:123456789012:firstTopic",
  "secondTopicName": "arn:aws:sns:us-east-1:123456789012:secondTopic",
}

Javascript Example

Once the publishing service has been deployed, you can send messages to SNS via the AWS SDK for SNS.

const { SNSClient, PublishCommand } = require("@aws-sdk/client-sns");
const client = new SNSClient({ region: "us-west-2" });
const {ordersTopic} = JSON.parse(process.env.COPILOT_SNS_TOPIC_ARNS);
const out = await client.send(new PublishCommand({
   Message: "hello",
   TopicArn: ordersTopic,
 }));

Subscribing to a topic with a Worker Service

To subscribe to an existing SNS topic with a worker service, you'll need to edit the worker service's manifest. Using the subscribe field in the manifest, you can define subscriptions to existing SNS topics exposed by other services in your environment. In this example, we'll use the ordersTopic topic which the api service from the last section exposed. We'll also customize the worker service's queue to enable a dead-letter queue. The tries field tells SQS how many times to try redelivering a failed message before sending it to the DLQ for further inspection.

name: orders-worker
type: Worker Service

subscribe:
  topics:
    - name: ordersTopic
      service: api
  queue:
    dead_letter:
      tries: 5

Copilot will create a subscription between this worker service's queue and the ordersTopic topic from the api service. It will also inject the queue URI into the service container under the environment variable COPILOT_QUEUE_URI.

If you specify one or more topic-specific queues, you can access those queue URIs via the COPILOT_TOPIC_QUEUE_URIS variable. This variable is a JSON map from a unique identifier for the topic-specific queue to its URI.

For example, a worker service with a topic-specific queue for the orders topic from the merchant service and a FIFO topic transactions from the merchant service will have the following JSON structure.

// COPILOT_TOPIC_QUEUE_URIS
{
  "merchantOrdersEventsQueue": "https://sqs.eu-central-1.amazonaws.com/...",
  "merchantTransactionsfifoEventsQueue": "https://sqs.eu-central-1.amazonaws.com/..."
}

Javascript Example

The central business logic of a worker service's container involves pulling messages from the queue. To do this with the AWS SDK, you can use the SQS Clients for your language of choice. In Javascript, the logic for pulling, processing, and deleting messages from the queue would look like the following code snipped.

const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const client = new SQSClient({ region: "us-west-2" });
const out = await client.send(new ReceiveMessageCommand({
            QueueUrl: process.env.COPILOT_QUEUE_URI,
            WaitTimeSeconds: 10,
}));

console.log(`results: ${JSON.stringify(out)}`);

if (out.Messages === undefined || out.Messages.length === 0) {
    return;
}

// Process the message here.

await client.send( new DeleteMessageCommand({
    QueueUrl: process.env.COPILOT_QUEUE_URI,
    ReceiptHandle: out.Messages[0].ReceiptHandle,
}));