Skip to content

Commit

Permalink
feat(functions_client): Add SSE support to invoke method (#905)
Browse files Browse the repository at this point in the history
* feat: Add SSE support to invoke method

* Add code sample in the comment docs

* Add test

* Update the comment on data of FunctionResponse and export ByteStream
  • Loading branch information
dshukertjr authored Apr 27, 2024
1 parent 5697e20 commit 2e05244
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 47 deletions.
2 changes: 2 additions & 0 deletions packages/functions_client/lib/functions_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
library functions_client;

export 'package:http/http.dart' show ByteStream;

export 'src/functions_client.dart';
export 'src/types.dart';
82 changes: 35 additions & 47 deletions packages/functions_client/lib/src/functions_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import 'dart:convert';
import 'package:functions_client/src/constants.dart';
import 'package:functions_client/src/types.dart';
import 'package:http/http.dart' as http;
import 'package:http/http.dart';
import 'package:yet_another_json_isolate/yet_another_json_isolate.dart';

class FunctionsClient {
Expand Down Expand Up @@ -44,6 +43,20 @@ class FunctionsClient {
/// [headers]: object representing the headers to send with the request
///
/// [body]: the body of the request
///
/// ```dart
/// // Call a standard function
/// final response = await supabase.functions.invoke('hello-world');
/// print(response.data);
///
/// // Listen to Server Sent Events
/// final response = await supabase.functions.invoke('sse-function');
/// response.data
/// .transform(const Utf8Decoder())
/// .listen((val) {
/// print(val);
/// });
/// ```
Future<FunctionResponse> invoke(
String functionName, {
Map<String, String>? headers,
Expand All @@ -52,67 +65,42 @@ class FunctionsClient {
}) async {
final bodyStr = body == null ? null : await _isolate.encode(body);

late final Response response;
final uri = Uri.parse('$_url/$functionName');

final finalHeaders = <String, String>{
..._headers,
if (headers != null) ...headers
};

switch (method) {
case HttpMethod.post:
response = await (_httpClient?.post ?? http.post)(
uri,
headers: finalHeaders,
body: bodyStr,
);
break;

case HttpMethod.get:
response = await (_httpClient?.get ?? http.get)(
uri,
headers: finalHeaders,
);
break;

case HttpMethod.put:
response = await (_httpClient?.put ?? http.put)(
uri,
headers: finalHeaders,
body: bodyStr,
);
break;

case HttpMethod.delete:
response = await (_httpClient?.delete ?? http.delete)(
uri,
headers: finalHeaders,
);
break;

case HttpMethod.patch:
response = await (_httpClient?.patch ?? http.patch)(
uri,
headers: finalHeaders,
body: bodyStr,
);
break;
}
final request = http.Request(method.name, uri);

finalHeaders.forEach((key, value) {
request.headers[key] = value;
});
if (bodyStr != null) request.body = bodyStr;
final response = await (_httpClient?.send(request) ?? request.send());
final responseType = (response.headers['Content-Type'] ??
response.headers['content-type'] ??
'text/plain')
.split(';')[0]
.trim();

final data = switch (responseType) {
'application/json' => response.bodyBytes.isEmpty
final dynamic data;

if (responseType == 'application/json') {
final bodyBytes = await response.stream.toBytes();
data = bodyBytes.isEmpty
? ""
: await _isolate.decode(utf8.decode(response.bodyBytes)),
'application/octet-stream' => response.bodyBytes,
_ => utf8.decode(response.bodyBytes),
};
: await _isolate.decode(utf8.decode(bodyBytes));
} else if (responseType == 'application/octet-stream') {
data = await response.stream.toBytes();
} else if (responseType == 'text/event-stream') {
data = response.stream;
} else {
final bodyBytes = await response.stream.toBytes();
data = utf8.decode(bodyBytes);
}

if (200 <= response.statusCode && response.statusCode < 300) {
return FunctionResponse(data: data, status: response.statusCode);
} else {
Expand Down
3 changes: 3 additions & 0 deletions packages/functions_client/lib/src/types.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import 'dart:convert';
import 'dart:typed_data';

import 'package:http/http.dart';

enum HttpMethod {
get,
post,
Expand All @@ -14,6 +16,7 @@ class FunctionResponse {
/// - 'text/plain': [String]
/// - 'octet/stream': [Uint8List]
/// - 'application/json': dynamic ([jsonDecode] is used)
/// - 'text/event-stream': [ByteStream]
final dynamic data;
final int status;

Expand Down
7 changes: 7 additions & 0 deletions packages/functions_client/test/custom_http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ class CustomHttpClient extends BaseClient {
"Content-Type": "application/json",
},
);
} else if (request.url.path.endsWith('sse')) {
return StreamedResponse(
Stream.fromIterable(['a', 'b', 'c'].map((e) => utf8.encode(e))), 200,
request: request,
headers: {
"Content-Type": "text/event-stream",
});
} else {
return StreamedResponse(
Stream.value(utf8.encode(jsonEncode({"key": "Hello World"}))),
Expand Down
11 changes: 11 additions & 0 deletions packages/functions_client/test/functions_dart_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:convert';

import 'package:functions_client/src/functions_client.dart';
import 'package:functions_client/src/types.dart';
import 'package:test/test.dart';
Expand Down Expand Up @@ -45,5 +47,14 @@ void main() {
final res = await client.invoke('function1');
expect(res.data, {'key': 'Hello World'});
});

test('Listen to SSE event', () async {
final res = await functionsCustomHttpClient.invoke('sse');
expect(
res.data.transform(const Utf8Decoder()),
emitsInOrder(
['a', 'b', 'c'],
));
});
});
}

0 comments on commit 2e05244

Please sign in to comment.