Skip to content

Commit

Permalink
add null-safety eventsource code
Browse files Browse the repository at this point in the history
  • Loading branch information
christian-rogobete committed Sep 1, 2021
1 parent cd9daa7 commit ef2408b
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## [1.2.1] - 01.Sep.2021.
- null-safety support

## [1.2.0] - 22.Aug.2021.
- update http package version
- update eventsource package version
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The Soneso open source Stellar SDK for Flutter is build with Dart and provides A
1. Add the dependency to your pubspec.yaml file:
```
dependencies:
stellar_flutter_sdk: ^1.2.0
stellar_flutter_sdk: ^1.2.1
```
2. Install it (command line or IDE):
```
Expand Down
67 changes: 67 additions & 0 deletions lib/src/eventsource/decoder.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import "dart:async";
import "dart:convert";
import "event.dart";

typedef RetryIndicator = void Function(Duration retry);

class EventSourceDecoder implements StreamTransformer<List<int>, Event> {
RetryIndicator? retryIndicator;

EventSourceDecoder({this.retryIndicator});

Stream<Event> bind(Stream<List<int>> stream) {
late StreamController<Event> controller;
controller = new StreamController(onListen: () {
// the event we are currently building
Event currentEvent = new Event();
// the regexes we will use later
RegExp lineRegex = new RegExp(r"^([^:]*)(?::)?(?: )?(.*)?$");
RegExp removeEndingNewlineRegex = new RegExp(r"^((?:.|\n)*)\n$");
// This stream will receive chunks of data that is not necessarily a
// single event. So we build events on the fly and broadcast the event as
// soon as we encounter a double newline, then we start a new one.
stream
.transform(new Utf8Decoder())
.transform(new LineSplitter())
.listen((String line) {
if (line.isEmpty) {
// event is done
// strip ending newline from data
if (currentEvent.data != null) {
var match = removeEndingNewlineRegex.firstMatch(currentEvent.data!);
currentEvent.data = match?.group(1);
}
controller.add(currentEvent);
currentEvent = new Event();
return;
}
// match the line prefix and the value using the regex
Match? match = lineRegex.firstMatch(line);
String? field = match?.group(1);
String? value = match?.group(2) ?? "";
if (field?.isEmpty == true) {
// lines starting with a colon are to be ignored
return;
}
switch (field) {
case "event":
currentEvent.event = value;
break;
case "data":
currentEvent.data = (currentEvent.data ?? "") + value + "\n";
break;
case "id":
currentEvent.id = value;
break;
case "retry":
retryIndicator?.call(new Duration(milliseconds: int.parse(value)));
break;
}
});
});
return controller.stream;
}

StreamTransformer<RS, RT> cast<RS, RT>() =>
StreamTransformer.castFrom<List<int>, Event, RS, RT>(this);
}
64 changes: 64 additions & 0 deletions lib/src/eventsource/encoder.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import "dart:convert";
import "dart:io";
import "event.dart";

class EventSourceEncoder extends Converter<Event, List<int>> {
final bool compressed;

const EventSourceEncoder({bool this.compressed: false});

static Map<String, Function> _fields = {
"id: ": (e) => e.id,
"event: ": (e) => e.event,
"data: ": (e) => e.data,
};

@override
List<int> convert(Event event) {
String payload = convertToString(event);
List<int> bytes = utf8.encode(payload);
if (compressed) {
bytes = gzip.encode(bytes);
}
return bytes;
}

String convertToString(Event event) {
String payload = "";
for (String prefix in _fields.keys) {
String? value = _fields[prefix]?.call(event);
if (value == null || value.isEmpty) {
continue;
}
// multi-line values need the field prefix on every line
value = value.replaceAll("\n", "\n$prefix");
payload += prefix + value + "\n";
}
payload += "\n";
return payload;
}

@override
Sink<Event> startChunkedConversion(Sink<List<int>> sink) {
Sink<dynamic> inputSink = sink;
if (compressed) {
inputSink =
gzip.encoder.startChunkedConversion(inputSink as Sink<List<int>>);
}
inputSink =
utf8.encoder.startChunkedConversion(inputSink as Sink<List<int>>);
return new ProxySink(
onAdd: (Event event) => inputSink.add(convertToString(event)),
onClose: () => inputSink.close());
}
}

class ProxySink<T> implements Sink<T> {
void Function(T) onAdd;
void Function() onClose;
ProxySink({required this.onAdd, required this.onClose});
@override
void add(t) => onAdd(t);
@override
void close() => onClose();
}
19 changes: 19 additions & 0 deletions lib/src/eventsource/event.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
class Event implements Comparable<Event> {
/// An identifier that can be used to allow a client to replay
/// missed Events by returning the Last-Event-Id header.
/// Return empty string if not required.
String? id;

/// The name of the event. Return empty string if not required.
String? event;

/// The payload of the event.
String? data;

Event({this.id, this.event, this.data});

Event.message({this.id, this.data}) : event = "message";

@override
int compareTo(Event other) => id!.compareTo(other.id!);
}
152 changes: 152 additions & 0 deletions lib/src/eventsource/eventsource.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import "event.dart";
import "decoder.dart";
import "dart:async";
import "dart:convert";
import 'package:http/http.dart' as http;
import "package:http/src/utils.dart" show encodingForCharset;
import "package:http_parser/http_parser.dart" show MediaType;

export "event.dart";

enum EventSourceReadyState {
CONNECTING,
OPEN,
CLOSED,
}

class EventSourceSubscriptionException extends Event implements Exception {
int statusCode;
String message;

@override
String get data => "$statusCode: $message";

EventSourceSubscriptionException(this.statusCode, this.message)
: super(event: "error");
}

/// An EventSource client that exposes a [Stream] of [Event]s.
class EventSource extends Stream<Event> {
// interface attributes

final Uri url;
final Map<String, String>? headers;

EventSourceReadyState get readyState => _readyState;

Stream<Event> get onOpen => this.where((e) => e.event == "open");
Stream<Event> get onMessage => this.where((e) => e.event == "message");
Stream<Event> get onError => this.where((e) => e.event == "error");

// internal attributes

StreamController<Event> _streamController =
new StreamController<Event>.broadcast();

EventSourceReadyState _readyState = EventSourceReadyState.CLOSED;

http.Client client;
Duration _retryDelay = const Duration(milliseconds: 3000);
String? _lastEventId;
late EventSourceDecoder _decoder;
String _body;
String _method;

/// Create a new EventSource by connecting to the specified url.
static Future<EventSource> connect(url,
{http.Client? client,
String? lastEventId,
Map<String, String>? headers,
String? body,
String? method}) async {
// parameter initialization
url = url is Uri ? url : Uri.parse(url);
client = client ?? new http.Client();
body = body ?? "";
method = method ?? "GET";
EventSource es = new EventSource._internal(
url, client, lastEventId, headers, body, method);
await es._start();
return es;
}

EventSource._internal(this.url, this.client, this._lastEventId, this.headers,
this._body, this._method) {
_decoder = new EventSourceDecoder(retryIndicator: _updateRetryDelay);
}

// proxy the listen call to the controller's listen call
@override
StreamSubscription<Event> listen(void onData(Event event)?,
{Function? onError, void onDone()?, bool? cancelOnError}) =>
_streamController.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);

/// Attempt to start a new connection.
Future _start() async {
_readyState = EventSourceReadyState.CONNECTING;
var request = new http.Request(_method, url);
request.headers["Cache-Control"] = "no-cache";
request.headers["Accept"] = "text/event-stream";
if (_lastEventId?.isNotEmpty == true) {
request.headers["Last-Event-ID"] = _lastEventId!;
}
headers?.forEach((k, v) {
request.headers[k] = v;
});
request.body = _body;

var response = await client.send(request);
if (response.statusCode != 200) {
// server returned an error
var bodyBytes = await response.stream.toBytes();
String body = _encodingForHeaders(response.headers).decode(bodyBytes);
throw new EventSourceSubscriptionException(response.statusCode, body);
}
_readyState = EventSourceReadyState.OPEN;
// start streaming the data
response.stream.transform(_decoder).listen((Event event) {
_streamController.add(event);
_lastEventId = event.id;
},
cancelOnError: true,
onError: _retry,
onDone: () => _readyState = EventSourceReadyState.CLOSED);
}

/// Retries until a new connection is established. Uses exponential backoff.
Future _retry(dynamic e) async {
_readyState = EventSourceReadyState.CONNECTING;
// try reopening with exponential backoff
Duration backoff = _retryDelay;
while (true) {
await new Future.delayed(backoff);
try {
await _start();
break;
} catch (error) {
_streamController.addError(error);
backoff *= 2;
}
}
}

void _updateRetryDelay(Duration retry) {
_retryDelay = retry;
}
}

/// Returns the encoding to use for a response with the given headers. This
/// defaults to [LATIN1] if the headers don't specify a charset or
/// if that charset is unknown.
Encoding _encodingForHeaders(Map<String, String> headers) =>
encodingForCharset(_contentTypeForHeaders(headers).parameters['charset']);

/// Returns the [MediaType] object for the given headers's content-type.
///
/// Defaults to `application/octet-stream`.
MediaType _contentTypeForHeaders(Map<String, String> headers) {
var contentType = headers['content-type'];
if (contentType != null) return new MediaType.parse(contentType);
return new MediaType("application", "octet-stream");
}
2 changes: 1 addition & 1 deletion lib/src/requests/accounts_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a license that can be
// found in the LICENSE file.

import "package:eventsource/eventsource.dart";
import "../eventsource/eventsource.dart";
import 'package:http/http.dart' as http;
import 'dart:async';
import 'dart:convert';
Expand Down
2 changes: 1 addition & 1 deletion lib/src/requests/effects_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a license that can be
// found in the LICENSE file.

import "package:eventsource/eventsource.dart";
import "../eventsource/eventsource.dart";
import 'package:http/http.dart' as http;
import 'dart:async';
import 'dart:convert';
Expand Down
2 changes: 1 addition & 1 deletion lib/src/requests/ledgers_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a license that can be
// found in the LICENSE file.

import "package:eventsource/eventsource.dart";
import "../eventsource/eventsource.dart";
import 'package:http/http.dart' as http;
import 'dart:async';
import 'dart:convert';
Expand Down
2 changes: 1 addition & 1 deletion lib/src/requests/offers_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import '../responses/response.dart';
import '../responses/offer_response.dart';
import '../util.dart';
import '../assets.dart';
import "package:eventsource/eventsource.dart";
import "../eventsource/eventsource.dart";
import 'dart:convert';

/// Builds requests connected to offers. Offers are statements about how much of an asset an account wants to buy or sell.
Expand Down
2 changes: 1 addition & 1 deletion lib/src/requests/operations_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a license that can be
// found in the LICENSE file.

import "package:eventsource/eventsource.dart";
import "../eventsource/eventsource.dart";
import 'package:http/http.dart' as http;
import 'dart:async';
import 'dart:convert';
Expand Down
2 changes: 1 addition & 1 deletion lib/src/requests/order_book_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a license that can be
// found in the LICENSE file.

import "package:eventsource/eventsource.dart";
import "../eventsource/eventsource.dart";
import 'package:http/http.dart' as http;
import 'dart:async';
import 'dart:convert';
Expand Down
2 changes: 1 addition & 1 deletion lib/src/requests/payments_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a license that can be
// found in the LICENSE file.

import "package:eventsource/eventsource.dart";
import "../eventsource/eventsource.dart";
import 'package:http/http.dart' as http;
import 'dart:async';
import 'dart:convert';
Expand Down
2 changes: 1 addition & 1 deletion lib/src/requests/trades_request_builder.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import '../responses/response.dart';
import 'request_builder.dart';
import '../responses/trade_response.dart';
import '../util.dart';
import "package:eventsource/eventsource.dart";
import "../eventsource/eventsource.dart";
import 'dart:convert';

/// Builds requests connected to trades. When an offer is fully or partially fulfilled, a trade happens. Trades can also be caused by successful path payments, because path payments involve fulfilling offers. A trade occurs between two parties—base and counter. Which is which is either arbitrary or determined by the calling query.
Expand Down
Loading

0 comments on commit ef2408b

Please sign in to comment.