Amazon Data Firehose クライアント
AmplifyFirehoseClient は Amazon Data Firehose デリバリーストリームへのデータストリーミング用スタンドアロンクライアントです。以下の機能を提供します:
- オフラインサポート用のローカル永続化
- 失敗したレコードの自動リトライ
- 自動バッチ処理(リクエストごとに最大 500 レコードまたは 4 MB)
- インターバルベースの自動フラッシュ(デフォルト: 30 秒ごと)
- キャッシュ済みレコードを保持しながら新しいレコードを静かにドロップする有効/無効の切り替え
はじめに
インストール
Swift Package Manager を使用してプロジェクトに AmplifyFirehoseClient を追加します。Xcode で File > Add Package Dependencies に移動し、Amplify Swift SDK のリポジトリ URL を入力します。
クライアントの初期化
import AmplifyFirehoseClient
let firehose = try AmplifyFirehoseClient( region: "us-east-1", credentialsProvider: credentialsProvider)設定オプション
オプションオブジェクトを渡してクライアントの動作をカスタマイズできます:
| オプション | デフォルト | 説明 |
|---|---|---|
cacheMaxBytes | 5 MB | ローカルレコードキャッシュの最大サイズ(バイト単位)。 |
maxRetries | 5 | レコードを破棄する前の最大リトライ回数。 |
flushStrategy | .interval(30) | 自動フラッシュインターバル(秒単位)。手動のみのフラッシュには .none を使用します。 |
configureClient | nil | 基礎となる FirehoseClientConfiguration をカスタマイズするためのクロージャ。 |
let firehose = try AmplifyFirehoseClient( region: "us-east-1", credentialsProvider: credentialsProvider, options: .init( cacheMaxBytes: 10 * 1_024 * 1_024, // 10 MB maxRetries: 5, flushStrategy: .interval(30), configureClient: { config in // 基礎となる FirehoseClientConfiguration をカスタマイズ } ))自動フラッシュを無効にするには:
options: .init(flushStrategy: .none)使い方
レコードデータ
record() を使用してデータをローカルキャッシュに永続化します。レコードは次のフラッシュサイクル(自動または手動)中に Firehose に送信されます。
let result = try await firehose.record( data: "Hello Firehose".data(using: .utf8)!, streamName: "my-delivery-stream")クライアントが無効な状態で送信されたレコードは静かにドロップされます。
レコードをフラッシュ
クライアントは設定されたインターバル(デフォルト: 30 秒)でキャッシュされたレコードを自動的にフラッシュします。手動フラッシュをトリガーすることもできます:
let flushResult = try await firehose.flush()print("フラッシュされたレコード: \(flushResult.recordsFlushed)")各フラッシュはストリームごとに最大 1 バッチを送信します(最大 500 レコードまたは 4 MB)。残りのレコードは次のフラッシュサイクルで処理されます。フラッシュが既に進行中の場合、呼び出しは flushInProgress: true で即座に戻ります。
手動フラッシュはクライアントが無効な場合でも機能するため、収集を再度有効にすることなくキャッシュされたレコードをドレインできます。
キャッシュをクリア
ローカルストレージからキャッシュされたすべてのレコードを削除します:
let cleared = try await firehose.clearCache()有効化と無効化
実行時にレコード収集と自動フラッシュを切り替えられます。無効にされた場合、新しいレコードは静かにドロップされますが、既存のキャッシュされたレコードはストレージに残ります。
await firehose.disable()// レコードはドロップされ、自動フラッシュは一時停止されます
await firehose.enable()// 収集と自動フラッシュが再開されますクライアントをクローズ
クライアントでの操作が完了したら、リソースを解放するためにクローズします:
クローズ後、すべての操作はエラーを返します。必要に応じて新しいクライアントインスタンスを作成してください。
高度な使用方法
エスケープハッチ
このクライアントの API でカバーされていない操作について、基礎となる AWS SDK FirehoseClient にアクセスします:
let sdkClient = firehose.getFirehoseClient()// sdkClient を使用して Firehose API を直接呼び出しますエラーハンドリング
すべての操作は、シール化された例外階層を通じてエラーを表示します:
| エラータイプ | 説明 |
|---|---|
FirehoseError.validation | レコード入力の検証に失敗しました(オーバーサイズのレコード)。 |
FirehoseError.cacheLimitExceeded | ローカルキャッシュが満杯です。flush() または clearCache() を呼び出してスペースを解放してください。 |
FirehoseError.cache | ローカルデータベースエラー。 |
FirehoseError.unknown | 予期しないまたは分類されていないエラー。 |
操作は FirehoseError をスロー します:
do { try await firehose.record( data: payload, streamName: "stream" )} catch let error as FirehoseError { switch error { case .validation(let desc, _, _): print("検証エラー: \(desc)") case .cacheLimitExceeded: print("キャッシュがいっぱい") case .cache(let desc, _, _): print("ストレージエラー: \(desc)") case .unknown(let desc, _, _): print("予期しないエラー: \(desc)") }}リトライ動作
- すべての
PutRecordBatchエラーコード(ServiceUnavailableException、InternalFailure)は再試行可能として扱われます。 - 各失敗したレコードの再試行カウントは各試行後に増加します。
maxRetries(デフォルト: 5)を超えるレコードはキャッシュから永久に削除されます。- SDK レベルの Firehose エラーはログに記録され、ストリームごとにスキップされるため、他のストリームでもフラッシュできます。
- SDK 以外のエラー(ネットワーク障害、ストレージエラー)はフラッシュ全体を中止します。
Firehose サービスリミット
クライアントはサービスに送信する前にこれらの制限を強制します:
| リミット | 値 |
|---|---|
PutRecordBatch リクエストごとの最大レコード数 | 500 |
| 最大単一レコードサイズ | 1,000 KiB |
PutRecordBatch リクエストごとの最大合計ペイロード | 4 MB |