Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Jan 6, 2023
1 parent 45c684b commit e448d96
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 10 deletions.
19 changes: 13 additions & 6 deletions src/Streams.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Streams

export Stream, closebody, isaborted, setstatus
export Stream, closebody, isaborted, setstatus, readall!

using Sockets, LoggingExtras
using ..IOExtras, ..Messages, ..ConnectionPool, ..Conditions, ..Exceptions
Expand Down Expand Up @@ -268,7 +268,7 @@ end
function Base.readbytes!(http::Stream, buf::AbstractVector{UInt8},
n=length(buf))
@require n <= length(buf)
return http_unsafe_read(http, pointer(buf), UInt(n))
return GC.@preserve buf http_unsafe_read(http, pointer(buf), UInt(n))
end

function Base.unsafe_read(http::Stream, p::Ptr{UInt8}, n::UInt)
Expand All @@ -282,13 +282,20 @@ function Base.unsafe_read(http::Stream, p::Ptr{UInt8}, n::UInt)
nothing
end

function Base.readbytes!(http::Stream, buf::IOBuffer, n=bytesavailable(http))
@noinline bufcheck(buf, n) = ((buf.size + n) <= buf.maxsize) || throw(ArgumentError("Unable to grow response stream IOBuffer large enough for response body size"))

function Base.readbytes!(http::Stream, buf::Base.GenericIOBuffer, n=bytesavailable(http))
Base.ensureroom(buf, n)
unsafe_read(http, pointer(buf.data, buf.size + 1), n)
# check if there's enough room in buf to write n bytes
bufcheck(buf, n)
data = buf.data
GC.@preserve data unsafe_read(http, pointer(data, buf.size + 1), n)
buf.size += n
end

function Base.read(http::Stream, buf::IOBuffer=PipeBuffer())
Base.read(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer()) = take!(readall(http, buf))

function readall!(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer())
if ntoread(http) == unknown_length
while !eof(http)
readbytes!(http, buf)
Expand All @@ -298,7 +305,7 @@ function Base.read(http::Stream, buf::IOBuffer=PipeBuffer())
readbytes!(http, buf, ntoread(http))
end
end
return take!(buf)
return buf
end

function Base.readuntil(http::Stream, f::Function)::ByteView
Expand Down
6 changes: 3 additions & 3 deletions src/clientlayers/RetryRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ e.g. `Sockets.DNSError`, `Base.EOFError` and `HTTP.StatusError`
"""
function retrylayer(handler)
return function(req::Request; retry::Bool=true, retries::Int=4,
retry_delays::ExponentialBackOff=ExponentialBackOff(n = retries), retry_check=FALSE,
retry_delays::ExponentialBackOff=ExponentialBackOff(n = retries, factor=3.0), retry_check=FALSE,
retry_non_idempotent::Bool=false, kw...)
if !retry || retries == 0
# no retry
Expand Down Expand Up @@ -61,8 +61,8 @@ function retrylayer(handler)
@debugv 1 "🚷 No Retry: $(no_retry_reason(ex, req))"
end
return s, retry
end)

end
)
return retry_request(req; kw...)
end
end
Expand Down
8 changes: 7 additions & 1 deletion src/clientlayers/StreamRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ function readbody(stream::Stream, res::Response, decompress::Union{Nothing, Bool
end
end

# 2 most common types of IOBuffers
const IOBuffers = Union{IOBuffer, Base.GenericIOBuffer{SubArray{UInt8, 1, Vector{UInt8}, Tuple{UnitRange{Int64}}, true}}}

function readbody!(stream::Stream, res::Response, buf_or_stream)
if !iserror(res)
if isbytes(res.body)
Expand All @@ -145,13 +148,16 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
elseif buf_or_stream isa Stream
# for HTTP.Stream, there's already an optimized read method
# that just needs an IOBuffer to write into
res.body = read(buf_or_stream, body)
readall!(buf_or_stream, body)
else
error("unreachable")
end
else
res.body = read(buf_or_stream)
end
elseif (res.body isa IOBuffers || res.body isa Base.GenericIOBuffer) && buf_or_stream isa Stream
# optimization for IOBuffer response_stream to avoid temporary allocations
readall!(buf_or_stream, res.body)
else
write(res.body, buf_or_stream)
end
Expand Down
28 changes: 28 additions & 0 deletions test/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,34 @@ end
seekstart(io)
@test isok(r)

# pass pre-allocated buffer
body = zeros(UInt8, 100)
r = HTTP.get("https://$httpbin/bytes/100"; response_stream=body, socket_type_tls=tls)
@test body === r.body

# wrapping pre-allocated buffer in IOBuffer will write to buffer directly
io = IOBuffer(body; write=true)
r = HTTP.get("https://$httpbin/bytes/100"; response_stream=io, socket_type_tls=tls)
@test body === r.body.data

# if provided buffer is too small, we won't grow it for user
body = zeros(UInt8, 10)
@test_throws HTTP.RequestError HTTP.get("https://$httpbin/bytes/100"; response_stream=body, socket_type_tls=tls)

# also won't shrink it if buffer provided is larger than response body
body = zeros(UInt8, 10)
r = HTTP.get("https://$httpbin/bytes/5"; response_stream=body, socket_type_tls=tls)
@test body === r.body
@test length(body) == 10
@test HTTP.header(r, "Content-Length") == "5"

# but if you wrap it in a writable IOBuffer, we will grow it
io = IOBuffer(body; write=true)
r = HTTP.get("https://$httpbin/bytes/100"; response_stream=io, socket_type_tls=tls)
# same Array, though it was resized larger
@test body === r.body.data
@test length(body) == 100

b = [JSON.parse(l) for l in eachline(io)]
@test all(zip(a, b)) do (x, y)
x["args"] == y["args"] &&
Expand Down

0 comments on commit e448d96

Please sign in to comment.