Skip to content

Commit

Permalink
fix channel still being connected when inactive
Browse files Browse the repository at this point in the history
  • Loading branch information
KammererTob committed Sep 11, 2023
1 parent eaba21b commit 95c5292
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
19 changes: 13 additions & 6 deletions lib/stomp_handler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class StompHandler {
late Parser _parser;
WebSocketChannel? _channel;
bool _connected = false;
bool _isActive = false;
int _currentReceiptIndex = 0;
int _currentSubscriptionIndex = 0;
DateTime _lastServerActivity = DateTime.now();
Expand All @@ -50,10 +51,17 @@ class StompHandler {
bool get connected => _connected;

void start() async {
_isActive = true;
try {
_channel = await platform.connect(config);
_channel!.stream.listen(_onData, onError: _onError, onDone: _onDone);
_connectToStomp();
// It can happen that dispose was called while the future above hasn't completed yet
// To prevent lingering connections we need to make sure that we disconnect cleanly
if (!_isActive) {
_cleanUp();
} else {
_channel!.stream.listen(_onData, onError: _onError, onDone: _onDone);
_connectToStomp();
}
} catch (err) {
_onError(err);
if (config.reconnectDelay.inMilliseconds == 0) {
Expand Down Expand Up @@ -237,8 +245,7 @@ class StompHandler {
_parser.escapeHeaders = false;
}

if (frame.headers['version'] != '1.0' &&
frame.headers.containsKey('heart-beat')) {
if (frame.headers['version'] != '1.0' && frame.headers.containsKey('heart-beat')) {
_setupHeartbeat(frame);
}

Expand Down Expand Up @@ -294,8 +301,7 @@ class StompHandler {
final ttl = max(config.heartbeatIncoming.inMilliseconds, serverOutgoing);
_heartbeatReceiver?.cancel();
_heartbeatReceiver = Timer.periodic(Duration(milliseconds: ttl), (_) {
final deltaMs = DateTime.now().millisecondsSinceEpoch -
_lastServerActivity.millisecondsSinceEpoch;
final deltaMs = DateTime.now().millisecondsSinceEpoch - _lastServerActivity.millisecondsSinceEpoch;
// The connection might be dead. Clean up.
if (deltaMs > (ttl * 2)) {
_cleanUp();
Expand All @@ -306,6 +312,7 @@ class StompHandler {

void _cleanUp() {
_connected = false;
_isActive = false;
_heartbeatSender?.cancel();
_heartbeatReceiver?.cancel();
_channel?.sink.close();
Expand Down
33 changes: 31 additions & 2 deletions test/stomp_handler_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ void main() {
handler!.dispose();
});

handler = StompHandler(config: config.copyWith(onConnect: onConnect))
..start();
handler = StompHandler(config: config.copyWith(onConnect: onConnect))..start();
});

test('disconnects correctly', () async {
Expand Down Expand Up @@ -136,6 +135,36 @@ void main() {
)..start();
});

test('aborts connection if disconnected while connecting', () async {
final onWebSocketDone = expectAsync0(() {}, count: 0);

final onDisconnect = expectAsync1(
(StompFrame frame) {},
count: 0,
);

final onConnect = expectAsync1(
(StompFrame frame) {},
count: 0,
);

final onError = expectAsync1((dynamic _) {}, count: 0);

handler = StompHandler(
config: config.copyWith(
onConnect: onConnect,
onDisconnect: onDisconnect,
onWebSocketDone: onWebSocketDone,
onStompError: onError,
onWebSocketError: onError,
),
)..start();

Future.microtask(() => handler?.dispose());

await Future.delayed(Duration(milliseconds: 200));
});

test('subscribes correctly', () {
final onSubscriptionFrame = expectAsync1((StompFrame frame) {
expect(frame.command, 'MESSAGE');
Expand Down

0 comments on commit 95c5292

Please sign in to comment.