ストリーミング分析データ
Amazon Kinesis分析プロバイダーを使用すると、分析データをKinesisストリームに送信して、リアルタイム処理を行うことができます。
Kinesisストリームの設定
以下は、AWS Cloud Development Kit (AWS CDK)を使用してAmazon Kinesisで動作するAnalyticsリソースを作成する例です。
import { auth } from "./auth/resource";import { data } from "./data/resource";import { Policy, PolicyStatement } from "aws-cdk-lib/aws-iam";import { Stream } from "aws-cdk-lib/aws-kinesis";import { Stack } from "aws-cdk-lib/core";
const backend = defineBackend({ auth, data, // additional resources });
// create a new stack for the Kinesis streamconst kinesisStack = backend.createStack("kinesis-stack");
// create a new Kinesis stream with one shardconst kinesisStream = new Stream(kinesisStack, "KinesisStream", { streamName: "myKinesisStream", shardCount: 1,});
// create a new policy to allow PutRecords to the Kinesis streamconst kinesisPolicy = new Policy(kinesisStack, "KinesisPolicy", { statements: [ new PolicyStatement({ actions: ["kinesis:PutRecords"], resources: [kinesisStream.streamArn], }), ],});
// apply the policy to the authenticated and unauthenticated rolesbackend.auth.resources.authenticatedUserIamRole.attachInlinePolicy(kinesisPolicy);backend.auth.resources.unauthenticatedUserIamRole.attachInlinePolicy(kinesisPolicy);インストールと設定
CLIを使用しなかった場合は、kinesis:PutRecordsのIAM権限を設定していることを確認してください。
Amazon Kinesis用のIAMポリシーの例:
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": "kinesis:PutRecords", "Resource": "arn:aws:kinesis:<your-aws-region>:<your-aws-account-id>:stream/<your-stream-name>" // replace the template fields }]}詳細はAmazon Kinesis開発者向けドキュメントをご覧ください。
Kinesisを設定します:
// Configure the plugin after adding it to the Analytics moduleimport { Amplify } from 'aws-amplify';import { parseAmplifyConfig } from "aws-amplify/utils";import outputs from '../amplify_outputs.json';
const amplifyConfig = parseAmplifyConfig(outputs);
Amplify.configure({ ...amplifyConfig, Analytics: { Kinesis: { // REQUIRED - Amazon Kinesis service region region: 'us-east-1',
// OPTIONAL - The buffer size for events in number of items. bufferSize: 1000,
// OPTIONAL - The number of events to be deleted from the buffer when flushed. flushSize: 100,
// OPTIONAL - The interval in milliseconds to perform a buffer check and flush if necessary. flushInterval: 5000, // 5s
// OPTIONAL - The limit for failed recording retries. resendLimit: 5 } }});データのストリーミング
標準のrecord()メソッドを使用して、Kinesisストリームにデータを送信できます:
import { record } from 'aws-amplify/analytics/kinesis';
record({ data: { // The data blob to put into the record }, partitionKey: 'myPartitionKey', streamName: 'myKinesisStream'});イベントのフラッシュ
記録されたイベントはバッファに保存され、定期的にリモートサーバーに送信されます*(flushIntervalオプションで調整できます)*。必要に応じて、'flushEvents' APIを使用してバッファからすべてのイベントを手動でクリアすることができます。
import { flushEvents } from 'aws-amplify/analytics/kinesis';
flushEvents();既知の問題
デフォルトのPinpointプロバイダーの代わりに、以下の代替サービスプロバイダーをインポートする場合:
- Kinesis (
aws-amplify/analytics/kinesis) - Kinesis Data Firehose (
aws-amplify/analytics/kinesis-firehose) - Personalize Event (
aws-amplify/analytics/personalize)
バンドラーを開始する際に以下のエラーが発生する場合があります:
Error: Unable to resolve module stream from /path/to/node_modules/@aws-sdk/... これは既知の問題です。このエラーを解決するために、この問題で説明されている手順に従ってください。