Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SSE response on functions client. #894

Closed
dshukertjr opened this issue Apr 19, 2024 · 29 comments · Fixed by #905
Closed

Add support for SSE response on functions client. #894

dshukertjr opened this issue Apr 19, 2024 · 29 comments · Fixed by #905
Labels
enhancement New feature or request functions This issue or pull request is related to functions

Comments

@dshukertjr
Copy link
Member

dshukertjr commented Apr 19, 2024

Is your feature request related to a problem? Please describe.
Yes. Currently, there seems to be no possibility to invoke an Edge Function that returns a server-sent events using the client library.

Describe the solution you'd like
I would like to see the implementation of an additional function, which could return a Stream of server-sent events instead of standard promise.

Describe alternatives you've considered
One alternative I considered was to simply await the full result. However, this would leave the user waiting for a long period, which is not an ideal user experience.

Additional context
The motivation is a project, where we are looking to implement a ChatGPT interface within our Flutter application, and we need to stream the response of the API call to OpenAI to the user in real-time.

Implementing this functionality on the backend as an Edge Function was straightforward enough - I followed the tutorial on Streaming Data in Edge Functions by Supabase (https://www.youtube.com/watch?v=9N66JBRLNYU).

Copied from supabase/functions-js#67

@dshukertjr dshukertjr added the enhancement New feature or request label Apr 19, 2024
@dshukertjr dshukertjr added the functions This issue or pull request is related to functions label Apr 19, 2024
@nietsmmar
Copy link

nietsmmar commented Apr 23, 2024

Because this feature does not exist yet I used a fork of dart_sse_client which provides web-support and used it together with fetch_client library.

This is super hacky but works for (most) part. But now since the upgrade from

supabase-flutter package > 2.4.0
gotrue package > 2.5.1

it does not work anymore. I always get 415 Unsupported Media Type as a response from my Edge-Function. I think it is some change in gotrue package to version 2.6.0 although I couldn't figure out which.

@dshukertjr It seems like this issue here is just a change of two lines, regarding the js-PR. I would really appreciate to finally be able to use streaming in flutter from the official supabase package. As of now it doesn't even work with the workaround anymore.

Thanks for all the great work.

@dshukertjr
Copy link
Member Author

@nietsmmar I did some quick research, and for the Dart library, it's not going to be just updating two lines of code, but a few more than that. I want to properly test it out, and with some other tasks that I have, it might take some days for me to get to this one. Sorry for the inconvenience.

@nietsmmar
Copy link

@dshukertjr Thank you for checking it out! Really appreciate it! Looking forward to the fix :)

@oliverbytes
Copy link

I'm eagerly waiting as well. Thanks so much to anyone doing the work.

@nietsmmar
Copy link

nietsmmar commented Apr 29, 2024

@dshukertjr
I initialized it now like this:

final client = FetchClient(mode: RequestMode.cors);
  await Supabase.initialize(
    url: config.api.url,
    anonKey: config.api.anonKey,
    debug: kDebugMode,
    httpClient: kIsWeb ? client : null,
  );
final response = await supabaseClient.functions.invoke(
  'chat_stream',
  body: {
    'body': ...
  },
  headers: {'x-region': 'eu-central-1'},
);
final stream = ((response.data) as ByteStream).transform(const Utf8Decoder());

try {
  await for (final String element in stream) {
    final chunk = json.decode(element);

    // I only get complete streamed data here at once

    yield chunk;
  }
} catch (e) {
  yield* Stream.error(
    "myCustomError",
  );
}

But I just get the full stream at once in the end as one element in my await for. Am I doing something wrong? I am using Web but with fetch_client it should work to give me every segment.

@dshukertjr
Copy link
Member Author

@nietsmmar
What does your edge function code look like?

@nietsmmar
Copy link

nietsmmar commented Apr 29, 2024

@nietsmmar What does your edge function code look like?

Looks like this (worked before with dart_sse_client, I am getting a stream from openAI that I want to forward):

