From 262e5d9a8db8392949d948615d1064aa1a571fbd Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Mon, 22 Apr 2024 17:31:39 +1200 Subject: [PATCH] Initial test suite. --- config/sus.rb | 2 + gems.rb | 3 + lib/io/stream.rb | 29 +---- lib/io/stream/buffered_stream.rb | 66 +++++++--- lib/io/stream/{buffer.rb => string_buffer.rb} | 2 +- test/io/stream/buffered_stream.rb | 121 ++++++++++++++++++ test/io/stream/string_buffer.rb | 27 ++++ 7 files changed, 209 insertions(+), 41 deletions(-) create mode 100644 config/sus.rb rename lib/io/stream/{buffer.rb => string_buffer.rb} (92%) create mode 100644 test/io/stream/buffered_stream.rb create mode 100644 test/io/stream/string_buffer.rb diff --git a/config/sus.rb b/config/sus.rb new file mode 100644 index 0000000..e32bedc --- /dev/null +++ b/config/sus.rb @@ -0,0 +1,2 @@ +require 'covered/sus' +include Covered::Sus diff --git a/gems.rb b/gems.rb index 0cf7af5..5738f7a 100644 --- a/gems.rb +++ b/gems.rb @@ -20,4 +20,7 @@ gem "bake-test" gem "bake-test-external" + + gem "sus-fixtures-async" + gem "sus-fixtures-openssl" end diff --git a/lib/io/stream.rb b/lib/io/stream.rb index 6e3a986..409307f 100644 --- a/lib/io/stream.rb +++ b/lib/io/stream.rb @@ -3,31 +3,10 @@ # Released under the MIT License. # Copyright, 2023, by Samuel Williams. -module IO::Stream - # The default block size for IO buffers. Defaults to 64KB (typical pipe buffer size). - BLOCK_SIZE = ENV.fetch('IO_STREAM_BLOCK_SIZE', 1024*64).to_i - - # The maximum read size when appending to IO buffers. Defaults to 8MB. - MAXIMUM_READ_SIZE = ENV.fetch('IO_STREAM_MAXIMUM_READ_SIZE', BLOCK_SIZE * 128).to_i - - def self.connected?(io) - return false if io.closed? - - io = io.to_io - - if io.respond_to?(:recv_nonblock) - # If we can wait for the socket to become readable, we know that the socket may still be open. - result = io.to_io.recv_nonblock(1, MSG_PEEK, exception: false) +require_relative 'stream/version' +require_relative 'stream/buffered_stream' - # No data was available - newer Ruby can return nil instead of empty string: - return false if result.nil? - - # Either there was some data available, or we can wait to see if there is data avaialble. - return !result.empty? || result == :wait_readable - - rescue Errno::ECONNRESET - # This might be thrown by recv_nonblock. - return false - end +class IO + module Stream end end diff --git a/lib/io/stream/buffered_stream.rb b/lib/io/stream/buffered_stream.rb index a48fa78..2d09227 100644 --- a/lib/io/stream/buffered_stream.rb +++ b/lib/io/stream/buffered_stream.rb @@ -3,10 +3,19 @@ # Released under the MIT License. # Copyright, 2023, by Samuel Williams. +require_relative 'string_buffer' + +require_relative '../buffered' +require_relative '../readable' + module IO::Stream + # The default block size for IO buffers. Defaults to 64KB (typical pipe buffer size). + BLOCK_SIZE = ENV.fetch('IO_STREAM_BLOCK_SIZE', 1024*64).to_i + + # The maximum read size when appending to IO buffers. Defaults to 8MB. + MAXIMUM_READ_SIZE = ENV.fetch('IO_STREAM_MAXIMUM_READ_SIZE', BLOCK_SIZE * 128).to_i + class BufferedStream - BLOCK_SIZE = IO::BLOCK_SIZE - def self.open(path, mode = "r+", **options) stream = self.new(File.open(path, mode), **options) @@ -24,7 +33,7 @@ def self.wrap(io, **options) io.buffered = false end - self.new(**options) + self.new(io, **options) end def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE) @@ -36,12 +45,12 @@ def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE) @block_size = block_size @maximum_read_size = maximum_read_size - @read_buffer = Buffer.new - @write_buffer = Buffer.new - @drain_buffer = Buffer.new + @read_buffer = StringBuffer.new + @write_buffer = StringBuffer.new + @drain_buffer = StringBuffer.new # Used as destination buffer for underlying reads. - @input_buffer = Buffer.new + @input_buffer = StringBuffer.new end attr :io @@ -92,6 +101,7 @@ def read_exactly(size, exception: EOFError) raise exception, "encountered eof while reading data" end + # This is a compatibility shim for existing code: def readpartial(size = nil) read_partial(size) or raise EOFError, "Encountered eof while reading data!" end @@ -148,7 +158,7 @@ def flush @write_buffer, @drain_buffer = @drain_buffer, @write_buffer begin - @io.write(@drain_buffer) + @io.syswrite(@drain_buffer) ensure # If the write operation fails, we still need to clear this buffer, and the data is essentially lost. @drain_buffer.clear @@ -216,19 +226,20 @@ def close end end - # Returns true if the stream is at file which means there is no more data to be read. + # Determins if the stream has consumed all available data. May block if the stream is not readable. + # See {readable?} for a non-blocking alternative. + # + # @returns [Boolean] If the stream is at file which means there is no more data to be read. def eof? if !@read_buffer.empty? return false elsif @eof return true else - return @io.eof? + return !self.fill_read_buffer end end - alias eof eof? - def eof! @read_buffer.clear @eof = true @@ -236,8 +247,33 @@ def eof! raise EOFError end + # Whether there is a chance that a read operation will succeed or not. + # @returns [Boolean] If the stream is readable, i.e. a `read` operation has a chance of success. + def readable? + # If we are at the end of the file, we can't read any more data: + if @eof + return false + end + + # If the read buffer is not empty, we can read more data: + if !@read_buffer.empty? + return true + end + + # If the underlying stream is readable, we can read more data: + return @io.readable? + end + private + # Reads data from the underlying stream as efficiently as possible. + def sysread(size, buffer) + # Come on Ruby, why couldn't this just return `nil`? EOF is not exceptional. Every file has one. + @io.sysread(size, buffer) + rescue EOFError + return false + end + # Fills the buffer from the underlying stream. def fill_read_buffer(size = @block_size) # We impose a limit because the underlying `read` system call can fail if we request too much data in one go. @@ -249,12 +285,12 @@ def fill_read_buffer(size = @block_size) flush if @read_buffer.empty? - if @io.read_nonblock(size, @read_buffer, exception: false) + if sysread(size, @read_buffer) # Console.logger.debug(self, name: "read") {@read_buffer.inspect} return true end else - if chunk = @io.read_nonblock(size, @input_buffer, exception: false) + if chunk = sysread(size, @input_buffer) @read_buffer << chunk # Console.logger.debug(self, name: "read") {@read_buffer.inspect} @@ -278,7 +314,7 @@ def consume_read_buffer(size = nil) if size.nil? or size >= @read_buffer.bytesize # Consume the entire read buffer: result = @read_buffer - @read_buffer = Buffer.new + @read_buffer = StringBuffer.new else # This approach uses more memory. # result = @read_buffer.slice!(0, size) diff --git a/lib/io/stream/buffer.rb b/lib/io/stream/string_buffer.rb similarity index 92% rename from lib/io/stream/buffer.rb rename to lib/io/stream/string_buffer.rb index 60eedd2..5d9a35a 100644 --- a/lib/io/stream/buffer.rb +++ b/lib/io/stream/string_buffer.rb @@ -4,7 +4,7 @@ # Copyright, 2023, by Samuel Williams. module IO::Stream - class Buffer < String + class StringBuffer < String BINARY = Encoding::BINARY def initialize diff --git a/test/io/stream/buffered_stream.rb b/test/io/stream/buffered_stream.rb new file mode 100644 index 0000000..4223e84 --- /dev/null +++ b/test/io/stream/buffered_stream.rb @@ -0,0 +1,121 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require 'io/stream/buffered_stream' + +require 'sus/fixtures/async/reactor_context' +require 'sus/fixtures/openssl/verified_certificate_context' +require 'sus/fixtures/openssl/valid_certificate_context' + +describe IO::Stream::BufferedStream do + # This constant is part of the public interface, but was renamed to `Async::IO::BLOCK_SIZE`. + describe "BLOCK_SIZE" do + it "should exist and be reasonable" do + expect(IO::Stream::BLOCK_SIZE).to be_within(1024...1024*128) + end + end + + describe "MAXIMUM_READ_SIZE" do + it "should exist and be reasonable" do + expect(IO::Stream::MAXIMUM_READ_SIZE).to be_within(1024*64..1024*64*512) + end + end +end + +ABufferedStream = Sus::Shared("a buffered stream") do + it "should be able to read and write data" do + writer.write "Hello, World!" + writer.flush + + expect(reader.read(13)).to be == "Hello, World!" + end + + with '#eof?' do + it "should return true when there is no data available" do + writer.close + expect(reader.eof?).to be_truthy + end + + it "should return false when there is data available" do + writer.write "Hello, World!" + writer.flush + + expect(reader.eof?).to be_falsey + end + end + + with '#readable?' do + it "should return true when the stream might be open" do + expect(reader.readable?).to be_truthy + end + + it "should return true when there is data available" do + writer.write "Hello, World!" + writer.flush + + expect(reader.readable?).to be_truthy + end + + it "should return false when the stream is known to be closed" do + writer.close + + expect(reader.readable?).to be_truthy + reader.read + expect(reader.readable?).to be_falsey + end + end + + with '#close' do + it "should close the stream" do + writer.close + expect(reader.read).to be_nil + + expect(writer.closed?).to be_truthy + expect(reader.closed?).to be_falsey + end + + it "writer should be idempotent" do + writer.close + writer.close + + expect(writer.closed?).to be_truthy + expect(reader.closed?).to be_falsey + end + + it "reader be idempotent" do + reader.close + reader.close + + expect(reader.closed?).to be_truthy + expect(writer.closed?).to be_falsey + end + end +end + +describe "IO.pipe" do + let(:pipe) {IO.pipe} + let(:reader) {IO::Stream::BufferedStream.wrap(pipe[0])} + let(:writer) {IO::Stream::BufferedStream.wrap(pipe[1])} + + it_behaves_like ABufferedStream +end + +describe "Socket.pair" do + let(:sockets) {Socket.pair(:UNIX, :STREAM)} + let(:reader) {IO::Stream::BufferedStream.wrap(sockets[0])} + let(:writer) {IO::Stream::BufferedStream.wrap(sockets[1])} + + it_behaves_like ABufferedStream +end + +describe "OpenSSL::SSL::SSLSocket" do + include Sus::Fixtures::OpenSSL::VerifiedCertificateContext + include Sus::Fixtures::OpenSSL::ValidCertificateContext + + let(:sockets) {Socket.pair(:UNIX, :STREAM)} + + let(:reader) {IO::Stream::BufferedStream.wrap(OpenSSL::SSL::SSLSocket.new(sockets[0], client_context))} + let(:writer) {IO::Stream::BufferedStream.wrap(OpenSSL::SSL::SSLSocket.new(sockets[1], server_context))} +end diff --git a/test/io/stream/string_buffer.rb b/test/io/stream/string_buffer.rb new file mode 100644 index 0000000..d5b215c --- /dev/null +++ b/test/io/stream/string_buffer.rb @@ -0,0 +1,27 @@ +require 'io/stream/string_buffer' + +describe IO::Stream::StringBuffer do + let(:string_buffer) {subject.new} + + it "should be a subclass of String" do + expect(subject).to be < String + end + + it "should have a binary encoding" do + expect(string_buffer.encoding).to be == Encoding::BINARY + end + + it "should append unicode strings" do + string_buffer << "Hello, World!".force_encoding(Encoding::UTF_8) + + expect(string_buffer).to be == "Hello, World!" + expect(string_buffer.encoding).to be == Encoding::BINARY + end + + it "should append binary strings" do + string_buffer << "Hello, World!".force_encoding(Encoding::BINARY) + + expect(string_buffer).to be == "Hello, World!" + expect(string_buffer.encoding).to be == Encoding::BINARY + end +end