Name:
interface
Value:
Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Choose your framework/language

Gen1 DocsLegacy

Page updated May 6, 2025

Amazon OpenSearchで検索およびアグリゲートクエリに接続する

Amazon OpenSearch Serviceは、OpenSearchまたはElasticsearchを使用して検索分析ソリューションをデプロイするためのマネージドプラットフォームを提供します。Amazon DynamoDBとOpenSearch Service間のゼロETL統合により、カスタムコードやインフラストラクチャを必要とせずに、DynamoDBデータを自動的にレプリケートおよび変換することで、DynamoDBデータに対するシームレスな検索が可能になります。この統合は、プロセスを簡素化し、データパイプラインの管理における運用負荷を軽減します。

DynamoDBユーザーは、フルテキスト検索、ファジー検索、オートコンプリート、機械学習機能のためのベクトル検索など、高度なOpenSearch機能にアクセスできます。Amazon OpenSearch Ingestionは、DynamoDBとOpenSearch Service間のデータを同期し、ほぼリアルタイムの更新と複数のDynamoDBテーブル全体の包括的なインサイトを実現します。開発者は、インデックスマッピングテンプレートを調整して、Amazon DynamoDBフィールドをOpenSearch Serviceインデックスと一致させることができます。

Amazon OpenSearch Ingestionは、S3エクスポートとDynamoDBストリームと組み合わせることで、DynamoDBテーブルからシームレスなデータ入力とOpenSearchへの自動取り込みを容易にします。さらに、パイプラインは必要に応じて将来の再取り込みのためのバックアップデータをS3に保存することができます。

ステップ1: プロジェクトのセットアップ

クイックスタートガイドの手順に従ってプロジェクトをセットアップしてください。このガイドでは、DynamoDBからOpenSearchへTodoテーブルを同期します。

まず、スキーマにTodoモデルを追加します:

amplify/data/resource.ts
import { type ClientSchema, a, defineData } from "@aws-amplify/backend";
const schema = a.schema({
Todo: a
.model({
content: a.string(),
done: a.boolean(),
priority: a.enum(["low", "medium", "high"]),
})
.authorization((allow) => [allow.publicApiKey()])
});
export type Schema = ClientSchema<typeof schema>;
export const data = defineData({
schema,
authorizationModes: {
defaultAuthorizationMode: "apiKey",
apiKeyAuthorizationMode: {
expiresInDays: 30,
},
},
});

重要な考慮事項:

ポイントインタイムリカバリ(PITR)が有効になっていることを確認してください。これはパイプライン統合に重要です。 DynamoDBストリームを有効にして、OpenSearchに取り込まれるアイテムの変更をキャプチャしてください。

amplify/backend.ts
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import { defineBackend } from '@aws-amplify/backend';
import { auth } from './auth/resource';
import { data } from './data/resource';
const backend = defineBackend({
auth,
data
});
const todoTable =
backend.data.resources.cfnResources.amplifyDynamoDbTables['Todo'];
// テーブル設定を更新
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE
};
// DynamoDBテーブルのARNを取得
const tableArn = backend.data.resources.tables['Todo'].tableArn;
// DynamoDBテーブル名を取得
const tableName = backend.data.resources.tables['Todo'].tableName;

ステップ2: OpenSearchインスタンスのセットアップ

暗号化を使用してOpenSearchインスタンスを作成します。

