Skip to content

Commit

Permalink
Initial test suite.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Apr 22, 2024
1 parent a41b4c9 commit 262e5d9
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 41 deletions.
2 changes: 2 additions & 0 deletions config/sus.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
require 'covered/sus'
include Covered::Sus
3 changes: 3 additions & 0 deletions gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@

gem "bake-test"
gem "bake-test-external"

gem "sus-fixtures-async"
gem "sus-fixtures-openssl"
end
29 changes: 4 additions & 25 deletions lib/io/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,10 @@
# Released under the MIT License.
# Copyright, 2023, by Samuel Williams.

module IO::Stream
# The default block size for IO buffers. Defaults to 64KB (typical pipe buffer size).
BLOCK_SIZE = ENV.fetch('IO_STREAM_BLOCK_SIZE', 1024*64).to_i

# The maximum read size when appending to IO buffers. Defaults to 8MB.
MAXIMUM_READ_SIZE = ENV.fetch('IO_STREAM_MAXIMUM_READ_SIZE', BLOCK_SIZE * 128).to_i

def self.connected?(io)
return false if io.closed?

io = io.to_io

if io.respond_to?(:recv_nonblock)
# If we can wait for the socket to become readable, we know that the socket may still be open.
result = io.to_io.recv_nonblock(1, MSG_PEEK, exception: false)
require_relative 'stream/version'
require_relative 'stream/buffered_stream'

# No data was available - newer Ruby can return nil instead of empty string:
return false if result.nil?

# Either there was some data available, or we can wait to see if there is data avaialble.
return !result.empty? || result == :wait_readable

rescue Errno::ECONNRESET
# This might be thrown by recv_nonblock.
return false
end
class IO
module Stream
end
end
66 changes: 51 additions & 15 deletions lib/io/stream/buffered_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@
# Released under the MIT License.
# Copyright, 2023, by Samuel Williams.

require_relative 'string_buffer'

require_relative '../buffered'
require_relative '../readable'

module IO::Stream
# The default block size for IO buffers. Defaults to 64KB (typical pipe buffer size).
BLOCK_SIZE = ENV.fetch('IO_STREAM_BLOCK_SIZE', 1024*64).to_i

# The maximum read size when appending to IO buffers. Defaults to 8MB.
MAXIMUM_READ_SIZE = ENV.fetch('IO_STREAM_MAXIMUM_READ_SIZE', BLOCK_SIZE * 128).to_i

class BufferedStream
BLOCK_SIZE = IO::BLOCK_SIZE

def self.open(path, mode = "r+", **options)
stream = self.new(File.open(path, mode), **options)

Expand All @@ -24,7 +33,7 @@ def self.wrap(io, **options)
io.buffered = false
end

self.new(**options)
self.new(io, **options)
end

def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE)
Expand All @@ -36,12 +45,12 @@ def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE)
@block_size = block_size
@maximum_read_size = maximum_read_size

@read_buffer = Buffer.new
@write_buffer = Buffer.new
@drain_buffer = Buffer.new
@read_buffer = StringBuffer.new
@write_buffer = StringBuffer.new
@drain_buffer = StringBuffer.new

# Used as destination buffer for underlying reads.
@input_buffer = Buffer.new
@input_buffer = StringBuffer.new
end

attr :io
Expand Down Expand Up @@ -92,6 +101,7 @@ def read_exactly(size, exception: EOFError)
raise exception, "encountered eof while reading data"
end

# This is a compatibility shim for existing code:
def readpartial(size = nil)
read_partial(size) or raise EOFError, "Encountered eof while reading data!"
end
Expand Down Expand Up @@ -148,7 +158,7 @@ def flush
@write_buffer, @drain_buffer = @drain_buffer, @write_buffer

begin
@io.write(@drain_buffer)
@io.syswrite(@drain_buffer)
ensure
# If the write operation fails, we still need to clear this buffer, and the data is essentially lost.
@drain_buffer.clear
Expand Down Expand Up @@ -216,28 +226,54 @@ def close
end
end

# Returns true if the stream is at file which means there is no more data to be read.
# Determins if the stream has consumed all available data. May block if the stream is not readable.
# See {readable?} for a non-blocking alternative.
#
# @returns [Boolean] If the stream is at file which means there is no more data to be read.
def eof?
if !@read_buffer.empty?
return false
elsif @eof
return true
else
return @io.eof?
return !self.fill_read_buffer
end
end

alias eof eof?

def eof!
@read_buffer.clear
@eof = true

raise EOFError
end

# Whether there is a chance that a read operation will succeed or not.
# @returns [Boolean] If the stream is readable, i.e. a `read` operation has a chance of success.
def readable?
# If we are at the end of the file, we can't read any more data:
if @eof
return false
end

# If the read buffer is not empty, we can read more data:
if !@read_buffer.empty?
return true
end

# If the underlying stream is readable, we can read more data:
return @io.readable?
end

private

# Reads data from the underlying stream as efficiently as possible.
def sysread(size, buffer)
# Come on Ruby, why couldn't this just return `nil`? EOF is not exceptional. Every file has one.
@io.sysread(size, buffer)
rescue EOFError
return false
end

# Fills the buffer from the underlying stream.
def fill_read_buffer(size = @block_size)
# We impose a limit because the underlying `read` system call can fail if we request too much data in one go.
Expand All @@ -249,12 +285,12 @@ def fill_read_buffer(size = @block_size)
flush

