Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper SSE parsing #338

Merged
merged 2 commits into from
Oct 30, 2023
Merged

Conversation

atesgoral
Copy link
Contributor

@atesgoral atesgoral commented Oct 16, 2023

Follow up to #332

  1. Switch to using event_stream_parser, a spec-compliant event stream parser (that I recently published)
  2. The stream proc now receives an optional second argument for any errors encountered during parsing -- consumers should know, instead of these errors being silently ignored by this library
  3. Explicitly check for "[DONE]" chunks.
  4. Remove bits about errors OpenAI does not send
  5. Tweak bit about token usage in streams (it's not a public feature yet)
  6. Check HTTP response code and only try to parse the body as an error JSON if it's not OK.

All Submissions:

  • Have you followed the guidelines in our Contributing document?
  • Have you checked to ensure there aren't other open Pull Requests for the same update/change?
  • Have you added an explanation of what your changes do and why you'd like us to include them?

@ScotterC
Copy link

Thanks for this. Can you link the EventStreamParser gem? I wasn't able to see it in your github but I read through it locally. I've merged this into my fork and trying it out in production.

@atesgoral
Copy link
Contributor Author

@ScotterC It's publicly published: https://rubygems.org/gems/event_stream_parser

@andresgutgon
Copy link

This code is not open yet no? I see 404 when I click here
image

@atesgoral
Copy link
Contributor Author

@andresgutgon Ah, my apologies. I forgot about making the repo public. It is now :)

@smojtabai
Copy link

smojtabai commented Oct 17, 2023

Hey @atesgoral , I was testing this, but I think it still swallows errors, not sure if this is a spec issue with OpenAI or what:

OpenAI::Client.new.chat( parameters: { model: "gpt-4", messages: [{ role: "garbage", content: "hello"}], temperature: 0, stream: proc do |chunk, error| puts "inside stream" puts chunk puts error end } )
No error is returned and nothing in the block.
It looks like openAI returns an entire error json object and this https://github.com/Shopify/event_stream_parser/blob/main/lib/event_stream_parser.rb skips it?

@atesgoral
Copy link
Contributor Author

@smojtabai event_stream_parser is a pure parser of event streams and is agnostic of OpenAI. It's not swallowing any events.

It seems ruby-openai isn't checking the HTTP response code before processing the response as an event stream. In case of errors, OpenAI returns a JSON response, not event stream. I just realized this and I'll see if I can forward-fix that in this PR. Thanks for raising it.

@smojtabai
Copy link

@atesgoral ah yes, you are correct, I miss understood what was going on there.

#328 looks like tried to contribute a fix but was never merged or looked at, unsure if @alexrudall has any thoughts or if it will be merged ?

@atesgoral
Copy link
Contributor Author

atesgoral commented Oct 17, 2023

With a one-line addition here:

Faraday.new do |f|
f.options[:timeout] = @request_timeout
f.request(:multipart) if multipart
end

f.response :raise_error

It is possible to mount the RaiseError middleware to get 4xx and 5xx responses percolate all the way up, but it's better to catch 4xx errors locally to be able to grab the JSON OpenAI is returning in the body.

I'll take a look at #328 to see if I can find inspiration there.

@andresgutgon
Copy link

This is looking fantastic. Looking forward to see this finish. Great work @atesgoral 👏

@atesgoral
Copy link
Contributor Author

Complication: When the RaiseError middleware is enabled, Faraday will bail out (like it should) early and not call the on_data block. So, in streaming mode, there's no way to access the response body in the presence of that stock middleware.

