Name:
interface
Value:
Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.
Gen1 DocsLegacy

Page updated Apr 7, 2026

Kinesis Data Streamsクライアント

AmplifyKinesisClientは、Amazon Kinesis Data Streamsにデータをストリーミングするための独立型クライアントです。以下の機能を提供します:

  • オフラインサポートのためのローカル永続化
  • 失敗したレコードの自動再試行
  • 自動バッチ処理(リクエストあたり最大500レコードまたは10MB)
  • インターバルベースの自動フラッシング(デフォルト: 30秒ごと)
  • キャッシュされたレコードを保持しながら新しいレコードを無視する有効/無効の切り替え

これはAmplify Analyticsカテゴリプラグインとは別の独立型クライアントです。PutRecordsを使用してKinesis Data Streams APIと直接通信します。

このクライアントを使用する前に、バックエンドが必要なIAM権限で構成されていることを確認してください。Kinesis Data Streamsのセットアップを参照してください。

はじめに

インストール

モジュールのbuild.gradle.ktsに依存関係を追加します:

dependencies {
implementation("com.amplifyframework:aws-kinesis:LATEST_VERSION")
}

クライアントの初期化

import com.amplifyframework.kinesis.AmplifyKinesisClient
val kinesis = AmplifyKinesisClient(
context = applicationContext,
region = "us-east-1",
credentialsProvider = credentialsProvider
)

構成オプション

オプションオブジェクトを渡すことでクライアントの動作をカスタマイズできます:

オプションデフォルト説明
cacheMaxBytes5 MBローカルレコードキャッシュの最大サイズ(バイト単位)。
maxRetries5レコードが破棄される前の最大再試行回数。
flushStrategyFlushStrategy.Interval(30.seconds)自動フラッシュインターバル。手動のみのフラッシングの場合はFlushStrategy.Noneを使用します。
configureClientnull基盤となるAWS SDK KinesisClientをカスタマイズするためのエスケープハッチ。
import com.amplifyframework.kinesis.AmplifyKinesisClient
import com.amplifyframework.kinesis.AmplifyKinesisClientOptions
import com.amplifyframework.recordcache.FlushStrategy
import kotlin.time.Duration.Companion.seconds
val kinesis = AmplifyKinesisClient(
context = applicationContext,
region = "us-east-1",
credentialsProvider = credentialsProvider,
options = AmplifyKinesisClientOptions {
cacheMaxBytes = 10L * 1024 * 1024 // 10 MB
maxRetries = 3
flushStrategy = FlushStrategy.Interval(60.seconds)
configureClient {
retryStrategy { maxAttempts = 10 }
}
}
)

自動フラッシングを無効にするには:

options = AmplifyKinesisClientOptions {
flushStrategy = FlushStrategy.None
}

使用方法

データの記録

record()を使用してデータをローカルキャッシュに永続化します。レコードは次のフラッシュサイクル(自動または手動)中にKinesisに送信されます。

val result = kinesis.record(
data = "Hello Kinesis".toByteArray(),
partitionKey = "partition-1",
streamName = "my-stream"
)
when (result) {
is Result.Success -> { /* 正常に記録されました */ }
is Result.Failure -> { /* エラーを処理します */ }
}

クライアントが無効な状態で送信されたレコードは無視されます。

レコードのフラッシュ

クライアントは設定されたインターバル(デフォルト: 30秒)でキャッシュされたレコードを自動的にフラッシュします。手動でフラッシュをトリガーすることもできます:

when (val result = kinesis.flush()) {
is Result.Success -> println("${result.data.recordsFlushed}件のレコードをフラッシュしました")
is Result.Failure -> println("フラッシュエラー: ${result.error}")
}

各フラッシュはストリームあたり最大1バッチを送信します(最大500レコードまたは10MB)。残りのレコードは後続のフラッシュサイクルで取得されます。フラッシュが既に進行中の場合、呼び出しはflushInProgress: trueで即座に戻ります。

手動フラッシュはクライアントが無効な場合でも機能し、コレクションを再度有効にすることなくキャッシュされたレコードをドレインできます。

キャッシュのクリア

ローカルストレージからキャッシュされたすべてのレコードを削除します:

kinesis.clearCache()

有効化と無効化

実行時にレコードコレクションと自動フラッシングを切り替えることができます。無効な場合、新しいレコードは無視されますが、既にキャッシュされたレコードはストレージに残ります。

kinesis.disable()
// レコードは無視され、自動フラッシュは一時停止されます
kinesis.enable()
// コレクションと自動フラッシュが再開されます

詳細

エスケープハッチ

このクライアントのAPIがカバーしていない操作のために、基盤となるAWS SDK KinesisClientにアクセスします:

val sdkClient = kinesis.kinesisClient
// sdkClientを直接Kinesis APIコールに使用します

エラーハンドリング

すべての操作はシールされた例外階層を通じてエラーをサーフェスします:

エラータイプ説明
AmplifyKinesisValidationExceptionレコード入力の検証に失敗しました(オーバーサイズレコード、無効なパーティションキー)。
AmplifyKinesisLimitExceededExceptionローカルキャッシュが満杯です。flush()またはclearCache()を呼び出してスペースを解放します。
AmplifyKinesisStorageExceptionローカルデータベースエラー。
AmplifyKinesisUnknownException予期しない、または分類されていないエラー。

操作はResult<T, AmplifyKinesisException>を返します:

when (val result = kinesis.record(...)) {
is Result.Success -> { /* 成功 */ }
is Result.Failure -> when (result.error) {
is AmplifyKinesisValidationException -> { /* 無効な入力 */ }
is AmplifyKinesisLimitExceededException -> { /* キャッシュ満杯 */ }
is AmplifyKinesisStorageException -> { /* データベースエラー */ }
is AmplifyKinesisUnknownException -> { /* 予期しないエラー */ }
}
}

再試行動作

  • すべてのPutRecordsエラーコード(ProvisionedThroughputExceededExceptionInternalFailure)は再試行可能として扱われます。
  • 各失敗したレコードの再試行カウントは試行ごとにインクリメントされます。
  • maxRetries(デフォルト: 5)を超えたレコードはキャッシュから永久に削除されます。
  • SDKレベルのKinesisエラーはログに記録され、ストリームごとにスキップされるため、他のストリームはまだフラッシュできます。
  • 非SDKエラー(ネットワーク障害、ストレージエラー)はフラッシュ全体を中止します。

Kinesisサービスの制限

クライアントはサービスに送信する前にこれらの制限を強制します:

制限
PutRecordsリクエストあたりの最大レコード数500
最大単一レコードサイズ10 MB
PutRecordsリクエストあたりの最大合計ペイロード10 MB
最大パーティションキー長256文字