From ef2408b5cfd9276aa6a15f5c44c24db9f7a7b2ea Mon Sep 17 00:00:00 2001 From: Christian Rogobete Date: Wed, 1 Sep 2021 23:09:44 +0200 Subject: [PATCH] add null-safety eventsource code --- CHANGELOG.md | 3 + README.md | 2 +- lib/src/eventsource/decoder.dart | 67 ++++++++ lib/src/eventsource/encoder.dart | 64 ++++++++ lib/src/eventsource/event.dart | 19 +++ lib/src/eventsource/eventsource.dart | 152 ++++++++++++++++++ .../requests/accounts_request_builder.dart | 2 +- lib/src/requests/effects_request_builder.dart | 2 +- lib/src/requests/ledgers_request_builder.dart | 2 +- lib/src/requests/offers_request_builder.dart | 2 +- .../requests/operations_request_builder.dart | 2 +- .../requests/order_book_request_builder.dart | 2 +- .../requests/payments_request_builder.dart | 2 +- lib/src/requests/trades_request_builder.dart | 2 +- .../transactions_request_builder.dart | 2 +- pubspec.yaml | 11 +- 16 files changed, 321 insertions(+), 15 deletions(-) create mode 100644 lib/src/eventsource/decoder.dart create mode 100644 lib/src/eventsource/encoder.dart create mode 100644 lib/src/eventsource/event.dart create mode 100644 lib/src/eventsource/eventsource.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index f6afca5..56ddeb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## [1.2.1] - 01.Sep.2021. +- null-safety support + ## [1.2.0] - 22.Aug.2021. - update http package version - update eventsource package version diff --git a/README.md b/README.md index ff2c3ca..c510284 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ The Soneso open source Stellar SDK for Flutter is build with Dart and provides A 1. Add the dependency to your pubspec.yaml file: ``` dependencies: - stellar_flutter_sdk: ^1.2.0 + stellar_flutter_sdk: ^1.2.1 ``` 2. Install it (command line or IDE): ``` diff --git a/lib/src/eventsource/decoder.dart b/lib/src/eventsource/decoder.dart new file mode 100644 index 0000000..ff78ee5 --- /dev/null +++ b/lib/src/eventsource/decoder.dart @@ -0,0 +1,67 @@ +import "dart:async"; +import "dart:convert"; +import "event.dart"; + +typedef RetryIndicator = void Function(Duration retry); + +class EventSourceDecoder implements StreamTransformer, Event> { + RetryIndicator? retryIndicator; + + EventSourceDecoder({this.retryIndicator}); + + Stream bind(Stream> stream) { + late StreamController controller; + controller = new StreamController(onListen: () { + // the event we are currently building + Event currentEvent = new Event(); + // the regexes we will use later + RegExp lineRegex = new RegExp(r"^([^:]*)(?::)?(?: )?(.*)?$"); + RegExp removeEndingNewlineRegex = new RegExp(r"^((?:.|\n)*)\n$"); + // This stream will receive chunks of data that is not necessarily a + // single event. So we build events on the fly and broadcast the event as + // soon as we encounter a double newline, then we start a new one. + stream + .transform(new Utf8Decoder()) + .transform(new LineSplitter()) + .listen((String line) { + if (line.isEmpty) { + // event is done + // strip ending newline from data + if (currentEvent.data != null) { + var match = removeEndingNewlineRegex.firstMatch(currentEvent.data!); + currentEvent.data = match?.group(1); + } + controller.add(currentEvent); + currentEvent = new Event(); + return; + } + // match the line prefix and the value using the regex + Match? match = lineRegex.firstMatch(line); + String? field = match?.group(1); + String? value = match?.group(2) ?? ""; + if (field?.isEmpty == true) { + // lines starting with a colon are to be ignored + return; + } + switch (field) { + case "event": + currentEvent.event = value; + break; + case "data": + currentEvent.data = (currentEvent.data ?? "") + value + "\n"; + break; + case "id": + currentEvent.id = value; + break; + case "retry": + retryIndicator?.call(new Duration(milliseconds: int.parse(value))); + break; + } + }); + }); + return controller.stream; + } + + StreamTransformer cast() => + StreamTransformer.castFrom, Event, RS, RT>(this); +} diff --git a/lib/src/eventsource/encoder.dart b/lib/src/eventsource/encoder.dart new file mode 100644 index 0000000..e951c87 --- /dev/null +++ b/lib/src/eventsource/encoder.dart @@ -0,0 +1,64 @@ +import "dart:convert"; +import "dart:io"; +import "event.dart"; + +class EventSourceEncoder extends Converter> { + final bool compressed; + + const EventSourceEncoder({bool this.compressed: false}); + + static Map _fields = { + "id: ": (e) => e.id, + "event: ": (e) => e.event, + "data: ": (e) => e.data, + }; + + @override + List convert(Event event) { + String payload = convertToString(event); + List bytes = utf8.encode(payload); + if (compressed) { + bytes = gzip.encode(bytes); + } + return bytes; + } + + String convertToString(Event event) { + String payload = ""; + for (String prefix in _fields.keys) { + String? value = _fields[prefix]?.call(event); + if (value == null || value.isEmpty) { + continue; + } + // multi-line values need the field prefix on every line + value = value.replaceAll("\n", "\n$prefix"); + payload += prefix + value + "\n"; + } + payload += "\n"; + return payload; + } + + @override + Sink startChunkedConversion(Sink> sink) { + Sink inputSink = sink; + if (compressed) { + inputSink = + gzip.encoder.startChunkedConversion(inputSink as Sink>); + } + inputSink = + utf8.encoder.startChunkedConversion(inputSink as Sink>); + return new ProxySink( + onAdd: (Event event) => inputSink.add(convertToString(event)), + onClose: () => inputSink.close()); + } +} + +class ProxySink implements Sink { + void Function(T) onAdd; + void Function() onClose; + ProxySink({required this.onAdd, required this.onClose}); + @override + void add(t) => onAdd(t); + @override + void close() => onClose(); +} diff --git a/lib/src/eventsource/event.dart b/lib/src/eventsource/event.dart new file mode 100644 index 0000000..d7ebb98 --- /dev/null +++ b/lib/src/eventsource/event.dart @@ -0,0 +1,19 @@ +class Event implements Comparable { + /// An identifier that can be used to allow a client to replay + /// missed Events by returning the Last-Event-Id header. + /// Return empty string if not required. + String? id; + + /// The name of the event. Return empty string if not required. + String? event; + + /// The payload of the event. + String? data; + + Event({this.id, this.event, this.data}); + + Event.message({this.id, this.data}) : event = "message"; + + @override + int compareTo(Event other) => id!.compareTo(other.id!); +} diff --git a/lib/src/eventsource/eventsource.dart b/lib/src/eventsource/eventsource.dart new file mode 100644 index 0000000..08adcd1 --- /dev/null +++ b/lib/src/eventsource/eventsource.dart @@ -0,0 +1,152 @@ +import "event.dart"; +import "decoder.dart"; +import "dart:async"; +import "dart:convert"; +import 'package:http/http.dart' as http; +import "package:http/src/utils.dart" show encodingForCharset; +import "package:http_parser/http_parser.dart" show MediaType; + +export "event.dart"; + +enum EventSourceReadyState { + CONNECTING, + OPEN, + CLOSED, +} + +class EventSourceSubscriptionException extends Event implements Exception { + int statusCode; + String message; + + @override + String get data => "$statusCode: $message"; + + EventSourceSubscriptionException(this.statusCode, this.message) + : super(event: "error"); +} + +/// An EventSource client that exposes a [Stream] of [Event]s. +class EventSource extends Stream { + // interface attributes + + final Uri url; + final Map? headers; + + EventSourceReadyState get readyState => _readyState; + + Stream get onOpen => this.where((e) => e.event == "open"); + Stream get onMessage => this.where((e) => e.event == "message"); + Stream get onError => this.where((e) => e.event == "error"); + + // internal attributes + + StreamController _streamController = + new StreamController.broadcast(); + + EventSourceReadyState _readyState = EventSourceReadyState.CLOSED; + + http.Client client; + Duration _retryDelay = const Duration(milliseconds: 3000); + String? _lastEventId; + late EventSourceDecoder _decoder; + String _body; + String _method; + + /// Create a new EventSource by connecting to the specified url. + static Future connect(url, + {http.Client? client, + String? lastEventId, + Map? headers, + String? body, + String? method}) async { + // parameter initialization + url = url is Uri ? url : Uri.parse(url); + client = client ?? new http.Client(); + body = body ?? ""; + method = method ?? "GET"; + EventSource es = new EventSource._internal( + url, client, lastEventId, headers, body, method); + await es._start(); + return es; + } + + EventSource._internal(this.url, this.client, this._lastEventId, this.headers, + this._body, this._method) { + _decoder = new EventSourceDecoder(retryIndicator: _updateRetryDelay); + } + + // proxy the listen call to the controller's listen call + @override + StreamSubscription listen(void onData(Event event)?, + {Function? onError, void onDone()?, bool? cancelOnError}) => + _streamController.stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + + /// Attempt to start a new connection. + Future _start() async { + _readyState = EventSourceReadyState.CONNECTING; + var request = new http.Request(_method, url); + request.headers["Cache-Control"] = "no-cache"; + request.headers["Accept"] = "text/event-stream"; + if (_lastEventId?.isNotEmpty == true) { + request.headers["Last-Event-ID"] = _lastEventId!; + } + headers?.forEach((k, v) { + request.headers[k] = v; + }); + request.body = _body; + + var response = await client.send(request); + if (response.statusCode != 200) { + // server returned an error + var bodyBytes = await response.stream.toBytes(); + String body = _encodingForHeaders(response.headers).decode(bodyBytes); + throw new EventSourceSubscriptionException(response.statusCode, body); + } + _readyState = EventSourceReadyState.OPEN; + // start streaming the data + response.stream.transform(_decoder).listen((Event event) { + _streamController.add(event); + _lastEventId = event.id; + }, + cancelOnError: true, + onError: _retry, + onDone: () => _readyState = EventSourceReadyState.CLOSED); + } + + /// Retries until a new connection is established. Uses exponential backoff. + Future _retry(dynamic e) async { + _readyState = EventSourceReadyState.CONNECTING; + // try reopening with exponential backoff + Duration backoff = _retryDelay; + while (true) { + await new Future.delayed(backoff); + try { + await _start(); + break; + } catch (error) { + _streamController.addError(error); + backoff *= 2; + } + } + } + + void _updateRetryDelay(Duration retry) { + _retryDelay = retry; + } +} + +/// Returns the encoding to use for a response with the given headers. This +/// defaults to [LATIN1] if the headers don't specify a charset or +/// if that charset is unknown. +Encoding _encodingForHeaders(Map headers) => + encodingForCharset(_contentTypeForHeaders(headers).parameters['charset']); + +/// Returns the [MediaType] object for the given headers's content-type. +/// +/// Defaults to `application/octet-stream`. +MediaType _contentTypeForHeaders(Map headers) { + var contentType = headers['content-type']; + if (contentType != null) return new MediaType.parse(contentType); + return new MediaType("application", "octet-stream"); +} diff --git a/lib/src/requests/accounts_request_builder.dart b/lib/src/requests/accounts_request_builder.dart index 4de451a..95c59d1 100644 --- a/lib/src/requests/accounts_request_builder.dart +++ b/lib/src/requests/accounts_request_builder.dart @@ -2,7 +2,7 @@ // Use of this source code is governed by a license that can be // found in the LICENSE file. -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'package:http/http.dart' as http; import 'dart:async'; import 'dart:convert'; diff --git a/lib/src/requests/effects_request_builder.dart b/lib/src/requests/effects_request_builder.dart index 6420729..3bc8e79 100644 --- a/lib/src/requests/effects_request_builder.dart +++ b/lib/src/requests/effects_request_builder.dart @@ -2,7 +2,7 @@ // Use of this source code is governed by a license that can be // found in the LICENSE file. -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'package:http/http.dart' as http; import 'dart:async'; import 'dart:convert'; diff --git a/lib/src/requests/ledgers_request_builder.dart b/lib/src/requests/ledgers_request_builder.dart index b875bbf..1b96ae3 100644 --- a/lib/src/requests/ledgers_request_builder.dart +++ b/lib/src/requests/ledgers_request_builder.dart @@ -2,7 +2,7 @@ // Use of this source code is governed by a license that can be // found in the LICENSE file. -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'package:http/http.dart' as http; import 'dart:async'; import 'dart:convert'; diff --git a/lib/src/requests/offers_request_builder.dart b/lib/src/requests/offers_request_builder.dart index c33ef46..a965c12 100644 --- a/lib/src/requests/offers_request_builder.dart +++ b/lib/src/requests/offers_request_builder.dart @@ -9,7 +9,7 @@ import '../responses/response.dart'; import '../responses/offer_response.dart'; import '../util.dart'; import '../assets.dart'; -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'dart:convert'; /// Builds requests connected to offers. Offers are statements about how much of an asset an account wants to buy or sell. diff --git a/lib/src/requests/operations_request_builder.dart b/lib/src/requests/operations_request_builder.dart index af03400..ac83739 100644 --- a/lib/src/requests/operations_request_builder.dart +++ b/lib/src/requests/operations_request_builder.dart @@ -2,7 +2,7 @@ // Use of this source code is governed by a license that can be // found in the LICENSE file. -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'package:http/http.dart' as http; import 'dart:async'; import 'dart:convert'; diff --git a/lib/src/requests/order_book_request_builder.dart b/lib/src/requests/order_book_request_builder.dart index a1aa4f7..0057edc 100644 --- a/lib/src/requests/order_book_request_builder.dart +++ b/lib/src/requests/order_book_request_builder.dart @@ -2,7 +2,7 @@ // Use of this source code is governed by a license that can be // found in the LICENSE file. -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'package:http/http.dart' as http; import 'dart:async'; import 'dart:convert'; diff --git a/lib/src/requests/payments_request_builder.dart b/lib/src/requests/payments_request_builder.dart index 2a39a86..13a7470 100644 --- a/lib/src/requests/payments_request_builder.dart +++ b/lib/src/requests/payments_request_builder.dart @@ -2,7 +2,7 @@ // Use of this source code is governed by a license that can be // found in the LICENSE file. -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'package:http/http.dart' as http; import 'dart:async'; import 'dart:convert'; diff --git a/lib/src/requests/trades_request_builder.dart b/lib/src/requests/trades_request_builder.dart index 7dbc5b0..57e8f10 100644 --- a/lib/src/requests/trades_request_builder.dart +++ b/lib/src/requests/trades_request_builder.dart @@ -10,7 +10,7 @@ import '../responses/response.dart'; import 'request_builder.dart'; import '../responses/trade_response.dart'; import '../util.dart'; -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'dart:convert'; /// Builds requests connected to trades. When an offer is fully or partially fulfilled, a trade happens. Trades can also be caused by successful path payments, because path payments involve fulfilling offers. A trade occurs between two parties—base and counter. Which is which is either arbitrary or determined by the calling query. diff --git a/lib/src/requests/transactions_request_builder.dart b/lib/src/requests/transactions_request_builder.dart index ed23ee0..72b342f 100644 --- a/lib/src/requests/transactions_request_builder.dart +++ b/lib/src/requests/transactions_request_builder.dart @@ -2,7 +2,7 @@ // Use of this source code is governed by a license that can be // found in the LICENSE file. -import "package:eventsource/eventsource.dart"; +import "../eventsource/eventsource.dart"; import 'package:http/http.dart' as http; import 'request_builder.dart'; import 'dart:async'; diff --git a/pubspec.yaml b/pubspec.yaml index 636ef87..ada8f52 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,6 @@ name: stellar_flutter_sdk description: A stellar blockchain sdk that query's horizon, build, signs and submits transactions to the stellar network. -version: 1.2.0 +version: 1.2.1 homepage: https://github.com/Soneso/stellar_flutter_sdk environment: @@ -21,10 +21,11 @@ dependencies: pinenacl: ^0.3.3 convert: ^3.0.1 # eventsource: ^0.3.0 - eventsource: - git: - url: git://github.com/mehcode/dart-eventsource.git - ref: ecda0fffeefc4ab0343e1a0c65ad5f6548ea0273 +# eventsource: +# git: +# url: git://github.com/mehcode/dart-eventsource.git +# ref: ecda0fffeefc4ab0343e1a0c65ad5f6548ea0273 + http_parser: ^4.0.0 dev_dependencies: flutter_test: