Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for realtime calling subscription multiple time - Appwrite 1.4.x #713

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 52 additions & 36 deletions templates/flutter/lib/src/realtime_mixin.dart.twig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'exception.dart';
import 'realtime_subscription.dart';
import 'client.dart';
Expand All @@ -15,35 +15,46 @@ typedef GetFallbackCookie = String? Function();

mixin RealtimeMixin {
late Client client;
final Map<String, List<StreamController<RealtimeMessage>>> _channels = {};
final Set<String> _channels = {};
WebSocketChannel? _websok;
String? _lastUrl;
late WebSocketFactory getWebSocket;
GetFallbackCookie? getFallbackCookie;
int? get closeCode => _websok?.closeCode;
int _subscriptionsCounter = 0;
Map<int, RealtimeSubscription> _subscriptions = {};
bool _notifyDone = true;
StreamSubscription? _websocketSubscription;
bool _creatingSocket = false;

Future<dynamic> _closeConnection() async {
await _websok?.sink.close(normalClosure);
await _websocketSubscription?.cancel();
await _websok?.sink.close(status.normalClosure, 'Ending session');
_lastUrl = null;
}

_createSocket() async {
if(_creatingSocket) return;
_creatingSocket = true;
final uri = _prepareUri();
if (_websok == null) {
_websok = await getWebSocket(uri);
_lastUrl = uri.toString();
} else {
if (_lastUrl == uri.toString() && _websok?.closeCode == null) {
_creatingSocket = false;
return;
}
_notifyDone = false;
await _closeConnection();
_lastUrl = uri.toString();
_websok = await getWebSocket(uri);
_notifyDone = true;
}
debugPrint('subscription: $_lastUrl');

try {
_websok?.stream.listen((response) {
_websocketSubscription = _websok?.stream.listen((response) {
final data = RealtimeResponse.fromJson(response);
switch (data.type) {
case 'error':
Expand All @@ -67,48 +78,47 @@ mixin RealtimeMixin {
break;
case 'event':
final message = RealtimeMessage.fromMap(data.data);
for(var channel in message.channels) {
if (_channels[channel] != null) {
for( var stream in _channels[channel]!) {
stream.sink.add(message);
for (var subscription in _subscriptions.values) {
for (var channel in message.channels) {
if (subscription.channels.contains(channel)) {
subscription.controller.add(message);
}
}
}
break;
}
}, onDone: () {
for (var list in _channels.values) {
for (var stream in list) {
stream.close();
}
if (!_notifyDone || _creatingSocket) return;
for (var subscription in _subscriptions.values) {
subscription.close();
}
_channels.clear();
_closeConnection();
}, onError: (err, stack) {
for (var list in _channels.values) {
for (var stream in list) {
stream.sink.addError(err, stack);
}
for (var subscription in _subscriptions.values) {
subscription.controller.addError(err, stack);
}
if (_websok?.closeCode != null && _websok?.closeCode != 1008) {
debugPrint("Reconnecting in one second.");
Future.delayed(Duration(seconds: 1), _createSocket);
}
});
} catch (e) {
if (e is {{spec.title | caseUcfirst}}Exception) {
if (e is AppwriteException) {
rethrow;
}
if (e is WebSocketChannelException) {
throw {{spec.title | caseUcfirst}}Exception(e.message);
throw AppwriteException(e.message);
}
throw {{spec.title | caseUcfirst}}Exception(e.toString());
throw AppwriteException(e.toString());
} finally {
_creatingSocket = false;
}
}

Uri _prepareUri() {
if (client.endPointRealtime == null) {
throw {{spec.title | caseUcfirst}}Exception(
throw AppwriteException(
"Please set endPointRealtime to connect to realtime server");
}
var uri = Uri.parse(client.endPointRealtime!);
Expand All @@ -118,43 +128,49 @@ mixin RealtimeMixin {
port: uri.port,
queryParameters: {
"project": client.config['project'],
"channels[]": _channels.keys.toList(),
"channels[]": _channels.toList(),
},
path: uri.path + "/realtime",
);
}

RealtimeSubscription subscribeTo(List<String> channels) {
StreamController<RealtimeMessage> controller = StreamController.broadcast();
for(var channel in channels) {
if (!_channels.containsKey(channel)) {
_channels[channel] = [];
}
_channels[channel]!.add(controller);
}
_channels.addAll(channels);
Future.delayed(Duration.zero, () => _createSocket());
int id = DateTime.now().microsecondsSinceEpoch;
RealtimeSubscription subscription = RealtimeSubscription(
stream: controller.stream,
controller: controller,
channels: channels,
close: () async {
_subscriptions.remove(id);
_subscriptionsCounter--;
controller.close();
for(var channel in channels) {
_channels[channel]!.remove(controller);
if (_channels[channel]!.isEmpty) {
_channels.remove(channel);
}
}
if(_channels.isNotEmpty) {
_cleanup(channels);

if (_channels.isNotEmpty) {
await Future.delayed(Duration.zero, () => _createSocket());
} else {
await _closeConnection();
}
});
_subscriptions[id] = subscription;
return subscription;
}

void _cleanup(List<String> channels) {
for (var channel in channels) {
bool found = _subscriptions.values
.any((subscription) => subscription.channels.contains(channel));
if (!found) {
_channels.remove(channel);
}
}
}

void handleError(RealtimeResponse response) {
if (response.data['code'] == 1008) {
throw {{spec.title | caseUcfirst}}Exception(response.data["message"], response.data["code"]);
throw AppwriteException(response.data["message"], response.data["code"]);
} else {
debugPrint("Reconnecting in one second.");
Future.delayed(const Duration(seconds: 1), () {
Expand Down
11 changes: 10 additions & 1 deletion templates/flutter/lib/src/realtime_subscription.dart.twig
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import 'dart:async';

import 'realtime_message.dart';

/// Realtime Subscription
class RealtimeSubscription {
/// Stream of [RealtimeMessage]s
final Stream<RealtimeMessage> stream;

final StreamController<RealtimeMessage> controller;

/// List of channels
List<String> channels;

/// Closes the subscription
final Future<void> Function() close;

/// Initializes a [RealtimeSubscription]
RealtimeSubscription({required this.stream, required this.close});
RealtimeSubscription(
{required this.close, required this.channels, required this.controller})
: stream = controller.stream;
}
20 changes: 10 additions & 10 deletions templates/flutter/test/src/realtime_subscription_test.dart.twig
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import 'package:mockito/mockito.dart';
import 'package:{{language.params.packageName}}/src/realtime_message.dart';
import 'package:{{language.params.packageName}}/src/realtime_subscription.dart';
import 'package:appwrite/src/realtime_message.dart';
import 'package:appwrite/src/realtime_subscription.dart';
import 'package:flutter_test/flutter_test.dart';

class MockStream<T> extends Mock implements Stream<T> {}


import 'dart:async';

void main() {
group('RealtimeSubscription', () {
final mockStream = MockStream<RealtimeMessage>();
final mockStream = StreamController<RealtimeMessage>.broadcast();
final mockCloseFunction = () async {};
final subscription = RealtimeSubscription(stream: mockStream, close: mockCloseFunction);
final subscription = RealtimeSubscription(
controller: mockStream,
close: mockCloseFunction,
channels: ['documents']);

test('should have the correct stream and close function', () {
expect(subscription.stream, equals(mockStream));
expect(subscription.controller, equals(mockStream));
expect(subscription.stream, equals(mockStream.stream));
expect(subscription.close, equals(mockCloseFunction));
});
});
Expand Down