amplify/backend.ts
import * as opensearch from 'aws-cdk-lib/aws-opensearchservice';
import { RemovalPolicy } from "aws-cdk-lib";
import { defineBackend } from '@aws-amplify/backend';
import { auth } from './auth/resource';
import { data } from './data/resource';
const backend = defineBackend({
auth,
data
});
const todoTable =
backend.data.resources.cfnResources.amplifyDynamoDbTables['Todo'];
// テーブル設定を更新
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE
};
// DynamoDBテーブルのARNを取得
const tableArn = backend.data.resources.tables['Todo'].tableArn;
// DynamoDBテーブル名を取得
const tableName = backend.data.resources.tables['Todo'].tableName;
// OpenSearchドメインを作成
const openSearchDomain = new opensearch.Domain(
backend.data.stack,
'OpenSearchDomain',
{
version: opensearch.EngineVersion.OPENSEARCH_2_11,
capacity: {
// 本番環境ではインスタンスタイプをアップグレードしてください
masterNodeInstanceType: "t3.small.search",
masterNodes: 0,
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
},
nodeToNodeEncryption: true,
// スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定
removalPolicy: RemovalPolicy.DESTROY,
encryptionAtRest: {
enabled: true
}
}
);

重要な考慮事項:

サンドボックス環境ではリソースを破棄するようにremovalPolicyを設定することをお勧めします。デフォルトでは、npx ampx sandbox deleteを実行してもOpenSearchインスタンスは削除されません。これは、ステートフルリソースのデフォルト削除ポリシーがリソースを保持するように設定されているためです。

ステップ3: DynamoDBからOpenSearchへのゼロETLのセットアップ

ステップ3a: ストレージとIAMロールのセットアップ

OpenSearchパイプラインによって使用される生イベントをバックアップするためのストレージを確立します。 amplify/storage/resource.tsという名前のファイルを生成し、提供されたコンテンツを挿入してストレージリソースをセットアップします。ストレージバケット内のさまざまなパスへのアクセスを制御するようにストレージ構成をカスタマイズします。

amplify/storage/resource.ts
import { defineStorage } from "@aws-amplify/backend"
export const storage = defineStorage({
name: "opensearch-backup-bucket-amplify-gen-2",
access: allow => ({
'public/*': [
allow.guest.to(['list', 'write', 'get'])
]
})
})

以下に示すようにストレージリソースからs3BucketArnおよびs3BucketNameの値を取得します。さらに、パイプラインのIAMロールを設定し、以下に示すようにロールを割り当てます。必要なIAMロールの詳細については、ロールとユーザーのセットアップドキュメントを参照してください。

amplify/backend.ts
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as opensearch from "aws-cdk-lib/aws-opensearchservice";
import { RemovalPolicy } from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { storage } from "./storage/resource";
// バックエンドリソースを定義
const backend = defineBackend({
auth,
data,
storage,
});
const todoTable =
backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// テーブル設定を更新
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE,
};
// DynamoDBテーブルのARNを取得
const tableArn = backend.data.resources.tables["Todo"].tableArn;
// DynamoDBテーブル名を取得
const tableName = backend.data.resources.tables["Todo"].tableName;
// OpenSearchドメインを作成
const openSearchDomain = new opensearch.Domain(
backend.data.stack,
"OpenSearchDomain",
{
version: opensearch.EngineVersion.OPENSEARCH_2_11,
capacity: {
// 本番環境ではインスタンスタイプをアップグレードしてください
masterNodeInstanceType: "t3.small.search",
masterNodes: 0,
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
},
nodeToNodeEncryption: true,
// スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定
removalPolicy: RemovalPolicy.DESTROY,
encryptionAtRest: {
enabled: true,
},
}
);
// S3バケットのARNを取得
const s3BucketArn = backend.storage.resources.bucket.bucketArn;
// S3バケット名を取得
const s3BucketName = backend.storage.resources.bucket.bucketName;
// OpenSearch統合用のIAMロールを作成
const openSearchIntegrationPipelineRole = new iam.Role(
backend.data.stack,
"OpenSearchIntegrationPipelineRole",
{
assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"),
inlinePolicies: {
openSearchPipelinePolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["es:DescribeDomain"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
actions: ["es:ESHttp*"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
],
resources: [s3BucketArn, s3BucketArn + "/*"],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime",
"dynamodb:DescribeExport",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
],
resources: [tableArn, tableArn + "/*"],
}),
],
}),
},
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"AmazonOpenSearchIngestionFullAccess"
),
],
}
);

