Kinesis Data Streamsクライアント
AmplifyKinesisClientは、Amazon Kinesis Data Streamsにデータをストリーミングするための独立型クライアントです。以下の機能を提供します:
- オフラインサポートのためのローカル永続化
- 失敗したレコードの自動再試行
- 自動バッチ処理(リクエストあたり最大500レコードまたは10MB)
- インターバルベースの自動フラッシング(デフォルト: 30秒ごと)
- キャッシュされたレコードを保持しながら新しいレコードを無視する有効/無効の切り替え
はじめに
インストール
pubspec.yamlに依存関係を追加します:
dependencies: amplify_kinesis: ^2.11.0クライアントの初期化
import 'package:amplify_kinesis/amplify_kinesis.dart';
final kinesis = await AmplifyKinesisClient.create( region: 'us-east-1', credentialsProvider: credentialsProvider,);構成オプション
オプションオブジェクトを渡すことでクライアントの動作をカスタマイズできます:
| オプション | デフォルト | 説明 |
|---|---|---|
cacheMaxBytes | 5 MB | ローカルレコードキャッシュの最大サイズ(バイト単位)。 |
maxRetries | 5 | レコードが破棄される前の最大再試行回数。 |
flushStrategy | FlushInterval(interval: Duration(seconds: 30)) | 自動フラッシュインターバル。手動のみのフラッシングの場合はFlushNone()を使用します。 |
final kinesis = await AmplifyKinesisClient.create( region: 'us-east-1', credentialsProvider: credentialsProvider, options: AmplifyKinesisClientOptions( cacheMaxBytes: 10 * 1024 * 1024, // 10 MB maxRetries: 3, flushStrategy: FlushInterval(interval: Duration(seconds: 60)), ),);自動フラッシングを無効にするには:
options: AmplifyKinesisClientOptions( flushStrategy: FlushNone(),),使用方法
データの記録
record()を使用してデータをローカルキャッシュに永続化します。レコードは次のフラッシュサイクル(自動または手動)中にKinesisに送信されます。
final result = await kinesis.record( data: Uint8List.fromList(utf8.encode('Hello Kinesis')), partitionKey: 'partition-1', streamName: 'my-stream',);switch (result) { case Ok(): print('記録されました'); case Error(:final error): print('エラー: $error');}クライアントが無効な状態で送信されたレコードは無視されます。
レコードのフラッシュ
クライアントは設定されたインターバル(デフォルト: 30秒)でキャッシュされたレコードを自動的にフラッシュします。手動でフラッシュをトリガーすることもできます:
switch (await kinesis.flush()) { case Ok(:final value): print('${value.recordsFlushed}件のレコードをフラッシュしました'); case Error(:final error): print('フラッシュに失敗しました: $error');}各フラッシュはストリームあたり最大1バッチを送信します(最大500レコードまたは10MB)。残りのレコードは後続のフラッシュサイクルで取得されます。フラッシュが既に進行中の場合、呼び出しはflushInProgress: trueで即座に戻ります。
手動フラッシュはクライアントが無効な場合でも機能し、コレクションを再度有効にすることなくキャッシュされたレコードをドレインできます。
キャッシュのクリア
ローカルストレージからキャッシュされたすべてのレコードを削除します:
await kinesis.clearCache();有効化と無効化
実行時にレコードコレクションと自動フラッシングを切り替えることができます。無効な場合、新しいレコードは無視されますが、既にキャッシュされたレコードはストレージに残ります。
kinesis.disable();// レコードは無視され、自動フラッシュは一時停止されます
kinesis.enable();// コレクションと自動フラッシュが再開されますクライアントのクローズ
クライアントの使用が終わったら、そのリソースを解放します。クローズ後、クライアントは再利用できません。
await kinesis.close();詳細
エスケープハッチ
このクライアントのAPIがカバーしていない操作のために、基盤となるAWS SDK KinesisClientにアクセスします:
final sdkClient = kinesis.kinesisClient;// sdkClientを直接Kinesis APIコールに使用しますエラーハンドリング
すべての操作はシールされた例外階層を通じてエラーをサーフェスします:
| エラータイプ | 説明 |
|---|---|
KinesisValidationException | レコード入力の検証に失敗しました(オーバーサイズレコード、無効なパーティションキー)。 |
KinesisLimitExceededException | ローカルキャッシュが満杯です。flush()またはclearCache()を呼び出してスペースを解放します。 |
KinesisStorageException | ローカルデータベースエラー。 |
KinesisUnknownException | 予期しない、または分類されていないエラー。 |
ClientClosedException | クライアントがクローズされており、使用できません。 |
操作はAmplifyKinesisExceptionサブタイプでResult<T>を返します:
switch (await kinesis.record(...)) { case Ok(): break; case Error(:final error): switch (error) { case KinesisValidationException(): // 無効な入力 case KinesisLimitExceededException(): // キャッシュ満杯 case KinesisStorageException(): // データベースエラー case KinesisUnknownException(): // 予期しないエラー case ClientClosedException(): // クライアントがクローズされました }}再試行動作
- すべての
PutRecordsエラーコード(ProvisionedThroughputExceededException、InternalFailure)は再試行可能として扱われます。 - 各失敗したレコードの再試行カウントは試行ごとにインクリメントされます。
maxRetries(デフォルト: 5)を超えたレコードはキャッシュから永久に削除されます。- SDKレベルのKinesisエラーはログに記録され、ストリームごとにスキップされるため、他のストリームはまだフラッシュできます。
- 非SDKエラー(ネットワーク障害、ストレージエラー)はフラッシュ全体を中止します。
Kinesisサービスの制限
クライアントはサービスに送信する前にこれらの制限を強制します:
| 制限 | 値 |
|---|---|
PutRecordsリクエストあたりの最大レコード数 | 500 |
| 最大単一レコードサイズ | 10 MB |
PutRecordsリクエストあたりの最大合計ペイロード | 10 MB |
| 最大パーティションキー長 | 256文字 |