コンテンツにスキップ

パブリッシュ / サブスクライブ アーキテクチャ

Copilot Worker Servicesは、すべての Service タイプと Job タイプに共通する publish フィールドを利用して、サービス間でメッセージを受け渡すためのパブリッシュ / サブスクライブロジックを簡単に作成できます。

AWS 上での一般的なパターンは、メッセージの配信と処理を行うための SNS と SQS の組み合わせです。SNS は堅牢なメッセージ配信システムで、メッセージの配信を保証しながら複数のサブスクライブしたエンドポイントにメッセージを送ることができます。

SQSは、メッセージの非同期処理を可能にするメッセージキューです。キューには 1 つまたは複数の SNS トピックや、AWS EventBridge からのイベントを投入できます。

この 2 つのサービスを組み合わせることで、メッセージの送受信を効果的に疎結合にできます。つまり、パブリッシャーは自分のトピックをサブスクライブしているキューを意識する必要がなく、また Worker Service のコードはメッセージがどこから来るかを気にする必要がありません。

パブリッシャーからのメッセージ送信

既存のサービスから SNS へのメッセージのパブリッシュを許可するには、Manifest に publish フィールドを設定するだけです。 SNS トピックの機能を表す名前を付けることを提案します。

# api サービス用の manifest.yml
name: api
type: Backend Service

publish:
  topics:
    - name: ordersTopic

これにより、SNS トピックが作成されます。また、トピックにリソースポリシーが設定され、AWS アカウントにある SQS キューがサブスクリプションを作成できるようになります。

また Copilot は、任意の SNS トピックの ARN をコンテナ内の環境変数 COPILOT_SNS_TOPIC_ARNS に注入します。 JSON 文字列の形式 :

{
  "firstTopicName": "arn:aws:sns:us-east-1:123456789012:firstTopic",
  "secondTopicName": "arn:aws:sns:us-east-1:123456789012:secondTopic",
}

Javascript での例

パブリッシャーのサービスがデプロイされると、AWS SDK を介して SNS にメッセージを送信できるようになります。

const { SNSClient, PublishCommand } = require("@aws-sdk/client-sns");
const client = new SNSClient({ region: "us-west-2" });
const {ordersTopic} = JSON.parse(process.env.COPILOT_SNS_TOPIC_ARNS);
const out = await client.send(new PublishCommand({
   Message: "hello",
   TopicArn: ordersTopic,
 }));

Worker Service でトピックをサブスクライブ

Worker Service で既存の SNS トピックをサブスクライブするには、Worker Service の Manifest を編集する必要があります。 Manifest の subscribe フィールドを使用して、Environment 内の他のサービスが公開する既存の SNS トピックへのサブスクリプションを定義します。この例では、前セクションの api サービスが公開する ordersTopic トピックを使用しています。また Worker Service のキューをカスタマイズして、DLQ(デッドレターキュー) を使えるようにします。 tries フィールドは失敗したメッセージを DLQ に送信し、失敗についての詳細な分析する前に、何回再配送を試みるかを SQS に伝えます。

name: orders-worker
type: Worker Service

subscribe:
  topics:
    - name: ordersTopic
      service: api
  queue:
    dead_letter:
      tries: 5

Copilot は、この Worker Service のキューと、api サービスの ordersTopic トピックの間にサブスクリプションを作成します。また、キューの URI を、コンテナ内の環境変数 COPILOT_QUEUE_URI に注入します。

1 つ以上のトピック固有キューを指定した場合、COPILOT_TOPIC_QUEUE_URIS 変数を使ってそれらのキュー URI にアクセスできます。この変数は、トピック固有のキューの一意な識別子からその URI への JSON Map です。

例えば、merchant Service からの orders トピックと merchant Service からの FIFO トピック transactions のトピック別キューを持つワーカーサービスは、以下のような JSON 構造を持つことになります。

// COPILOT_TOPIC_QUEUE_URIS
{
  "merchantOrdersEventsQueue": "https://sqs.eu-central-1.amazonaws.com/...",
  "merchantTransactionsfifoEventsQueue": "https://sqs.eu-central-1.amazonaws.com/..."
}

Javascript での例

Worker Service 内のコンテナの中心となるビジネスロジックには、キューからメッセージをプルすることが含まれます。これを AWS SDK で行うには、選択した言語用の SQS クライアントを使用します。例えば Javascript でキューからメッセージをプルしたり、処理や削除をするためには、以下のようなコードスニペットになります。

const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require("@aws-sdk/client-sqs");
const client = new SQSClient({ region: "us-west-2" });
const out = await client.send(new ReceiveMessageCommand({
            QueueUrl: process.env.COPILOT_QUEUE_URI,
            WaitTimeSeconds: 10,
}));

console.log(`results: ${JSON.stringify(out)}`);

if (out.Messages === undefined || out.Messages.length === 0) {
    return;
}

// ここでメッセージを処理します。

await client.send( new DeleteMessageCommand({
    QueueUrl: process.env.COPILOT_QUEUE_URI,
    ReceiptHandle: out.Messages[0].ReceiptHandle,
}));