Amazon OpenSearchで検索およびアグリゲートクエリに接続する
Amazon OpenSearch Serviceは、OpenSearchまたはElasticsearchを使用して検索分析ソリューションをデプロイするためのマネージドプラットフォームを提供します。Amazon DynamoDBとOpenSearch Service間のゼロETL統合により、カスタムコードやインフラストラクチャを必要とせずに、DynamoDBデータを自動的にレプリケートおよび変換することで、DynamoDBデータに対するシームレスな検索が可能になります。この統合は、プロセスを簡素化し、データパイプラインの管理における運用負荷を軽減します。
DynamoDBユーザーは、フルテキスト検索、ファジー検索、オートコンプリート、機械学習機能のためのベクトル検索など、高度なOpenSearch機能にアクセスできます。Amazon OpenSearch Ingestionは、DynamoDBとOpenSearch Service間のデータを同期し、ほぼリアルタイムの更新と複数のDynamoDBテーブル全体の包括的なインサイトを実現します。開発者は、インデックスマッピングテンプレートを調整して、Amazon DynamoDBフィールドをOpenSearch Serviceインデックスと一致させることができます。
Amazon OpenSearch Ingestionは、S3エクスポートとDynamoDBストリームと組み合わせることで、DynamoDBテーブルからシームレスなデータ入力とOpenSearchへの自動取り込みを容易にします。さらに、パイプラインは必要に応じて将来の再取り込みのためのバックアップデータをS3に保存することができます。
ステップ1: プロジェクトのセットアップ
クイックスタートガイドの手順に従ってプロジェクトをセットアップしてください。このガイドでは、DynamoDBからOpenSearchへTodoテーブルを同期します。
まず、スキーマにTodoモデルを追加します:
import { type ClientSchema, a, defineData } from "@aws-amplify/backend";
const schema = a.schema({ Todo: a .model({ content: a.string(), done: a.boolean(), priority: a.enum(["low", "medium", "high"]), }) .authorization((allow) => [allow.publicApiKey()])});
export type Schema = ClientSchema<typeof schema>;
export const data = defineData({ schema, authorizationModes: { defaultAuthorizationMode: "apiKey", apiKeyAuthorizationMode: { expiresInDays: 30, }, },});import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';import { defineBackend } from '@aws-amplify/backend';import { auth } from './auth/resource';import { data } from './data/resource';
const backend = defineBackend({ auth, data});
const todoTable = backend.data.resources.cfnResources.amplifyDynamoDbTables['Todo'];
// テーブル設定を更新todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = { streamViewType: dynamodb.StreamViewType.NEW_IMAGE};
// DynamoDBテーブルのARNを取得const tableArn = backend.data.resources.tables['Todo'].tableArn;// DynamoDBテーブル名を取得const tableName = backend.data.resources.tables['Todo'].tableName;ステップ2: OpenSearchインスタンスのセットアップ
暗号化を使用してOpenSearchインスタンスを作成します。
import * as opensearch from 'aws-cdk-lib/aws-opensearchservice';import { RemovalPolicy } from "aws-cdk-lib";import { defineBackend } from '@aws-amplify/backend';import { auth } from './auth/resource';import { data } from './data/resource';
const backend = defineBackend({ auth, data});
const todoTable = backend.data.resources.cfnResources.amplifyDynamoDbTables['Todo'];
// テーブル設定を更新todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = { streamViewType: dynamodb.StreamViewType.NEW_IMAGE};
// DynamoDBテーブルのARNを取得const tableArn = backend.data.resources.tables['Todo'].tableArn;// DynamoDBテーブル名を取得const tableName = backend.data.resources.tables['Todo'].tableName;
// OpenSearchドメインを作成const openSearchDomain = new opensearch.Domain( backend.data.stack, 'OpenSearchDomain', { version: opensearch.EngineVersion.OPENSEARCH_2_11, capacity: { // 本番環境ではインスタンスタイプをアップグレードしてください masterNodeInstanceType: "t3.small.search", masterNodes: 0, dataNodeInstanceType: "t3.small.search", dataNodes: 1, }, nodeToNodeEncryption: true, // スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定 removalPolicy: RemovalPolicy.DESTROY, encryptionAtRest: { enabled: true } });ステップ3: DynamoDBからOpenSearchへのゼロETLのセットアップ
ステップ3a: ストレージとIAMロールのセットアップ
OpenSearchパイプラインによって使用される生イベントをバックアップするためのストレージを確立します。
amplify/storage/resource.tsという名前のファイルを生成し、提供されたコンテンツを挿入してストレージリソースをセットアップします。ストレージバケット内のさまざまなパスへのアクセスを制御するようにストレージ構成をカスタマイズします。
import { defineStorage } from "@aws-amplify/backend"
export const storage = defineStorage({ name: "opensearch-backup-bucket-amplify-gen-2", access: allow => ({ 'public/*': [ allow.guest.to(['list', 'write', 'get']) ] })})以下に示すようにストレージリソースからs3BucketArnおよびs3BucketNameの値を取得します。さらに、パイプラインのIAMロールを設定し、以下に示すようにロールを割り当てます。必要なIAMロールの詳細については、ロールとユーザーのセットアップドキュメントを参照してください。
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";import * as opensearch from "aws-cdk-lib/aws-opensearchservice";import { RemovalPolicy } from "aws-cdk-lib";import * as iam from "aws-cdk-lib/aws-iam";import { defineBackend } from "@aws-amplify/backend";import { auth } from "./auth/resource";import { data } from "./data/resource";import { storage } from "./storage/resource";
// バックエンドリソースを定義const backend = defineBackend({ auth, data, storage,});
const todoTable = backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// テーブル設定を更新todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = { streamViewType: dynamodb.StreamViewType.NEW_IMAGE,};
// DynamoDBテーブルのARNを取得const tableArn = backend.data.resources.tables["Todo"].tableArn;// DynamoDBテーブル名を取得const tableName = backend.data.resources.tables["Todo"].tableName;
// OpenSearchドメインを作成const openSearchDomain = new opensearch.Domain( backend.data.stack, "OpenSearchDomain", { version: opensearch.EngineVersion.OPENSEARCH_2_11, capacity: { // 本番環境ではインスタンスタイプをアップグレードしてください masterNodeInstanceType: "t3.small.search", masterNodes: 0, dataNodeInstanceType: "t3.small.search", dataNodes: 1, }, nodeToNodeEncryption: true, // スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定 removalPolicy: RemovalPolicy.DESTROY, encryptionAtRest: { enabled: true, }, });// S3バケットのARNを取得const s3BucketArn = backend.storage.resources.bucket.bucketArn;// S3バケット名を取得const s3BucketName = backend.storage.resources.bucket.bucketName;
// OpenSearch統合用のIAMロールを作成const openSearchIntegrationPipelineRole = new iam.Role( backend.data.stack, "OpenSearchIntegrationPipelineRole", { assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"), inlinePolicies: { openSearchPipelinePolicy: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: ["es:DescribeDomain"], resources: [ openSearchDomain.domainArn, openSearchDomain.domainArn + "/*", ], effect: iam.Effect.ALLOW, }), new iam.PolicyStatement({ actions: ["es:ESHttp*"], resources: [ openSearchDomain.domainArn, openSearchDomain.domainArn + "/*", ], effect: iam.Effect.ALLOW, }), new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl", ], resources: [s3BucketArn, s3BucketArn + "/*"], }), new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime", "dynamodb:DescribeExport", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", ], resources: [tableArn, tableArn + "/*"], }), ], }), }, managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName( "AmazonOpenSearchIngestionFullAccess" ), ], });S3バケットの場合、標準的なセキュリティプラクティスに従います: パブリックアクセスをブロック、保存中のデータを暗号化、バージョニングを有効にします。
IAMロールは、OpenSearch Ingestion Service(OSIS)パイプラインがそれを引き受けることを許可する必要があります。特定のOpenSearch Serviceの権限を付与し、DynamoDBおよびS3アクセスも提供します。最小権限の原則に従うために、権限をカスタマイズしてもよいです。
ステップ3b: OpenSearch Serviceパイプライン
パイプラインコンストラクトとその構成を定義します。
OpenSearchを使用する場合、データ構造に基づいてインデックステンプレートまたはマッピングを事前に定義できます。これにより、ドキュメント内の各フィールドのデータタイプを設定できます。このアプローチは、正確なデータ取り込みと検索に非常に強力です。インデックスマッピング/テンプレートの詳細については、OpenSearchドキュメントを参照してください。
取り込みパイプラインのデータ構造を定義するために、template_contentJSON表現をカスタマイズします。
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";import * as opensearch from "aws-cdk-lib/aws-opensearchservice";import { RemovalPolicy } from "aws-cdk-lib";import * as iam from "aws-cdk-lib/aws-iam";import { defineBackend } from "@aws-amplify/backend";import { auth } from "./auth/resource";import { data } from "./data/resource";import { storage } from "./storage/resource"; // バックエンドリソースを定義const backend = defineBackend({ auth, data, storage,});
const todoTable = backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// テーブル設定を更新todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = { streamViewType: dynamodb.StreamViewType.NEW_IMAGE,};
// DynamoDBテーブルのARNを取得const tableArn = backend.data.resources.tables["Todo"].tableArn;// DynamoDBテーブル名を取得const tableName = backend.data.resources.tables["Todo"].tableName;
// OpenSearchドメインを作成const openSearchDomain = new opensearch.Domain( backend.data.stack, "OpenSearchDomain", { version: opensearch.EngineVersion.OPENSEARCH_2_11, capacity: { // 本番環境ではインスタンスタイプをアップグレードしてください masterNodeInstanceType: "t3.small.search", masterNodes: 0, dataNodeInstanceType: "t3.small.search", dataNodes: 1, }, nodeToNodeEncryption: true, // スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定 removalPolicy: RemovalPolicy.DESTROY, encryptionAtRest: { enabled: true, }, });
// S3バケットのARNを取得const s3BucketArn = backend.storage.resources.bucket.bucketArn;// S3バケット名を取得const s3BucketName = backend.storage.resources.bucket.bucketName;
// OpenSearch統合用のIAMロールを作成const openSearchIntegrationPipelineRole = new iam.Role( backend.data.stack, "OpenSearchIntegrationPipelineRole", { assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"), inlinePolicies: { openSearchPipelinePolicy: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: ["es:DescribeDomain"], resources: [ openSearchDomain.domainArn, openSearchDomain.domainArn + "/*", ], effect: iam.Effect.ALLOW, }), new iam.PolicyStatement({ actions: ["es:ESHttp*"], resources: [ openSearchDomain.domainArn, openSearchDomain.domainArn + "/*", ], effect: iam.Effect.ALLOW, }), new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl", ], resources: [s3BucketArn, s3BucketArn + "/*"], }), new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime", "dynamodb:DescribeExport", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", ], resources: [tableArn, tableArn + "/*"], }), ], }), }, managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName( "AmazonOpenSearchIngestionFullAccess" ), ], });
// OpenSearchインデックスマッピングを定義const indexName = "todo";
const indexMapping = { settings: { number_of_shards: 1, number_of_replicas: 0, }, mappings: { properties: { id: { type: "keyword", }, done: { type: "boolean", }, content: { type: "text", }, }, },};設定はOpenSearchのdata-prepper機能です。DynamoDB構成に関する特定のドキュメントについては、OpenSearchデータプリッパードキュメントを参照してください。
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";import * as opensearch from "aws-cdk-lib/aws-opensearchservice";import { RemovalPolicy } from "aws-cdk-lib";import * as iam from "aws-cdk-lib/aws-iam";import { defineBackend } from "@aws-amplify/backend";import { auth } from "./auth/resource";import { data } from "./data/resource";import { storage } from "./storage/resource"; // バックエンドリソースを定義const backend = defineBackend({ auth, data, storage,});
const todoTable = backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// テーブル設定を更新todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = { streamViewType: dynamodb.StreamViewType.NEW_IMAGE,};
// DynamoDBテーブルのARNを取得const tableArn = backend.data.resources.tables["Todo"].tableArn;// DynamoDBテーブル名を取得const tableName = backend.data.resources.tables["Todo"].tableName;
// OpenSearchドメインを作成const openSearchDomain = new opensearch.Domain( backend.data.stack, "OpenSearchDomain", { version: opensearch.EngineVersion.OPENSEARCH_2_11, capacity: { // 本番環境ではインスタンスタイプをアップグレードしてください masterNodeInstanceType: "t3.small.search", masterNodes: 0, dataNodeInstanceType: "t3.small.search", dataNodes: 1, }, nodeToNodeEncryption: true, // スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定 removalPolicy: RemovalPolicy.DESTROY, encryptionAtRest: { enabled: true, }, });
// S3バケットのARNを取得const s3BucketArn = backend.storage.resources.bucket.bucketArn;// S3バケット名を取得const s3BucketName = backend.storage.resources.bucket.bucketName;
// OpenSearch統合用のIAMロールを作成const openSearchIntegrationPipelineRole = new iam.Role( backend.data.stack, "OpenSearchIntegrationPipelineRole", { assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"), inlinePolicies: { openSearchPipelinePolicy: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: ["es:DescribeDomain"], resources: [ openSearchDomain.domainArn, openSearchDomain.domainArn + "/*", ], effect: iam.Effect.ALLOW, }), new iam.PolicyStatement({ actions: ["es:ESHttp*"], resources: [ openSearchDomain.domainArn, openSearchDomain.domainArn + "/*", ], effect: iam.Effect.ALLOW, }), new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl", ], resources: [s3BucketArn, s3BucketArn + "/*"], }), new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime", "dynamodb:DescribeExport", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", ], resources: [tableArn, tableArn + "/*"], }), ], }), }, managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName( "AmazonOpenSearchIngestionFullAccess" ), ], });
// OpenSearchインデックスマッピングを定義const indexName = "todo";
const indexMapping = { settings: { number_of_shards: 1, number_of_replicas: 0, }, mappings: { properties: { id: { type: "keyword", }, isDone: { type: "boolean", }, content: { type: "text", }, priority: { type: "text", }, }, },};
// OpenSearchテンプレート定義const openSearchTemplate = `version: "2"dynamodb-pipeline: source: dynamodb: acknowledgments: true tables: - table_arn: "${tableArn}" stream: start_position: "LATEST" export: s3_bucket: "${s3BucketName}" s3_region: "${backend.storage.stack.region}" s3_prefix: "${tableName}/" aws: sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}" region: "${backend.data.stack.region}" sink: - opensearch: hosts: - "https://${openSearchDomain.domainEndpoint}" index: "${indexName}" index_type: "custom" template_content: | ${JSON.stringify(indexMapping)} document_id: '\${getMetadata("primary_key")}' action: '\${getMetadata("opensearch_action")}' document_version: '\${getMetadata("document_version")}' document_version_type: "external" bulk_size: 4 aws: sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}" region: "${backend.data.stack.region}"`;この設定は、単一モデルのパイプラインの望ましい動作を定義します。
ソース設定では、DynamoDBをデータソースとして指定し、取り込み対象のテーブルとストリームの開始点を設定します。さらに、ストリームをOpenSearchに取り込むだけでなく、バックアップ目的でターゲットS3バケットを定義します。さらに、取り込みパイプラインにIAMロールを設定し、ドキュメントで詳述されている必要な権限とポリシーが確実に使用されるようにします。
シンク設定に関して、OpenSearchドメインクラスタはホスト、インデックス名、タイプ、テンプレートコンテンツ(インデックスマッピング)を設定することで指定されます。ドキュメント関連のメタデータはOpenSearchへのリクエストの最大バルクサイズ(MB)とともに設定されます。再び、パイプラインのシンク部分にIAMロールを指定します。シンク設定の詳細については、OpenSearchドキュメントを参照してください。
シンク設定は配列です。同じテーブルで別のインデックスを作成するには、シンク配列にOpenSearch設定を2番目として追加することで達成できます。
複数のテーブルにインデックスを付けるには、設定で複数のパイプラインを構成する必要があります。詳細なガイダンスについては、OpenSearchドキュメントのパイプラインセクションを参照してください。
次に、OSISパイプラインリソースを作成します:
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";import * as opensearch from "aws-cdk-lib/aws-opensearchservice";import { RemovalPolicy } from "aws-cdk-lib";import * as iam from "aws-cdk-lib/aws-iam";import * as osis from "aws-cdk-lib/aws-osis";import * as logs from "aws-cdk-lib/aws-logs";import { RemovalPolicy } from "aws-cdk-lib"; import { defineBackend } from "@aws-amplify/backend";import { auth } from "./auth/resource";import { data } from "./data/resource";import { storage } from "./storage/resource";
// バックエンドリソースを定義const backend = defineBackend({ auth, data, storage,});
const todoTable = backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// テーブル設定を更新todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = { streamViewType: dynamodb.StreamViewType.NEW_IMAGE,};
// DynamoDBテーブルのARNを取得const tableArn = backend.data.resources.tables["Todo"].tableArn;// DynamoDBテーブル名を取得const tableName = backend.data.resources.tables["Todo"].tableName;
// OpenSearchドメインを作成const openSearchDomain = new opensearch.Domain( backend.data.stack, "OpenSearchDomain", { version: opensearch.EngineVersion.OPENSEARCH_2_11, capacity: { // 本番環境ではインスタンスタイプをアップグレードしてください masterNodeInstanceType: "t3.small.search", masterNodes: 0, dataNodeInstanceType: "t3.small.search", dataNodes: 1, }, nodeToNodeEncryption: true, // スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定 removalPolicy: RemovalPolicy.DESTROY, encryptionAtRest: { enabled: true, }, });
// S3バケットのARNを取得const s3BucketArn = backend.storage.resources.bucket.bucketArn;// S3バケット名を取得const s3BucketName = backend.storage.resources.bucket.bucketName;
// OpenSearch統合用のIAMロールを作成const openSearchIntegrationPipelineRole = new iam.Role( backend.data.stack, "OpenSearchIntegrationPipelineRole", { assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"), inlinePolicies: { openSearchPipelinePolicy: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: ["es:DescribeDomain"], resources: [ openSearchDomain.domainArn, openSearchDomain.domainArn + "/*", ], effect: iam.Effect.ALLOW, }), new iam.PolicyStatement({ actions: ["es:ESHttp*"], resources: [ openSearchDomain.domainArn, openSearchDomain.domainArn + "/*", ], effect: iam.Effect.ALLOW, }), new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl", ], resources: [s3BucketArn, s3BucketArn + "/*"], }), new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime", "dynamodb:DescribeExport", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", ], resources: [tableArn, tableArn + "/*"], }), ], }), }, managedPolicies: [ iam.ManagedPolicy.fromAwsManagedPolicyName( "AmazonOpenSearchIngestionFullAccess" ), ], });
// OpenSearchインデックスマッピングを定義const indexName = "todo";
const indexMapping = { settings: { number_of_shards: 1, number_of_replicas: 0, }, mappings: { properties: { id: { type: "keyword", }, isDone: { type: "boolean", }, content: { type: "text", }, priority: { type: "text", }, }, },};
// OpenSearchテンプレート定義const openSearchTemplate = `version: "2"dynamodb-pipeline: source: dynamodb: acknowledgments: true tables: - table_arn: "${tableArn}" stream: start_position: "LATEST" export: s3_bucket: "${s3BucketName}" s3_region: "${backend.storage.stack.region}" s3_prefix: "${tableName}/" aws: sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}" region: "${backend.data.stack.region}" sink: - opensearch: hosts: - "https://${openSearchDomain.domainEndpoint}" index: "${indexName}" index_type: "custom" template_content: | ${JSON.stringify(indexMapping)} document_id: '\${getMetadata("primary_key")}' action: '\${getMetadata("opensearch_action")}' document_version: '\${getMetadata("document_version")}' document_version_type: "external" bulk_size: 4 aws: sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}" region: "${backend.data.stack.region}"`;
// CloudWatchログループを作成const logGroup = new logs.LogGroup(backend.data.stack, "LogGroup", { logGroupName: "/aws/vendedlogs/OpenSearchService/pipelines/1", removalPolicy: RemovalPolicy.DESTROY,});
// OpenSearch統合サービスパイプラインを作成const cfnPipeline = new osis.CfnPipeline( backend.data.stack, "OpenSearchIntegrationPipeline", { maxUnits: 4, minUnits: 1, pipelineConfigurationBody: openSearchTemplate, pipelineName: "dynamodb-integration-2", logPublishingOptions: { isLoggingEnabled: true, cloudWatchLogDestination: { logGroup: logGroup.logGroupName, }, }, });リソースをデプロイした後、Todoテーブルにアイテムを追加することで、データ取り込みプロセスをテストできます。ただし、その前に、パイプラインが正しくセットアップされていることを確認しましょう。
AWSコンソールでOpenSearchに移動し、パイプラインセクションに移動します。構成されたパイプラインが表示され、その設定を確認して、期待と一致していることを確認できます:
DynamoDBコンソールのテーブルの統合セクションに移動することで、これも確認できます。
ステップ4: OpenSearchで新しいクエリを公開
ステップ4a: OpenSearchデータソースをバックエンドに追加
まず、OpenSearchデータソースをデータバックエンドに追加します。amplify/backend.tsファイルの末尾に以下のコードを追加します。
// OpenSearchデータソースを追加const osDataSource = backend.data.addOpenSearchDataSource( "osDataSource", openSearchDomain);ステップ4b: レゾルバーを作成してクエリに添付
検索レゾルバーを作成しましょう。amplify/data/searchTodoResolver.jsという名前のファイルを作成し、以下のコードを貼り付けます。詳細については、Amazon OpenSearch Serviceレゾルバーを参照してください。
import { util } from "@aws-appsync/utils";
/** * 入力用語を使用してドキュメントを検索します * @param {import('@aws-appsync/utils').Context} ctx コンテキスト * @returns {*} リクエスト */export function request(ctx) { return { operation: "GET", path: "/todo/_search", };}
/** * フェッチされたアイテムを返します * @param {import('@aws-appsync/utils').Context} ctx コンテキスト * @returns {*} 結果 */export function response(ctx) { if (ctx.error) { util.error(ctx.error.message, ctx.error.type); } return ctx.result.hits.hits.map((hit) => hit._source);}ステップ4c: 検索クエリのAppSyncレゾルバーを追加
スキーマを更新し、searchTodoクエリを追加します。
const schema = a.schema({ Todo: a .model({ content: a.string(), done: a.boolean(), priority: a.enum(["low", "medium", "high"]), }) .authorization((allow) => [allow.publicApiKey()]),
searchTodos: a .query() .returns(a.ref("Todo").array()) .authorization((allow) => [allow.publicApiKey()]) .handler( a.handler.custom({ entry: "./searchTodoResolver.js", dataSource: "osDataSource", }) ),
});リソースをデプロイした後、AppSyncコンソールを確認することで変更を検証できます。「searchTodo」クエリを実行し、右側の結果を確認して、その正確性を確認します。