Name:
interface
Value:
Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.
Gen1 DocsLegacy

Page updated May 8, 2024

Amazon EventBridge に接続してイベントを送受信する

Amazon EventBridge は、アプリケーション間の通信方法を簡素化するサーバーレスイベントバスです。AWS サービス、カスタムアプリケーション、サードパーティの SaaS プロバイダーを含むさまざまなソースによって生成されたイベントの中央ハブとして機能します。

EventBridge はこのイベントデータをリアルタイムで配信し、変更に迅速に対応するアプリケーションを構築できます。イベントをフィルタリングして特定の宛先(ターゲットと呼ばれる)にルーティングするルールを定義します。ターゲットには、AWS Lambda、Amazon SQS キュー、Amazon SNS トピックなどのサービスを含めることができます。このガイドの目的上、AWS AppSync をイベントのターゲットとして使用します。

EventBridge でイベント駆動型アーキテクチャを採用することにより、以下を実現できます。

  • 疎結合:アプリケーションが独立し、イベント通信を介してスケーラビリティと保守性が向上します。

  • 復元力の向上:イベントが非同期で配信されるため、システム障害は分離され、全体的なアプリケーション可用性が確保されます。

  • 統合の簡素化:EventBridge は多様なイベントソースを統合するための統一インターフェースを提供し、開発が効率化されます。

このセクションでは、イベントバスをデータソースとして API に追加し、ルーティングルールを定義し、AWS Amplify Gen 2 および Amazon EventBridge を使用してロバストなイベント駆動型アプリケーションを構築するためのターゲットを構成する方法を説明します。

  1. API を設定する
  2. Amazon EventBridge イベントバスをデータソースとして追加する
  3. カスタムクエリとミューテーションを定義する
  4. カスタムビジネスロジックハンドラーコードを構成する
  5. カスタムミューテーションを呼び出して EventBridge にイベントを送信する
  6. EventBridge で呼び出されるミューテーションにサブスクライブする
  7. EventBridge からミューテーションを呼び出してサブスクリプションをトリガーする

ステップ 1 - API を設定する

このガイドの目的上、注文ステータス変更イベントを表す OrderStatusChange カスタムタイプを定義します。このタイプには、注文 ID、ステータス、メッセージのフィールドが含まれます。

amplify/data/resource.ts ファイルで、以下のコードを使用して OrderStatusChange カスタムタイプと OrderStatus 列挙型を定義し、スキーマに追加します。

amplify/data/resource.ts
import { type ClientSchema, a, defineData } from "@aws-amplify/backend";
const schema = a.schema({
Todo: a
.model({
content: a.string(),
})
.authorization(allow => [allow.publicApiKey()]),
OrderStatus: a.enum(["OrderPending", "OrderShipped", "OrderDelivered"]),
OrderStatusChange: a.customType({
orderId: a.id().required(),
status: a.ref("OrderStatus").required(),
message: a.string().required(),
}),
});
export type Schema = ClientSchema<typeof schema>;
export const data = defineData({
schema,
authorizationModes: {
defaultAuthorizationMode: 'apiKey',
apiKeyAuthorizationMode: {
expiresInDays: 30,
},
},
});

注: スキーマが有効であるためには、少なくとも 1 つのクエリが必要です。そうしないと、デプロイメントはスキーマエラーで失敗します。Amplify Data スキーマは、Todo モデルと対応するクエリで自動生成されます。次のステップでスキーマに最初のカスタムクエリを追加するまで、スキーマに Todo モデルを残しておくことができます。

ステップ 2 - Amazon EventBridge イベントバスをデータソースとして追加する

amplify/backend.ts ファイルで、以下のコードを使用してデフォルトイベントバスを API のデータソースとして追加します。