if @read_buffer.empty?
if @io.read_nonblock(size, @read_buffer, exception: false)
if sysread(size, @read_buffer)
# Console.logger.debug(self, name: "read") {@read_buffer.inspect}
return true
end
else
if chunk = @io.read_nonblock(size, @input_buffer, exception: false)
if chunk = sysread(size, @input_buffer)
@read_buffer << chunk
# Console.logger.debug(self, name: "read") {@read_buffer.inspect}

Expand All @@ -278,7 +314,7 @@ def consume_read_buffer(size = nil)
if size.nil? or size >= @read_buffer.bytesize
# Consume the entire read buffer:
result = @read_buffer
@read_buffer = Buffer.new
@read_buffer = StringBuffer.new
else
# This approach uses more memory.
# result = @read_buffer.slice!(0, size)
Expand Down
2 changes: 1 addition & 1 deletion lib/io/stream/buffer.rb → lib/io/stream/string_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Copyright, 2023, by Samuel Williams.

module IO::Stream
class Buffer < String
class StringBuffer < String
BINARY = Encoding::BINARY

def initialize
Expand Down
121 changes: 121 additions & 0 deletions test/io/stream/buffered_stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require 'io/stream/buffered_stream'

require 'sus/fixtures/async/reactor_context'
require 'sus/fixtures/openssl/verified_certificate_context'
require 'sus/fixtures/openssl/valid_certificate_context'

describe IO::Stream::BufferedStream do
# This constant is part of the public interface, but was renamed to `Async::IO::BLOCK_SIZE`.
describe "BLOCK_SIZE" do
it "should exist and be reasonable" do
expect(IO::Stream::BLOCK_SIZE).to be_within(1024...1024*128)
end
end

describe "MAXIMUM_READ_SIZE" do
it "should exist and be reasonable" do
expect(IO::Stream::MAXIMUM_READ_SIZE).to be_within(1024*64..1024*64*512)
end
end
end

ABufferedStream = Sus::Shared("a buffered stream") do
it "should be able to read and write data" do
writer.write "Hello, World!"
writer.flush

expect(reader.read(13)).to be == "Hello, World!"
end

with '#eof?' do
it "should return true when there is no data available" do
writer.close
expect(reader.eof?).to be_truthy
end

it "should return false when there is data available" do
writer.write "Hello, World!"
writer.flush

expect(reader.eof?).to be_falsey
end
end

with '#readable?' do
it "should return true when the stream might be open" do
expect(reader.readable?).to be_truthy
end

it "should return true when there is data available" do
writer.write "Hello, World!"
writer.flush

expect(reader.readable?).to be_truthy
end

it "should return false when the stream is known to be closed" do
writer.close

expect(reader.readable?).to be_truthy
reader.read
expect(reader.readable?).to be_falsey
end
end

with '#close' do
it "should close the stream" do
writer.close
expect(reader.read).to be_nil

expect(writer.closed?).to be_truthy
expect(reader.closed?).to be_falsey
end

it "writer should be idempotent" do
writer.close
writer.close

expect(writer.closed?).to be_truthy
expect(reader.closed?).to be_falsey
end

it "reader be idempotent" do
reader.close
reader.close

expect(reader.closed?).to be_truthy
expect(writer.closed?).to be_falsey
end
end
end

describe "IO.pipe" do
let(:pipe) {IO.pipe}
let(:reader) {IO::Stream::BufferedStream.wrap(pipe[0])}
let(:writer) {IO::Stream::BufferedStream.wrap(pipe[1])}

it_behaves_like ABufferedStream
end

describe "Socket.pair" do
let(:sockets) {Socket.pair(:UNIX, :STREAM)}
let(:reader) {IO::Stream::BufferedStream.wrap(sockets[0])}
let(:writer) {IO::Stream::BufferedStream.wrap(sockets[1])}

it_behaves_like ABufferedStream
end

describe "OpenSSL::SSL::SSLSocket" do
include Sus::Fixtures::OpenSSL::VerifiedCertificateContext
include Sus::Fixtures::OpenSSL::ValidCertificateContext

let(:sockets) {Socket.pair(:UNIX, :STREAM)}

let(:reader) {IO::Stream::BufferedStream.wrap(OpenSSL::SSL::SSLSocket.new(sockets[0], client_context))}
let(:writer) {IO::Stream::BufferedStream.wrap(OpenSSL::SSL::SSLSocket.new(sockets[1], server_context))}
end
27 changes: 27 additions & 0 deletions test/io/stream/string_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
require 'io/stream/string_buffer'

describe IO::Stream::StringBuffer do
let(:string_buffer) {subject.new}

it "should be a subclass of String" do
expect(subject).to be < String
end

it "should have a binary encoding" do
expect(string_buffer.encoding).to be == Encoding::BINARY
end

it "should append unicode strings" do
string_buffer << "Hello, World!".force_encoding(Encoding::UTF_8)

expect(string_buffer).to be == "Hello, World!"
expect(string_buffer.encoding).to be == Encoding::BINARY
end

it "should append binary strings" do
string_buffer << "Hello, World!".force_encoding(Encoding::BINARY)

expect(string_buffer).to be == "Hello, World!"
expect(string_buffer.encoding).to be == Encoding::BINARY
end
end

0 comments on commit 262e5d9

Please sign in to comment.