Ignoring the middleware, I couldn't get access to Faraday's env argument in the on_data block for some reason (maybe it's not in the version of Faraday that this gem relies on?). If I could, then I could check the HTTP response code within the on_data block while also collecting response chunks.

Contemplating on creating a streaming-friendly version of the RaiseError middleware (and/or upgrading Faraday if needed).

@smojtabai
Copy link

smojtabai commented Oct 18, 2023

Hmmm, I think upgrading Faraday would be the best looking solution. As a hack I tried monkey patching the http json_pst method with this:

    def json_post(path:, parameters:)
      stream_body = ''
      is_streaming = parameters[:stream].respond_to?(:call)
      response = conn.post(uri(path: path)) do |req|
        if is_streaming
          to_json_stream_proc = to_json_stream(user_proc: parameters[:stream])
          req.options.on_data = proc do |chunk, chunk_size|
            stream_body = chunk
            to_json_stream_proc.call(chunk, chunk_size)
          end
    
          parameters[:stream] = true # Necessary to tell OpenAI to stream.
        elsif parameters[:stream]
          raise ArgumentError, "The stream parameter must be a Proc or have a #call method"
        end
    
        req.headers = headers
        req.body = parameters.to_json
    
      end
      if is_streaming && response.status >= 400
        to_json(stream_body)
      else
        to_json(response&.body)
      end
    end

It sorta for now keeps the same signature on return of an error, I guess we could even call the user function instead this way if needed. Unsure if this can cause any issues with race conditions or if its a good idea but throwing it out there.

If we don't want to check the return code, we could just augment the to_json_stream function as follows which is hackier but may have less side effects:

def to_json_stream(user_proc:)
      parser = EventStreamParser::Parser.new

      proc do |chunk, chunk_size|
        processed_data = false
        parser.feed(chunk) do |type, data, id, reconnection_time|
          processed_data = true
          parsed_data = JSON.parse(data)
          user_proc.call(parsed_data) unless data == "[DONE]"
        end
        if processed_data == false
          Rails.logger.warn("Failed to parse chunk: #{chunk}")
          parsed_data = JSON.parse(chunk)
          if parsed_data['error'].present?
            raise StandardError.new("Error with OpenAI: #{parsed_data}")
          end
        end
      end
    end

This basically checks if the stream processor fails it tries to see if JSON got returned and for now throws (but can do what we want) in case of error.

@smojtabai
Copy link

We also may have wondered a bit and I apologize, I do think your first fix is important and can be merged separate from this but streaming in my opinion is not very usable without the ability to understand error responses from OpenAI. Do you have any connection with @alexrudall , I notice he hasn't commented on the other PR's trying to fix the error issue?

@atesgoral
Copy link
Contributor Author

atesgoral commented Oct 19, 2023

The Faraday version is new enough. I must have been making a mistake earlier. I see the env argument being passed to the on_data block.

Here's how I managed to grab the error response and raise the appropriate Faraday exception with it:

13ef7e3

    def to_json_stream(user_proc:)
      parser = EventStreamParser::Parser.new
      proc do |chunk, _bytes, env|
        if env && env.status != 200
          raise_error = Faraday::Response::RaiseError.new
          raise_error.on_complete(env.merge(body: JSON.parse(chunk)))
        end

        parser.feed(chunk) do |_type, data|
          user_proc.call(JSON.parse(data)) unless data == "[DONE]"
        end
      end
    end

This also feels a bit like a Faraday deficiency of not making this super easy.

The story so far, in a new draft PR that does all this and goes further by raising exceptions on HTTP errors (as well as JSON parse errors): #342

I got blocked on a cryptic failure on a finetune test. It could be the test setup that's wrong.

@atesgoral
Copy link
Contributor Author

@smojtabai No, Alex and I are not connected. I'm just trying to fix things for our project (on my fork of this library) while trying to contribute back to the source.

@atesgoral
Copy link
Contributor Author

So, I might actually tweak this PR to preserve existing functionality of treating HTTP errors as non-errors and returning the JSON error as a result. #342 could be a bigger leap to change behaviour, as a follow-up.

@smojtabai
Copy link

I agree with keeping existing functionality for now, more likely to get merged, thank you!

@@ -108,42 +108,39 @@
context "when called with a string containing a single JSON object" do
it "calls the user proc with the data parsed as JSON" do
expect(user_proc).to receive(:call).with(JSON.parse('{"foo": "bar"}'))
stream.call('data: { "foo": "bar" }')
stream.call(<<~CHUNK)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event streams require double new lines (or CR, or CRLFs) to emit data events.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already learning new things keep going 💪

end

context "when called with a string containing that looks like a JSON object but is invalid" do
context "when called with string containing invalid JSON" do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was weird to test for "semblance" of JSON. It's either valid JSON or not.

end
end

context "when called with JSON split across chunks" do
Copy link
Contributor Author

@atesgoral atesgoral Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the real motivation of this PR. ruby-openai now becomes resilient to packet fragmentation at awkward locations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @atesgoral, I'm an engineer at OpenAI investigating this response. Do you have any logs (ideally from curl showing the full response stream) to demonstrate this issue? Please feel free to email me at [email protected]. Thanks!

Copy link
Contributor Author

@atesgoral atesgoral Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@athyuttamre I haven't recently seen this happen when directly talking to the OpenAI API, but I recall seeing it a while back, when network conditions were bad. But it can still happen when a proxy or some other network element buffers and chops up the text/event-stream chunks at non-event-stream boundaries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is an OpenAI problem at all. Clients just need to be resilient and do buffered/proper parsing of event streams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity, here are two chunks (stream fragments) from an earlier reproduction of the parser issue that gets unearthed by buffering + rechunking:

Chunk 1:

data: {"id":"chatcmpl-83QVo11UROyI8HUeixAo25Vjx0SA5","object":"chat.completion.chunk","created":1695827588,"model":"gpt-4-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
data: {"id":"chatcmpl-83QVo11UROyI8HUeixAo25Vjx0SA5","object":"chat.completion.chunk","created":1695827588,"model":"gpt-4-0613","choices":[{"index":0,"delta"

Chunk 2:

:{"content":"Hello"},"finish_reason":null}]}

@atesgoral
Copy link
Contributor Author

atesgoral commented Oct 19, 2023

Alright, I updated the PR to be laser-focused on solving the parsing issue, without touching any other behaviour.

@alexrudall if you have some spare cycles, it would be great to get this in.

@atesgoral
Copy link
Contributor Author

Follow-up PR to raise errors: #344

@smojtabai
Copy link

@atesgoral if this branch were merged, would I be able to tell if an error occurred? Did you feel like the callback having an error argument was too big a change (without raising an exception)?

@atesgoral
Copy link
Contributor Author

atesgoral commented Oct 20, 2023

@smojtabai With this change, there's no change to how errors are subtly handled. You'd still have to check if the parsed JSON chunk your proc receives looks like an error (has an error property).

Ah, I guess the shape of the error is changing now since the error value is not being plucked. 🤔

But, assuming the error raising PR below could hopefully be also reviewed and accepted in quick succession, there could be a major bump to the gem for the new error handling flavour. We'll see soon 🤞

@alexrudall alexrudall merged commit 76aab2d into alexrudall:main Oct 30, 2023
6 checks passed
@alexrudall
Copy link
Owner

Released in 5.2.0 - huge thanks to you @atesgoral for your work on this and to others for input

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants