Skip to content

Commit

Permalink
Use event_stream_parser for robust SSE parsing
Browse files Browse the repository at this point in the history
Remove bits about errors OpenAI does not send
  • Loading branch information
atesgoral committed Oct 16, 2023
1 parent 1e01f89 commit 6d5bac2
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 60 deletions.
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
ruby-openai (5.1.0)
event_stream_parser (~> 0.3.0)
faraday (>= 1)
faraday-multipart (>= 1)

Expand All @@ -16,6 +17,7 @@ GEM
rexml
diff-lcs (1.5.0)
dotenv (2.8.1)
event_stream_parser (0.3.0)
faraday (2.7.10)
faraday-net_http (>= 2.0, < 3.1)
ruby2_keywords (>= 0.0.4)
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,22 @@ puts response.dig("choices", 0, "message", "content")

[Quick guide to streaming ChatGPT with Rails 7 and Hotwire](https://gist.github.com/alexrudall/cb5ee1e109353ef358adb4e66631799d)

You can stream from the API in realtime, which can be much faster and used to create a more engaging user experience. Pass a [Proc](https://ruby-doc.org/core-2.6/Proc.html) (or any object with a `#call` method) to the `stream` parameter to receive the stream of text chunks as they are generated. Each time one or more chunks is received, the proc will be called once with each chunk, parsed as a Hash. If OpenAI returns an error, `ruby-openai` will pass that to your proc as a Hash.
You can stream from the API in realtime, which can be much faster and used to create a more engaging user experience. Pass a [Proc](https://ruby-doc.org/core-2.6/Proc.html) (or any object with a `#call` method) to the `stream` parameter to receive the stream of completion chunks as they are generated. Each time one or more chunks is received, the proc will be called once with each chunk, parsed as a Hash.

```ruby
client.chat(
parameters: {
model: "gpt-3.5-turbo", # Required.
messages: [{ role: "user", content: "Describe a character called Anna!"}], # Required.
temperature: 0.7,
stream: proc do |chunk, _bytesize|
print chunk.dig("choices", 0, "delta", "content")
stream: proc do |chunk, error|
print chunk.dig("choices", 0, "delta", "content") unless error
end
})
# => "Anna is a young woman in her mid-twenties, with wavy chestnut hair that falls to her shoulders..."
```

Note: the API docs state that token usage is included in the streamed chat chunk objects, but this doesn't currently appear to be the case. To count tokens while streaming, try `OpenAI.rough_token_count` or [tiktoken_ruby](https://github.com/IAPark/tiktoken_ruby).
Note: OpenAPI currently does not report token usage for streaming responses. To count tokens while streaming, try `OpenAI.rough_token_count` or [tiktoken_ruby](https://github.com/IAPark/tiktoken_ruby).

### Functions

Expand Down
27 changes: 14 additions & 13 deletions lib/openai/http.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "event_stream_parser"

module OpenAI
module HTTP
def get(path:)
Expand Down Expand Up @@ -44,21 +46,20 @@ def to_json(string)
JSON.parse(string.gsub("}\n{", "},{").prepend("[").concat("]"))
end

# Given a proc, returns an outer proc that can be used to iterate over a JSON stream of chunks.
# For each chunk, the inner user_proc is called giving it the JSON object. The JSON object could
# be a data object or an error object as described in the OpenAI API documentation.
#
# If the JSON object for a given data or error message is invalid, it is ignored.
# Given a proc, returns an outer proc that can be used to iterate over a
# stream of completion JSON object chunks.
# For each chunk, the inner user_proc is called giving it the parsed object.
# If the chunk is not valid JSON, the user_proc is called with nil and an
# error.
#
# @param user_proc [Proc] The inner proc to call for each JSON object in the chunk.
# @return [Proc] An outer proc that iterates over a raw stream, converting it to JSON.
# @param user_proc [Proc] The inner proc to call for each completion object.
# @return [Proc] An outer proc that iterates over a raw stream, parsing it.
def to_json_stream(user_proc:)
proc do |chunk, _|
chunk.scan(/(?:data|error): (\{.*\})/i).flatten.each do |data|
user_proc.call(JSON.parse(data))
rescue JSON::ParserError
# Ignore invalid JSON.
end
parser = EventStreamParser::Parser.new
parser.stream do |_type, data|
user_proc.call(JSON.parse(data)) unless data == "[DONE]"
rescue StandardError => e
user_proc.call(nil, e)
end
end

Expand Down
1 change: 1 addition & 0 deletions ruby-openai.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]

spec.add_dependency "event_stream_parser", "~> 0.3.0"
spec.add_dependency "faraday", ">= 1"
spec.add_dependency "faraday-multipart", ">= 1"
end
60 changes: 17 additions & 43 deletions spec/openai/client/http_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,72 +106,46 @@
end

context "when called with a string containing a single JSON object" do
it "calls the user proc with the data parsed as JSON" do
it "calls the user proc with the parsed object" do
expect(user_proc).to receive(:call).with(JSON.parse('{"foo": "bar"}'))
stream.call('data: { "foo": "bar" }')
stream.call(<<~CHUNK)
data: { "foo": "bar" }
#
CHUNK
end
end

context "when called with string containing more than one JSON object" do
it "calls the user proc for each data parsed as JSON" do
it "calls the user proc for each parsed object" do
expect(user_proc).to receive(:call).with(JSON.parse('{"foo": "bar"}'))
expect(user_proc).to receive(:call).with(JSON.parse('{"baz": "qud"}'))

stream.call(<<-CHUNK)
stream.call(<<~CHUNK)
data: { "foo": "bar" }
data: { "baz": "qud" }
data: [DONE]
#
CHUNK
end
end

context "when called with a string that does not even resemble a JSON object" do
let(:bad_examples) { ["", "foo", "data: ", "data: foo"] }

it "does not call the user proc" do
bad_examples.each do |chunk|
expect(user_proc).to_not receive(:call)
stream.call(chunk)
end
context "when called with a string that is not valid JSON" do
it "populates the error argument" do
expect(user_proc).to receive(:call).with(nil, an_instance_of(JSON::ParserError))
stream.call("data: foo\n\n")
end
end

context "when called with a string containing that looks like a JSON object but is invalid" do
let(:chunk) do
<<-CHUNK
data: { "foo": "bar" }
data: { BAD ]:-> JSON }
CHUNK
end

it "does not raise an error" do
expect(user_proc).to receive(:call).with(JSON.parse('{"foo": "bar"}'))

expect do
stream.call(chunk)
end.not_to raise_error
end
end

context "when called with a string containing an error" do
let(:chunk) do
<<-CHUNK
data: { "foo": "bar" }
error: { "message": "A bad thing has happened!" }
CHUNK
end

it "does not raise an error" do
context "when called with JSON split across chunks" do
it "calls the user proc with the data parsed as JSON" do
expect(user_proc).to receive(:call).with(JSON.parse('{ "foo": "bar" }'))
expect(user_proc).to receive(:call).with(
JSON.parse('{ "message": "A bad thing has happened!" }')
)

expect do
stream.call(chunk)
stream.call("data: { \"foo\":")
stream.call(" \"bar\" }\n\n")
end.not_to raise_error
end
end
Expand Down

0 comments on commit 6d5bac2

Please sign in to comment.