Skip to content

Commit

Permalink
Tidy up implementation of stream? and call(stream).
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 5, 2024
1 parent 7909cff commit 81f38b1
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 112 deletions.
9 changes: 0 additions & 9 deletions lib/protocol/http/body/completable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@ def rewind
false
end

def finish
super.tap do
if @callback
@callback.call
@callback = nil
end
end
end

def close(error = nil)
super.tap do
if @callback
Expand Down
6 changes: 0 additions & 6 deletions lib/protocol/http/body/deflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ def self.for(body, window_size = GZIP, level = DEFAULT_LEVEL)
self.new(body, Zlib::Deflate.new(level, window_size))
end

def stream?
# We might want to revisit this design choice.
# We could wrap the streaming body in a Deflate stream, but that would require an extra stream wrapper which we don't have right now. See also `Digestable#stream?`.
false
end

def read
return if @stream.finished?

Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/digestable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ def etag(weak: false)
end
end

def stream?
false
end

def read
if chunk = super
@digest.update(chunk)
Expand Down
12 changes: 8 additions & 4 deletions lib/protocol/http/body/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ def rewind
@remaining = @length
end

def stream?
false
end

def read
if @remaining > 0
amount = [@remaining, @block_size].min
Expand All @@ -72,6 +68,14 @@ def read
end
end

def stream?
true
end

def call(stream)
IO.copy_stream(@file, stream, @remaining)
end

def join
return "" if @remaining == 0

Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/inflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ def self.for(body, encoding = GZIP)
self.new(body, Zlib::Inflate.new(encoding))
end

def stream?
false
end

def read
return if @stream.finished?

Expand Down
84 changes: 51 additions & 33 deletions lib/protocol/http/body/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@
module Protocol
module HTTP
module Body
# An interface for reading data from a body.
# Represents a readable input streams.
#
# Typically, you'd override `#read` to return chunks of data.
#
# I n general, you read chunks of data from a body until it is empty and returns `nil`. Upon reading `nil`, the body is considered consumed and should not be read from again.
#
# Reading can also fail, for example if the body represents a streaming upload, and the connection is lost. In this case, the body will raise some kind of error.
#
# If you don't want to read from a stream, and instead want to close it immediately, you can call `close` on the body. If the body is already completely consumed, `close` will do nothing, but if there is still data to be read, it will cause the underlying stream to be reset (and possibly closed).
class Readable
# Close the stream immediately.
def close(error = nil)
Expand All @@ -29,63 +35,46 @@ def ready?
false
end

# Whether the stream can be rewound using {rewind}.
def rewindable?
false
end

# Rewind the stream to the beginning.
# @returns [Boolean] Whether the stream was successfully rewound.
def rewind
false
end

# The total length of the body, if known.
# @returns [Integer | Nil] The total length of the body, or `nil` if the length is unknown.
def length
nil
end

# Read the next available chunk.
# @returns [String | Nil] The chunk of data, or `nil` if the stream has finished.
# @raises [StandardError] If an error occurs while reading.
def read
nil
end

# Should the internal mechanism prefer to use {call}?
# @returns [Boolean]
def stream?
false
end

# Write the body to the given stream.
def call(stream)
while chunk = self.read
stream.write(chunk)

# Flush the stream unless we are immediately expecting more data:
unless self.ready?
stream.flush
end
end
end

# Read all remaining chunks into a buffered body and close the underlying input.
# @returns [Buffered] The buffered body.
def finish
# Internally, this invokes `self.each` which then invokes `self.close`.
Buffered.read(self)
end

# Enumerate all chunks until finished, then invoke `#close`.
#
# Closes the stream when finished or if an error occurs.
#
# @yields {|chunk| ...} The block to call with each chunk of data.
# @parameter chunk [String | Nil] The chunk of data, or `nil` if the stream has finished.
def each
return to_enum(:each) unless block_given?
return to_enum unless block_given?

begin
while chunk = self.read
yield chunk
end
ensure
self.close($!)
while chunk = self.read
yield chunk
end
rescue => error
raise
ensure
self.close(error)
end

# Read all remaining chunks into a single binary string using `#each`.
Expand All @@ -105,6 +94,35 @@ def join
end
end

def stream?
false
end

# Write the body to the given stream.
#
# In some cases, the stream may also be readable, such as when hijacking an HTTP/1 connection. In that case, it may be acceptable to read and write to the stream directly.
#
# If the stream is not ready, it will be flushed after each chunk. Closes the stream when finished or if an error occurs.
#
def call(stream)
self.each do |chunk|
stream.write(chunk)

# Flush the stream unless we are immediately expecting more data:
unless self.ready?
stream.flush
end
end
end

# Read all remaining chunks into a buffered body and close the underlying input.
#
# @returns [Buffered] The buffered body.
def finish
# Internally, this invokes `self.each` which then invokes `self.close`.
Buffered.read(self)
end

