Skip to content

Commit

Permalink
Introduce Protocol::HTTP::Body::Streamable and Writable.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 5, 2024
1 parent 205f165 commit cbaa0be
Show file tree
Hide file tree
Showing 20 changed files with 428 additions and 79 deletions.
110 changes: 110 additions & 0 deletions fixtures/protocol/http/body/a_writable_body.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2023, by Samuel Williams.

require 'protocol/http/body/deflate'

module Protocol
module HTTP
module Body
AWritableBody = Sus::Shared("a writable body") do
it "can write and read data" do
3.times do |i|
body.write("Hello World #{i}")
expect(body.read).to be == "Hello World #{i}"
end
end

it "can buffer data in order" do
3.times do |i|
body.write("Hello World #{i}")
end

3.times do |i|
expect(body.read).to be == "Hello World #{i}"
end
end

with '#join' do
it "can join chunks" do
3.times do |i|
body.write("#{i}")
end

body.close

expect(body.join).to be == "012"
end
end

with '#each' do
it "can read all data in order" do
3.times do |i|
body.write("Hello World #{i}")
end

body.close

3.times do |i|
chunk = body.read
expect(chunk).to be == "Hello World #{i}"
end
end

# it "can propagate failures" do
# reactor.async do
# expect do
# body.each do |chunk|
# raise RuntimeError.new("It was too big!")
# end
# end.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# expect{
# body.write("Beep boop") # This will cause a failure.
# ::Async::Task.current.yield
# body.write("Beep boop") # This will fail.
# }.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# it "can propagate failures in nested bodies" do
# nested = ::Protocol::HTTP::Body::Deflate.for(body)

# reactor.async do
# expect do
# nested.each do |chunk|
# raise RuntimeError.new("It was too big!")
# end
# end.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# expect{
# body.write("Beep boop") # This will cause a failure.
# ::Async::Task.current.yield
# body.write("Beep boop") # This will fail.
# }.to raise_exception(RuntimeError, message: be =~ /big/)
# end

# it "will stop after finishing" do
# output_task = reactor.async do
# body.each do |chunk|
# expect(chunk).to be == "Hello World!"
# end
# end

# body.write("Hello World!")
# body.close

# expect(body).not.to be(:empty?)

# ::Async::Task.current.yield

# expect(output_task).to be(:finished?)
# expect(body).to be(:empty?)
# end
end
end
end
end
end
9 changes: 0 additions & 9 deletions lib/protocol/http/body.md

This file was deleted.

6 changes: 0 additions & 6 deletions lib/protocol/http/body/deflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ def self.for(body, window_size = GZIP, level = DEFAULT_LEVEL)
self.new(body, Zlib::Deflate.new(level, window_size))
end

def stream?
# We might want to revisit this design choice.
# We could wrap the streaming body in a Deflate stream, but that would require an extra stream wrapper which we don't have right now. See also `Digestable#stream?`.
false
end

def read
return if @stream.finished?

Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/digestable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ def etag(weak: false)
end
end

def stream?
false
end

def read
if chunk = super
@digest.update(chunk)
Expand Down
12 changes: 8 additions & 4 deletions lib/protocol/http/body/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ def rewind
@remaining = @length
end

def stream?
false
end

def read
if @remaining > 0
amount = [@remaining, @block_size].min
Expand All @@ -72,6 +68,14 @@ def read
end
end

def stream?
true
end

def call(stream)
IO.copy_stream(@file, stream)
end

def join
return "" if @remaining == 0

Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/inflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ def self.for(body, encoding = GZIP)
self.new(body, Zlib::Inflate.new(encoding))
end

def stream?
false
end

def read
return if @stream.finished?

Expand Down
23 changes: 15 additions & 8 deletions lib/protocol/http/body/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
module Protocol
module HTTP
module Body
# An interface for reading data from a body.
# Represents a readable input streams.
#
# Typically, you'd override `#read` to return chunks of data.
#
# I n general, you read chunks of data from a body until it is empty and returns `nil`. Upon reading `nil`, the body is considered consumed and should not be read from again.
#
# Reading can also fail, for example if the body represents a streaming upload, and the connection is lost. In this case, the body will raise some kind of error.
#
# If you don't want to read from a stream, and instead want to close it immediately, you can call `close` on the body. If the body is already completely consumed, `close` will do nothing, but if there is still data to be read, it will cause the underlying stream to be reset (and possibly closed).
class Readable
# Close the stream immediately.
def close(error = nil)
Expand All @@ -29,14 +35,19 @@ def ready?
false
end

# Whether the stream can be rewound using {rewind}.
def rewindable?
false
end

# Rewind the stream to the beginning.
# @returns [Boolean] Whether the stream was successfully rewound.
def rewind
false
end

# The total length of the body, if known.
# @returns [Integer | Nil] The total length of the body, or `nil` if the length is unknown.
def length
nil
end
Expand Down Expand Up @@ -83,14 +94,10 @@ def join
end
end

# Should the internal mechanism prefer to use {call}?
# @returns [Boolean]
def stream?
false
end

# Write the body to the given stream.
#
# In some cases, the stream may also be readable, such as when hijacking an HTTP/1 connection. In that case, it may be acceptable to read and write to the stream directly.
#
# If the stream is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs.
#
def call(stream)
Expand All @@ -105,6 +112,7 @@ def call(stream)
end

# Read all remaining chunks into a buffered body and close the underlying input.
#
# @returns [Buffered] The buffered body.
def finish
# Internally, this invokes `self.each` which then invokes `self.close`.
Expand All @@ -115,7 +123,6 @@ def as_json(...)
{
class: self.class.name,
length: self.length,
stream: self.stream?,
ready: self.ready?,
empty: self.empty?
}
Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/rewindable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ def buffered
Buffered.new(@chunks)
end

def stream?
false
end

def read
if @index < @chunks.size
chunk = @chunks[@index]
Expand Down
87 changes: 87 additions & 0 deletions lib/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2022, by Samuel Williams.

require_relative 'readable'
require_relative 'stream'

module Protocol
module HTTP
module Body
# A body that invokes a block that can read and write to a stream.
#
# In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement `stream?` and return `true`. When `stream?` returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using `#each`.
#
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
class Streamable < Readable
def initialize(block, input = nil)
@block = block
@input = input
@output = nil
end

attr :block

class Output
def initialize(input, block)
stream = Stream.new(input, self)

@from = nil

@fiber = Fiber.new do |from|
@from = from
block.call(stream)
@fiber = nil
end
end

def write(chunk)
if from = @from
@from = nil
@from = from.transfer(chunk)
else
raise RuntimeError, "Stream is not being read!"
end
end

def close
@fiber = nil

if from = @from
@from = nil
from.transfer(nil)
end
end

def read
raise RuntimeError, "Stream is already being read!" if @from

@fiber&.transfer(Fiber.current)
end
end

# Invokes the block in a fiber which yields chunks when they are available.
def read
@output ||= Output.new(@input, @block)

return @output.read
end

def stream?
true
end

def call(stream)
raise "Streaming body has already been read!" if @output

@block.call(stream)
rescue => error
raise
ensure
self.close(error)
end
end
end
end
end
13 changes: 5 additions & 8 deletions lib/protocol/http/body/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ def rewindable?
@body.rewindable?
end

def stream?
# Most wrappers are not streamable by default.
false
end

def length
@body.length
end
Expand All @@ -73,14 +78,6 @@ def to_json(...)
def inspect
@body.inspect
end

def stream?
@body.stream?
end

def call(stream)
@body.call(stream)
end
end
end
end
Expand Down
Loading

0 comments on commit cbaa0be

Please sign in to comment.