const body = new ReadableStream({
  async start(controller) {
    for await (const chunk of stream) {
      var finish = false;
      if (chunk['choices'][0]['finishReason'] === 'stop') {
        finish = true;
      }
      var token = chunk['choices'][0]['delta']['content'] ?? '';
      var chunkJson = {
        token: token,
        finish: finish
      };
      console.log(chunkJson);
      controller.enqueue(new TextEncoder().encode("data:" + JSON.stringify(chunkJson) + "\r\n\r\n"));
      if (finish) {
        controller.close();
        return;
      }
    }
  },
  cancel() {
    // ...
  },
});

@dshukertjr
Copy link
Member Author

@nietsmmar Just to double check, what version of functions_client are you using? You can find this within your pubspec.lock file.

@nietsmmar
Copy link

@nietsmmar Just to double check, what version of functions_client are you using? You can find this within your pubspec.lock file.

My pubspec.lock says:

  functions_client:
    dependency: transitive
    description:
      name: functions_client
      sha256: a70b0dd9a1c35d05d1141557f7e49ffe4de5f450ffde31755a9eeeadca03b8ee
      url: "https://pub.dev"
    source: hosted
    version: "2.1.0"

@dshukertjr
Copy link
Member Author

@nietsmmar Hmm, alright. Do you have the 'Content-Type': 'text/event-stream', set on your edge function?

@nietsmmar
Copy link

nietsmmar commented Apr 29, 2024

@nietsmmar Hmm, alright. Do you have the 'Content-Type': 'text/event-stream', set on your edge function?

I am returning this:

return new Response(content, {
    headers: {
      ...corsHeaders,
      "Content-Type": "text/event-stream",
    },
  });

@dshukertjr
Copy link
Member Author

@nietsmmar This is the test script that I used. Another difference that I see is Connection: 'keep-alive', in the headers. Would you be able to try adding it? If that doesn't work, would you be able to use my test script and see if that works on your end?

Deno.serve(async (req) => {
  const { input } = await req.json()

  const headers = new Headers({
    'Content-Type': 'text/event-stream',
    Connection: 'keep-alive',
  })

  // Create a stream
  const stream = new ReadableStream({
    async start(controller) {
      const encoder = new TextEncoder()

      try {
        for await (const char of input) {
          controller.enqueue(encoder.encode(char))
        }
      } catch (err) {
        console.error('Stream error:', err)
      } finally {
        controller.close()
      }
    },
  })

  // Return the stream to the user
  return new Response(stream, {
    headers,
  })
})

@nietsmmar
Copy link

nietsmmar commented Apr 29, 2024

@dshukertjr I tried adding kee-alive but it didn't change anything.

I copied your test script and created a new function with the exact code. Then I called it like this:

final response = await supabaseClient.functions.invoke(
  'stream_test',
  body: {
    'input': 'this is a test, hello?',
  },
  headers: {'x-region': 'eu-central-1'},
);
final stream =
    ((response.data) as ByteStream).transform(const Utf8Decoder());

try {
  await for (final String element in stream) {
    print('start');
    print(element);
    print('end');
    //yield ...
  }
} catch (e) {
  yield* Stream.error(
    'myCustomError',
  );
}

And this is my print output:

start
this is a test, hello?
end

@dshukertjr
Copy link
Member Author

@nietsmmar Could you check to see you can listen to SSE using both your function and mine on an iOS or Android device/emulator?

@nietsmmar
Copy link

nietsmmar commented Apr 29, 2024

@dshukertjr I first tried building for linux as this was easier for me (not having an android emulator running right now). There it works for both our functions.

What version of fetch_client are you using? I am using 1.0.2.

@dshukertjr
Copy link
Member Author

@nietsmmar I'm using fetch_client v1.0.2 as well. What browser are you using? I tested it on Google Chrome, and nothing else.

@nietsmmar
Copy link

@dshukertjr I also only testet in Google Chrome. Version 124.0.6367.91 (Official Build) (64-bit)

