Skip to content

Commit

Permalink
A lot more tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 5, 2024
1 parent cbaa0be commit 6dc13ce
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 19 deletions.
32 changes: 29 additions & 3 deletions lib/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,27 @@ module Body
#
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
class Streamable < Readable
class Closed < StandardError
end

def initialize(block, input = nil)
@block = block
@input = input
@output = nil
end

# Closing a stream indicates we are no longer interested in reading from it.
def close(error = nil)
if @input
@input.close
@input = nil
end

if @output
@output.close(error)
end
end

attr :block

class Output
Expand All @@ -32,10 +47,20 @@ def initialize(input, block)
@fiber = Fiber.new do |from|
@from = from
block.call(stream)
rescue Closed
# Ignore.
ensure
@fiber = nil

# No more chunks will be generated:
if from = @from
@from = nil
from.transfer(nil)
end
end
end

# Can be invoked by the block to write to the stream.
def write(chunk)
if from = @from
@from = nil
Expand All @@ -45,12 +70,13 @@ def write(chunk)
end
end

def close
@fiber = nil

# Can be invoked by the block to close the stream.
def close(error = nil)
if from = @from
@from = nil
from.transfer(nil)
elsif @fiber
@fiber.raise(error || Closed)
end
end

Expand Down
30 changes: 15 additions & 15 deletions lib/protocol/http/body/writable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def length
@length
end

# Stop generating output; cause the next call to write to fail with the given error.
# Stop generating output; cause the next call to write to fail with the given error. Does not prevent existing chunks from being read. In other words, this indicates both that no more data will be or should be written to the body.
def close(error = nil)
unless @closed
@queue.close
Expand All @@ -49,23 +49,17 @@ def closed?
end

def ready?
!@queue.empty?
!@queue.empty? || @queue.closed?
end

# Has the producer called #finish and has the reader consumed the nil token?
def empty?
@finished
@queue.empty? && @queue.closed?
end

# Read the next available chunk.
def read
return if @finished

unless chunk = @queue.pop
@finished = true
end

return chunk
@queue.pop
end

# Write a single chunk to the body. Signal completion by calling `#finish`.
Expand All @@ -89,12 +83,18 @@ def inspect
private

def status
if @finished
'finished'
elsif @closed
'closing'
if @queue.empty?
if @queue.closed?
'closed'
else
'waiting'
end
else
'waiting'
if @queue.closed?
'closing'
else
'ready'
end
end
end
end
Expand Down
16 changes: 16 additions & 0 deletions test/protocol/http/body/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
let(:path) {File.expand_path('file_spec.txt', __dir__)}
let(:body) {subject.open(path)}

with '#stream?' do
it "should be streamable" do
expect(body).to be(:stream?)
end
end

with '#join' do
it "should read entire file" do
expect(body.join).to be == "Hello World"
Expand Down Expand Up @@ -70,4 +76,14 @@
expect(body.read).to be == "ll"
end
end

with "#call" do
let(:output) {StringIO.new}

it "can stream output" do
body.call(output)

expect(output.string).to be == "Hello World"
end
end
end
91 changes: 90 additions & 1 deletion test/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@
end
end

let(:body) {subject.new(block)}
let(:input) {nil}
let(:body) {subject.new(block, input)}

with "#stream?" do
it "should be streamable" do
expect(body).to be(:stream?)
end
end

with '#block' do
it "should wrap block" do
Expand All @@ -28,6 +35,37 @@
expect(body.read).to be == "World"
expect(body.read).to be == nil
end

with "block that doesn't close" do
let(:block) do
proc do |stream|
stream.write("Hello")
stream.write("World")
end
end

it "can read the body" do
expect(body.read).to be == "Hello"
expect(body.read).to be == "World"
expect(body.read).to be == nil
end
end

with "a block that allows stream to escape" do
let(:block) do
proc do |stream|
@stream = stream
end
end

it "can read the body" do
expect(body.read).to be == nil

expect do
@stream.write("!")
end.to raise_exception(RuntimeError, message: be =~ /Stream is not being read!/)
end
end
end

with '#each' do
Expand All @@ -44,6 +82,30 @@
body.call(stream)
expect(stream.string).to be == "HelloWorld"
end

with "a block that raises an error" do
let(:block) do
proc do |stream|
raise "Oh no... a wild error appeared!"
end
end

it "closes the stream if an error occurs" do
stream = StringIO.new
expect(body).to receive(:close)

expect do
body.call(stream)
end.to raise_exception(RuntimeError, message: be =~ /Oh no... a wild error appeared!/)
end
end
end

with '#close' do
it "can close the body" do
expect(body.read).to be == "Hello"
body.close
end
end

with "nested fiber" do
Expand All @@ -59,4 +121,31 @@
expect(body.read).to be == "Hello"
end
end

with "buffered input" do
let(:input) {Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"])}

let(:block) do
proc do |stream|
while chunk = stream.read_partial
stream.write(chunk)
end
end
end

it "can read from input" do
expect(body.read).to be == "Hello"
expect(body.read).to be == " "
expect(body.read).to be == "World"
end

it "can stream to output" do
output = StringIO.new
stream = Protocol::HTTP::Body::Stream.new(input, output)

body.call(stream)

expect(output.string).to be == "Hello World"
end
end
end
6 changes: 6 additions & 0 deletions test/protocol/http/body/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
let(:source) {Protocol::HTTP::Body::Buffered.new}
let(:body) {subject.new(source)}

with '#stream?' do
it "should not be streamable" do
expect(body).not.to be(:stream?)
end
end

it "should proxy close" do
expect(source).to receive(:close).and_return(nil)
body.close
Expand Down
70 changes: 70 additions & 0 deletions test/protocol/http/body/writable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,74 @@
let(:body) {subject.new}

it_behaves_like Protocol::HTTP::Body::AWritableBody

with "#length" do
it "should be unspecified by default" do
expect(body.length).to be_nil
end
end

with "#closed?" do
it "should not be closed by default" do
expect(body).not.to be(:closed?)
end
end

with "#ready?" do
it "should be ready if chunks are available" do
expect(body).not.to be(:ready?)

body.write("Hello")

expect(body).to be(:ready?)
end

it "should be ready if closed" do
body.close

expect(body).to be(:ready?)
end
end

with "#empty?" do
it "should be empty if closed with no pending chunks" do
expect(body).not.to be(:empty?)

body.close

expect(body).to be(:empty?)
end

it "should become empty when pending chunks are read" do
body.write("Hello")
body.close

expect(body).not.to be(:empty?)
body.read
expect(body).to be(:empty?)
end

it "should not be empty if chunks are available" do
body.write("Hello")
expect(body).not.to be(:empty?)
end
end

with "#write" do
it "should write chunks" do
body.write("Hello")
body.write("World")

expect(body.read).to be == "Hello"
expect(body.read).to be == "World"
end

it "can't write to closed body" do
body.close

expect do
body.write("Hello")
end.to raise_exception(Protocol::HTTP::Body::Writable::Closed)
end
end
end

0 comments on commit 6dc13ce

Please sign in to comment.