Skip to content

Commit

Permalink
Improved streams in the request builders
Browse files Browse the repository at this point in the history
  • Loading branch information
jopmiddelkamp committed Apr 6, 2024
1 parent aa5cb3d commit 43070e4
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 196 deletions.
34 changes: 25 additions & 9 deletions lib/src/eventsource/eventsource.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import "event.dart";
import "decoder.dart";
import "dart:async";
import "dart:convert";

import 'package:http/http.dart' as http;
import "package:http/src/utils.dart" show encodingForCharset;
import "package:http_parser/http_parser.dart" show MediaType;

import "decoder.dart";
import "event.dart";

export "event.dart";

enum EventSourceReadyState {
Expand Down Expand Up @@ -52,13 +54,17 @@ class EventSource extends Stream<Event> {
String _body;
String _method;

StreamSubscription? _responseStreamSubscription;

/// Create a new EventSource by connecting to the specified url.
static Future<EventSource> connect(url,
{http.Client? client,
String? lastEventId,
Map<String, String>? headers,
String? body,
String? method}) async {
static Future<EventSource> connect(
url, {
http.Client? client,
String? lastEventId,
Map<String, String>? headers,
String? body,
String? method,
}) async {
// parameter initialization
url = url is Uri ? url : Uri.parse(url);
client = client ?? new http.Client();
Expand Down Expand Up @@ -105,7 +111,11 @@ class EventSource extends Stream<Event> {
}
_readyState = EventSourceReadyState.OPEN;
// start streaming the data
response.stream.transform(_decoder).listen((Event event) {
_responseStreamSubscription = response.stream.transform(_decoder).listen(
(Event event) {
if (_readyState == EventSourceReadyState.CLOSED) {
return;
}
_streamController.add(event);
_lastEventId = event.id;
},
Expand Down Expand Up @@ -134,6 +144,12 @@ class EventSource extends Stream<Event> {
void _updateRetryDelay(Duration retry) {
_retryDelay = retry;
}

void close() {
_responseStreamSubscription?.cancel();
_readyState = EventSourceReadyState.CLOSED;
_streamController.close();
}
}

/// Returns the encoding to use for a response with the given headers. This
Expand Down
59 changes: 38 additions & 21 deletions lib/src/requests/accounts_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -100,34 +100,51 @@ class AccountsRequestBuilder extends RequestBuilder {
/// See: <a href="https://developers.stellar.org/api/introduction/streaming/" target="_blank">Streaming</a>
Stream<AccountResponse> stream() {
StreamController<AccountResponse> listener = StreamController.broadcast();

bool cancelled = false;
listener.onCancel = () {
cancelled = true;
};
void createNewEventSource() {
EventSource? source;

Future<void> createNewEventSource() async {
if (cancelled) {
return;
}
EventSource.connect(this.buildUri()).then((eventSource) {
eventSource.listen((Event event) {
if (cancelled) {
return null;
}
if (event.data == "\"hello\"") {
return null;
}
if (event.event == "close") {
createNewEventSource();
return null;
}
AccountResponse accountResponse =
AccountResponse.fromJson(json.decode(event.data!));
listener.add(accountResponse);
});
source?.close();
source = await EventSource.connect(this.buildUri());
source!.listen((Event event) async {
if (cancelled) {
return null;
}
if (event.event == "open") {
return null;
}
if (event.event == "close") {
// Reconnect on close to stream infinitely
createNewEventSource();
return null;
}
try {
AccountResponse operationResponse = AccountResponse.fromJson(
json.decode(event.data!),
);
listener.add(operationResponse);
} catch (e, stackTrace) {
listener.addError(e, stackTrace);
createNewEventSource();
}
});
}

createNewEventSource();
listener.onListen = () {
cancelled = false;
createNewEventSource();
};
listener.onCancel = () {
if (!listener.hasListener) {
cancelled = true;
source?.close();
}
};

return listener.stream;
}

Expand Down
59 changes: 38 additions & 21 deletions lib/src/requests/effects_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,34 +73,51 @@ class EffectsRequestBuilder extends RequestBuilder {
/// See: <a href="https://developers.stellar.org/api/introduction/streaming/" target="_blank">Streaming</a>
Stream<EffectResponse> stream() {
StreamController<EffectResponse> listener = StreamController.broadcast();

bool cancelled = false;
listener.onCancel = () {
cancelled = true;
};
void createNewEventSource() {
EventSource? source;

Future<void> createNewEventSource() async {
if (cancelled) {
return;
}
EventSource.connect(this.buildUri()).then((eventSource) {
eventSource.listen((Event event) {
if (cancelled) {
return null;
}
if (event.data == "\"hello\"") {
return null;
}
if (event.event == "close") {
createNewEventSource();
return null;
}
EffectResponse effectResponse =
EffectResponse.fromJson(json.decode(event.data!));
listener.add(effectResponse);
});
source?.close();
source = await EventSource.connect(this.buildUri());
source!.listen((Event event) async {
if (cancelled) {
return null;
}
if (event.event == "open") {
return null;
}
if (event.event == "close") {
// Reconnect on close to stream infinitely
createNewEventSource();
return null;
}
try {
EffectResponse operationResponse = EffectResponse.fromJson(
json.decode(event.data!),
);
listener.add(operationResponse);
} catch (e, stackTrace) {
listener.addError(e, stackTrace);
createNewEventSource();
}
});
}

createNewEventSource();
listener.onListen = () {
cancelled = false;
createNewEventSource();
};
listener.onCancel = () {
if (!listener.hasListener) {
cancelled = true;
source?.close();
}
};

return listener.stream;
}

Expand Down
59 changes: 38 additions & 21 deletions lib/src/requests/ledgers_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,34 +62,51 @@ class LedgersRequestBuilder extends RequestBuilder {
/// See: <a href="https://developers.stellar.org/api/introduction/streaming/" target="_blank">Streaming</a>
Stream<LedgerResponse> stream() {
StreamController<LedgerResponse> listener = StreamController.broadcast();

bool cancelled = false;
listener.onCancel = () {
cancelled = true;
};
void createNewEventSource() {
EventSource? source;

Future<void> createNewEventSource() async {
if (cancelled) {
return;
}
EventSource.connect(this.buildUri()).then((eventSource) {
eventSource.listen((Event event) {
if (cancelled) {
return null;
}
if (event.data == "\"hello\"") {
return null;
}
if (event.event == "close") {
createNewEventSource();
return null;
}
LedgerResponse ledgerResponse =
LedgerResponse.fromJson(json.decode(event.data!));
listener.add(ledgerResponse);
});
source?.close();
source = await EventSource.connect(this.buildUri());
source!.listen((Event event) async {
if (cancelled) {
return null;
}
if (event.event == "open") {
return null;
}
if (event.event == "close") {
// Reconnect on close to stream infinitely
createNewEventSource();
return null;
}
try {
LedgerResponse operationResponse = LedgerResponse.fromJson(
json.decode(event.data!),
);
listener.add(operationResponse);
} catch (e, stackTrace) {
listener.addError(e, stackTrace);
createNewEventSource();
}
});
}

createNewEventSource();
listener.onListen = () {
cancelled = false;
createNewEventSource();
};
listener.onCancel = () {
if (!listener.hasListener) {
cancelled = true;
source?.close();
}
};

return listener.stream;
}

Expand Down
59 changes: 38 additions & 21 deletions lib/src/requests/offers_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -110,34 +110,51 @@ class OffersRequestBuilder extends RequestBuilder {
/// See: <a href="https://developers.stellar.org/api/introduction/streaming/" target="_blank">Streaming</a>
Stream<OfferResponse> stream() {
StreamController<OfferResponse> listener = StreamController.broadcast();

bool cancelled = false;
listener.onCancel = () {
cancelled = true;
};
void createNewEventSource() {
EventSource? source;

Future<void> createNewEventSource() async {
if (cancelled) {
return;
}
EventSource.connect(this.buildUri()).then((eventSource) {
eventSource.listen((Event event) {
if (cancelled) {
return null;
}
if (event.data == "\"hello\"") {
return null;
}
if (event.event == "close") {
createNewEventSource();
return null;
}
OfferResponse offerResponse =
OfferResponse.fromJson(json.decode(event.data!));
listener.add(offerResponse);
});
source?.close();
source = await EventSource.connect(this.buildUri());
source!.listen((Event event) async {
if (cancelled) {
return null;
}
if (event.event == "open") {
return null;
}
if (event.event == "close") {
// Reconnect on close to stream infinitely
createNewEventSource();
return null;
}
try {
OfferResponse operationResponse = OfferResponse.fromJson(
json.decode(event.data!),
);
listener.add(operationResponse);
} catch (e, stackTrace) {
listener.addError(e, stackTrace);
createNewEventSource();
}
});
}

createNewEventSource();
listener.onListen = () {
cancelled = false;
createNewEventSource();
};
listener.onCancel = () {
if (!listener.hasListener) {
cancelled = true;
source?.close();
}
};

return listener.stream;
}

Expand Down
Loading

0 comments on commit 43070e4

Please sign in to comment.