Kinesis Data Streamsクライアント
AmplifyKinesisClientは、Amazon Kinesis Data Streamsにデータをストリーミングするための独立型クライアントです。以下の機能を提供します:
- オフラインサポートのためのローカル永続化
- 失敗したレコードの自動再試行
- 自動バッチ処理(リクエストあたり最大500レコードまたは10MB)
- インターバルベースの自動フラッシング(デフォルト: 30秒ごと)
- キャッシュされたレコードを保持しながら新しいレコードを無視する有効/無効の切り替え
はじめに
インストール
モジュールのbuild.gradle.ktsに依存関係を追加します:
dependencies { implementation("com.amplifyframework:aws-kinesis:LATEST_VERSION")}クライアントの初期化
import com.amplifyframework.kinesis.AmplifyKinesisClient
val kinesis = AmplifyKinesisClient( context = applicationContext, region = "us-east-1", credentialsProvider = credentialsProvider)構成オプション
オプションオブジェクトを渡すことでクライアントの動作をカスタマイズできます:
| オプション | デフォルト | 説明 |
|---|---|---|
cacheMaxBytes | 5 MB | ローカルレコードキャッシュの最大サイズ(バイト単位)。 |
maxRetries | 5 | レコードが破棄される前の最大再試行回数。 |
flushStrategy | FlushStrategy.Interval(30.seconds) | 自動フラッシュインターバル。手動のみのフラッシングの場合はFlushStrategy.Noneを使用します。 |
configureClient | null | 基盤となるAWS SDK KinesisClientをカスタマイズするためのエスケープハッチ。 |
import com.amplifyframework.kinesis.AmplifyKinesisClientimport com.amplifyframework.kinesis.AmplifyKinesisClientOptionsimport com.amplifyframework.recordcache.FlushStrategyimport kotlin.time.Duration.Companion.seconds
val kinesis = AmplifyKinesisClient( context = applicationContext, region = "us-east-1", credentialsProvider = credentialsProvider, options = AmplifyKinesisClientOptions { cacheMaxBytes = 10L * 1024 * 1024 // 10 MB maxRetries = 3 flushStrategy = FlushStrategy.Interval(60.seconds) configureClient { retryStrategy { maxAttempts = 10 } } })自動フラッシングを無効にするには:
options = AmplifyKinesisClientOptions { flushStrategy = FlushStrategy.None}使用方法
データの記録
record()を使用してデータをローカルキャッシュに永続化します。レコードは次のフラッシュサイクル(自動または手動)中にKinesisに送信されます。
val result = kinesis.record( data = "Hello Kinesis".toByteArray(), partitionKey = "partition-1", streamName = "my-stream")when (result) { is Result.Success -> { /* 正常に記録されました */ } is Result.Failure -> { /* エラーを処理します */ }}クライアントが無効な状態で送信されたレコードは無視されます。
レコードのフラッシュ
クライアントは設定されたインターバル(デフォルト: 30秒)でキャッシュされたレコードを自動的にフラッシュします。手動でフラッシュをトリガーすることもできます:
when (val result = kinesis.flush()) { is Result.Success -> println("${result.data.recordsFlushed}件のレコードをフラッシュしました") is Result.Failure -> println("フラッシュエラー: ${result.error}")}各フラッシュはストリームあたり最大1バッチを送信します(最大500レコードまたは10MB)。残りのレコードは後続のフラッシュサイクルで取得されます。フラッシュが既に進行中の場合、呼び出しはflushInProgress: trueで即座に戻ります。
手動フラッシュはクライアントが無効な場合でも機能し、コレクションを再度有効にすることなくキャッシュされたレコードをドレインできます。
キャッシュのクリア
ローカルストレージからキャッシュされたすべてのレコードを削除します:
kinesis.clearCache()有効化と無効化
実行時にレコードコレクションと自動フラッシングを切り替えることができます。無効な場合、新しいレコードは無視されますが、既にキャッシュされたレコードはストレージに残ります。
kinesis.disable()// レコードは無視され、自動フラッシュは一時停止されます
kinesis.enable()// コレクションと自動フラッシュが再開されます詳細
エスケープハッチ
このクライアントのAPIがカバーしていない操作のために、基盤となるAWS SDK KinesisClientにアクセスします:
val sdkClient = kinesis.kinesisClient// sdkClientを直接Kinesis APIコールに使用しますエラーハンドリング
すべての操作はシールされた例外階層を通じてエラーをサーフェスします:
| エラータイプ | 説明 |
|---|---|
AmplifyKinesisValidationException | レコード入力の検証に失敗しました(オーバーサイズレコード、無効なパーティションキー)。 |
AmplifyKinesisLimitExceededException | ローカルキャッシュが満杯です。flush()またはclearCache()を呼び出してスペースを解放します。 |
AmplifyKinesisStorageException | ローカルデータベースエラー。 |
AmplifyKinesisUnknownException | 予期しない、または分類されていないエラー。 |
操作はResult<T, AmplifyKinesisException>を返します:
when (val result = kinesis.record(...)) { is Result.Success -> { /* 成功 */ } is Result.Failure -> when (result.error) { is AmplifyKinesisValidationException -> { /* 無効な入力 */ } is AmplifyKinesisLimitExceededException -> { /* キャッシュ満杯 */ } is AmplifyKinesisStorageException -> { /* データベースエラー */ } is AmplifyKinesisUnknownException -> { /* 予期しないエラー */ } }}再試行動作
- すべての
PutRecordsエラーコード(ProvisionedThroughputExceededException、InternalFailure)は再試行可能として扱われます。 - 各失敗したレコードの再試行カウントは試行ごとにインクリメントされます。
maxRetries(デフォルト: 5)を超えたレコードはキャッシュから永久に削除されます。- SDKレベルのKinesisエラーはログに記録され、ストリームごとにスキップされるため、他のストリームはまだフラッシュできます。
- 非SDKエラー(ネットワーク障害、ストレージエラー)はフラッシュ全体を中止します。
Kinesisサービスの制限
クライアントはサービスに送信する前にこれらの制限を強制します:
| 制限 | 値 |
|---|---|
PutRecordsリクエストあたりの最大レコード数 | 500 |
| 最大単一レコードサイズ | 10 MB |
PutRecordsリクエストあたりの最大合計ペイロード | 10 MB |
| 最大パーティションキー長 | 256文字 |