リアルタイムイベントの購読
リアルタイムクライアント作成のためのミューテーションを購読します。
コールバック付きサブスクリプションの設定
サブスクリプションを作成するとき、Streamオブジェクトが返されます。このStreamはサブスクリプションがエラーに遭遇するか、サブスクリプションをキャンセルするまで、イベントを生成し続けます。出力されるデータの量を制限する必要がある場合は、takeなどのStreamのヘルパー関数を利用できます。キャンセルは定義されたイベント量が発生したときに発生します:
Stream<GraphQLResponse<Todo>> subscribe() { final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType); final Stream<GraphQLResponse<Todo>> operation = Amplify.API .subscribe( subscriptionRequest, onEstablished: () => safePrint('Subscription established'), ) // 5つの要素のみをリッスン .take(5) .handleError( (Object error) { safePrint('Error in subscription stream: $error'); }, ); return operation;}別の方法として、Stream.listenを呼び出して、プログラムでキャンセルできるStreamSubscriptionオブジェクトを作成できます。
// これをインポートすることを忘れないでくださいimport 'dart:async';
...
StreamSubscription<GraphQLResponse<Todo>>? subscription;
void subscribe() { final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType); final Stream<GraphQLResponse<Todo>> operation = Amplify.API.subscribe( subscriptionRequest, onEstablished: () => safePrint('Subscription established'), ); subscription = operation.listen( (event) { safePrint('Subscription event data received: ${event.data}'); }, onError: (Object e) => safePrint('Error in subscription stream: $e'), );}
void unsubscribe() { subscription?.cancel(); subscription = null;}onCreateサブスクリプションに加えて、.onUpdate()または.onDelete()も呼び出すことができます。
final onUpdateSubscriptionRequest = ModelSubscriptions.onUpdate(Todo.classType);// またはfinal onDeleteSubscriptionRequest = ModelSubscriptions.onDelete(Todo.classType);サブスクリプション接続ステータス
アプリケーションを設定してサブスクリプションを使用している状態で、サブスクリプションが閉じたときを知りたい場合があります。また、サブスクリプションが正常でない場合はユーザーに反映することができます。Amplify.Hubを介してサブスクリプションステータスの変化を監視できます。
Amplify.Hub.listen( HubChannel.Api, (ApiHubEvent event) { if (event is SubscriptionHubEvent) { safePrint(event.status); } },);SubscriptionStatus
connected- 接続され、問題なく動作していますconnecting- 接続を試みています(初期接続と再接続の両方)pendingDisconnect- 接続にアクティブなサブスクリプションがなく、シャットダウンしていますdisconnected- 接続にアクティブなサブスクリプションがなく、切断されていますfailed- 接続に障害が発生し、切断されています
自動再接続
内部的には、ネットワーク変更を通じて健全なウェブソケット接続を維持しようとします。例えば、デバイスの接続がWi-Fiから5gネットワークに変わる場合、プラグインは新しいネットワークを使用して再接続しようとします。
同様に、インターネット接続が予期せず切断された場合、サブスクリプションは指数関数的な再試行/バックオフ戦略を使用して再接続を試みます。デフォルトでは、約50秒で8回の回復試行を行います。接続を確立できない場合、ウェブソケットは閉じられます。RetryOptionsを通じてAPI プラグインを構成するときに、この戦略をカスタマイズできます。
Future<void> _configureAmplify() async { final apiPlugin = AmplifyAPI( options: APIPluginOptions( modelProvider: ModelProvider.instance, // オプション設定 subscriptionOptions: const GraphQLSubscriptionOptions( retryOptions: RetryOptions(maxAttempts: 10), ), ) ); await Amplify.addPlugin(apiPlugin);
try { await Amplify.configure(outputs); } on AmplifyAlreadyConfiguredException { safePrint( "Tried to reconfigure Amplify; this can occur when your app restarts on Android."); }}import 'package:amplify_flutter/amplify_flutter.dart';import 'package:amplify_api/amplify_api.dart';import './models/ModelProvider.dart'; // <--- プロジェクトを反映するようにインポートを更新import 'dart:async';
// ...
List<Todo?> allTodos = [];SubscriptionStatus prevSubscriptionStatus = SubscriptionStatus.disconnected;StreamSubscription<GraphQLResponse<Todo>>? subscription;
/// ...
// リスナーを初期化Amplify.Hub.listen( HubChannel.Api, (ApiHubEvent event) { if (event is SubscriptionHubEvent) { if (prevSubscriptionStatus == SubscriptionStatus.connecting && event.status == SubscriptionStatus.connected) { getTodos(); // todoを再フェッチ } prevSubscriptionStatus = event.status; } },);
subscribe();
/// ...
Future<void> getTodos() async { try { final request = ModelQueries.list(Todo.classType); final response = await Amplify.API.query(request: request).response;
final todos = response.data?.items ?? []; if (response.errors.isNotEmpty) { safePrint('errors: ${response.errors}'); }
setState(() { allTodos = todos; }); } on ApiException catch (e) { safePrint('Query failed: $e'); return; }}
void subscribe() { final subscriptionRequest = ModelSubscriptions.onCreate(Todo.classType); final Stream<GraphQLResponse<Todo>> operation = Amplify.API.subscribe( subscriptionRequest, onEstablished: () => safePrint('Subscription established'), ); subscription = operation.listen( (event) { setState(() { allTodos.add(event.data); }); }, onError: (Object e) => safePrint('Error in subscription stream: $e'), );}