def as_json(...)
{
class: self.class.name,
Expand Down
4 changes: 0 additions & 4 deletions lib/protocol/http/body/rewindable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ def buffered
Buffered.new(@chunks)
end

def stream?
false
end

def read
if @index < @chunks.size
chunk = @chunks[@index]
Expand Down
16 changes: 2 additions & 14 deletions lib/protocol/http/body/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,11 @@ def initialize(body)
# The wrapped body.
attr :body

# Buffer any remaining body.
def finish
@body.finish
end

def close(error = nil)
@body.close(error)

super
# It's a no-op:
# super
end

def empty?
Expand Down Expand Up @@ -77,14 +73,6 @@ def to_json(...)
def inspect
@body.inspect
end

def stream?
@body.stream?
end

def call(stream)
@body.call(stream)
end
end
end
end
Expand Down
5 changes: 0 additions & 5 deletions test/protocol/http/body/deflate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
let(:compressed_body) {Protocol::HTTP::Body::Deflate.for(body)}
let(:decompressed_body) {Protocol::HTTP::Body::Inflate.for(compressed_body)}

it "should not be a stream" do
expect(compressed_body).not.to be(:stream?)
expect(decompressed_body).not.to be(:stream?)
end

it "should round-trip data" do
body.write("Hello World!")
body.close
Expand Down
4 changes: 0 additions & 4 deletions test/protocol/http/body/digestable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
let(:source) {Protocol::HTTP::Body::Buffered.new}
let(:body) {subject.new(source)}

it "should not be a stream" do
expect(body).not.to be(:stream?)
end

with '.wrap' do
let(:source) {Protocol::HTTP::Body::Buffered.wrap("HelloWorld")}
let(:message) {Protocol::HTTP::Request.new(nil, nil, 'GET', '/', nil, Protocol::HTTP::Headers.new, body)}
Expand Down
6 changes: 4 additions & 2 deletions test/protocol/http/body/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
let(:path) {File.expand_path('file_spec.txt', __dir__)}
let(:body) {subject.open(path)}

it "should not be a stream" do
expect(body).not.to be(:stream?)
with '#stream?' do
it "should be streamable" do
expect(body).to be(:stream?)
end
end

with '#join' do
Expand Down
4 changes: 0 additions & 4 deletions test/protocol/http/body/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
expect(body).not.to be(:ready?)
end

it "should not be a stream" do
expect(body).not.to be(:stream?)
end

with '#finish' do
it "should return empty buffered representation" do
expect(body.finish).to be(:empty?)
Expand Down
4 changes: 0 additions & 4 deletions test/protocol/http/body/rewindable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
let(:source) {Protocol::HTTP::Body::Buffered.new}
let(:body) {subject.new(source)}

it "should not be a stream" do
expect(body).not.to be(:stream?)
end

it "can write and read data" do
3.times do |i|
source.write("Hello World #{i}")
Expand Down
44 changes: 29 additions & 15 deletions test/protocol/http/body/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
# Released under the MIT License.
# Copyright, 2023-2024, by Samuel Williams.

require 'protocol/http/body/readable'
require 'protocol/http/body/wrapper'
require 'protocol/http/body/buffered'
require 'protocol/http/request'

require 'json'
require 'stringio'

describe Protocol::HTTP::Body::Wrapper do
let(:source) {Protocol::HTTP::Body::Buffered.new}
let(:body) {subject.new(source)}

it "should proxy finish" do
expect(source).to receive(:finish).and_return(nil)
body.finish
with '#stream?' do
it "should not be streamable" do
expect(body).not.to be(:stream?)
end
end

it "should proxy close" do
Expand All @@ -34,24 +40,14 @@
expect(body.length).to be == 1
end

it "should proxy stream?" do
expect(source).to receive(:stream?).and_return(true)
expect(body.stream?).to be == true
end

it "should proxy read" do
expect(source).to receive(:read).and_return("!")
expect(body.read).to be == "!"
end

it "should proxy inspect" do
expect(source).to receive(:inspect).and_return("!")
expect(body.inspect).to be == "!"
end

it "should proxy call" do
expect(source).to receive(:call).and_return(nil)
body.call(nil)
expect(body.inspect).to be(:include?, "!")
end

with '.wrap' do
Expand Down Expand Up @@ -90,4 +86,22 @@
expect(JSON.dump(body)).to be == body.to_json
end
end

with "#each" do
it "should invoke close correctly" do
expect(body).to receive(:close)

body.each{}
end
end

with "#stream" do
let(:stream) {StringIO.new}

it "should invoke close correctly" do
expect(body).to receive(:close)

body.call(stream)
end
end
end

0 comments on commit 81f38b1

Please sign in to comment.