This is really weird. I also did run flutter clean etc. can there still be something old cached that I am missing?

@dshukertjr
Copy link
Member Author

@nietsmmar Does the same thing happen if you run your edge functions locally, because that's what I've been doing.

Here is the Dart code that I'm using the listen to stream BTW.

final res = await supabase.functions
    .invoke('sse', body: {'input': 'sample text'});

(res.data as ByteStream)
    .transform(const Utf8Decoder())
    .listen((val) {
      print(val);
});

@nietsmmar
Copy link

nietsmmar commented Apr 30, 2024

@dshukertjr I did run my edge function locally too.

I just tried your exact dart-code. And there I get the whole message at once too.

I did try building in dev and in release. But in release the same happens.

I also tried wrapping runApp with runWithClient like this:

runWithClient(() => 
    runApp(
        child: const MyApp(),
    ),
); FetchClient.new,);

But that also didn't change the behavior.

@nietsmmar
Copy link

@dshukertjr You are sure it works on your end? I updated to newest fetch_client versions etc. but I still can't make it work.

I still get

start
{input}
end

@dshukertjr
Copy link
Member Author

@nietsmmar
Yeah, this is the deployed version of a simple SSE app that works for me. Can you try it out to see if it works on your end?
https://sse.dshukertjr.dev/

Here is a screen recording of what I see.

sse.mp4

@nietsmmar
Copy link

nietsmmar commented May 17, 2024

@dshukertjr
yes it works. I am so puzzled. I literally copied your code. 🤯
Do you have the code on a github repo or something so I could clone and try myself?

@dshukertjr
Copy link
Member Author

@nietsmmar
Good to hear that it works on your end as well!

The code is here, but this repo is my "junk yard", so it's a bit messy, and you will find bunch of unrelated code as well 😂
https://github.com/dshukertjr/flutter_playground/tree/main

@nietsmmar
Copy link

@nietsmmar Good to hear that it works on your end as well!

The code is here, but this repo is my "junk yard", so it's a bit messy, and you will find bunch of unrelated code as well 😂 https://github.com/dshukertjr/flutter_playground/tree/main

thanks a lot! I will be on vacation a few days but will try your exact code when I am back.

@nietsmmar
Copy link

@dshukertjr oh wow! It is this:

const delay = Math.random() * 100
await new Promise((resolve) => setTimeout(resolve, delay))

when there is no delay, the messages just get send together. I did not know about that. So your first backendcode you posted above did not have any delay and everything came just as one message. So I always thought it won't work.

@dshukertjr
Copy link
Member Author

@nietsmmar Ah, my bad 🙈

@nietsmmar
Copy link

@dshukertjr So it is normal, that parts can be sent together? I never had this with my older implementation.

As I am processing each chunk of stream seperately in my client. I need to distinguish them myself then. (which is okay)

Thanks again for your help to make this all work! Thanks a lot!

@nietsmmar
Copy link

@dshukertjr
I am also having a bit of a problem when throwing an error while streaming.

When doing supabaseClient.functions.invoke(...)... and catching error I only get:

js_primitives.dart:28 Instance of 'minified:RO'

In my Edge-Function I throw a 500 Error with JSON-Response like:

{
    "error": "Custom Error message",
    "reasonPhrase": "Custom Reasonphrase."
}

Does the thrown error have some specific format to be parsed correctly from the supabase package?

@nietsmmar
Copy link

I do send my chunks like this:

var chunkJson = {
  token: token,
  finish: finish
};

controller.enqueue(new TextEncoder().encode(JSON.stringify(chunkJson)));

But sometimes I get this as one chunk:
{"token":" gest","finish":fa

And in the next chunk I get:
lse}

As I decode these json-strings in my client my code breaks because each chunk itself is no proper JSON anymore. Is this normal behaviour that I should expect in this stream? I find it difficult to properly process my data like that.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request functions This issue or pull request is related to functions
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants