AWS AppSync Eventsに接続する
このガイドでは、Amplifyの有無を問わずAWS AppSync Eventsに接続する方法について説明します。
AWS AppSync Eventsを使用すると、接続やリソーススケーリングを管理することなく、数百万のサブスクライバーにリアルタイムイベントデータをブロードキャストできるセキュアで高性能なサーバーレスWebSocket APIを作成できます。この機能を使用すると、共同ドキュメントエディタ、チャットアプリ、ライブポーリングシステムなどの複数ユーザー機能を構築できます。
AWS AppSync Eventsの詳細については、デベロッパーガイドをご覧ください。
AWS AppSync Eventsライブラリのインストール
aws-sdk-appsync-events依存関係をapp/build.gradle.ktsファイルに追加します。
dependencies { // Amplify依存関係を追加せずにAWS AppSync Eventsライブラリを追加します implementation("com.amazonaws:aws-sdk-appsync-events:ANDROID_APPSYNC_SDK_VERSION") }aws-sdk-appsync-eventsとaws-sdk-appsync-amplify依存関係をapp/build.gradle.ktsファイルに追加します。
dependencies { // AWS AppSync Eventsライブラリを追加します implementation("com.amazonaws:aws-sdk-appsync-events:ANDROID_APPSYNC_SDK_VERSION") // Amplify Authに接続するAppSync認可者を含みます implementation("com.amazonaws:aws-sdk-appsync-amplify:ANDROID_APPSYNC_SDK_VERSION")}AppSync認可者の提供
AWS AppSync Eventsライブラリは、Events APIで使用される可能性のあるさまざまな認可戦略に対応する複数の認可者クラスをインポートします。使用する認可戦略に適切な認可者タイプを選択する必要があります。
apiKey認可の場合、APIKeyAuthorizeridentityPool認可の場合、IAMAuthorizeruserPool認可の場合、AuthTokenAuthorizer
複数の認可タイプが必要な場合は、必要に応じて複数のEventsクライアントを作成できます。
API KEY
ApiKeyAuthorizerはハードコードされたAPIキーか、いくつかのソースからキーを取得することで使用できます。:
// ハードコードされたAPIキーを使用しますval authorizer = ApiKeyAuthorizer("[API_KEY]")// または// 何らかのソースからAPIキーを取得します。この関数は何度も呼び出される可能性があるため、// 適切なキャッシュを内部的に実装する必要があります。val authorizer = ApiKeyAuthorizer { fetchApiKey() }AMAZON COGNITO USER POOLS
AppSyncで直接作業する場合は、トークンフェッチを自分で実装する必要があります。
// 独自のトークンフェッチを使用します。この関数は何度も呼び出される可能性があるため、// 適切なキャッシュを内部的に実装する必要があります。val authorizer = AuthTokenAuthorizer { fetchLatestAuthToken()}AWS IAM
AppSyncで直接作業する場合は、リクエスト署名を自分で実装する必要があります。
// 署名関数の実装を提供します。この関数はAWS Sig-v4署名ロジックを実装し、// トークンと署名を含む認可ヘッダーを返す必要があります。val authorizer = IamAuthorizer { appSyncRequest -> signRequestAndReturnHeaders(appSyncRequest) }AWS AppSync Eventsライブラリは、Events APIで使用される可能性のあるさまざまな認可戦略に対応する複数の認可者クラスをインポートします。使用する認可戦略に適切な認可者タイプを選択する必要があります。
apiKey認可の場合、APIKeyAuthorizeridentityPool認可の場合、AmplifyIAMAuthorizeruserPool認可の場合、AmplifyUserPoolAuthorizer
複数の認可タイプが必要な場合は、必要に応じて複数のEventsクライアントを作成できます。
API KEY
ApiKeyAuthorizerはハードコードされたAPIキーを提供するか、何らかのソースからAPIキーを取得できます。:
// ハードコードされたAPIキーを使用しますval authorizer = ApiKeyAuthorizer("[API_KEY]")// または// 何らかのソースからAPIキーを取得します。この関数は何度も呼び出される可能性があるため、// 適切なキャッシュを内部的に実装する必要があります。val authorizer = ApiKeyAuthorizer { fetchApiKey() }AMAZON COGNITO USER POOLS
AmplifyUserPoolAuthorizerは設定されたAmplifyインスタンスを使用してAWS Cognito UserPoolトークンをフェッチしてリクエストにアタッチします。
// 提供されたAmplify UserPool認可者を使用しますval authorizer = AmplifyUserPoolAuthorizer()AWS IAM
AmplifyIAMAuthorizerは設定されたAmplifyインスタンスを使用してIAM SigV4プロトコルでリクエストに署名します。
// 提供されたAmplify IAM認可者を使用しますval authorizer = AmplifyIamAuthorizer("{REGION}")既存のAmplifyバックエンドなしでEvent APIに接続する
開始する前に、以下が必要です:
- AWSコンソールで作成されたEvent API
- メモを取ってください: HTTPエンドポイント、リージョン、APIキー
完了です! クライアントライブラリ使用ガイドに進んでください。
既存のAmplifyバックエンドにEvent APIを追加する
このガイドでは、既存のAmplifyバックエンドにEvent APIを追加する方法について説明します。フロントエンドアプリケーションからEvent APIで認証するためにCognito User Poolsを使用します。サインインしているユーザーはEvent APIをサブスクライブしてイベントをパブリッシュできます。
開始する前に、以下が必要です:
- 既存のAmplifyバックエンド(クイックスタートを参照)
@aws-amplify/backendと@aws-amplify/backend-cliの最新バージョン(npm add @aws-amplify/backend@latest @aws-amplify/backend-cli@latest)
バックエンド定義の更新
まず、バックエンド定義に新しいEvent APIを追加します。
import { defineBackend } from '@aws-amplify/backend';import { auth } from './auth/resource';// CDKリソースをインポートします:import { CfnApi, CfnChannelNamespace, AuthorizationType,} from 'aws-cdk-lib/aws-appsync';import { Policy, PolicyStatement } from 'aws-cdk-lib/aws-iam';
const backend = defineBackend({ auth,});
// Event APIリソースの新しいスタックを作成します:const customResources = backend.createStack('custom-resources');
// スタックに新しいEvent APIを追加します:const cfnEventAPI = new CfnApi(customResources, 'CfnEventAPI', { name: 'my-event-api', eventConfig: { authProviders: [ { authType: AuthorizationType.USER_POOL, cognitoConfig: { awsRegion: customResources.region, // Event APIをAmplifyによってプロビジョンされたCognito User Poolを使用するように設定します: userPoolId: backend.auth.resources.userPool.userPoolId, }, }, ], // Connect、Publish、および Subscribe操作の認可プロバイダーとしてUser Poolを設定します: connectionAuthModes: [{ authType: AuthorizationType.USER_POOL }], defaultPublishAuthModes: [{ authType: AuthorizationType.USER_POOL }], defaultSubscribeAuthModes: [{ authType: AuthorizationType.USER_POOL }], },});
// Event APIのデフォルトネームスペースを作成します:const namespace = new CfnChannelNamespace( customResources, 'CfnEventAPINamespace', { apiId: cfnEventAPI.attrApiId, name: 'default', });
// User Pool内の認証されたユーザーロールにポリシーをアタッチしてEvent APIへのアクセスを許可します:backend.auth.resources.authenticatedUserIamRole.attachInlinePolicy( new Policy(customResources, 'AppSyncEventPolicy', { statements: [ new PolicyStatement({ actions: [ 'appsync:EventConnect', 'appsync:EventSubscribe', 'appsync:EventPublish', ], resources: [`${cfnEventAPI.attrApiArn}/*`, `${cfnEventAPI.attrApiArn}`], }), ], }));バックエンドのデプロイ
変更をテストするには、Amplify Sandboxをデプロイします。
npx ampx sandboxクライアントライブラリ使用ガイド
Eventsクラスの作成
エンドポイントはAWS AppSync Eventsコンソールで確認できます。httpsで始まり、/eventで終わる必要があります。
val endpoint = "https://abcdefghijklmnopqrstuvwxyz.appsync-api.us-east-1.amazonaws.com/event"val events = Events(endpoint)RESTクライアントの使用
EventsRestClientを作成してRESTでイベントをパブリッシュできます。クライアント内での任意のパブリッシュコールに対して使用される、パブリッシュ認可者を受け入れます。
RESTクライアントの作成
val events: Events // 設定されたEventsval restClient = events.createRestClient( publishAuthorizer = ApiKeyAuthorizer("da2-abcdefghijklmnopqrstuvwxyz"))さらに、カスタムオプションをRest Clientに渡すことができます。現在の機能には、Rest Clientが使用するOkHttp.Builderの変更とクライアントライブラリログの有効化が含まれます。
クライアントライブラリログの収集のAndroidLoggerクラスを参照してください。
val restClient = events.createRestClient( publishAuthorizer = ApiKeyAuthorizer("da2-abcdefghijklmnopqrstuvwxyz"), options = Events.Options.Rest( loggerProvider = { namespace -> AndroidLogger(namespace, logLevel) }, okHttpConfigurationProvider = { okHttpBuilder -> // EventsRestClientが使用するOkHttp.Builderを更新します } ))単一のイベントをパブリッシュする
val restClient: EventsRestClient // 設定されたEventRestClient// kotlinx.serialization.json.[JsonElement, JsonPrimitive, JsonArray, JsonObject]val jsonEvent = JsonObject(mapOf("some" to JsonPrimitive("data")))val publishResult = restClient.publish( channelName = "default/channel", event = jsonEvent)when(publishResult) { is PublishResult.Response -> { val successfulEvents = publishResult.successfulEvents // 成功したイベントをインスペクトします val failedEvents = publishResult.failedEvents // 失敗したイベントをインスペクトします } is PublishResult.Failure -> { val error = result.error // パブリッシュ失敗、エラーをインスペクトします }}複数のイベントをパブリッシュする
一度に最大5つのイベントをパブリッシュできます。
val restClient: EventsRestClient // 設定されたEventRestClient// List of kotlinx.serialization.json.[JsonElement, JsonPrimitive, JsonArray, JsonObject]val jsonEvents = listOf( JsonObject(mapOf("some" to JsonPrimitive("data1"))), JsonObject(mapOf("some" to JsonPrimitive("data2"))))val publishResult = restClient.publish( channelName = "default/channel", events = jsonEvents)when(publishResult) { is PublishResult.Response -> { val successfulEvents = publishResult.successfulEvents // 成功したイベントをインスペクトします val failedEvents = publishResult.failedEvents // 失敗したイベントをインスペクトします } is PublishResult.Failure -> { val error = result.error // パブリッシュ失敗、エラーをインスペクトします }}異なる認可者でパブリッシュする
restClient.publish( channelName = "default/channel", event = JsonObject(mapOf("some" to JsonPrimitive("data"))), authorizer = ApiKeyAuthorizer("da2-abcdefghijklmnopqrstuvwxyz"))WebSocketクライアントの使用
EventsWebSocketClientを作成してチャネルにパブリッシュしたりサブスクライブしたりできます。WebSocket接続はライブラリによって管理され、最初のサブスクライブまたはパブリッシュ操作時に接続します。接続すると、WebSocketは開いたままになります。クライアントをサブスクライブまたはパブリッシュする必要がなくなった場合は、明示的に切断する必要があります。
WebSocketクライアントの作成
val events: Events // 設定されたEventsval apiKeyAuthorizer = ApiKeyAuthorizer("da2-abcdefghijklmnopqrstuvwxyz")val webSocketClient = events.createWebSocketClient( connectAuthorizer = apiKeyAuthorizer, // websocketの接続に使用されます subscribeAuthorizer = apiKeyAuthorizer, // サブスクライブコールに使用されます publishAuthorizer = apiKeyAuthorizer // パブリッシュコールに使用されます)さらに、カスタムオプションをWebSocket Clientに渡すことができます。現在の機能には、WebSocket Clientが使用するOkHttp.Builderの変更とクライアントライブラリログの有効化が含まれます。
クライアントライブラリログの収集のAndroidLoggerクラスを参照してください。
val events: Events // 設定されたEventsval apiKeyAuthorizer = ApiKeyAuthorizer("da2-abcdefghijklmnopqrstuvwxyz")val webSocketClient = events.createWebSocketClient( connectAuthorizer = apiKeyAuthorizer, // websocketの接続に使用されます subscribeAuthorizer = apiKeyAuthorizer, // サブスクライブコールに使用されます publishAuthorizer = apiKeyAuthorizer // パブリッシュコールに使用されます, options = Events.Options.WebSocket( loggerProvider = { namespace -> AndroidLogger(namespace, logLevel) }, okHttpConfigurationProvider = { okHttpBuilder -> // EventsWebSocketClientが使用するOkHttpBuilderを更新します } ))単一のイベントをパブリッシュする
val webSocketClient: EventsWebSocketClient // 設定されたEventsWebSocketClient// kotlinx.serialization.json.[JsonElement, JsonPrimitive, JsonArray, JsonObject]val jsonEvent = JsonObject(mapOf("some" to JsonPrimitive("data")))val publishResult = webSocketClient.publish( channelName = "default/channel", event = jsonEvent)when(publishResult) { is PublishResult.Response -> { val successfulEvents = publishResult.successfulEvents // 成功したイベントをインスペクトします val failedEvents = publishResult.failedEvents // 失敗したイベントをインスペクトします } is PublishResult.Failure -> { val error = result.error // パブリッシュ失敗、エラーをインスペクトします }}複数のイベントをパブリッシュする
一度に最大5つのイベントをパブリッシュできます。
val webSocketClient: EventsWebSocketClient // 設定されたEventsWebSocketClient// List of kotlinx.serialization.json.[JsonElement, JsonPrimitive, JsonArray, JsonObject]val jsonEvents = listOf( JsonObject(mapOf("some" to JsonPrimitive("data1"))), JsonObject(mapOf("some" to JsonPrimitive("data2"))))val publishResult = webSocketClient.publish( channelName = "default/channel", events = jsonEvents)when(publishResult) { is PublishResult.Response -> { val successfulEvents = publishResult.successfulEvents // 成功したイベントをインスペクトします val failedEvents = publishResult.failedEvents // 失敗したイベントをインスペクトします } is PublishResult.Failure -> { val error = result.error // パブリッシュ失敗、エラーをインスペクトします }}異なる認可者でパブリッシュする
val webSocketClient: EventsWebSocketClient // 設定されたEventsWebSocketClientval jsonEvent = JsonObject(mapOf("some" to JsonPrimitive("data")))val publishResult = webSocketClient.publish( channelName = "default/channel", event = jsonEvent, authorizer = ApiKeyAuthorizer("da2-abcdefghijklmnopqrstuvwxyz"))when(publishResult) { is PublishResult.Response -> { val successfulEvents = publishResult.successfulEvents // 成功したイベントをインスペクトします val failedEvents = publishResult.failedEvents // 失敗したイベントをインスペクトします } is PublishResult.Failure -> { val error = result.error // パブリッシュ失敗、エラーをインスペクトします }}チャネルをサブスクライブする
チャネルをサブスクライブするときは、特定のネームスペース/チャネル(例: "default/channel")をサブスクライブするか、チャネルパスの末尾にワイルドカード(*)を指定して、マッチするすべてのチャネルにパブリッシュされたイベントを受け取ります(例: "default/*")。
サブスクリプションFlowに適切なコルーチンスコープを選択することが重要です。サブスクリプションが必要な時間に対応するスコープを選択する必要があります。たとえば、サブスクリプションを画面に関連付ける場合は、AndroidXのViewModelからviewModelScopeを使用することを検討する必要があります。サブスクリプションFlowは設定の変更を保持し、ViewModelでonCleared()がトリガーされるときにキャンセルされます。
Flowのコルーチンスコープがキャンセルされると、ライブラリはチャネルからサブスクライブ解除します。
coroutineScope.launch { // subscribeは冷たいFlowを返します。ターミナル演算子が呼び出されると、サブスクリプションが確立されます。 val subscription: Flow<EventsMessage> = webSocketClient.subscribe("default/channel").onCompletion { // サブスクリプションがアンサブスクライブされました }.catch { throwable -> // サブスクリプションでエラーが発生し、アンサブスクライブされました // 原因についてはthrowableを参照してください }
// collectはサブスクリプションを開始します subscription.collect { eventsMessage -> // JsonElementタイプを返します。 // Kotlin Serializationを使用して、好みのデータ構造に変換します。 val jsonData = eventsMessage.data }}異なる認可者でチャネルをサブスクライブする
coroutineScope.launch { // subscribeは冷たいFlowを返します。ターミナル演算子が呼び出されると、サブスクリプションが確立されます。 val subscription: Flow<EventsMessage> = webSocketClient.subscribe( channelName = "default/channel". authorizer = ApiKeyAuthorizer("da2-abcdefghijklmnopqrstuvwxyz") ).onCompletion { // サブスクリプションがアンサブスクライブされました }.catch { throwable -> // サブスクリプションでエラーが発生し、アンサブスクライブされました // 原因についてはthrowableを参照してください }
// collectはサブスクリプションを開始します subscription.collect { eventsMessage -> // JsonElementタイプを返します。 val jsonData = eventsMessage.data // Kotlin Serializationを使用して、好みのデータ構造に変換します。 }}WebSocketを切断する
WebSocketの使用を終了し、クライアントでサブスクライブまたはパブリッシュを呼び出すつもりがない場合は、WebSocketを切断する必要があります。これにより、すべてのチャネルがアンサブスクライブされます。
val webSocketClient: EventsWebSocketClient // 設定されたEventsWebSocketClient// 保留中のパブリッシュ操作をWebSocketにポストするのを待つ場合はflushEventsをtrueに設定します// 保留中のポストを破棄してすぐに切断する場合はflushEventsをfalseに設定しますwebSocketClient.disconnect(flushEvents = true) // またはfalseを指定してすぐに切断しますクライアントライブラリログの収集
RESTクライアントおよびWebSocketクライアントの例では、カスタムロガーへのログを示しました。以下は、ログをAndroidのLogcatに書き込むカスタムロガーの例です。自由に独自のLoggerタイプを実装できます。
class AndroidLogger( private val namespace: String, override val thresholdLevel: LogLevel) : Logger {
override fun error(message: String) { if (!thresholdLevel.above(LogLevel.ERROR)) { Log.e(namespace, message) } }
override fun error(message: String, error: Throwable?) { if (!thresholdLevel.above(LogLevel.ERROR)) { Log.e(namespace, message, error) } }
override fun warn(message: String) { if (!thresholdLevel.above(LogLevel.WARN)) { Log.w(namespace, message) } }
override fun warn(message: String, issue: Throwable?) { if (!thresholdLevel.above(LogLevel.WARN)) { Log.w(namespace, message, issue) } }
override fun info(message: String) { if (!thresholdLevel.above(LogLevel.INFO)) { Log.i(namespace, message) } }
override fun debug(message: String) { if (!thresholdLevel.above(LogLevel.DEBUG)) { Log.d(namespace, message) } }
override fun verbose(message: String) { if (!thresholdLevel.above(LogLevel.VERBOSE)) { Log.v(namespace, message) } }}