diff --git a/docs/adr/0024-rate-limit-stream-extension.md b/docs/adr/0024-rate-limit-stream-extension.md new file mode 100644 index 0000000000..4bd2605770 --- /dev/null +++ b/docs/adr/0024-rate-limit-stream-extension.md @@ -0,0 +1,33 @@ +# 23. Implement Real-time Avatar Updates in Chat List + +Date: 2024-08-20 + +## Status + +Accepted + +## Context + +Currently, when a user changes their avatar or a group avatar is updated, the chat list items do not reflect these changes in real-time. On the web platform, users must reload the page to see the updated avatars. This leads to an inconsistent user experience and delays in displaying the most current information. + +## Decision + +We will implement a new function called `rateLimitWithSyncUpdate` to complement our existing `rateLimit` function. This new function will: + +1. Maintain the rate-limiting functionality of the original `rateLimit` function. +2. Capture and process `syncUpdate` events received from the server. +3. Use the `syncUpdate` events to identify which rooms or private chats have new avatars. +4. Trigger real-time updates to the relevant chat list items without requiring a page reload. + +## Consequences + +### Positive + +- Improved user experience with real-time avatar updates in the chat list. +- Reduced need for manual page reloads to see updated avatars. +- More consistent information display across the application. + +### Negative + +- Increased complexity in the client-side code to handle real-time updates. +- Potential for increased network traffic due to more frequent updates. \ No newline at end of file diff --git a/lib/pages/chat_list/chat_list_body_view.dart b/lib/pages/chat_list/chat_list_body_view.dart index 86c8767d37..ce7cb1aab8 100644 --- a/lib/pages/chat_list/chat_list_body_view.dart +++ b/lib/pages/chat_list/chat_list_body_view.dart @@ -51,8 +51,11 @@ class ChatListBodyView extends StatelessWidget { ), stream: controller.activeClient.onSync.stream .where((s) => s.hasRoomUpdate) - .rateLimit(const Duration(seconds: 1)), - builder: (context, _) { + .rateLimitWithSyncUpdate(const Duration(seconds: 1)), + builder: (context, syncUpdateSnapshot) { + Logs().v( + 'ChatListBodyView: StreamBuilder: snapshot: ${syncUpdateSnapshot.data?.rooms}', + ); if (controller.activeFilter == ActiveFilter.spaces) { return SpaceView( controller, @@ -163,6 +166,7 @@ class ChatListBodyView extends StatelessWidget { child: ChatListViewBuilder( controller: controller, rooms: controller.filteredRoomsForPin, + syncUpdate: syncUpdateSnapshot.data, ), ), if (!controller.filteredRoomsForAllIsEmpty) @@ -192,6 +196,7 @@ class ChatListBodyView extends StatelessWidget { child: ChatListViewBuilder( controller: controller, rooms: controller.filteredRoomsForAll, + syncUpdate: syncUpdateSnapshot.data, ), ), ], diff --git a/lib/pages/chat_list/chat_list_item.dart b/lib/pages/chat_list/chat_list_item.dart index e1c78d5e74..85fd7dc701 100644 --- a/lib/pages/chat_list/chat_list_item.dart +++ b/lib/pages/chat_list/chat_list_item.dart @@ -1,13 +1,12 @@ import 'package:adaptive_dialog/adaptive_dialog.dart'; import 'package:fluffychat/config/app_config.dart'; +import 'package:fluffychat/pages/chat_list/chat_list_item_avatar.dart'; import 'package:fluffychat/presentation/mixins/chat_list_item_mixin.dart'; import 'package:fluffychat/pages/chat_list/chat_list_item_style.dart'; import 'package:fluffychat/pages/chat_list/chat_list_item_subtitle.dart'; import 'package:fluffychat/pages/chat_list/chat_list_item_title.dart'; import 'package:fluffychat/utils/dialog/twake_dialog.dart'; -import 'package:fluffychat/utils/matrix_sdk_extensions/matrix_locals.dart'; import 'package:fluffychat/utils/twake_snackbar.dart'; -import 'package:fluffychat/widgets/avatar/avatar.dart'; import 'package:flutter/material.dart'; import 'package:flutter_gen/gen_l10n/l10n.dart'; @@ -27,6 +26,7 @@ class ChatListItem extends StatelessWidget with ChatListItemMixin { final void Function()? onTapAvatar; final void Function(TapDownDetails)? onSecondaryTapDown; final void Function()? onLongPress; + final JoinedRoomUpdate? joinedRoomUpdate; const ChatListItem( this.room, { @@ -39,6 +39,7 @@ class ChatListItem extends StatelessWidget with ChatListItemMixin { this.onSecondaryTapDown, this.onLongPress, super.key, + this.joinedRoomUpdate, }); void clickAction(BuildContext context) async { @@ -87,9 +88,6 @@ class ChatListItem extends StatelessWidget with ChatListItemMixin { @override Widget build(BuildContext context) { - final displayName = room.getLocalizedDisplayname( - MatrixLocals(L10n.of(context)!), - ); return Padding( padding: ChatListItemStyle.paddingConversation, child: Material( @@ -114,10 +112,10 @@ class ChatListItem extends StatelessWidget with ChatListItemMixin { padding: ChatListItemStyle.paddingAvatar, child: Stack( children: [ - Avatar( - mxContent: room.avatar, - name: displayName, + ChatListItemAvatar( + room: room, onTap: onTapAvatar, + joinedRoomUpdate: joinedRoomUpdate, ), if (_isGroupChat) Positioned( diff --git a/lib/pages/chat_list/chat_list_item_avatar.dart b/lib/pages/chat_list/chat_list_item_avatar.dart new file mode 100644 index 0000000000..08759048c3 --- /dev/null +++ b/lib/pages/chat_list/chat_list_item_avatar.dart @@ -0,0 +1,108 @@ +import 'package:fluffychat/utils/matrix_sdk_extensions/matrix_locals.dart'; +import 'package:fluffychat/widgets/avatar/avatar.dart'; +import 'package:flutter/material.dart'; +import 'package:matrix/matrix.dart'; +import 'package:flutter_gen/gen_l10n/l10n.dart'; + +class ChatListItemAvatar extends StatefulWidget { + final Room room; + final void Function()? onTap; + final JoinedRoomUpdate? joinedRoomUpdate; + + const ChatListItemAvatar({ + required this.room, + this.onTap, + this.joinedRoomUpdate, + super.key, + }); + + @override + State createState() => _ChatListItemAvatarState(); +} + +class _ChatListItemAvatarState extends State { + final ValueNotifier avatarUrlNotifier = ValueNotifier(Uri()); + + @override + void initState() { + avatarUrlNotifier.value = widget.room.avatar ?? Uri(); + super.initState(); + } + + @override + void dispose() { + avatarUrlNotifier.dispose(); + super.dispose(); + } + + @override + void didUpdateWidget(ChatListItemAvatar oldWidget) { + super.didUpdateWidget(oldWidget); + if (oldWidget.joinedRoomUpdate != widget.joinedRoomUpdate) { + updateAvatarUrlFromJoinedRoomUpdate(); + } + } + + @override + Widget build(BuildContext context) { + final displayName = widget.room.getLocalizedDisplayname( + MatrixLocals(L10n.of(context)!), + ); + return ValueListenableBuilder( + valueListenable: avatarUrlNotifier, + builder: (context, avatarUrl, child) { + return Avatar( + mxContent: avatarUrl, + name: displayName, + onTap: widget.onTap, + ); + }, + ); + } + + void updateAvatarUrlFromJoinedRoomUpdate() { + if (isChatHaveAvatarUpdated) { + if (isGroupChatAvatarUpdated) { + updateGroupAvatar(); + } else if (isDirectChatAvatarUpdated) { + updateDirectChatAvatar(); + } + } + } + + bool get isChatHaveAvatarUpdated => + widget.joinedRoomUpdate?.timeline?.events?.isNotEmpty == true; + + bool get isDirectChatAvatarUpdated { + return widget.room.isDirectChat && + widget.joinedRoomUpdate?.timeline?.events?.last.type == + EventTypes.RoomMember; + } + + bool get isGroupChatAvatarUpdated => + widget.joinedRoomUpdate?.timeline?.events?.last.type == + EventTypes.RoomAvatar; + + void updateDirectChatAvatar() { + final event = widget.joinedRoomUpdate?.timeline?.events?.last; + final avatarMxc = event?.content['avatar_url']; + if (event?.senderId != widget.room.directChatMatrixID) { + return; + } + updateAvatarUrl(avatarMxc); + } + + void updateGroupAvatar() { + final avatarMxc = + widget.joinedRoomUpdate?.timeline?.events?.last.content['url']; + updateAvatarUrl(avatarMxc); + } + + void updateAvatarUrl(Object? avatarMxc) { + if (avatarMxc is String) { + avatarUrlNotifier.value = Uri.tryParse(avatarMxc) ?? Uri(); + } else if (avatarMxc == null) { + avatarUrlNotifier.value = Uri(); + } + } +} diff --git a/lib/pages/chat_list/chat_list_view_builder.dart b/lib/pages/chat_list/chat_list_view_builder.dart index eee9f8b109..ff69b0dced 100644 --- a/lib/pages/chat_list/chat_list_view_builder.dart +++ b/lib/pages/chat_list/chat_list_view_builder.dart @@ -9,11 +9,13 @@ import 'package:matrix/matrix.dart'; class ChatListViewBuilder extends StatelessWidget { final ChatListController controller; final List rooms; + final SyncUpdate? syncUpdate; const ChatListViewBuilder({ super.key, required this.controller, required this.rooms, + this.syncUpdate, }); @override @@ -36,12 +38,14 @@ class ChatListViewBuilder extends StatelessWidget { chatListItem: CommonChatListItem( controller: controller, room: rooms[index], + joinedRoomUpdate: syncUpdate?.rooms?.join?[rooms[index].id], ), ); } return CommonChatListItem( controller: controller, room: rooms[index], + joinedRoomUpdate: syncUpdate?.rooms?.join?[rooms[index].id], ); }, ); diff --git a/lib/pages/chat_list/common_chat_list_item.dart b/lib/pages/chat_list/common_chat_list_item.dart index d74cb7f1fc..54f8f0402c 100644 --- a/lib/pages/chat_list/common_chat_list_item.dart +++ b/lib/pages/chat_list/common_chat_list_item.dart @@ -7,11 +7,13 @@ import 'package:matrix/matrix.dart'; class CommonChatListItem extends StatelessWidget { final ChatListController controller; final Room room; + final JoinedRoomUpdate? joinedRoomUpdate; const CommonChatListItem({ super.key, required this.controller, required this.room, + this.joinedRoomUpdate, }); @override @@ -49,6 +51,7 @@ class CommonChatListItem extends StatelessWidget { }, ), activeChat: activeRoomId == room.id, + joinedRoomUpdate: joinedRoomUpdate, ); }, ); diff --git a/lib/utils/stream_extension.dart b/lib/utils/stream_extension.dart index 66e840fcee..d9c5304400 100644 --- a/lib/utils/stream_extension.dart +++ b/lib/utils/stream_extension.dart @@ -1,5 +1,7 @@ import 'dart:async'; +import 'package:matrix/matrix.dart'; + extension StreamExtension on Stream { /// Returns a new Stream which outputs only `true` for every update of the original /// stream, ratelimited by the Duration t @@ -45,4 +47,42 @@ extension StreamExtension on Stream { }; return controller.stream; } + + Stream rateLimitWithSyncUpdate(Duration t) { + final controller = StreamController(); + Timer? timer; + SyncUpdate? pendingMessage; + + void processMessage() { + if (controller.isClosed) return; + + if (timer == null && pendingMessage != null) { + controller.add(pendingMessage!); + pendingMessage = null; + timer = Timer(t, () { + timer = null; + if (pendingMessage != null) { + processMessage(); + } + }); + } + } + + final subscription = listen( + (data) { + pendingMessage = data; + processMessage(); + }, + onDone: () => controller.close(), + onError: (e, s) => controller.addError(e, s), + ); + + controller.onCancel = () { + subscription.cancel(); + timer?.cancel(); + controller.close(); + }; + + return controller.stream; + } } diff --git a/test/utils/mock_sync_update.dart b/test/utils/mock_sync_update.dart new file mode 100644 index 0000000000..592cecf716 --- /dev/null +++ b/test/utils/mock_sync_update.dart @@ -0,0 +1,12 @@ +import 'package:matrix/matrix.dart'; +import 'package:mockito/mockito.dart'; + +class MockSyncUpdate extends Mock implements SyncUpdate { + @override + final String nextBatch; + + MockSyncUpdate({required this.nextBatch}); + + @override + String toString() => 'MockSyncUpdate(nextBatch: $nextBatch)'; +} diff --git a/test/utils/rate_limit_with_sync_update_test.dart b/test/utils/rate_limit_with_sync_update_test.dart new file mode 100644 index 0000000000..2494fcf8bd --- /dev/null +++ b/test/utils/rate_limit_with_sync_update_test.dart @@ -0,0 +1,194 @@ +import 'dart:async'; +import 'package:fluffychat/utils/stream_extension.dart'; +import 'package:flutter_test/flutter_test.dart'; +import 'package:matrix/matrix.dart'; + +import 'mock_sync_update.dart'; + +void main() { + group('rateLimitWithSyncUpdate', () { + test( + 'WHEN two updates are added within the rate limit duration THEN only the first update is emitted', + () async { + // GIVEN + final controller = StreamController(); + const duration = Duration(milliseconds: 100); + final rateLimitedStream = + controller.stream.rateLimitWithSyncUpdate(duration); + + final results = []; + final subscription = rateLimitedStream.listen(results.add); + + // WHEN + // Add two updates in quick succession + controller.add(MockSyncUpdate(nextBatch: '1')); + controller.add(MockSyncUpdate(nextBatch: '2')); + + // Wait for slightly less than the rate limit duration + await Future.delayed(duration - const Duration(milliseconds: 10)); + + // THEN + expect(results.length, 1); + expect(results.first.nextBatch, '1'); + + // CLEANUP + await subscription.cancel(); + await controller.close(); + }); + + test( + 'WHEN two updates are added with a delay greater than the rate limit duration THEN both updates are emitted', + () async { + // GIVEN + final controller = StreamController(); + const duration = Duration(milliseconds: 100); + final rateLimitedStream = + controller.stream.rateLimitWithSyncUpdate(duration); + + final results = []; + final subscription = rateLimitedStream.listen(results.add); + + // WHEN + controller.add(MockSyncUpdate(nextBatch: '1')); + await Future.delayed(duration); + controller.add(MockSyncUpdate(nextBatch: '2')); + + // Wait for slightly more than the rate limit duration to ensure processing + await Future.delayed(duration + const Duration(milliseconds: 10)); + + // THEN + expect(results.length, 2, reason: 'Two updates should be emitted'); + expect( + results.map((update) => update.nextBatch).toList(), + ['1', '2'], + reason: 'Both updates should be emitted in order', + ); + + // Clean up + await subscription.cancel(); + await controller.close(); + }); + + test( + 'WHEN many updates occur rapidly THEN only first and last updates within each rate limit duration are emitted', + () async { + // GIVEN + final controller = StreamController(); + const duration = Duration(milliseconds: 100); + final rateLimitedStream = + controller.stream.rateLimitWithSyncUpdate(duration); + + final results = []; + final subscription = rateLimitedStream.listen(results.add); + + // WHEN + for (int i = 1; i <= 10; i++) { + controller.add(MockSyncUpdate(nextBatch: i.toString())); + } + + // Wait for slightly more than the rate limit duration + await Future.delayed(duration); + + expect( + results.length, + 1, + reason: 'Only the first update should be emitted initially', + ); + expect( + results.first.nextBatch, + '1', + reason: 'The first update should be "1"', + ); + + // Wait for another rate limit duration + await Future.delayed(duration); + + // THEN + expect( + results.length, + 2, + reason: 'Two updates should be emitted after waiting', + ); + expect( + results.first.nextBatch, + '1', + reason: 'The first update should still be "1"', + ); + expect( + results.last.nextBatch, + '10', + reason: 'The last update should be "10"', + ); + + // Clean up + await subscription.cancel(); + await controller.close(); + }); + + test( + 'WHEN multiple updates are added rapidly THEN the first update is emitted', + () async { + // GIVEN + final controller = StreamController(); + final rateLimitedStream = controller.stream + .rateLimitWithSyncUpdate(const Duration(milliseconds: 100)); + + // WHEN + controller.add(MockSyncUpdate(nextBatch: '1')); + controller.add(MockSyncUpdate(nextBatch: '2')); + controller.add(MockSyncUpdate(nextBatch: '3')); + + // THEN + final result = await rateLimitedStream.first; + expect( + result.nextBatch, + '1', + reason: 'The first update should be emitted', + ); + + // Clean up + await controller.close(); + }); + + test( + 'WHEN the input stream is done THEN the rate-limited stream should close', + () async { + // GIVEN + final inputStream = + Stream.fromIterable([MockSyncUpdate(nextBatch: '1')]); + + // WHEN + final rateLimitedStream = inputStream + .rateLimitWithSyncUpdate(const Duration(milliseconds: 100)); + + // THEN + await expectLater( + rateLimitedStream, + emitsInOrder([isA(), emitsDone]), + reason: 'Should emit one update and then close', + ); + }); + + test( + 'WHEN an error is added to the input stream THEN it should be propagated to the rate-limited stream', + () async { + // GIVEN + final controller = StreamController(); + final rateLimitedStream = controller.stream + .rateLimitWithSyncUpdate(const Duration(milliseconds: 100)); + + // WHEN + controller.addError('Test error'); + + // THEN + await expectLater( + rateLimitedStream, + emitsError('Test error'), + reason: 'The error should be propagated to the rate-limited stream', + ); + + // Clean up + await controller.close(); + }); + }); +}