diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index a87ae5ffa..efbaf10bb 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -10,7 +10,7 @@ Methods are provided for `eof`, `readavailable`, This allows the `Connection` object to act as a proxy for the `TCPSocket` or `SSLContext` that it wraps. -The [`POOL`](@ref) is used to manage connection pooling. Connections +[`POOLS`](@ref) are used to manage connection pooling. Connections are identified by their host, port, whether they require ssl verification, and whether they are a client or server connection. If a subsequent request matches these properties of a previous connection @@ -39,7 +39,7 @@ using .ConnectionPools """ Connection -A `TCPSocket` or `SSLContext` connection to a HTTP `host` and `port`. +A `Sockets.TCPSocket`, `MbedTLS.SSLContext` or `OpenSSL.SSLStream` connection to a HTTP `host` and `port`. Fields: - `host::String` @@ -49,7 +49,7 @@ Fields: - `peerip`, remote IP adress (used for debug/log messages). - `peerport`, remote TCP port number (used for debug/log messages). - `localport`, local TCP port number (used for debug messages). -- `io::T`, the `TCPSocket` or `SSLContext. +- `io::T`, the `Sockets.TCPSocket`, `MbedTLS.SSLContext` or `OpenSSL.SSLStream`. - `clientconnection::Bool`, whether the Connection was created from client code (as opposed to server code) - `buffer::IOBuffer`, left over bytes read from the connection after the end of a response header (or chunksize). These bytes are usually @@ -58,7 +58,7 @@ Fields: - `readable`, whether the Connection object is readable - `writable`, whether the Connection object is writable """ -mutable struct Connection <: IO +mutable struct Connection{IO_t <: IO} <: IO host::String port::String idle_timeout::Int @@ -66,7 +66,7 @@ mutable struct Connection <: IO peerip::IPAddr # for debugging/logging peerport::UInt16 # for debugging/logging localport::UInt16 # debug only - io::IO + io::IO_t clientconnection::Bool buffer::IOBuffer timestamp::Float64 @@ -89,8 +89,8 @@ connectionkey(x::Connection) = (typeof(x.io), x.host, x.port, x.require_ssl_veri Connection(host::AbstractString, port::AbstractString, idle_timeout::Int, - require_ssl_verification::Bool, io::IO, client=true) = - Connection(host, port, idle_timeout, + require_ssl_verification::Bool, io::T, client=true) where {T}= + Connection{T}(host, port, idle_timeout, require_ssl_verification, safe_getpeername(io)..., localport(io), io, client, PipeBuffer(), time(), false, false, IOBuffer(), nothing) @@ -325,22 +325,35 @@ function purge(c::Connection) @ensure bytesavailable(c) == 0 end +const TCP_POOL = Pool(Connection{Sockets.TCPSocket}) +const MbedTLS_SSL_POOL = Pool(Connection{MbedTLS.SSLContext}) +const OpenSSL_SSL_POOL = Pool(Connection{OpenSSL.SSLStream}) """ - closeall() + POOLS -Close all connections in`pool`. +A dict of global connection pools keeping track of active connections, split by their IO type. """ -function closeall() - ConnectionPools.reset!(POOL) - return +const POOLS = Dict{DataType,Pool}( + Sockets.TCPSocket => TCP_POOL, + MbedTLS.SSLContext => MbedTLS_SSL_POOL, + OpenSSL.SSLStream => OpenSSL_SSL_POOL, +) +getpool(::Type{Sockets.TCPSocket}) = TCP_POOL +getpool(::Type{MbedTLS.SSLContext}) = MbedTLS_SSL_POOL +getpool(::Type{OpenSSL.SSLStream}) = OpenSSL_SSL_POOL +# Fallback for custom connection io types +# to opt out from locking, define your own `Pool` and add a `getpool` method for your IO type +const POOLS_LOCK = Threads.ReentrantLock() +function getpool(::Type{T}) where {T} + Base.@lock POOLS_LOCK get!(() -> Pool(Connection{T}), POOLS, T)::Pool{Connection{T}} end """ - POOL + closeall() -Global connection pool keeping track of active connections. +Close all connections in `POOLS`. """ -const POOL = Pool(Connection) +closeall() = foreach(ConnectionPools.reset!, values(POOLS)) """ newconnection(type, host, port) -> Connection @@ -355,9 +368,9 @@ function newconnection(::Type{T}, forcenew::Bool=false, idle_timeout=typemax(Int), require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), - kw...)::Connection where {T <: IO} + kw...) where {T <: IO} return acquire( - POOL, + getpool(T), (T, host, port, require_ssl_verification, true); max_concurrent_connections=Int(connection_limit), forcenew=forcenew, @@ -370,8 +383,8 @@ function newconnection(::Type{T}, end end -releaseconnection(c::Connection, reuse) = - release(POOL, connectionkey(c), c; return_for_reuse=reuse) +releaseconnection(c::Connection{T}, reuse) where {T} = + release(getpool(T), connectionkey(c), c; return_for_reuse=reuse) function keepalive!(tcp) @debugv 2 "setting keepalive on tcp socket" @@ -524,7 +537,7 @@ function sslupgrade(::Type{IOType}, c::Connection, host::AbstractString; require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), readtimeout::Int=0, - kw...)::Connection where {IOType} + kw...)::Connection{IOType} where {IOType} # initiate the upgrade to SSL # if the upgrade fails, an error will be thrown and the original c will be closed # in ConnectionRequest @@ -538,9 +551,9 @@ function sslupgrade(::Type{IOType}, c::Connection, # success, now we turn it into a new Connection conn = Connection(host, "", 0, require_ssl_verification, tls) # release the "old" one, but don't allow reuse since we're hijacking the socket - release(POOL, connectionkey(c), c; return_for_reuse=false) + release(getpool(IOType), connectionkey(c), c; return_for_reuse=false) # and return the new one - return acquire(POOL, connectionkey(conn), conn) + return acquire(getpool(IOType), connectionkey(conn), conn) end function Base.show(io::IO, c::Connection)