Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for realtime multiple subscription 15x #801

Merged
merged 9 commits into from
Apr 24, 2024
80 changes: 48 additions & 32 deletions templates/flutter/lib/src/realtime_mixin.dart.twig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'exception.dart';
import 'realtime_subscription.dart';
import 'client.dart';
Expand All @@ -15,35 +15,46 @@ typedef GetFallbackCookie = String? Function();

mixin RealtimeMixin {
late Client client;
final Map<String, List<StreamController<RealtimeMessage>>> _channels = {};
final Set<String> _channels = {};
WebSocketChannel? _websok;
String? _lastUrl;
late WebSocketFactory getWebSocket;
GetFallbackCookie? getFallbackCookie;
int? get closeCode => _websok?.closeCode;
int _subscriptionsCounter = 0;
Map<int, RealtimeSubscription> _subscriptions = {};
stnguyen90 marked this conversation as resolved.
Show resolved Hide resolved
bool _notifyDone = true;
StreamSubscription? _websocketSubscription;
bool _creatingSocket = false;

Future<dynamic> _closeConnection() async {
await _websok?.sink.close(normalClosure);
await _websocketSubscription?.cancel();
await _websok?.sink.close(status.normalClosure, 'Ending session');
_lastUrl = null;
}

_createSocket() async {
if(_creatingSocket) return;
_creatingSocket = true;
stnguyen90 marked this conversation as resolved.
Show resolved Hide resolved
final uri = _prepareUri();
if (_websok == null) {
_websok = await getWebSocket(uri);
_lastUrl = uri.toString();
} else {
if (_lastUrl == uri.toString() && _websok?.closeCode == null) {
_creatingSocket = false;
return;
}
_notifyDone = false;
stnguyen90 marked this conversation as resolved.
Show resolved Hide resolved
await _closeConnection();
_lastUrl = uri.toString();
_websok = await getWebSocket(uri);
_notifyDone = true;
}
debugPrint('subscription: $_lastUrl');

try {
_websok?.stream.listen((response) {
_websocketSubscription = _websok?.stream.listen((response) {
final data = RealtimeResponse.fromJson(response);
switch (data.type) {
case 'error':
Expand All @@ -67,28 +78,25 @@ mixin RealtimeMixin {
break;
case 'event':
final message = RealtimeMessage.fromMap(data.data);
for(var channel in message.channels) {
if (_channels[channel] != null) {
for( var stream in _channels[channel]!) {
stream.sink.add(message);
for (var subscription in _subscriptions.values) {
for (var channel in message.channels) {
if (subscription.channels.contains(channel)) {
subscription.controller.add(message);
}
}
}
break;
}
}, onDone: () {
for (var list in _channels.values) {
for (var stream in list) {
stream.close();
}
if (!_notifyDone || _creatingSocket) return;
for (var subscription in _subscriptions.values) {
subscription.close();
}
_channels.clear();
_closeConnection();
}, onError: (err, stack) {
for (var list in _channels.values) {
for (var stream in list) {
stream.sink.addError(err, stack);
}
for (var subscription in _subscriptions.values) {
subscription.controller.addError(err, stack);
}
if (_websok?.closeCode != null && _websok?.closeCode != 1008) {
debugPrint("Reconnecting in one second.");
Expand All @@ -103,6 +111,8 @@ mixin RealtimeMixin {
throw {{spec.title | caseUcfirst}}Exception(e.message);
}
throw {{spec.title | caseUcfirst}}Exception(e.toString());
} finally {
_creatingSocket = false;
}
}

Expand All @@ -118,43 +128,49 @@ mixin RealtimeMixin {
port: uri.port,
queryParameters: {
"project": client.config['project'],
"channels[]": _channels.keys.toList(),
"channels[]": _channels.toList(),
},
path: uri.path + "/realtime",
);
}

RealtimeSubscription subscribeTo(List<String> channels) {
StreamController<RealtimeMessage> controller = StreamController.broadcast();
for(var channel in channels) {
if (!_channels.containsKey(channel)) {
_channels[channel] = [];
}
_channels[channel]!.add(controller);
}
_channels.addAll(channels);
Future.delayed(Duration.zero, () => _createSocket());
int id = DateTime.now().microsecondsSinceEpoch;
RealtimeSubscription subscription = RealtimeSubscription(
stream: controller.stream,
controller: controller,
channels: channels,
close: () async {
_subscriptions.remove(id);
_subscriptionsCounter--;
controller.close();
for(var channel in channels) {
_channels[channel]!.remove(controller);
if (_channels[channel]!.isEmpty) {
_channels.remove(channel);
}
}
if(_channels.isNotEmpty) {
_cleanup(channels);

if (_channels.isNotEmpty) {
await Future.delayed(Duration.zero, () => _createSocket());
} else {
await _closeConnection();
}
});
_subscriptions[id] = subscription;
return subscription;
}

void _cleanup(List<String> channels) {
for (var channel in channels) {
bool found = _subscriptions.values
.any((subscription) => subscription.channels.contains(channel));
if (!found) {
_channels.remove(channel);
}
}
}

void handleError(RealtimeResponse response) {
if (response.data['code'] == 1008) {
throw {{spec.title | caseUcfirst}}Exception(response.data["message"], response.data["code"]);
throw AppwriteException(response.data["message"], response.data["code"]);
} else {
debugPrint("Reconnecting in one second.");
Future.delayed(const Duration(seconds: 1), () {
Expand Down
11 changes: 10 additions & 1 deletion templates/flutter/lib/src/realtime_subscription.dart.twig
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import 'dart:async';

import 'realtime_message.dart';

/// Realtime Subscription
class RealtimeSubscription {
/// Stream of [RealtimeMessage]s
final Stream<RealtimeMessage> stream;

final StreamController<RealtimeMessage> controller;

/// List of channels
List<String> channels;

/// Closes the subscription
final Future<void> Function() close;

/// Initializes a [RealtimeSubscription]
RealtimeSubscription({required this.stream, required this.close});
RealtimeSubscription(
{required this.close, required this.channels, required this.controller})
: stream = controller.stream;
}
20 changes: 10 additions & 10 deletions templates/flutter/test/src/realtime_subscription_test.dart.twig
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import 'package:mockito/mockito.dart';
import 'package:{{language.params.packageName}}/src/realtime_message.dart';
import 'package:{{language.params.packageName}}/src/realtime_subscription.dart';
import 'package:appwrite/src/realtime_message.dart';
import 'package:appwrite/src/realtime_subscription.dart';
import 'package:flutter_test/flutter_test.dart';

class MockStream<T> extends Mock implements Stream<T> {}


import 'dart:async';

void main() {
group('RealtimeSubscription', () {
final mockStream = MockStream<RealtimeMessage>();
final mockStream = StreamController<RealtimeMessage>.broadcast();
final mockCloseFunction = () async {};
final subscription = RealtimeSubscription(stream: mockStream, close: mockCloseFunction);
final subscription = RealtimeSubscription(
controller: mockStream,
close: mockCloseFunction,
channels: ['documents']);

test('should have the correct stream and close function', () {
expect(subscription.stream, equals(mockStream));
expect(subscription.controller, equals(mockStream));
expect(subscription.stream, equals(mockStream.stream));
expect(subscription.close, equals(mockCloseFunction));
});
});
Expand Down