Skip to content

Commit

Permalink
limit connections before async
Browse files Browse the repository at this point in the history
  • Loading branch information
pankgeorg committed Sep 2, 2023
1 parent f719bf6 commit 811b6b3
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ include("clientlayers/ConnectionRequest.jl"); using .ConnectionRequest
include("clientlayers/StreamRequest.jl"); using .StreamRequest

include("download.jl")
include("accept.jl")
include("Servers.jl") ;using .Servers; using .Servers: listen
include("Handlers.jl") ;using .Handlers; using .Handlers: serve
include("parsemultipart.jl") ;using .MultiPartParsing: parse_multipart_form
Expand Down
46 changes: 4 additions & 42 deletions src/Servers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ using MbedTLS: SSLContext, SSLConfig
using ConcurrentUtilities: ConcurrentUtilities, Lockable, lock
using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions
import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str
using ..Accept: acceptmany

TRUE(x) = true
getinet(host::String, port::Integer) = Sockets.InetAddr(parse(IPAddr, host), port)
Expand Down Expand Up @@ -366,47 +367,6 @@ function listen!(f, listener::Listener;
return Server(listener, on_shutdown, conns, tsk)
end

using Base: iolock_begin, iolock_end, uv_error, preserve_handle, unpreserve_handle,
StatusClosing, StatusClosed, StatusActive, UV_EAGAIN, UV_ECONNABORTED
using Sockets: accept_nonblock

function acceptmany(server; MAXSIZE=Sockets.BACKLOG_DEFAULT)
result = Vector{TCPSocket}()
sizehint!(result, MAXSIZE)
iolock_begin()
if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed
throw(ArgumentError("server not connected, make sure \"listen\" has been called"))
end
while isopen(server)
client = TCPSocket()
err = Sockets.accept_nonblock(server, client)
while err == 0 && length(result) < MAXSIZE # Don't try to fill more than half the buffer
push!(result, client)
client = TCPSocket()
err = Sockets.accept_nonblock(server, client)
end
if length(result) > 0
iolock_end()
return result
end
if err != UV_EAGAIN
uv_error("accept", err)
end
preserve_handle(server)
lock(server.cond)
iolock_end()
try
wait(server.cond)
finally
unlock(server.cond)
unpreserve_handle(server)
end
iolock_begin()
end
uv_error("accept", UV_ECONNABORTED)
nothing
end

""""
Main server loop.
Accepts new tcp connections and spawns async tasks to handle them."
Expand All @@ -421,8 +381,10 @@ function listenloop(f, listener, conns, tcpisvalid,
while isopen(listener)
try
for io in acceptmany(listener.server)
# I would prefer this inside the async, so we can loop and accept again,
# but https://github.com/JuliaWeb/HTTP.jl/pull/647/files says it's bad for performance
max_connections < typemax(Int) && Base.acquire(sem)
@async begin
max_connections < typemax(Int) && Base.acquire(sem)
local conn = nothing
isssl = !isnothing(listener.ssl)
try
Expand Down
47 changes: 47 additions & 0 deletions src/accept.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

module Accept

export acceptmany

using Base: iolock_begin, iolock_end, uv_error, preserve_handle, unpreserve_handle,
StatusClosing, StatusClosed, StatusActive, UV_EAGAIN, UV_ECONNABORTED
using Sockets

function acceptmany(server; MAXSIZE=Sockets.BACKLOG_DEFAULT)
result = Vector{TCPSocket}()
sizehint!(result, MAXSIZE)
iolock_begin()
if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed
throw(ArgumentError("server not connected, make sure \"listen\" has been called"))
end
while isopen(server)
client = TCPSocket()
err = Sockets.accept_nonblock(server, client)
while err == 0 && length(result) < MAXSIZE # Don't try to fill more than half the buffer
push!(result, client)
client = TCPSocket()
err = Sockets.accept_nonblock(server, client)
end
if length(result) > 0
iolock_end()
return result
end
if err != UV_EAGAIN
uv_error("accept", err)
end
preserve_handle(server)
lock(server.cond)
iolock_end()
try
wait(server.cond)
finally
unlock(server.cond)
unpreserve_handle(server)
end
iolock_begin()
end
uv_error("accept", UV_ECONNABORTED)
nothing
end

end

0 comments on commit 811b6b3

Please sign in to comment.