Amazon EventBridge に接続してイベントを送受信する
Amazon EventBridge は、アプリケーション間の通信方法を簡素化するサーバーレスイベントバスです。AWS サービス、カスタムアプリケーション、サードパーティの SaaS プロバイダーを含むさまざまなソースによって生成されたイベントの中央ハブとして機能します。
EventBridge はこのイベントデータをリアルタイムで配信し、変更に迅速に対応するアプリケーションを構築できます。イベントをフィルタリングして特定の宛先(ターゲットと呼ばれる)にルーティングするルールを定義します。ターゲットには、AWS Lambda、Amazon SQS キュー、Amazon SNS トピックなどのサービスを含めることができます。このガイドの目的上、AWS AppSync をイベントのターゲットとして使用します。
EventBridge でイベント駆動型アーキテクチャを採用することにより、以下を実現できます。
-
疎結合:アプリケーションが独立し、イベント通信を介してスケーラビリティと保守性が向上します。
-
復元力の向上:イベントが非同期で配信されるため、システム障害は分離され、全体的なアプリケーション可用性が確保されます。
-
統合の簡素化:EventBridge は多様なイベントソースを統合するための統一インターフェースを提供し、開発が効率化されます。
このセクションでは、イベントバスをデータソースとして API に追加し、ルーティングルールを定義し、AWS Amplify Gen 2 および Amazon EventBridge を使用してロバストなイベント駆動型アプリケーションを構築するためのターゲットを構成する方法を説明します。
- API を設定する
- Amazon EventBridge イベントバスをデータソースとして追加する
- カスタムクエリとミューテーションを定義する
- カスタムビジネスロジックハンドラーコードを構成する
- カスタムミューテーションを呼び出して EventBridge にイベントを送信する
- EventBridge で呼び出されるミューテーションにサブスクライブする
- EventBridge からミューテーションを呼び出してサブスクリプションをトリガーする
ステップ 1 - API を設定する
このガイドの目的上、注文ステータス変更イベントを表す OrderStatusChange カスタムタイプを定義します。このタイプには、注文 ID、ステータス、メッセージのフィールドが含まれます。
amplify/data/resource.ts ファイルで、以下のコードを使用して OrderStatusChange カスタムタイプと OrderStatus 列挙型を定義し、スキーマに追加します。
import { type ClientSchema, a, defineData } from "@aws-amplify/backend";
const schema = a.schema({ Todo: a .model({ content: a.string(), }) .authorization(allow => [allow.publicApiKey()]), OrderStatus: a.enum(["OrderPending", "OrderShipped", "OrderDelivered"]), OrderStatusChange: a.customType({ orderId: a.id().required(), status: a.ref("OrderStatus").required(), message: a.string().required(), }),});
export type Schema = ClientSchema<typeof schema>;
export const data = defineData({ schema, authorizationModes: { defaultAuthorizationMode: 'apiKey', apiKeyAuthorizationMode: { expiresInDays: 30, }, },});ステップ 2 - Amazon EventBridge イベントバスをデータソースとして追加する
amplify/backend.ts ファイルで、以下のコードを使用してデフォルトイベントバスを API のデータソースとして追加します。
import { defineBackend } from "@aws-amplify/backend";import { auth } from "./auth/resource";import { data } from "./data/resource";import { aws_events } from "aws-cdk-lib";import { Effect, PolicyDocument, PolicyStatement, Role, ServicePrincipal,} from "aws-cdk-lib/aws-iam";
export const backend = defineBackend({ auth, data,});
// EventBridge データソース用の新しいスタックを作成しますconst eventStack = backend.createStack("MyExternalDataSources");
// EventBridge EventBus を参照または作成しますconst eventBus = aws_events.EventBus.fromEventBusName( eventStack, "MyEventBus", "default");
// EventBridge データソースを追加しますbackend.data.addEventBridgeDataSource("MyEventBridgeDataSource", eventBus);
// AppSync API のミューテーションを呼び出せるようにするポリシーステートメントを作成しますconst policyStatement = new PolicyStatement({ effect: Effect.ALLOW, actions: ["appsync:GraphQL"], resources: [`${backend.data.resources.graphqlApi.arn}/types/Mutation/*`],});
// EventBus が想定するロールを作成しますconst eventBusRole = new Role(eventStack, "AppSyncInvokeRole", { assumedBy: new ServicePrincipal("events.amazonaws.com"), inlinePolicies: { PolicyStatement: new PolicyDocument({ statements: [policyStatement], }), },});
// AppSync API にイベントをルーティングする EventBridge ルールを作成しますconst rule = new aws_events.CfnRule(eventStack, "MyOrderRule", { eventBusName: eventBus.eventBusName, name: "broadcastOrderStatusChange", eventPattern: { source: ["amplify.orders"], /* イベントパターンの形状は EventBridge のイベントメッセージ構造と一致する必要があります。 したがって、このフィールドは "detail-type" と表記する必要があります。そうしないと、イベントはルールをトリガーしません。
https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html */ ["detail-type"]: ["OrderStatusChange"], detail: { orderId: [{ exists: true }], status: ["PENDING", "SHIPPED", "DELIVERED"], message: [{ exists: true }], }, }, targets: [ { id: "orderStatusChangeReceiver", arn: backend.data.resources.cfnResources.cfnGraphqlApi .attrGraphQlEndpointArn, roleArn: eventBusRole.roleArn, appSyncParameters: { graphQlOperation: ` mutation PublishOrderFromEventBridge( $orderId: String! $status: String! $message: String! ) { publishOrderFromEventBridge(orderId: $orderId, status: $status, message: $message) { orderId status message } }`, }, inputTransformer: { inputPathsMap: { orderId: "$.detail.orderId", status: "$.detail.status", message: "$.detail.message", }, inputTemplate: JSON.stringify({ orderId: "<orderId>", status: "<status>", message: "<message>", }), }, }, ],});上記のコードスニペットでは、addEventBridgeDataSource メソッドを使用して、デフォルトイベントバスを API のデータソースとして追加します。これにより、カスタムクエリとミューテーションでイベントバスを参照できるようになります。
CfnRule コンストラクトは、イベントを AppSync API にルーティングする EventBridge ルールを作成するために使用されます。ルールはマッチするイベントパターンと、イベント受信時に呼び出すターゲットを指定します。この例では、ターゲットは publishOrderFromEventBridge という名前の AppSync ミューテーションです。
appSyncParameters プロパティはイベント受信時に呼び出すミューテーションを指定します。inputTransformer プロパティはイベントデータをミューテーション引数にマップします。
ステップ 3 - カスタムクエリとミューテーションを定義する
イベントバスがデータソースとして追加されたので、a.handler.custom() 修飾子を使用してカスタムクエリとミューテーションでそれを参照できます。このオプションはデータソースの名前とレゾルバーのエントリーポイントを受け入れます。
以下のコードを使用して、publishOrderToEventBridge と publishOrderFromEventBridge カスタムミューテーション、および onOrderStatusChange カスタムサブスクリプションをスキーマに追加します。
import { type ClientSchema, a, defineData } from "@aws-amplify/backend";
const schema = a.schema({ // ... OrderStatus: a.enum(["OrderPending", "OrderShipped", "OrderDelivered"]), OrderStatusChange: a.customType({ orderId: a.id().required(), status: a.ref("OrderStatus").required(), message: a.string().required(), }), publishOrderToEventBridge: a .mutation() .arguments({ orderId: a.id().required(), status: a.string().required(), message: a.string().required(), }) .returns(a.ref("OrderStatusChange")) .authorization((allow) => [allow.publicApiKey()]) .handler( a.handler.custom({ dataSource: "EventBridgeDataSource", entry: "./publishOrderToEventBridge.js", }) ), publishOrderFromEventBridge: a .mutation() .arguments({ orderId: a.id().required(), status: a.string().required(), message: a.string().required(), }) .returns(a.ref("OrderStatusChange")) .authorization((allow) => [allow.publicApiKey(), allow.guest()]) .handler( a.handler.custom({ entry: "./publishOrderFromEventBridge.js", }) ), onOrderFromEventBridge: a .subscription() .for(a.ref("publishOrderFromEventBridge")) .authorization((allow) => [allow.publicApiKey()]) .handler( a.handler.custom({ entry: "./onOrderFromEventBridge.js", }) ),});
export type Schema = ClientSchema<typeof schema>;
export const data = defineData({ schema, name: "MyLibrary", authorizationModes: { defaultAuthorizationMode: "apiKey", apiKeyAuthorizationMode: { expiresInDays: 30, }, },});上記のコードスニペットについて:
-
publishOrderToEventBridgeカスタムミューテーションは EventBridge データソースを使用するため、そのレゾルバーからイベントバスにイベントを公開できます。 -
publishOrderFromEventBridgeカスタムミューテーションは None データソースをパススルーとして使用し、ルールパターンと一致するイベントが受信されたときに EventBridge ルールによって呼び出されます。allow.guestルールは内部で IAM を使用し、EventBridge ルールがミューテーションを呼び出せるようにします。 -
onOrderFromEventBridgeカスタムサブスクリプションは、EventBridge がpublishOrderFromEventBridgeミューテーションを呼び出すか、クライアントがpublishOrderToEventBridgeミューテーションを呼び出すことでトリガーできます。
ステップ 4 - カスタムビジネスロジックハンドラーコードを構成する
次に、amplify/data フォルダに以下のファイルを作成し、コード例を使用して、前のステップでスキーマに追加されたカスタムクエリとミューテーション用のカスタムレゾルバーを定義します。これらは AppSync JavaScript レゾルバーです。
以下のコードは、onOrderStatusChange サブスクリプションのカスタムビジネスロジックハンドラーを定義します。サブスクリプションは None データソースを使用するため、response 関数は空です。サブスクリプションは追加の処理を必要としません。
export function request(ctx) { return { payload: {}, };}
export function response(ctx) {}以下のコードでは、request 関数はイベントバスに公開するイベントペイロードを構築します。前のステップで構成されたルールパターンに一致するために、イベントソースは amplify.orders に設定され、detail-type は OrderStatusChange に設定されます。ミューテーション引数がイベント詳細に渡されます。
export function request(ctx) { return { operation: "PutEvents", events: [ { source: "amplify.orders", ["detail-type"]: "OrderStatusChange", detail: { ...ctx.args }, }, ], };}
export function response(ctx) { return ctx.args;}以下のコードは、publishOrderFromEventBridge ミューテーションのカスタムビジネスロジックハンドラーを定義します。request 関数はイベントバスから受け取ったイベントペイロードからミューテーション引数を構築します。response 関数はミューテーション引数を返します。
export function request(ctx) { return { payload: ctx.arguments, };}
export function response(ctx) { return ctx.arguments;}ステップ 5 - カスタムミューテーションを呼び出して EventBridge にイベントを送信する
生成された Data クライアントから、すべてのカスタムクエリとミューテーションは client.queries および client.mutations API で見つけることができます。
以下のカスタムミューテーションは、注文ステータス変更イベントをイベントバスに公開します。
await client.mutations.publishOrderToEventBridge({ orderId: "12345", status: "SHIPPED", message: "Order has been shipped",});ステップ 6 - EventBridge で呼び出されるミューテーションにサブスクライブする
イベントバスからのイベントにサブスクライブするには、client.subscriptions API を使用できます。
// EventBridge ルールでトリガーされるミューテーションにサブスクライブしますconst sub = client.subscriptions.onOrderStatusChange().subscribe({ next: (data) => { console.log(data); },});
//...
// サブスクリプションをクリーンアップしますsub.unsubscribe();ステップ 7 - EventBridge からミューテーションを呼び出してサブスクリプションをトリガーする
EventBridge コンソールを使用してイベントを送信し、カスタムミューテーションを呼び出すことで、カスタムミューテーションとサブスクリプションをテストできます。その後、サブスクリプションがトリガーされた結果を監視できます。
- Amazon EventBridge コンソールに移動し、「Send Events」を選択します。
- フォームに入力し、イベントソースを
amplify.ordersに、detail-typeをOrderStatusChangeに指定します。
- 「Send」を選択し、AppSync Queries コンソールでサブスクリプション出力を確認します。
結論
このガイドでは、Amazon EventBridge イベントバスを Amplify API のデータソースとして追加し、イベントバスにイベントを公開・受信するためのカスタムクエリとミューテーションを定義しました。また、イベントデータを処理し、適切なミューテーションを呼び出すカスタムビジネスロジックハンドラーコードも構成しました。
クリーンアップするには、ターミナルでサンドボックスプロセスを終了する際のプロンプトを受け入れてサンドボックスを削除できます。または、AWS Amplify コンソールを使用してサンドボックス環境を管理・削除することもできます。