amplify/backend.ts
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { aws_events } from "aws-cdk-lib";
import {
Effect,
PolicyDocument,
PolicyStatement,
Role,
ServicePrincipal,
} from "aws-cdk-lib/aws-iam";
export const backend = defineBackend({
auth,
data,
});
// EventBridge データソース用の新しいスタックを作成します
const eventStack = backend.createStack("MyExternalDataSources");
// EventBridge EventBus を参照または作成します
const eventBus = aws_events.EventBus.fromEventBusName(
eventStack,
"MyEventBus",
"default"
);
// EventBridge データソースを追加します
backend.data.addEventBridgeDataSource("MyEventBridgeDataSource", eventBus);
// AppSync API のミューテーションを呼び出せるようにするポリシーステートメントを作成します
const policyStatement = new PolicyStatement({
effect: Effect.ALLOW,
actions: ["appsync:GraphQL"],
resources: [`${backend.data.resources.graphqlApi.arn}/types/Mutation/*`],
});
// EventBus が想定するロールを作成します
const eventBusRole = new Role(eventStack, "AppSyncInvokeRole", {
assumedBy: new ServicePrincipal("events.amazonaws.com"),
inlinePolicies: {
PolicyStatement: new PolicyDocument({
statements: [policyStatement],
}),
},
});
// AppSync API にイベントをルーティングする EventBridge ルールを作成します
const rule = new aws_events.CfnRule(eventStack, "MyOrderRule", {
eventBusName: eventBus.eventBusName,
name: "broadcastOrderStatusChange",
eventPattern: {
source: ["amplify.orders"],
/* イベントパターンの形状は EventBridge のイベントメッセージ構造と一致する必要があります。
したがって、このフィールドは "detail-type" と表記する必要があります。そうしないと、イベントはルールをトリガーしません。
https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html
*/
["detail-type"]: ["OrderStatusChange"],
detail: {
orderId: [{ exists: true }],
status: ["PENDING", "SHIPPED", "DELIVERED"],
message: [{ exists: true }],
},
},
targets: [
{
id: "orderStatusChangeReceiver",
arn: backend.data.resources.cfnResources.cfnGraphqlApi
.attrGraphQlEndpointArn,
roleArn: eventBusRole.roleArn,
appSyncParameters: {
graphQlOperation: `
mutation PublishOrderFromEventBridge(
$orderId: String!
$status: String!
$message: String!
) {
publishOrderFromEventBridge(orderId: $orderId, status: $status, message: $message) {
orderId
status
message
}
}`,
},
inputTransformer: {
inputPathsMap: {
orderId: "$.detail.orderId",
status: "$.detail.status",
message: "$.detail.message",
},
inputTemplate: JSON.stringify({
orderId: "<orderId>",
status: "<status>",
message: "<message>",
}),
},
},
],
});

ミューテーションで返される選択セットは、サブスクリプションの選択セットと一致する必要があります。ミューテーションの選択セットがサブスクリプションの選択セットと異なる場合、サブスクリプションはイベントを受け取りません。

上記のコードスニペットでは、addEventBridgeDataSource メソッドを使用して、デフォルトイベントバスを API のデータソースとして追加します。これにより、カスタムクエリとミューテーションでイベントバスを参照できるようになります。

CfnRule コンストラクトは、イベントを AppSync API にルーティングする EventBridge ルールを作成するために使用されます。ルールはマッチするイベントパターンと、イベント受信時に呼び出すターゲットを指定します。この例では、ターゲットは publishOrderFromEventBridge という名前の AppSync ミューテーションです。

appSyncParameters プロパティはイベント受信時に呼び出すミューテーションを指定します。inputTransformer プロパティはイベントデータをミューテーション引数にマップします。

ステップ 3 - カスタムクエリとミューテーションを定義する

イベントバスがデータソースとして追加されたので、a.handler.custom() 修飾子を使用してカスタムクエリとミューテーションでそれを参照できます。このオプションはデータソースの名前とレゾルバーのエントリーポイントを受け入れます。

以下のコードを使用して、publishOrderToEventBridgepublishOrderFromEventBridge カスタムミューテーション、および onOrderStatusChange カスタムサブスクリプションをスキーマに追加します。

amplify/data/resource.ts
import { type ClientSchema, a, defineData } from "@aws-amplify/backend";
const schema = a.schema({
// ...
OrderStatus: a.enum(["OrderPending", "OrderShipped", "OrderDelivered"]),
OrderStatusChange: a.customType({
orderId: a.id().required(),
status: a.ref("OrderStatus").required(),
message: a.string().required(),
}),
publishOrderToEventBridge: a
.mutation()
.arguments({
orderId: a.id().required(),
status: a.string().required(),
message: a.string().required(),
})
.returns(a.ref("OrderStatusChange"))
.authorization((allow) => [allow.publicApiKey()])
.handler(
a.handler.custom({
dataSource: "EventBridgeDataSource",
entry: "./publishOrderToEventBridge.js",
})
),
publishOrderFromEventBridge: a
.mutation()
.arguments({
orderId: a.id().required(),
status: a.string().required(),
message: a.string().required(),
})
.returns(a.ref("OrderStatusChange"))
.authorization((allow) => [allow.publicApiKey(), allow.guest()])
.handler(
a.handler.custom({
entry: "./publishOrderFromEventBridge.js",
})
),
onOrderFromEventBridge: a
.subscription()
.for(a.ref("publishOrderFromEventBridge"))
.authorization((allow) => [allow.publicApiKey()])
.handler(
a.handler.custom({
entry: "./onOrderFromEventBridge.js",
})
),
});
export type Schema = ClientSchema<typeof schema>;
export const data = defineData({
schema,
name: "MyLibrary",
authorizationModes: {
defaultAuthorizationMode: "apiKey",
apiKeyAuthorizationMode: {
expiresInDays: 30,
},
},
});