S3バケットの場合、標準的なセキュリティプラクティスに従います: パブリックアクセスをブロック、保存中のデータを暗号化、バージョニングを有効にします。

IAMロールは、OpenSearch Ingestion Service(OSIS)パイプラインがそれを引き受けることを許可する必要があります。特定のOpenSearch Serviceの権限を付与し、DynamoDBおよびS3アクセスも提供します。最小権限の原則に従うために、権限をカスタマイズしてもよいです。

ステップ3b: OpenSearch Serviceパイプライン

パイプラインコンストラクトとその構成を定義します。

OpenSearchを使用する場合、データ構造に基づいてインデックステンプレートまたはマッピングを事前に定義できます。これにより、ドキュメント内の各フィールドのデータタイプを設定できます。このアプローチは、正確なデータ取り込みと検索に非常に強力です。インデックスマッピング/テンプレートの詳細については、OpenSearchドキュメントを参照してください。

取り込みパイプラインのデータ構造を定義するために、template_contentJSON表現をカスタマイズします。

amplify/backend.ts
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as opensearch from "aws-cdk-lib/aws-opensearchservice";
import { RemovalPolicy } from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { storage } from "./storage/resource";
// バックエンドリソースを定義
const backend = defineBackend({
auth,
data,
storage,
});
const todoTable =
backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// テーブル設定を更新
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE,
};
// DynamoDBテーブルのARNを取得
const tableArn = backend.data.resources.tables["Todo"].tableArn;
// DynamoDBテーブル名を取得
const tableName = backend.data.resources.tables["Todo"].tableName;
// OpenSearchドメインを作成
const openSearchDomain = new opensearch.Domain(
backend.data.stack,
"OpenSearchDomain",
{
version: opensearch.EngineVersion.OPENSEARCH_2_11,
capacity: {
// 本番環境ではインスタンスタイプをアップグレードしてください
masterNodeInstanceType: "t3.small.search",
masterNodes: 0,
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
},
nodeToNodeEncryption: true,
// スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定
removalPolicy: RemovalPolicy.DESTROY,
encryptionAtRest: {
enabled: true,
},
}
);
// S3バケットのARNを取得
const s3BucketArn = backend.storage.resources.bucket.bucketArn;
// S3バケット名を取得
const s3BucketName = backend.storage.resources.bucket.bucketName;
// OpenSearch統合用のIAMロールを作成
const openSearchIntegrationPipelineRole = new iam.Role(
backend.data.stack,
"OpenSearchIntegrationPipelineRole",
{
assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"),
inlinePolicies: {
openSearchPipelinePolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["es:DescribeDomain"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
actions: ["es:ESHttp*"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
],
resources: [s3BucketArn, s3BucketArn + "/*"],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime",
"dynamodb:DescribeExport",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
],
resources: [tableArn, tableArn + "/*"],
}),
],
}),
},
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"AmazonOpenSearchIngestionFullAccess"
),
],
}
);
// OpenSearchインデックスマッピングを定義
const indexName = "todo";
const indexMapping = {
settings: {
number_of_shards: 1,
number_of_replicas: 0,
},
mappings: {
properties: {
id: {
type: "keyword",
},
done: {
type: "boolean",
},
content: {
type: "text",
},
},
},
};

設定はOpenSearchのdata-prepper機能です。DynamoDB構成に関する特定のドキュメントについては、OpenSearchデータプリッパードキュメントを参照してください。

