Skip to content

Commit

Permalink
Copy Async::HTTP::Body::Writable and Protocol::Rack::Streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 5, 2024
1 parent 81f38b1 commit 1b1ccac
Show file tree
Hide file tree
Showing 6 changed files with 569 additions and 0 deletions.
110 changes: 110 additions & 0 deletions fixtures/protocol/http/body/a_writable_body.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2023, by Samuel Williams.

require 'protocol/http/body/deflate'

module Protocol
module HTTP
module Body
AWritableBody = Sus::Shared("a writable body") do
it "can write and read data" do
3.times do |i|
body.write("Hello World #{i}")
expect(body.read).to be == "Hello World #{i}"
end
end

it "can buffer data in order" do
3.times do |i|
body.write("Hello World #{i}")
end

3.times do |i|
expect(body.read).to be == "Hello World #{i}"
end
end

with '#join' do
it "can join chunks" do
3.times do |i|
body.write("#{i}")
end

body.close

expect(body.join).to be == "012"
end
end

with '#each' do
it "can read all data in order" do
3.times do |i|
body.write("Hello World #{i}")
end

body.close

3.times do |i|
chunk = body.read
expect(chunk).to be == "Hello World #{i}"
end
end

# it "can propagate failures" do
# reactor.async do
# expect do
# body.each do |chunk|
# raise RuntimeError.new("It was too big!")
# end
# end.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# expect{
# body.write("Beep boop") # This will cause a failure.
# ::Async::Task.current.yield
# body.write("Beep boop") # This will fail.
# }.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# it "can propagate failures in nested bodies" do
# nested = ::Protocol::HTTP::Body::Deflate.for(body)

# reactor.async do
# expect do
# nested.each do |chunk|
# raise RuntimeError.new("It was too big!")
# end
# end.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# expect{
# body.write("Beep boop") # This will cause a failure.
# ::Async::Task.current.yield
# body.write("Beep boop") # This will fail.
# }.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# it "will stop after finishing" do
# output_task = reactor.async do
# body.each do |chunk|
# expect(chunk).to be == "Hello World!"
# end
# end

# body.write("Hello World!")
# body.close

# expect(body).not.to be(:empty?)

# ::Async::Task.current.yield

# expect(output_task).to be(:finished?)
# expect(body).to be(:empty?)
# end
end
end
end
end
end
113 changes: 113 additions & 0 deletions lib/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2022, by Samuel Williams.

require_relative 'readable'
require_relative 'stream'

module Protocol
module HTTP
module Body
# A body that invokes a block that can read and write to a stream.
#
# In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement `stream?` and return `true`. When `stream?` returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using `#each`.
#
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
class Streamable < Readable
class Closed < StandardError
end

def initialize(block, input = nil)
@block = block
@input = input
@output = nil
end

# Closing a stream indicates we are no longer interested in reading from it.
def close(error = nil)
if @input
@input.close
@input = nil
end

if @output
@output.close(error)
end
end

attr :block

class Output
def initialize(input, block)
stream = Stream.new(input, self)

@from = nil

@fiber = Fiber.new do |from|
@from = from
block.call(stream)
rescue Closed
# Ignore.
ensure
@fiber = nil

# No more chunks will be generated:
if from = @from
@from = nil
from.transfer(nil)
end
end
end

# Can be invoked by the block to write to the stream.
def write(chunk)
if from = @from
@from = nil
@from = from.transfer(chunk)
else
raise RuntimeError, "Stream is not being read!"
end
end

# Can be invoked by the block to close the stream.
def close(error = nil)
if from = @from
@from = nil
from.transfer(nil)
elsif @fiber
@fiber.raise(error || Closed)
end
end

def read
raise RuntimeError, "Stream is already being read!" if @from

@fiber&.transfer(Fiber.current)
end
end

# Invokes the block in a fiber which yields chunks when they are available.
def read
@output ||= Output.new(@input, @block)

return @output.read
end

def stream?
true
end

def call(stream)
raise "Streaming body has already been read!" if @output

@block.call(stream)
rescue => error
raise
ensure
self.close(error)
end
end
end
end
end
103 changes: 103 additions & 0 deletions lib/protocol/http/body/writable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2023, by Samuel Williams.

require_relative 'readable'

module Protocol
module HTTP
module Body
# A dynamic body which you can write to and read from.
class Writable < Readable
class Closed < StandardError
end

# @param [Integer] length The length of the response body if known.
# @param [Async::Queue] queue Specify a different queue implementation, e.g. `Async::LimitedQueue.new(8)` to enable back-pressure streaming.
def initialize(length = nil, queue: Thread::Queue.new)
@queue = queue

@length = length

@count = 0

@finished = false

@closed = false
@error = nil
end

def length
@length
end

# Stop generating output; cause the next call to write to fail with the given error. Does not prevent existing chunks from being read. In other words, this indicates both that no more data will be or should be written to the body.
def close(error = nil)
unless @closed
@queue.close

@closed = true
@error = error
end

super
end

def closed?
@closed
end

def ready?
!@queue.empty? || @queue.closed?
end

# Has the producer called #finish and has the reader consumed the nil token?
def empty?
@queue.empty? && @queue.closed?
end

# Read the next available chunk.
def read
@queue.pop
end

# Write a single chunk to the body. Signal completion by calling `#finish`.
def write(chunk)
# If the reader breaks, the writer will break.
# The inverse of this is less obvious (*)
if @closed
raise(@error || Closed)
end

@count += 1
@queue.push(chunk)
end

alias << write

def inspect
"\#<#{self.class} #{@count} chunks written, #{status}>"
end

private

def status
if @queue.empty?
if @queue.closed?
'closed'
else
'waiting'
end
else
if @queue.closed?
'closing'
else
'ready'
end
end
end
end
end
end
end
5 changes: 5 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Releases

## Unreleased

- Clarify behaviour of streaming bodies and copy `Protocol::Rack::Body::Streaming` to `Protocol::HTTP::Body::Streamable`.
- Copy `Async::HTTP::Body::Writable` to `Protocol::HTTP::Body::Writable`.

## v0.31.0

- Ensure chunks are flushed if required, when streaming.
Expand Down
Loading

0 comments on commit 1b1ccac

Please sign in to comment.