From c48bbeb3a579bc6d5a5cf3a02c028acaa2e50b7f Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 08:28:26 +0000 Subject: [PATCH 01/21] Add a test for existing reconnect behaviour --- test/sse_test.dart | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/test/sse_test.dart b/test/sse_test.dart index 62fe4c6..70d6d1b 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -93,6 +93,19 @@ void main() { expect(handler.numberOfClients, 0); }); + test('Client reconnects after being disconnected', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + await connection.sink.close(); + await pumpEventQueue(); + expect(handler.numberOfClients, 0); + + // Ensure the client reconnects + await handler.connections.next; + }); + test('Can close from the client-side', () async { expect(handler.numberOfClients, 0); await webdriver.get('http://localhost:${server.port}'); From aca46981648ae7287ba4318b402afc093e1fec7f Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 09:38:20 +0000 Subject: [PATCH 02/21] Add a server keep-alive to allow clients to reconnect Fixes #18. --- lib/server/sse_handler.dart | 79 ++++++++--- test/sse_test.dart | 255 +++++++++++++++++++++--------------- 2 files changed, 214 insertions(+), 120 deletions(-) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 1a5ef06..3baf8f2 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -28,12 +28,22 @@ class SseConnection extends StreamChannelMixin { /// Outgoing messages to the Browser client. final _outgoingController = StreamController(); - final Sink _sink; + Sink _sink; + + /// How long to wait after a connection drops before considering it closed. + final Duration _keepAlive; + + /// Whether the connection is in the timeout period waiting for a reconnect. + bool _isTimingOut = false; + + /// The subscription that passes messages outgoing messages to the sink. This + /// will be paused during the timeout/reconnect period. + StreamSubscription _outgoingStreamSubscription; final _closedCompleter = Completer(); - SseConnection(this._sink) { - _outgoingController.stream.listen((data) { + SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { + _outgoingStreamSubscription = _outgoingController.stream.listen((data) { if (!_closedCompleter.isCompleted) { // JSON encode the message to escape new lines. _sink.add('data: ${json.encode(data)}\n'); @@ -55,9 +65,35 @@ class SseConnection extends StreamChannelMixin { @override Stream get stream => _incomingController.stream; + void _acceptReconnection(Sink sink) { + _isTimingOut = false; + _sink = sink; + _outgoingStreamSubscription.resume(); + } + + void _handleDisconnect() { + if (_keepAlive == null) { + _close(); + } else { + _outgoingStreamSubscription.pause(); + _isTimingOut = true; + // If after the timeout period we're still in this state, we'll close. + Timer(_keepAlive, () { + if (_isTimingOut) { + _isTimingOut = false; + _close(); + } + }); + } + } + + // TODO(dantup): @visibleForTesting? + void closeSink() => _sink.close(); + void _close() { if (!_closedCompleter.isCompleted) { _closedCompleter.complete(); + _outgoingStreamSubscription.cancel(); _sink.close(); if (!_outgoingController.isClosed) _outgoingController.close(); if (!_incomingController.isClosed) _incomingController.close(); @@ -73,12 +109,13 @@ class SseConnection extends StreamChannelMixin { class SseHandler { final _logger = Logger('SseHandler'); final Uri _uri; + final Duration _keepAlive; final _connections = {}; final _connectionController = StreamController(); StreamQueue _connectionsStream; - SseHandler(this._uri); + SseHandler(this._uri, {Duration keepAlive}) : _keepAlive = keepAlive; StreamQueue get connections => _connectionsStream ??= StreamQueue(_connectionController.stream); @@ -92,20 +129,28 @@ class SseHandler { var sink = utf8.encoder.startChunkedConversion(channel.sink); sink.add(_sseHeaders(req.headers['origin'])); var clientId = req.url.queryParameters['sseClientId']; - var connection = SseConnection(sink); - _connections[clientId] = connection; - unawaited(connection._closedCompleter.future.then((_) { - _connections.remove(clientId); - })); - // Remove connection when it is remotely closed or the stream is - // cancelled. - channel.stream.listen((_) { - // SSE is unidirectional. Responses are handled through POST requests. - }, onDone: () { - connection._close(); - }); - _connectionController.add(connection); + // Check if we already have a connection for this ID that is in the process + // of timing out (in which case we can reconnect it transparently). + if (_connections[clientId] != null && + _connections[clientId]._isTimingOut) { + _connections[clientId]._acceptReconnection(sink); + } else { + var connection = SseConnection(sink, keepAlive: _keepAlive); + _connections[clientId] = connection; + unawaited(connection._closedCompleter.future.then((_) { + _connections.remove(clientId); + })); + // Remove connection when it is remotely closed or the stream is + // cancelled. + channel.stream.listen((_) { + // SSE is unidirectional. Responses are handled through POST requests. + }, onDone: () { + connection._handleDisconnect(); + }); + + _connectionController.add(connection); + } }); return shelf.Response.notFound(''); } diff --git a/test/sse_test.dart b/test/sse_test.dart index 70d6d1b..ad7111b 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -33,111 +33,160 @@ void main() { chromeDriver.kill(); }); - setUp(() async { - handler = SseHandler(Uri.parse('/test')); - - var cascade = shelf.Cascade().add(handler.handler).add(_faviconHandler).add( - createStaticHandler('test/web', - listDirectories: true, defaultDocument: 'index.html')); - - server = await io.serve(cascade.handler, 'localhost', 0); - var capabilities = Capabilities.chrome - ..addAll({ - Capabilities.chromeOptions: { - 'args': ['--headless'] - } - }); - webdriver = await createDriver(desired: capabilities); + group('SSE', () { + setUp(() async { + handler = SseHandler(Uri.parse('/test')); + + var cascade = shelf.Cascade() + .add(handler.handler) + .add(_faviconHandler) + .add(createStaticHandler('test/web', + listDirectories: true, defaultDocument: 'index.html')); + + server = await io.serve(cascade.handler, 'localhost', 0); + var capabilities = Capabilities.chrome + ..addAll({ + Capabilities.chromeOptions: { + 'args': ['--headless'] + } + }); + webdriver = await createDriver(desired: capabilities); + }); + + tearDown(() async { + await webdriver.quit(); + await server.close(); + }); + + test('Can round trip messages', () async { + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + connection.sink.add('blah'); + expect(await connection.stream.first, 'blah'); + }); + + test('Multiple clients can connect', () async { + var connections = handler.connections; + await webdriver.get('http://localhost:${server.port}'); + await connections.next; + await webdriver.get('http://localhost:${server.port}'); + await connections.next; + }); + + test('Routes data correctly', () async { + var connections = handler.connections; + await webdriver.get('http://localhost:${server.port}'); + var connectionA = await connections.next; + connectionA.sink.add('foo'); + expect(await connectionA.stream.first, 'foo'); + + await webdriver.get('http://localhost:${server.port}'); + var connectionB = await connections.next; + connectionB.sink.add('bar'); + expect(await connectionB.stream.first, 'bar'); + }); + + test('Can close from the server', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + await connection.sink.close(); + await pumpEventQueue(); + expect(handler.numberOfClients, 0); + }); + + test('Client reconnects after being disconnected', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + await connection.sink.close(); + await pumpEventQueue(); + expect(handler.numberOfClients, 0); + + // Ensure the client reconnects + await handler.connections.next; + }); + + test('Can close from the client-side', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + var closeButton = await webdriver.findElement(const By.tagName('button')); + await closeButton.click(); + + // Should complete since the connection is closed. + await connection.stream.toList(); + expect(handler.numberOfClients, 0); + }); + + test('Cancelling the listener closes the connection', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + var sub = connection.stream.listen((_) {}); + await sub.cancel(); + await pumpEventQueue(); + expect(handler.numberOfClients, 0); + }); + + test('Disconnects when navigating away', () async { + await webdriver.get('http://localhost:${server.port}'); + expect(handler.numberOfClients, 1); + + await webdriver.get('chrome://version/'); + expect(handler.numberOfClients, 0); + }); }); - tearDown(() async { - await webdriver.quit(); - await server.close(); - }); - - test('Can round trip messages', () async { - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - connection.sink.add('blah'); - expect(await connection.stream.first, 'blah'); - }); - - test('Multiple clients can connect', () async { - var connections = handler.connections; - await webdriver.get('http://localhost:${server.port}'); - await connections.next; - await webdriver.get('http://localhost:${server.port}'); - await connections.next; - }); - - test('Routes data correctly', () async { - var connections = handler.connections; - await webdriver.get('http://localhost:${server.port}'); - var connectionA = await connections.next; - connectionA.sink.add('foo'); - expect(await connectionA.stream.first, 'foo'); - - await webdriver.get('http://localhost:${server.port}'); - var connectionB = await connections.next; - connectionB.sink.add('bar'); - expect(await connectionB.stream.first, 'bar'); - }); - - test('Can close from the server', () async { - expect(handler.numberOfClients, 0); - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - expect(handler.numberOfClients, 1); - await connection.sink.close(); - await pumpEventQueue(); - expect(handler.numberOfClients, 0); - }); - - test('Client reconnects after being disconnected', () async { - expect(handler.numberOfClients, 0); - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - expect(handler.numberOfClients, 1); - await connection.sink.close(); - await pumpEventQueue(); - expect(handler.numberOfClients, 0); - - // Ensure the client reconnects - await handler.connections.next; - }); - - test('Can close from the client-side', () async { - expect(handler.numberOfClients, 0); - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - expect(handler.numberOfClients, 1); - - var closeButton = await webdriver.findElement(const By.tagName('button')); - await closeButton.click(); - - // Should complete since the connection is closed. - await connection.stream.toList(); - expect(handler.numberOfClients, 0); - }); - - test('Cancelling the listener closes the connection', () async { - expect(handler.numberOfClients, 0); - await webdriver.get('http://localhost:${server.port}'); - var connection = await handler.connections.next; - expect(handler.numberOfClients, 1); - - var sub = connection.stream.listen((_) {}); - await sub.cancel(); - await pumpEventQueue(); - expect(handler.numberOfClients, 0); - }); - - test('Disconnects when navigating away', () async { - await webdriver.get('http://localhost:${server.port}'); - expect(handler.numberOfClients, 1); - - await webdriver.get('chrome://version/'); - expect(handler.numberOfClients, 0); + group('SSE with server keep-alive', () { + setUp(() async { + handler = + SseHandler(Uri.parse('/test'), keepAlive: const Duration(seconds: 5)); + + var cascade = shelf.Cascade() + .add(handler.handler) + .add(_faviconHandler) + .add(createStaticHandler('test/web', + listDirectories: true, defaultDocument: 'index.html')); + + server = await io.serve(cascade.handler, 'localhost', 0); + var capabilities = Capabilities.chrome + ..addAll({ + Capabilities.chromeOptions: { + 'args': ['--headless'] + } + }); + webdriver = await createDriver(desired: capabilities); + }); + + tearDown(() async { + await webdriver.quit(); + await server.close(); + }); + + test('Client reconnect use the same connection', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + // Close the underlying connection. + connection.closeSink(); + await pumpEventQueue(); + + // Ensure there's still a connection. + expect(handler.numberOfClients, 1); + + // Ensure we can still round-trip data on the original connection. + connection.sink.add('bar'); + expect(await connection.stream.first, 'bar'); + }); }); } From a1fb6afb13053bca5fd5e0c544aefaa325169834 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 13:58:16 +0000 Subject: [PATCH 03/21] Add debug logging to debug Travis failure --- lib/server/sse_handler.dart | 10 +++++++--- test/sse_test.dart | 10 ++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 3baf8f2..3564545 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -45,9 +45,13 @@ class SseConnection extends StreamChannelMixin { SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { _outgoingStreamSubscription = _outgoingController.stream.listen((data) { if (!_closedCompleter.isCompleted) { - // JSON encode the message to escape new lines. - _sink.add('data: ${json.encode(data)}\n'); - _sink.add('\n'); + try { + // JSON encode the message to escape new lines. + _sink.add('data: ${json.encode(data)}\n'); + _sink.add('\n'); + } catch (e) { + print('Error while trying to write "${json.encode(data)}": $e'); + } } }); _outgoingController.onCancel = _close; diff --git a/test/sse_test.dart b/test/sse_test.dart index ad7111b..162e366 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -171,21 +171,31 @@ void main() { }); test('Client reconnect use the same connection', () async { + print('test: 1'); expect(handler.numberOfClients, 0); await webdriver.get('http://localhost:${server.port}'); var connection = await handler.connections.next; + print('test: 2'); expect(handler.numberOfClients, 1); + print('test: 3'); // Close the underlying connection. connection.closeSink(); + print('test: 4'); await pumpEventQueue(); + print('test: 5'); // Ensure there's still a connection. + print('test: 6'); expect(handler.numberOfClients, 1); + print('test: 7'); // Ensure we can still round-trip data on the original connection. + print('test: 8'); connection.sink.add('bar'); + print('test: 9'); expect(await connection.stream.first, 'bar'); + print('test: 10'); }); }); } From 39b7d9980dfbce9c58599401759fcb6b2c4d0173 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 14:13:27 +0000 Subject: [PATCH 04/21] more debug logging --- lib/server/sse_handler.dart | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 3564545..f1f0d86 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -76,6 +76,7 @@ class SseConnection extends StreamChannelMixin { } void _handleDisconnect() { + print('handling disconnect!'); if (_keepAlive == null) { _close(); } else { @@ -150,6 +151,7 @@ class SseHandler { channel.stream.listen((_) { // SSE is unidirectional. Responses are handled through POST requests. }, onDone: () { + print('stream is done!'); connection._handleDisconnect(); }); From dc5e488f98713223c881595cb6e1492bc951ea3d Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 14:16:55 +0000 Subject: [PATCH 05/21] Try handling failures to write to the stream --- lib/server/sse_handler.dart | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index f1f0d86..173b3aa 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -49,8 +49,13 @@ class SseConnection extends StreamChannelMixin { // JSON encode the message to escape new lines. _sink.add('data: ${json.encode(data)}\n'); _sink.add('\n'); - } catch (e) { - print('Error while trying to write "${json.encode(data)}": $e'); + } catch (StateError) { + // If we got here then the sink may have closed but the stream.onDone + // hasn't fired yet, so pause the subscription, re-queue the message + // and handle the error as a disconnect. + _outgoingStreamSubscription.pause(); + _outgoingController.add(data); + _handleDisconnect(); } } }); @@ -79,7 +84,7 @@ class SseConnection extends StreamChannelMixin { print('handling disconnect!'); if (_keepAlive == null) { _close(); - } else { + } else if (!_isTimingOut) { _outgoingStreamSubscription.pause(); _isTimingOut = true; // If after the timeout period we're still in this state, we'll close. From 652a0163a41a142b2be744392aa6d2138eb21df2 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 14:24:00 +0000 Subject: [PATCH 06/21] Don't re-queue messages if not keeping alive --- lib/server/sse_handler.dart | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 173b3aa..bba795d 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -50,6 +50,9 @@ class SseConnection extends StreamChannelMixin { _sink.add('data: ${json.encode(data)}\n'); _sink.add('\n'); } catch (StateError) { + if (_keepAlive == null) { + rethrow; + } // If we got here then the sink may have closed but the stream.onDone // hasn't fired yet, so pause the subscription, re-queue the message // and handle the error as a disconnect. From c3931c1eb45c5df95e9127bbc63e736b52be0f00 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 14:24:05 +0000 Subject: [PATCH 07/21] More debugging --- lib/server/sse_handler.dart | 1 + test/sse_test.dart | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index bba795d..c4d0809 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -78,6 +78,7 @@ class SseConnection extends StreamChannelMixin { Stream get stream => _incomingController.stream; void _acceptReconnection(Sink sink) { + print('accepting reconnect'); _isTimingOut = false; _sink = sink; _outgoingStreamSubscription.resume(); diff --git a/test/sse_test.dart b/test/sse_test.dart index 162e366..3a827b2 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -197,7 +197,7 @@ void main() { expect(await connection.stream.first, 'bar'); print('test: 10'); }); - }); + }, timeout: const Timeout(Duration(seconds: 120))); } FutureOr _faviconHandler(shelf.Request request) { From 5d5f4f8f6f20f84efafb86ce26f0db2423e83489 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 14:29:08 +0000 Subject: [PATCH 08/21] Don't cause multiple pauses --- lib/server/sse_handler.dart | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index c4d0809..392d604 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -44,6 +44,7 @@ class SseConnection extends StreamChannelMixin { SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { _outgoingStreamSubscription = _outgoingController.stream.listen((data) { + print('Trying to send ${json.encode(data)}'); if (!_closedCompleter.isCompleted) { try { // JSON encode the message to escape new lines. @@ -53,12 +54,12 @@ class SseConnection extends StreamChannelMixin { if (_keepAlive == null) { rethrow; } + print('failed to send (re-queing and pausing) ${json.encode(data)}'); // If we got here then the sink may have closed but the stream.onDone // hasn't fired yet, so pause the subscription, re-queue the message // and handle the error as a disconnect. - _outgoingStreamSubscription.pause(); - _outgoingController.add(data); _handleDisconnect(); + _outgoingController.add(data); } } }); @@ -81,6 +82,7 @@ class SseConnection extends StreamChannelMixin { print('accepting reconnect'); _isTimingOut = false; _sink = sink; + print('resuming sub'); _outgoingStreamSubscription.resume(); } From fea430f57efa297f944e7defc423be2afeebe140 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Tue, 17 Dec 2019 14:34:18 +0000 Subject: [PATCH 09/21] Remov debug logging --- lib/server/sse_handler.dart | 6 ------ test/sse_test.dart | 10 ---------- 2 files changed, 16 deletions(-) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 392d604..1a55938 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -44,7 +44,6 @@ class SseConnection extends StreamChannelMixin { SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { _outgoingStreamSubscription = _outgoingController.stream.listen((data) { - print('Trying to send ${json.encode(data)}'); if (!_closedCompleter.isCompleted) { try { // JSON encode the message to escape new lines. @@ -54,7 +53,6 @@ class SseConnection extends StreamChannelMixin { if (_keepAlive == null) { rethrow; } - print('failed to send (re-queing and pausing) ${json.encode(data)}'); // If we got here then the sink may have closed but the stream.onDone // hasn't fired yet, so pause the subscription, re-queue the message // and handle the error as a disconnect. @@ -79,15 +77,12 @@ class SseConnection extends StreamChannelMixin { Stream get stream => _incomingController.stream; void _acceptReconnection(Sink sink) { - print('accepting reconnect'); _isTimingOut = false; _sink = sink; - print('resuming sub'); _outgoingStreamSubscription.resume(); } void _handleDisconnect() { - print('handling disconnect!'); if (_keepAlive == null) { _close(); } else if (!_isTimingOut) { @@ -162,7 +157,6 @@ class SseHandler { channel.stream.listen((_) { // SSE is unidirectional. Responses are handled through POST requests. }, onDone: () { - print('stream is done!'); connection._handleDisconnect(); }); diff --git a/test/sse_test.dart b/test/sse_test.dart index 3a827b2..810989e 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -171,31 +171,21 @@ void main() { }); test('Client reconnect use the same connection', () async { - print('test: 1'); expect(handler.numberOfClients, 0); await webdriver.get('http://localhost:${server.port}'); var connection = await handler.connections.next; - print('test: 2'); expect(handler.numberOfClients, 1); - print('test: 3'); // Close the underlying connection. connection.closeSink(); - print('test: 4'); await pumpEventQueue(); - print('test: 5'); // Ensure there's still a connection. - print('test: 6'); expect(handler.numberOfClients, 1); - print('test: 7'); // Ensure we can still round-trip data on the original connection. - print('test: 8'); connection.sink.add('bar'); - print('test: 9'); expect(await connection.stream.first, 'bar'); - print('test: 10'); }); }, timeout: const Timeout(Duration(seconds: 120))); } From 0a5827cf55d804740b72e9a926a3d4c474d549ba Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Wed, 18 Dec 2019 09:35:35 +0000 Subject: [PATCH 10/21] Switch boolean to timer --- lib/server/sse_handler.dart | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 1a55938..b8abfad 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -33,8 +33,8 @@ class SseConnection extends StreamChannelMixin { /// How long to wait after a connection drops before considering it closed. final Duration _keepAlive; - /// Whether the connection is in the timeout period waiting for a reconnect. - bool _isTimingOut = false; + /// A timer counting down the KeepAlive period (null if connected). + Timer _keepAliveTimer; /// The subscription that passes messages outgoing messages to the sink. This /// will be paused during the timeout/reconnect period. @@ -77,24 +77,22 @@ class SseConnection extends StreamChannelMixin { Stream get stream => _incomingController.stream; void _acceptReconnection(Sink sink) { - _isTimingOut = false; + _keepAliveTimer.cancel(); + _keepAliveTimer = null; _sink = sink; _outgoingStreamSubscription.resume(); } void _handleDisconnect() { if (_keepAlive == null) { + // Close immediately if we're not keeping alive. _close(); - } else if (!_isTimingOut) { + } else if (_keepAliveTimer == null) { + // Otherwise pause sending messages and set a timer to close after the + // timeout period. If the connection comes back, this will be unpaused + // and the timer cancelled. _outgoingStreamSubscription.pause(); - _isTimingOut = true; - // If after the timeout period we're still in this state, we'll close. - Timer(_keepAlive, () { - if (_isTimingOut) { - _isTimingOut = false; - _close(); - } - }); + _keepAliveTimer = Timer(_keepAlive, _close); } } @@ -144,7 +142,7 @@ class SseHandler { // Check if we already have a connection for this ID that is in the process // of timing out (in which case we can reconnect it transparently). if (_connections[clientId] != null && - _connections[clientId]._isTimingOut) { + _connections[clientId]._keepAliveTimer != null) { _connections[clientId]._acceptReconnection(sink); } else { var connection = SseConnection(sink, keepAlive: _keepAlive); From 254142eef5b62cc5ff39ceb4e4538c037301bb58 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Wed, 18 Dec 2019 09:36:21 +0000 Subject: [PATCH 11/21] Tweak comment --- lib/server/sse_handler.dart | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index b8abfad..69829cf 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -36,8 +36,9 @@ class SseConnection extends StreamChannelMixin { /// A timer counting down the KeepAlive period (null if connected). Timer _keepAliveTimer; - /// The subscription that passes messages outgoing messages to the sink. This - /// will be paused during the timeout/reconnect period. + /// The subscription that passes outgoing messages to the sink. + /// + /// This will be paused during the keepalive period and resumed upon reconnection. StreamSubscription _outgoingStreamSubscription; final _closedCompleter = Completer(); From fee2787e2e4d11bd5b55671e7680bb848f26b015 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Wed, 18 Dec 2019 09:40:08 +0000 Subject: [PATCH 12/21] Add a comment about the keepAlive parameter --- lib/server/sse_handler.dart | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 69829cf..6ec4639 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -43,6 +43,14 @@ class SseConnection extends StreamChannelMixin { final _closedCompleter = Completer(); + /// Creates an [SseConnection] for the supplied [_sink]. + /// + /// If [keepAlive] is supplied, the connection will remain active for this + /// period after a disconnect and can be reconnected transparently. If there + /// is no reconnect within that period, the connection will be closed normally. + /// + /// If [keepAlive] is not supplied, the connection will be closed immediately + /// after a disconnect. SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { _outgoingStreamSubscription = _outgoingController.stream.listen((data) { if (!_closedCompleter.isCompleted) { From bef29374b0eca565c294553da528ae73b598ae35 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Wed, 18 Dec 2019 09:55:11 +0000 Subject: [PATCH 13/21] Move sse_handler -> src --- lib/{server => src}/sse_handler.dart | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename lib/{server => src}/sse_handler.dart (100%) diff --git a/lib/server/sse_handler.dart b/lib/src/sse_handler.dart similarity index 100% rename from lib/server/sse_handler.dart rename to lib/src/sse_handler.dart From b5f53f9141a59cafb3d2cc35812624957594f99f Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Wed, 18 Dec 2019 09:55:40 +0000 Subject: [PATCH 14/21] Re-export sse_handler classes --- lib/server/sse_handler.dart | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 lib/server/sse_handler.dart diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart new file mode 100644 index 0000000..4a331e5 --- /dev/null +++ b/lib/server/sse_handler.dart @@ -0,0 +1,5 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +export 'package:sse/src/sse_handler.dart' show SseConnection, SseHandler; From 8af4a5eed7d3c5a4c3cb242f94dbfec23d8d691f Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Wed, 18 Dec 2019 09:55:53 +0000 Subject: [PATCH 15/21] Change closeSink to a top level in src/ --- lib/src/sse_handler.dart | 5 ++--- test/sse_test.dart | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/src/sse_handler.dart b/lib/src/sse_handler.dart index 6ec4639..7796a6a 100644 --- a/lib/src/sse_handler.dart +++ b/lib/src/sse_handler.dart @@ -105,9 +105,6 @@ class SseConnection extends StreamChannelMixin { } } - // TODO(dantup): @visibleForTesting? - void closeSink() => _sink.close(); - void _close() { if (!_closedCompleter.isCompleted) { _closedCompleter.complete(); @@ -213,3 +210,5 @@ class SseHandler { // https://bugzilla.mozilla.org/show_bug.cgi?id=1508661 req.headers['origin'] ?? req.headers['host']; } + +void closeSink(SseConnection connection) => connection._sink.close(); diff --git a/test/sse_test.dart b/test/sse_test.dart index 810989e..e6983a8 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -10,6 +10,7 @@ import 'package:shelf/shelf.dart' as shelf; import 'package:shelf/shelf_io.dart' as io; import 'package:shelf_static/shelf_static.dart'; import 'package:sse/server/sse_handler.dart'; +import 'package:sse/src/sse_handler.dart' show closeSink; import 'package:test/test.dart'; import 'package:webdriver/io.dart'; @@ -177,7 +178,7 @@ void main() { expect(handler.numberOfClients, 1); // Close the underlying connection. - connection.closeSink(); + closeSink(connection); await pumpEventQueue(); // Ensure there's still a connection. From 76bd1819b167e01cc4e975a1b8e3532ab79c68c4 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Mon, 23 Dec 2019 10:04:02 +0000 Subject: [PATCH 16/21] Tweak handling of KeepAlive timer --- lib/src/sse_handler.dart | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/src/sse_handler.dart b/lib/src/sse_handler.dart index 7796a6a..735dcb9 100644 --- a/lib/src/sse_handler.dart +++ b/lib/src/sse_handler.dart @@ -33,9 +33,12 @@ class SseConnection extends StreamChannelMixin { /// How long to wait after a connection drops before considering it closed. final Duration _keepAlive; - /// A timer counting down the KeepAlive period (null if connected). + /// A timer counting down the KeepAlive period (null if hasn't disconnected). Timer _keepAliveTimer; + /// Whether this connection is currently in the KeepAlive timeout period. + bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false; + /// The subscription that passes outgoing messages to the sink. /// /// This will be paused during the keepalive period and resumed upon reconnection. @@ -86,8 +89,7 @@ class SseConnection extends StreamChannelMixin { Stream get stream => _incomingController.stream; void _acceptReconnection(Sink sink) { - _keepAliveTimer.cancel(); - _keepAliveTimer = null; + _keepAliveTimer?.cancel(); _sink = sink; _outgoingStreamSubscription.resume(); } @@ -96,10 +98,10 @@ class SseConnection extends StreamChannelMixin { if (_keepAlive == null) { // Close immediately if we're not keeping alive. _close(); - } else if (_keepAliveTimer == null) { - // Otherwise pause sending messages and set a timer to close after the - // timeout period. If the connection comes back, this will be unpaused - // and the timer cancelled. + } else if (!isInKeepAlivePeriod) { + // Otherwise if we didn't already have an active timer, pause sending + // messages and set a timer to close after the timeout period. If the + // connection comes back, this will be unpaused and the timer cancelled. _outgoingStreamSubscription.pause(); _keepAliveTimer = Timer(_keepAlive, _close); } @@ -148,7 +150,7 @@ class SseHandler { // Check if we already have a connection for this ID that is in the process // of timing out (in which case we can reconnect it transparently). if (_connections[clientId] != null && - _connections[clientId]._keepAliveTimer != null) { + _connections[clientId].isInKeepAlivePeriod) { _connections[clientId]._acceptReconnection(sink); } else { var connection = SseConnection(sink, keepAlive: _keepAlive); From 1437f08039506dda2292e0a556856acb58992417 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Mon, 23 Dec 2019 11:26:32 +0000 Subject: [PATCH 17/21] Add test for messages arriving in-order Curently fails with: ``` Expected: ['one', 'two'] Actual: ['two', 'one'] ``` --- test/sse_test.dart | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/test/sse_test.dart b/test/sse_test.dart index e6983a8..cc10a2d 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -188,6 +188,25 @@ void main() { connection.sink.add('bar'); expect(await connection.stream.first, 'bar'); }); + + test('Messages sent during disconnect arrive in-order', () async { + expect(handler.numberOfClients, 0); + await webdriver.get('http://localhost:${server.port}'); + var connection = await handler.connections.next; + expect(handler.numberOfClients, 1); + + // Close the underlying connection. + closeSink(connection); + connection.sink.add('one'); + connection.sink.add('two'); + await pumpEventQueue(); + + // Ensure there's still a connection. + expect(handler.numberOfClients, 1); + + // Ensure messages arrive in the same order + expect(await connection.stream.take(2).toList(), equals(['one', 'two'])); + }); }, timeout: const Timeout(Duration(seconds: 120))); } From 58fc9e528ac54aeb85aeba3054367926e456c3eb Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Mon, 23 Dec 2019 12:02:08 +0000 Subject: [PATCH 18/21] Use StreamQueue to keep messages in-order during dis/reconnect --- lib/src/sse_handler.dart | 63 ++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/lib/src/sse_handler.dart b/lib/src/sse_handler.dart index 735dcb9..c93082b 100644 --- a/lib/src/sse_handler.dart +++ b/lib/src/sse_handler.dart @@ -39,11 +39,6 @@ class SseConnection extends StreamChannelMixin { /// Whether this connection is currently in the KeepAlive timeout period. bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false; - /// The subscription that passes outgoing messages to the sink. - /// - /// This will be paused during the keepalive period and resumed upon reconnection. - StreamSubscription _outgoingStreamSubscription; - final _closedCompleter = Completer(); /// Creates an [SseConnection] for the supplied [_sink]. @@ -55,28 +50,41 @@ class SseConnection extends StreamChannelMixin { /// If [keepAlive] is not supplied, the connection will be closed immediately /// after a disconnect. SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { - _outgoingStreamSubscription = _outgoingController.stream.listen((data) { - if (!_closedCompleter.isCompleted) { - try { - // JSON encode the message to escape new lines. - _sink.add('data: ${json.encode(data)}\n'); - _sink.add('\n'); - } catch (StateError) { - if (_keepAlive == null) { - rethrow; - } - // If we got here then the sink may have closed but the stream.onDone - // hasn't fired yet, so pause the subscription, re-queue the message - // and handle the error as a disconnect. - _handleDisconnect(); - _outgoingController.add(data); - } - } - }); + unawaited(_setUpListener()); _outgoingController.onCancel = _close; _incomingController.onCancel = _close; } + Future _setUpListener() async { + var outgoingStreamQueue = StreamQueue(_outgoingController.stream); + while (await outgoingStreamQueue.hasNext) { + // If we're in a KeepAlive timeout, there's nowhere to send messages so + // wait a short period and check again. + if (isInKeepAlivePeriod) { + await Future.delayed(const Duration(milliseconds: 200)); + continue; + } + + // Peek the data so we don't remove it from the stream if we're unable to + // send it. + final data = await outgoingStreamQueue.peek; + try { + // JSON encode the message to escape new lines. + _sink.add('data: ${json.encode(data)}\n'); + _sink.add('\n'); + await outgoingStreamQueue.next; // Consume from stream if no errors. + } catch (StateError) { + if (_keepAlive == null) { + rethrow; + } + // If we got here then the sink may have closed but the stream.onDone + // hasn't fired yet, so pause the subscription and skip calling + // `next` so the message remains in the queue to try again. + _handleDisconnect(); + } + } + } + /// The message added to the sink has to be JSON encodable. @override StreamSink get sink => _outgoingController.sink; @@ -91,7 +99,6 @@ class SseConnection extends StreamChannelMixin { void _acceptReconnection(Sink sink) { _keepAliveTimer?.cancel(); _sink = sink; - _outgoingStreamSubscription.resume(); } void _handleDisconnect() { @@ -99,10 +106,9 @@ class SseConnection extends StreamChannelMixin { // Close immediately if we're not keeping alive. _close(); } else if (!isInKeepAlivePeriod) { - // Otherwise if we didn't already have an active timer, pause sending - // messages and set a timer to close after the timeout period. If the - // connection comes back, this will be unpaused and the timer cancelled. - _outgoingStreamSubscription.pause(); + // Otherwise if we didn't already have an active timer, set a timer to + // close after the timeout period. If the connection comes back, this will + // be cancelled and all messages left in the queue tried again. _keepAliveTimer = Timer(_keepAlive, _close); } } @@ -110,7 +116,6 @@ class SseConnection extends StreamChannelMixin { void _close() { if (!_closedCompleter.isCompleted) { _closedCompleter.complete(); - _outgoingStreamSubscription.cancel(); _sink.close(); if (!_outgoingController.isClosed) _outgoingController.close(); if (!_incomingController.isClosed) _incomingController.close(); From 611cee95765fef4e6acf3f88f88ea5450b73d1e3 Mon Sep 17 00:00:00 2001 From: Danny Tuppeny Date: Mon, 23 Dec 2019 12:35:07 +0000 Subject: [PATCH 19/21] After KeepAlive period elapses, rethrow exceptions --- lib/src/sse_handler.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/sse_handler.dart b/lib/src/sse_handler.dart index c93082b..e8cd523 100644 --- a/lib/src/sse_handler.dart +++ b/lib/src/sse_handler.dart @@ -74,7 +74,7 @@ class SseConnection extends StreamChannelMixin { _sink.add('\n'); await outgoingStreamQueue.next; // Consume from stream if no errors. } catch (StateError) { - if (_keepAlive == null) { + if (_keepAlive == null || _closedCompleter.isCompleted) { rethrow; } // If we got here then the sink may have closed but the stream.onDone From eac5b4ec3c4d154b626f378649c08d946e4ad604 Mon Sep 17 00:00:00 2001 From: Gary Roumanis Date: Mon, 6 Jan 2020 11:15:33 -0800 Subject: [PATCH 20/21] minor fixes and prep to publish --- .travis.yml | 2 +- CHANGELOG.md | 7 +++++++ lib/server/sse_handler.dart | 2 +- lib/src/{ => server}/sse_handler.dart | 0 pubspec.yaml | 4 ++-- test/sse_test.dart | 2 +- 6 files changed, 12 insertions(+), 5 deletions(-) rename lib/src/{ => server}/sse_handler.dart (100%) diff --git a/.travis.yml b/.travis.yml index e0371f6..1e2faca 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ addons: dart: - dev - - 2.1.0 + - 2.1.2 with_content_shell: false diff --git a/CHANGELOG.md b/CHANGELOG.md index f46643e..022f120 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 3.1.0 + +- Add optional `keepAlive` parameter to the `SseHandler`. If `keepAlive` is + supplied, the connection will remain active for this period after a + disconnect and can be reconnected transparently. If there is no reconnect + within that period, the connection will be closed normally. + ## 3.0.0 - Add retry logic. diff --git a/lib/server/sse_handler.dart b/lib/server/sse_handler.dart index 4a331e5..bfed935 100644 --- a/lib/server/sse_handler.dart +++ b/lib/server/sse_handler.dart @@ -2,4 +2,4 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. -export 'package:sse/src/sse_handler.dart' show SseConnection, SseHandler; +export 'package:sse/src/server/sse_handler.dart' show SseConnection, SseHandler; diff --git a/lib/src/sse_handler.dart b/lib/src/server/sse_handler.dart similarity index 100% rename from lib/src/sse_handler.dart rename to lib/src/server/sse_handler.dart diff --git a/pubspec.yaml b/pubspec.yaml index 33fc5fb..d1580d1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: sse -version: 3.0.0 +version: 3.1.0 author: Dart Team homepage: https://github.com/dart-lang/sse description: >- @@ -8,7 +8,7 @@ description: >- requests. environment: - sdk: ">=2.1.0 <3.0.0" + sdk: ">=2.1.2 <3.0.0" dependencies: async: ^2.0.8 diff --git a/test/sse_test.dart b/test/sse_test.dart index cc10a2d..0d4040f 100644 --- a/test/sse_test.dart +++ b/test/sse_test.dart @@ -10,7 +10,7 @@ import 'package:shelf/shelf.dart' as shelf; import 'package:shelf/shelf_io.dart' as io; import 'package:shelf_static/shelf_static.dart'; import 'package:sse/server/sse_handler.dart'; -import 'package:sse/src/sse_handler.dart' show closeSink; +import 'package:sse/src/server/sse_handler.dart' show closeSink; import 'package:test/test.dart'; import 'package:webdriver/io.dart'; From a57ed2c3460ec7e04f2b611d10c2728ab1937e46 Mon Sep 17 00:00:00 2001 From: Gary Roumanis Date: Mon, 6 Jan 2020 12:14:49 -0800 Subject: [PATCH 21/21] actual existing lower bound --- .travis.yml | 2 +- pubspec.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1e2faca..d6bce8f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ addons: dart: - dev - - 2.1.2 + - 2.2.0 with_content_shell: false diff --git a/pubspec.yaml b/pubspec.yaml index d1580d1..9c164f7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -8,7 +8,7 @@ description: >- requests. environment: - sdk: ">=2.1.2 <3.0.0" + sdk: ">=2.2.0 <3.0.0" dependencies: async: ^2.0.8