amplify/backend.ts
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as opensearch from "aws-cdk-lib/aws-opensearchservice";
import { RemovalPolicy } from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { storage } from "./storage/resource";
// バックエンドリソースを定義
const backend = defineBackend({
auth,
data,
storage,
});
const todoTable =
backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// テーブル設定を更新
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE,
};
// DynamoDBテーブルのARNを取得
const tableArn = backend.data.resources.tables["Todo"].tableArn;
// DynamoDBテーブル名を取得
const tableName = backend.data.resources.tables["Todo"].tableName;
// OpenSearchドメインを作成
const openSearchDomain = new opensearch.Domain(
backend.data.stack,
"OpenSearchDomain",
{
version: opensearch.EngineVersion.OPENSEARCH_2_11,
capacity: {
// 本番環境ではインスタンスタイプをアップグレードしてください
masterNodeInstanceType: "t3.small.search",
masterNodes: 0,
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
},
nodeToNodeEncryption: true,
// スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定
removalPolicy: RemovalPolicy.DESTROY,
encryptionAtRest: {
enabled: true,
},
}
);
// S3バケットのARNを取得
const s3BucketArn = backend.storage.resources.bucket.bucketArn;
// S3バケット名を取得
const s3BucketName = backend.storage.resources.bucket.bucketName;
// OpenSearch統合用のIAMロールを作成
const openSearchIntegrationPipelineRole = new iam.Role(
backend.data.stack,
"OpenSearchIntegrationPipelineRole",
{
assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"),
inlinePolicies: {
openSearchPipelinePolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["es:DescribeDomain"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
actions: ["es:ESHttp*"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
],
resources: [s3BucketArn, s3BucketArn + "/*"],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime",
"dynamodb:DescribeExport",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
],
resources: [tableArn, tableArn + "/*"],
}),
],
}),
},
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"AmazonOpenSearchIngestionFullAccess"
),
],
}
);
// OpenSearchインデックスマッピングを定義
const indexName = "todo";
const indexMapping = {
settings: {
number_of_shards: 1,
number_of_replicas: 0,
},
mappings: {
properties: {
id: {
type: "keyword",
},
isDone: {
type: "boolean",
},
content: {
type: "text",
},
priority: {
type: "text",
},
},
},
};
// OpenSearchテンプレート定義
const openSearchTemplate = `
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "${tableArn}"
stream:
start_position: "LATEST"
export:
s3_bucket: "${s3BucketName}"
s3_region: "${backend.storage.stack.region}"
s3_prefix: "${tableName}/"
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
sink:
- opensearch:
hosts:
- "https://${openSearchDomain.domainEndpoint}"
index: "${indexName}"
index_type: "custom"
template_content: |
${JSON.stringify(indexMapping)}
document_id: '\${getMetadata("primary_key")}'
action: '\${getMetadata("opensearch_action")}'
document_version: '\${getMetadata("document_version")}'
document_version_type: "external"
bulk_size: 4
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
`;

この設定は、単一モデルのパイプラインの望ましい動作を定義します。

ソース設定では、DynamoDBをデータソースとして指定し、取り込み対象のテーブルとストリームの開始点を設定します。さらに、ストリームをOpenSearchに取り込むだけでなく、バックアップ目的でターゲットS3バケットを定義します。さらに、取り込みパイプラインにIAMロールを設定し、ドキュメントで詳述されている必要な権限とポリシーが確実に使用されるようにします。

シンク設定に関して、OpenSearchドメインクラスタはホスト、インデックス名、タイプ、テンプレートコンテンツ(インデックスマッピング)を設定することで指定されます。ドキュメント関連のメタデータはOpenSearchへのリクエストの最大バルクサイズ(MB)とともに設定されます。再び、パイプラインのシンク部分にIAMロールを指定します。シンク設定の詳細については、OpenSearchドキュメントを参照してください。

シンク設定は配列です。同じテーブルで別のインデックスを作成するには、シンク配列にOpenSearch設定を2番目として追加することで達成できます。

複数のテーブルにインデックスを付けるには、設定で複数のパイプラインを構成する必要があります。詳細なガイダンスについては、OpenSearchドキュメントのパイプラインセクションを参照してください。

: OpenSearch Ingestionパイプラインは、ソースとして1つのDynamoDBテーブルのみをサポートします。現在の制限の詳細については、Amazon OpenSearch制限セクションを参照してください。

次に、OSISパイプラインリソースを作成します:

