Name:
interface
Value:
Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.
Gen1 DocsLegacy

Page updated May 12, 2026

Amazon Data Firehose クライアント

AmplifyFirehoseClientAmazon Data Firehose デリバリーストリームへのデータストリーミング用スタンドアロンクライアントです。以下の機能を提供します:

  • オフラインサポート用のローカル永続化
  • 失敗したレコードの自動リトライ
  • 自動バッチ処理(リクエストごとに最大 500 レコードまたは 4 MB)
  • インターバルベースの自動フラッシュ(デフォルト: 30 秒ごと)
  • キャッシュ済みレコードを保持しながら新しいレコードを静かにドロップする有効/無効の切り替え

これはスタンドアロンクライアントであり、Amplify Analytics カテゴリプラグインとは別です。PutRecordBatch を使用して Firehose API と直接通信します。

このクライアントを使用する前に、バックエンドが必要な IAM 権限で設定されていることを確認してください。Amazon Data Firehose の設定 を参照してください。

はじめに

インストール

モジュールの build.gradle.kts に依存関係を追加します:

dependencies {
implementation("com.amplifyframework:aws-kinesis:LATEST_VERSION")
}

クライアントの初期化

import com.amplifyframework.firehose.AmplifyFirehoseClient
val firehose = AmplifyFirehoseClient(
context = applicationContext,
region = "us-east-1",
credentialsProvider = credentialsProvider
)

設定オプション

オプションオブジェクトを渡してクライアントの動作をカスタマイズできます:

オプションデフォルト説明
cacheMaxBytes5 MBローカルレコードキャッシュの最大サイズ(バイト単位)。
maxRetries5レコードを破棄する前の最大リトライ回数。
flushStrategyFlushStrategy.Interval(30.seconds)自動フラッシュインターバル。手動のみのフラッシュには FlushStrategy.None を使用します。
configureClientnull基礎となる AWS SDK FirehoseClient をカスタマイズするためのエスケープハッチ。
import com.amplifyframework.firehose.AmplifyFirehoseClient
import com.amplifyframework.firehose.AmplifyFirehoseClientOptions
import com.amplifyframework.recordcache.FlushStrategy
import kotlin.time.Duration.Companion.seconds
val firehose = AmplifyFirehoseClient(
context = applicationContext,
region = "us-east-1",
credentialsProvider = credentialsProvider,
options = AmplifyFirehoseClientOptions {
cacheMaxBytes = 10L * 1024 * 1024 // 10 MB
maxRetries = 5
flushStrategy = FlushStrategy.Interval(30.seconds)
configureClient {
retryStrategy { maxAttempts = 10 }
}
}
)

自動フラッシュを無効にするには:

options = AmplifyFirehoseClientOptions {
flushStrategy = FlushStrategy.None
}

使い方

レコードデータ

record() を使用してデータをローカルキャッシュに永続化します。レコードは次のフラッシュサイクル(自動または手動)中に Firehose に送信されます。

val result = firehose.record(
data = "Hello Firehose".toByteArray(),
streamName = "my-delivery-stream"
)
when (result) {
is Result.Success -> { /* 正常に記録されました */ }
is Result.Failure -> { /* エラーを処理します */ }
}

クライアントが無効な状態で送信されたレコードは静かにドロップされます。

レコードをフラッシュ

クライアントは設定されたインターバル(デフォルト: 30 秒)でキャッシュされたレコードを自動的にフラッシュします。手動フラッシュをトリガーすることもできます:

when (val result = firehose.flush()) {
is Result.Success -> println("${result.data.recordsFlushed} レコードをフラッシュしました")
is Result.Failure -> println("フラッシュエラー: ${result.error}")
}

各フラッシュはストリームごとに最大 1 バッチを送信します(最大 500 レコードまたは 4 MB)。残りのレコードは次のフラッシュサイクルで処理されます。フラッシュが既に進行中の場合、呼び出しは flushInProgress: true で即座に戻ります。

手動フラッシュはクライアントが無効な場合でも機能するため、収集を再度有効にすることなくキャッシュされたレコードをドレインできます。

キャッシュをクリア

ローカルストレージからキャッシュされたすべてのレコードを削除します:

firehose.clearCache()

有効化と無効化

実行時にレコード収集と自動フラッシュを切り替えられます。無効にされた場合、新しいレコードは静かにドロップされますが、既存のキャッシュされたレコードはストレージに残ります。

firehose.disable()
// レコードはドロップされ、自動フラッシュは一時停止されます
firehose.enable()
// 収集と自動フラッシュが再開されます

クライアントをクローズ

クライアントでの操作が完了したら、リソースを解放するためにクローズします:

クローズ後、すべての操作はエラーを返します。必要に応じて新しいクライアントインスタンスを作成してください。

高度な使用方法

エスケープハッチ

このクライアントの API でカバーされていない操作について、基礎となる AWS SDK FirehoseClient にアクセスします:

val sdkClient = firehose.firehoseClient
// sdkClient を使用して Firehose API を直接呼び出します

エラーハンドリング

すべての操作は、シール化された例外階層を通じてエラーを表示します:

エラータイプ説明
AmplifyFirehoseValidationExceptionレコード入力の検証に失敗しました(オーバーサイズのレコード)。
AmplifyFirehoseLimitExceededExceptionローカルキャッシュが満杯です。flush() または clearCache() を呼び出してスペースを解放してください。
AmplifyFirehoseStorageExceptionローカルデータベースエラー。
AmplifyFirehoseUnknownException予期しないまたは分類されていないエラー。

操作は Result<T, AmplifyFirehoseException> を返します:

when (val result = firehose.record(...)) {
is Result.Success -> { /* 成功 */ }
is Result.Failure -> when (result.error) {
is AmplifyFirehoseValidationException -> { /* 無効な入力 */ }
is AmplifyFirehoseLimitExceededException -> { /* キャッシュがいっぱい */ }
is AmplifyFirehoseStorageException -> { /* データベースエラー */ }
is AmplifyFirehoseUnknownException -> { /* 予期しないエラー */ }
}
}

リトライ動作

  • すべての PutRecordBatch エラーコード(ServiceUnavailableExceptionInternalFailure)は再試行可能として扱われます。
  • 各失敗したレコードの再試行カウントは各試行後に増加します。
  • maxRetries(デフォルト: 5)を超えるレコードはキャッシュから永久に削除されます。
  • SDK レベルの Firehose エラーはログに記録され、ストリームごとにスキップされるため、他のストリームでもフラッシュできます。
  • SDK 以外のエラー(ネットワーク障害、ストレージエラー)はフラッシュ全体を中止します。

Firehose サービスリミット

クライアントはサービスに送信する前にこれらの制限を強制します:

リミット
PutRecordBatch リクエストごとの最大レコード数500
最大単一レコードサイズ1,000 KiB
PutRecordBatch リクエストごとの最大合計ペイロード4 MB