Amazon Data Firehose クライアント
AmplifyFirehoseClient は Amazon Data Firehose デリバリーストリームへのデータストリーミング用スタンドアロンクライアントです。以下の機能を提供します:
- オフラインサポート用のローカル永続化
- 失敗したレコードの自動リトライ
- 自動バッチ処理(リクエストごとに最大 500 レコードまたは 4 MB)
- インターバルベースの自動フラッシュ(デフォルト: 30 秒ごと)
- キャッシュ済みレコードを保持しながら新しいレコードを静かにドロップする有効/無効の切り替え
はじめに
インストール
モジュールの build.gradle.kts に依存関係を追加します:
dependencies { implementation("com.amplifyframework:aws-kinesis:LATEST_VERSION")}クライアントの初期化
import com.amplifyframework.firehose.AmplifyFirehoseClient
val firehose = AmplifyFirehoseClient( context = applicationContext, region = "us-east-1", credentialsProvider = credentialsProvider)設定オプション
オプションオブジェクトを渡してクライアントの動作をカスタマイズできます:
| オプション | デフォルト | 説明 |
|---|---|---|
cacheMaxBytes | 5 MB | ローカルレコードキャッシュの最大サイズ(バイト単位)。 |
maxRetries | 5 | レコードを破棄する前の最大リトライ回数。 |
flushStrategy | FlushStrategy.Interval(30.seconds) | 自動フラッシュインターバル。手動のみのフラッシュには FlushStrategy.None を使用します。 |
configureClient | null | 基礎となる AWS SDK FirehoseClient をカスタマイズするためのエスケープハッチ。 |
import com.amplifyframework.firehose.AmplifyFirehoseClientimport com.amplifyframework.firehose.AmplifyFirehoseClientOptionsimport com.amplifyframework.recordcache.FlushStrategyimport kotlin.time.Duration.Companion.seconds
val firehose = AmplifyFirehoseClient( context = applicationContext, region = "us-east-1", credentialsProvider = credentialsProvider, options = AmplifyFirehoseClientOptions { cacheMaxBytes = 10L * 1024 * 1024 // 10 MB maxRetries = 5 flushStrategy = FlushStrategy.Interval(30.seconds) configureClient { retryStrategy { maxAttempts = 10 } } })自動フラッシュを無効にするには:
options = AmplifyFirehoseClientOptions { flushStrategy = FlushStrategy.None}使い方
レコードデータ
record() を使用してデータをローカルキャッシュに永続化します。レコードは次のフラッシュサイクル(自動または手動)中に Firehose に送信されます。
val result = firehose.record( data = "Hello Firehose".toByteArray(), streamName = "my-delivery-stream")when (result) { is Result.Success -> { /* 正常に記録されました */ } is Result.Failure -> { /* エラーを処理します */ }}クライアントが無効な状態で送信されたレコードは静かにドロップされます。
レコードをフラッシュ
クライアントは設定されたインターバル(デフォルト: 30 秒)でキャッシュされたレコードを自動的にフラッシュします。手動フラッシュをトリガーすることもできます:
when (val result = firehose.flush()) { is Result.Success -> println("${result.data.recordsFlushed} レコードをフラッシュしました") is Result.Failure -> println("フラッシュエラー: ${result.error}")}各フラッシュはストリームごとに最大 1 バッチを送信します(最大 500 レコードまたは 4 MB)。残りのレコードは次のフラッシュサイクルで処理されます。フラッシュが既に進行中の場合、呼び出しは flushInProgress: true で即座に戻ります。
手動フラッシュはクライアントが無効な場合でも機能するため、収集を再度有効にすることなくキャッシュされたレコードをドレインできます。
キャッシュをクリア
ローカルストレージからキャッシュされたすべてのレコードを削除します:
firehose.clearCache()有効化と無効化
実行時にレコード収集と自動フラッシュを切り替えられます。無効にされた場合、新しいレコードは静かにドロップされますが、既存のキャッシュされたレコードはストレージに残ります。
firehose.disable()// レコードはドロップされ、自動フラッシュは一時停止されます
firehose.enable()// 収集と自動フラッシュが再開されますクライアントをクローズ
クライアントでの操作が完了したら、リソースを解放するためにクローズします:
クローズ後、すべての操作はエラーを返します。必要に応じて新しいクライアントインスタンスを作成してください。
高度な使用方法
エスケープハッチ
このクライアントの API でカバーされていない操作について、基礎となる AWS SDK FirehoseClient にアクセスします:
val sdkClient = firehose.firehoseClient// sdkClient を使用して Firehose API を直接呼び出しますエラーハンドリング
すべての操作は、シール化された例外階層を通じてエラーを表示します:
| エラータイプ | 説明 |
|---|---|
AmplifyFirehoseValidationException | レコード入力の検証に失敗しました(オーバーサイズのレコード)。 |
AmplifyFirehoseLimitExceededException | ローカルキャッシュが満杯です。flush() または clearCache() を呼び出してスペースを解放してください。 |
AmplifyFirehoseStorageException | ローカルデータベースエラー。 |
AmplifyFirehoseUnknownException | 予期しないまたは分類されていないエラー。 |
操作は Result<T, AmplifyFirehoseException> を返します:
when (val result = firehose.record(...)) { is Result.Success -> { /* 成功 */ } is Result.Failure -> when (result.error) { is AmplifyFirehoseValidationException -> { /* 無効な入力 */ } is AmplifyFirehoseLimitExceededException -> { /* キャッシュがいっぱい */ } is AmplifyFirehoseStorageException -> { /* データベースエラー */ } is AmplifyFirehoseUnknownException -> { /* 予期しないエラー */ } }}リトライ動作
- すべての
PutRecordBatchエラーコード(ServiceUnavailableException、InternalFailure)は再試行可能として扱われます。 - 各失敗したレコードの再試行カウントは各試行後に増加します。
maxRetries(デフォルト: 5)を超えるレコードはキャッシュから永久に削除されます。- SDK レベルの Firehose エラーはログに記録され、ストリームごとにスキップされるため、他のストリームでもフラッシュできます。
- SDK 以外のエラー(ネットワーク障害、ストレージエラー)はフラッシュ全体を中止します。
Firehose サービスリミット
クライアントはサービスに送信する前にこれらの制限を強制します:
| リミット | 値 |
|---|---|
PutRecordBatch リクエストごとの最大レコード数 | 500 |
| 最大単一レコードサイズ | 1,000 KiB |
PutRecordBatch リクエストごとの最大合計ペイロード | 4 MB |