amplify/backend.ts
import * as dynamodb from "aws-cdk-lib/aws-dynamodb";
import * as opensearch from "aws-cdk-lib/aws-opensearchservice";
import { RemovalPolicy } from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as osis from "aws-cdk-lib/aws-osis";
import * as logs from "aws-cdk-lib/aws-logs";
import { RemovalPolicy } from "aws-cdk-lib";
import { defineBackend } from "@aws-amplify/backend";
import { auth } from "./auth/resource";
import { data } from "./data/resource";
import { storage } from "./storage/resource";
// バックエンドリソースを定義
const backend = defineBackend({
auth,
data,
storage,
});
const todoTable =
backend.data.resources.cfnResources.amplifyDynamoDbTables["Todo"];
// テーブル設定を更新
todoTable.pointInTimeRecoveryEnabled = true;
todoTable.streamSpecification = {
streamViewType: dynamodb.StreamViewType.NEW_IMAGE,
};
// DynamoDBテーブルのARNを取得
const tableArn = backend.data.resources.tables["Todo"].tableArn;
// DynamoDBテーブル名を取得
const tableName = backend.data.resources.tables["Todo"].tableName;
// OpenSearchドメインを作成
const openSearchDomain = new opensearch.Domain(
backend.data.stack,
"OpenSearchDomain",
{
version: opensearch.EngineVersion.OPENSEARCH_2_11,
capacity: {
// 本番環境ではインスタンスタイプをアップグレードしてください
masterNodeInstanceType: "t3.small.search",
masterNodes: 0,
dataNodeInstanceType: "t3.small.search",
dataNodes: 1,
},
nodeToNodeEncryption: true,
// スタック削除時にOpenSearchドメインが削除されるようにremovalPolicyをDESTROYに設定
removalPolicy: RemovalPolicy.DESTROY,
encryptionAtRest: {
enabled: true,
},
}
);
// S3バケットのARNを取得
const s3BucketArn = backend.storage.resources.bucket.bucketArn;
// S3バケット名を取得
const s3BucketName = backend.storage.resources.bucket.bucketName;
// OpenSearch統合用のIAMロールを作成
const openSearchIntegrationPipelineRole = new iam.Role(
backend.data.stack,
"OpenSearchIntegrationPipelineRole",
{
assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com"),
inlinePolicies: {
openSearchPipelinePolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ["es:DescribeDomain"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
actions: ["es:ESHttp*"],
resources: [
openSearchDomain.domainArn,
openSearchDomain.domainArn + "/*",
],
effect: iam.Effect.ALLOW,
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl",
],
resources: [s3BucketArn, s3BucketArn + "/*"],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime",
"dynamodb:DescribeExport",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
],
resources: [tableArn, tableArn + "/*"],
}),
],
}),
},
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName(
"AmazonOpenSearchIngestionFullAccess"
),
],
}
);
// OpenSearchインデックスマッピングを定義
const indexName = "todo";
const indexMapping = {
settings: {
number_of_shards: 1,
number_of_replicas: 0,
},
mappings: {
properties: {
id: {
type: "keyword",
},
isDone: {
type: "boolean",
},
content: {
type: "text",
},
priority: {
type: "text",
},
},
},
};
// OpenSearchテンプレート定義
const openSearchTemplate = `
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "${tableArn}"
stream:
start_position: "LATEST"
export:
s3_bucket: "${s3BucketName}"
s3_region: "${backend.storage.stack.region}"
s3_prefix: "${tableName}/"
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
sink:
- opensearch:
hosts:
- "https://${openSearchDomain.domainEndpoint}"
index: "${indexName}"
index_type: "custom"
template_content: |
${JSON.stringify(indexMapping)}
document_id: '\${getMetadata("primary_key")}'
action: '\${getMetadata("opensearch_action")}'
document_version: '\${getMetadata("document_version")}'
document_version_type: "external"
bulk_size: 4
aws:
sts_role_arn: "${openSearchIntegrationPipelineRole.roleArn}"
region: "${backend.data.stack.region}"
`;
// CloudWatchログループを作成
const logGroup = new logs.LogGroup(backend.data.stack, "LogGroup", {
logGroupName: "/aws/vendedlogs/OpenSearchService/pipelines/1",
removalPolicy: RemovalPolicy.DESTROY,
});
// OpenSearch統合サービスパイプラインを作成
const cfnPipeline = new osis.CfnPipeline(
backend.data.stack,
"OpenSearchIntegrationPipeline",
{
maxUnits: 4,
minUnits: 1,
pipelineConfigurationBody: openSearchTemplate,
pipelineName: "dynamodb-integration-2",
logPublishingOptions: {
isLoggingEnabled: true,
cloudWatchLogDestination: {
logGroup: logGroup.logGroupName,
},
},
}
);

