Name:
interface
Value:
Amplify has re-imagined the way frontend developers build fullstack applications. Develop and deploy without the hassle.

Amazon Data Firehose クライアント

AmplifyFirehoseClientAmazon Data Firehose デリバリーストリームへのデータストリーミング用スタンドアロンクライアントです。以下の機能を提供します:

  • オフラインサポート用のローカル永続化
  • 失敗したレコードの自動リトライ
  • 自動バッチ処理(リクエストごとに最大 500 レコードまたは 4 MB)
  • インターバルベースの自動フラッシュ(デフォルト: 30 秒ごと)
  • キャッシュ済みレコードを保持しながら新しいレコードを静かにドロップする有効/無効の切り替え

これはスタンドアロンクライアントであり、Amplify Analytics カテゴリプラグインとは別です。PutRecordBatch を使用して Firehose API と直接通信します。

このクライアントを使用する前に、バックエンドが必要な IAM 権限で設定されていることを確認してください。Amazon Data Firehose の設定 を参照してください。

はじめに

インストール

pubspec.yaml に依存関係を追加します:

dependencies:
amplify_firehose: ^2.11.0

クライアントの初期化

import 'package:amplify_firehose/amplify_firehose.dart';
final firehose = await AmplifyFirehoseClient.create(
region: 'us-east-1',
credentialsProvider: credentialsProvider,
);

Flutter クライアントは path_provider を使用してローカルストレージパスを自動的に解決します。ウェブ上では IndexedDB をインメモリフォールバック付きで使用します。

設定オプション

オプションオブジェクトを渡してクライアントの動作をカスタマイズできます:

オプションデフォルト説明
cacheMaxBytes5 MBローカルレコードキャッシュの最大サイズ(バイト単位)。
maxRetries5レコードを破棄する前の最大リトライ回数。
flushStrategyFlushInterval(interval: Duration(seconds: 30))自動フラッシュインターバル。手動のみのフラッシュには FlushNone() を使用します。
import 'package:amplify_firehose/amplify_firehose.dart';
final firehose = await AmplifyFirehoseClient.create(
region: 'us-east-1',
credentialsProvider: credentialsProvider,
options: const AmplifyFirehoseClientOptions(
cacheMaxBytes: 10 * 1024 * 1024, // 10 MB
maxRetries: 5,
flushStrategy: FlushInterval(
interval: Duration(seconds: 30),
),
),
);

自動フラッシュを無効にするには:

options: const AmplifyFirehoseClientOptions(
flushStrategy: FlushNone(),
)

使い方

レコードデータ

record() を使用してデータをローカルキャッシュに永続化します。レコードは次のフラッシュサイクル(自動または手動)中に Firehose に送信されます。

import 'dart:convert';
import 'dart:typed_data';
final result = await firehose.record(
data: Uint8List.fromList(utf8.encode('Hello Firehose')),
streamName: 'my-delivery-stream',
);
switch (result) {
case Ok():
// 正常に記録されました
case Error(:final error):
// エラーを処理します
}

クライアントが無効な状態で送信されたレコードは静かにドロップされます。

レコードをフラッシュ

クライアントは設定されたインターバル(デフォルト: 30 秒)でキャッシュされたレコードを自動的にフラッシュします。手動フラッシュをトリガーすることもできます:

switch (await firehose.flush()) {
case Ok(:final value):
print('フラッシュされたレコード: ${value.recordsFlushed}');
case Error(:final error):
print('フラッシュエラー: $error');
}

各フラッシュはストリームごとに最大 1 バッチを送信します(最大 500 レコードまたは 4 MB)。残りのレコードは次のフラッシュサイクルで処理されます。フラッシュが既に進行中の場合、呼び出しは flushInProgress: true で即座に戻ります。

手動フラッシュはクライアントが無効な場合でも機能するため、収集を再度有効にすることなくキャッシュされたレコードをドレインできます。

キャッシュをクリア

ローカルストレージからキャッシュされたすべてのレコードを削除します:

final result = await firehose.clearCache();

有効化と無効化

実行時にレコード収集と自動フラッシュを切り替えられます。無効にされた場合、新しいレコードは静かにドロップされますが、既存のキャッシュされたレコードはストレージに残ります。

firehose.disable();
// レコードはドロップされ、自動フラッシュは一時停止されます
firehose.enable();
// 収集と自動フラッシュが再開されます

クライアントをクローズ

クライアントでの操作が完了したら、リソースを解放するためにクローズします:

await firehose.close();

クローズ後、すべての操作はエラーを返します。必要に応じて新しいクライアントインスタンスを作成してください。

高度な使用方法

エスケープハッチ

このクライアントの API でカバーされていない操作について、基礎となる AWS SDK FirehoseClient にアクセスします:

final sdkClient = firehose.firehoseClient;
// sdkClient を使用して Firehose API を直接呼び出します

エラーハンドリング

すべての操作は、シール化された例外階層を通じてエラーを表示します:

エラータイプ説明
FirehoseValidationExceptionレコード入力の検証に失敗しました(オーバーサイズのレコード)。
FirehoseLimitExceededExceptionローカルキャッシュが満杯です。flush() または clearCache() を呼び出してスペースを解放してください。
FirehoseStorageExceptionローカルデータベースエラー。
FirehoseClientClosedExceptionクローズされたクライアントで操作が試行されました。
FirehoseUnknownException予期しないまたは分類されていないエラー。

操作は Result<T> を返し、パターンマッチングできます:

final result = await firehose.record(
data: payload,
streamName: 'stream',
);
switch (result) {
case Ok():
// 成功
case Error(:final error):
switch (error) {
case FirehoseValidationException():
print('検証エラー: ${error.message}');
case FirehoseLimitExceededException():
print('キャッシュがいっぱい');
case FirehoseStorageException():
print('ストレージエラー: ${error.message}');
case FirehoseClientClosedException():
print('クライアントがクローズされています');
case FirehoseUnknownException():
print('予期しないエラー: ${error.message}');
}
}

リトライ動作

  • すべての PutRecordBatch エラーコード(ServiceUnavailableExceptionInternalFailure)は再試行可能として扱われます。
  • 各失敗したレコードの再試行カウントは各試行後に増加します。
  • maxRetries(デフォルト: 5)を超えるレコードはキャッシュから永久に削除されます。
  • SDK レベルの Firehose エラーはログに記録され、ストリームごとにスキップされるため、他のストリームでもフラッシュできます。
  • SDK 以外のエラー(ネットワーク障害、ストレージエラー)はフラッシュ全体を中止します。

Firehose サービスリミット

クライアントはサービスに送信する前にこれらの制限を強制します:

リミット
PutRecordBatch リクエストごとの最大レコード数500
最大単一レコードサイズ1,000 KiB
PutRecordBatch リクエストごとの最大合計ペイロード4 MB