Kinesis Data Streamsクライアント
AmplifyKinesisClientは、Amazon Kinesis Data Streamsにデータをストリーミングするための独立型クライアントです。以下の機能を提供します:
- オフラインサポートのためのローカル永続化
- 失敗したレコードの自動再試行
- 自動バッチ処理(リクエストあたり最大500レコードまたは10MB)
- インターバルベースの自動フラッシング(デフォルト: 30秒ごと)
- キャッシュされたレコードを保持しながら新しいレコードを無視する有効/無効の切り替え
はじめに
インストール
Swift Package Managerを使用してAmplifyKinesisClientをプロジェクトに追加します。XcodeでFile > Add Package Dependenciesに移動し、Amplify Swift SDKのリポジトリURLを入力します。
クライアントの初期化
import AmplifyKinesisClient
let kinesis = try AmplifyKinesisClient( region: "us-east-1", credentialsProvider: credentialsProvider)構成オプション
オプションオブジェクトを渡すことでクライアントの動作をカスタマイズできます:
| オプション | デフォルト | 説明 |
|---|---|---|
cacheMaxBytes | 5 MB | ローカルレコードキャッシュの最大サイズ(バイト単位)。 |
maxRetries | 5 | レコードが破棄される前の最大再試行回数。 |
flushStrategy | .interval(30) | 自動フラッシュインターバル(秒単位)。手動のみのフラッシングの場合は.noneを使用します。 |
configureClient | nil | 基盤となるKinesisClientConfigurationをカスタマイズするためのクロージャ。 |
let kinesis = try AmplifyKinesisClient( region: "us-east-1", credentialsProvider: credentialsProvider, options: .init( cacheMaxBytes: 10 * 1_024 * 1_024, // 10 MB maxRetries: 3, flushStrategy: .interval(60), configureClient: { config in // 基盤となるKinesisClientConfigurationをカスタマイズする } ))自動フラッシングを無効にするには:
options: .init(flushStrategy: .none)使用方法
データの記録
record()を使用してデータをローカルキャッシュに永続化します。レコードは次のフラッシュサイクル(自動または手動)中にKinesisに送信されます。
let result = try await kinesis.record( data: "Hello Kinesis".data(using: .utf8)!, partitionKey: "partition-1", streamName: "my-stream")クライアントが無効な状態で送信されたレコードは無視されます。
レコードのフラッシュ
クライアントは設定されたインターバル(デフォルト: 30秒)でキャッシュされたレコードを自動的にフラッシュします。手動でフラッシュをトリガーすることもできます:
let flushResult = try await kinesis.flush()print("\(flushResult.recordsFlushed)件のレコードをフラッシュしました")各フラッシュはストリームあたり最大1バッチを送信します(最大500レコードまたは10MB)。残りのレコードは後続のフラッシュサイクルで取得されます。フラッシュが既に進行中の場合、呼び出しはflushInProgress: trueで即座に戻ります。
手動フラッシュはクライアントが無効な場合でも機能し、コレクションを再度有効にすることなくキャッシュされたレコードをドレインできます。
キャッシュのクリア
ローカルストレージからキャッシュされたすべてのレコードを削除します:
let cleared = try await kinesis.clearCache()有効化と無効化
実行時にレコードコレクションと自動フラッシングを切り替えることができます。無効な場合、新しいレコードは無視されますが、既にキャッシュされたレコードはストレージに残ります。
await kinesis.disable()// レコードは無視され、自動フラッシュは一時停止されます
await kinesis.enable()// コレクションと自動フラッシュが再開されます詳細
エスケープハッチ
このクライアントのAPIがカバーしていない操作のために、基盤となるAWS SDK KinesisClientにアクセスします:
let sdkClient = kinesis.getKinesisClient()// sdkClientを直接Kinesis APIコールに使用しますエラーハンドリング
すべての操作はシールされた例外階層を通じてエラーをサーフェスします:
| エラータイプ | 説明 |
|---|---|
KinesisError.validation | レコード入力の検証に失敗しました(オーバーサイズレコード、無効なパーティションキー)。 |
KinesisError.cacheLimitExceeded | ローカルキャッシュが満杯です。flush()またはclearCache()を呼び出してスペースを解放します。 |
KinesisError.cache | ローカルデータベースエラー。 |
KinesisError.unknown | 予期しない、または分類されていないエラー。 |
操作はKinesisErrorをスローします:
do { try await kinesis.record( data: payload, partitionKey: "key", streamName: "stream" )} catch let error as KinesisError { switch error { case .validation(let desc, _, _): print("検証エラー: \(desc)") case .cacheLimitExceeded: print("キャッシュ満杯") case .cache(let desc, _, _): print("ストレージエラー: \(desc)") case .unknown(let desc, _, _): print("不明なエラー: \(desc)") }}再試行動作
- すべての
PutRecordsエラーコード(ProvisionedThroughputExceededException、InternalFailure)は再試行可能として扱われます。 - 各失敗したレコードの再試行カウントは試行ごとにインクリメントされます。
maxRetries(デフォルト: 5)を超えたレコードはキャッシュから永久に削除されます。- SDKレベルのKinesisエラーはログに記録され、ストリームごとにスキップされるため、他のストリームはまだフラッシュできます。
- 非SDKエラー(ネットワーク障害、ストレージエラー)はフラッシュ全体を中止します。
Kinesisサービスの制限
クライアントはサービスに送信する前にこれらの制限を強制します:
| 制限 | 値 |
|---|---|
PutRecordsリクエストあたりの最大レコード数 | 500 |
| 最大単一レコードサイズ | 10 MB |
PutRecordsリクエストあたりの最大合計ペイロード | 10 MB |
| 最大パーティションキー長 | 256文字 |