リソースをデプロイした後、Todoテーブルにアイテムを追加することで、データ取り込みプロセスをテストできます。ただし、その前に、パイプラインが正しくセットアップされていることを確認しましょう。

AWSコンソールでOpenSearchに移動し、パイプラインセクションに移動します。構成されたパイプラインが表示され、その設定を確認して、期待と一致していることを確認できます:

DynamoDB統合セクションの下に作成されたOpenSearch OSISパイプラインが表示されているスクリーンショット

DynamoDBコンソールのテーブルの統合セクションに移動することで、これも確認できます。

OpenSearchコンソールの「取り込み -> パイプライン」セクション下で作成されたOpenSearch OSISパイプラインが表示されているスクリーンショット。

ステップ4: OpenSearchで新しいクエリを公開

ステップ4a: OpenSearchデータソースをバックエンドに追加

まず、OpenSearchデータソースをデータバックエンドに追加します。amplify/backend.tsファイルの末尾に以下のコードを追加します。

amplify/backend.ts
// OpenSearchデータソースを追加
const osDataSource = backend.data.addOpenSearchDataSource(
"osDataSource",
openSearchDomain
);

ステップ4b: レゾルバーを作成してクエリに添付

検索レゾルバーを作成しましょう。amplify/data/searchTodoResolver.jsという名前のファイルを作成し、以下のコードを貼り付けます。詳細については、Amazon OpenSearch Serviceレゾルバーを参照してください。

amplify/data/searchTodoResolver.js
import { util } from "@aws-appsync/utils";
/**
* 入力用語を使用してドキュメントを検索します
* @param {import('@aws-appsync/utils').Context} ctx コンテキスト
* @returns {*} リクエスト
*/
export function request(ctx) {
return {
operation: "GET",
path: "/todo/_search",
};
}
/**
* フェッチされたアイテムを返します
* @param {import('@aws-appsync/utils').Context} ctx コンテキスト
* @returns {*} 結果
*/
export function response(ctx) {
if (ctx.error) {
util.error(ctx.error.message, ctx.error.type);
}
return ctx.result.hits.hits.map((hit) => hit._source);
}

ステップ4c: 検索クエリのAppSyncレゾルバーを追加

スキーマを更新し、searchTodoクエリを追加します。

amplify/data/resource.ts
const schema = a.schema({
Todo: a
.model({
content: a.string(),
done: a.boolean(),
priority: a.enum(["low", "medium", "high"]),
})
.authorization((allow) => [allow.publicApiKey()]),
searchTodos: a
.query()
.returns(a.ref("Todo").array())
.authorization((allow) => [allow.publicApiKey()])
.handler(
a.handler.custom({
entry: "./searchTodoResolver.js",
dataSource: "osDataSource",
})
),
});

リソースをデプロイした後、AppSyncコンソールを確認することで変更を検証できます。「searchTodo」クエリを実行し、右側の結果を確認して、その正確性を確認します。

「searchTodo」の生成されたクエリが表示され、OpenSearchからフェッチされた結果が右側に表示されているAppSyncコンソール。