上記のコードスニペットについて:

  • publishOrderToEventBridge カスタムミューテーションは EventBridge データソースを使用するため、そのレゾルバーからイベントバスにイベントを公開できます。

  • publishOrderFromEventBridge カスタムミューテーションは None データソースをパススルーとして使用し、ルールパターンと一致するイベントが受信されたときに EventBridge ルールによって呼び出されます。allow.guest ルールは内部で IAM を使用し、EventBridge ルールがミューテーションを呼び出せるようにします。

  • onOrderFromEventBridge カスタムサブスクリプションは、EventBridge が publishOrderFromEventBridge ミューテーションを呼び出すか、クライアントが publishOrderToEventBridge ミューテーションを呼び出すことでトリガーできます。

ステップ 4 - カスタムビジネスロジックハンドラーコードを構成する

次に、amplify/data フォルダに以下のファイルを作成し、コード例を使用して、前のステップでスキーマに追加されたカスタムクエリとミューテーション用のカスタムレゾルバーを定義します。これらは AppSync JavaScript レゾルバーです。

以下のコードは、onOrderStatusChange サブスクリプションのカスタムビジネスロジックハンドラーを定義します。サブスクリプションは None データソースを使用するため、response 関数は空です。サブスクリプションは追加の処理を必要としません。

amplify/data/onOrderStatusChange.js
export function request(ctx) {
return {
payload: {},
};
}
export function response(ctx) {
}

以下のコードでは、request 関数はイベントバスに公開するイベントペイロードを構築します。前のステップで構成されたルールパターンに一致するために、イベントソースは amplify.orders に設定され、detail-typeOrderStatusChange に設定されます。ミューテーション引数がイベント詳細に渡されます。

amplify/data/publishOrderToEventBridge.js
export function request(ctx) {
return {
operation: "PutEvents",
events: [
{
source: "amplify.orders",
["detail-type"]: "OrderStatusChange",
detail: { ...ctx.args },
},
],
};
}
export function response(ctx) {
return ctx.args;
}

以下のコードは、publishOrderFromEventBridge ミューテーションのカスタムビジネスロジックハンドラーを定義します。request 関数はイベントバスから受け取ったイベントペイロードからミューテーション引数を構築します。response 関数はミューテーション引数を返します。

amplify/data/publishOrderFromEventBridge.js
export function request(ctx) {
return {
payload: ctx.arguments,
};
}
export function response(ctx) {
return ctx.arguments;
}

ステップ 5 - カスタムミューテーションを呼び出して EventBridge にイベントを送信する

生成された Data クライアントから、すべてのカスタムクエリとミューテーションは client.queries および client.mutations API で見つけることができます。

以下のカスタムミューテーションは、注文ステータス変更イベントをイベントバスに公開します。

App.tsx
await client.mutations.publishOrderToEventBridge({
orderId: "12345",
status: "SHIPPED",
message: "Order has been shipped",
});

ステップ 6 - EventBridge で呼び出されるミューテーションにサブスクライブする

イベントバスからのイベントにサブスクライブするには、client.subscriptions API を使用できます。

App.tsx
// EventBridge ルールでトリガーされるミューテーションにサブスクライブします
const sub = client.subscriptions.onOrderStatusChange().subscribe({
next: (data) => {
console.log(data);
},
});
//...
// サブスクリプションをクリーンアップします
sub.unsubscribe();

ステップ 7 - EventBridge からミューテーションを呼び出してサブスクリプションをトリガーする

EventBridge コンソールを使用してイベントを送信し、カスタムミューテーションを呼び出すことで、カスタムミューテーションとサブスクリプションをテストできます。その後、サブスクリプションがトリガーされた結果を監視できます。

  1. Amazon EventBridge コンソールに移動し、「Send Events」を選択します。

Amazon EventBridge コンソール、「Event buses」というタイトルのページ。イベントバスのテーブルと「Send events」というラベルのハイライトされたボタンが表示されています。

  1. フォームに入力し、イベントソースを amplify.orders に、detail-typeOrderStatusChange に指定します。

Amazon EventBridge コンソール、「Send events」というタイトルのページ。フォーム入力フィールドと「event bus: default」、「event source: amplify.orders」、「detail type: OrderStatusChange」、および「orderId」、「status」、「message」を含む JSON データを持つ Event detail フィールドの値が表示されています。

  1. 「Send」を選択し、AppSync Queries コンソールでサブスクリプション出力を確認します。

AppSync コンソール、「Queries」というタイトルのページ。「onOrderFromEventBridge」という名前の実行中のサブスクリプションと、「orderId」、「status」、「message」を含むデータを持つ結果が表示されています。

結論

このガイドでは、Amazon EventBridge イベントバスを Amplify API のデータソースとして追加し、イベントバスにイベントを公開・受信するためのカスタムクエリとミューテーションを定義しました。また、イベントデータを処理し、適切なミューテーションを呼び出すカスタムビジネスロジックハンドラーコードも構成しました。

クリーンアップするには、ターミナルでサンドボックスプロセスを終了する際のプロンプトを受け入れてサンドボックスを削除できます。または、AWS Amplify コンソールを使用してサンドボックス環境を管理・削除することもできます。