Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "stop using raw libuv API" #156

Merged
merged 1 commit into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ version = "1.5.1"

[deps]
ArgTools = "0dad84c5-d112-42e6-8d28-ef12dabb789f"
FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee"
LibCURL = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21"
NetworkOptions = "ca575930-c2e3-43a9-ace4-1e988b2c1908"

Expand Down
27 changes: 2 additions & 25 deletions src/Curl/Curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,15 @@ export
remove_handle

using LibCURL
using LibCURL: curl_off_t, libcurl
using LibCURL: curl_off_t
# not exported: https://github.com/JuliaWeb/LibCURL.jl/issues/87

# constants that LibCURL should have but doesn't
const CURLE_PEER_FAILED_VERIFICATION = 60
const CURLSSLOPT_REVOKE_BEST_EFFORT = 1 << 3

# these are incorrectly defined on Windows by LibCURL:
if Sys.iswindows()
const curl_socket_t = Base.OS_HANDLE
const CURL_SOCKET_TIMEOUT = Base.INVALID_OS_HANDLE
else
const curl_socket_t = Cint
const CURL_SOCKET_TIMEOUT = -1
end

# definitions affected by incorrect curl_socket_t (copied verbatim):
function curl_multi_socket_action(multi_handle, s, ev_bitmask, running_handles)
ccall((:curl_multi_socket_action, libcurl), CURLMcode, (Ptr{CURLM}, curl_socket_t, Cint, Ptr{Cint}), multi_handle, s, ev_bitmask, running_handles)
end
function curl_multi_assign(multi_handle, sockfd, sockp)
ccall((:curl_multi_assign, libcurl), CURLMcode, (Ptr{CURLM}, curl_socket_t, Ptr{Cvoid}), multi_handle, sockfd, sockp)
end

# additional curl_multi_socket_action method
function curl_multi_socket_action(multi_handle, s, ev_bitmask)
curl_multi_socket_action(multi_handle, s, ev_bitmask, Ref{Cint}())
end

using FileWatching
using NetworkOptions
using Base: OS_HANDLE, preserve_handle, unpreserve_handle
using Base: preserve_handle, unpreserve_handle

include("utils.jl")

Expand Down
122 changes: 77 additions & 45 deletions src/Curl/Multi.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
mutable struct Multi
lock :: ReentrantLock
handle :: Ptr{Cvoid}
timer :: Timer
timer :: Ptr{Cvoid}
easies :: Vector{Easy}
grace :: UInt64

function Multi(grace::Integer = typemax(UInt64))
multi = new(ReentrantLock(), C_NULL, Timer(0), Easy[], grace)
timer = jl_malloc(Base._sizeof_uv_timer)
uv_timer_init(timer)
multi = new(ReentrantLock(), C_NULL, timer, Easy[], grace)
finalizer(multi) do multi
close(multi.timer)
uv_timer_stop(multi.timer)
uv_close(multi.timer, cglobal(:jl_free))
done!(multi)
end
end
Expand All @@ -29,11 +32,19 @@ end

# adding & removing easy handles

function cleanup_callback(uv_timer_p::Ptr{Cvoid})::Cvoid
## TODO: use a member access API
multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_timer_p))
multi = unsafe_pointer_to_objref(multi_p)::Multi
done!(multi)
return
end

function add_handle(multi::Multi, easy::Easy)
lock(multi.lock) do
if isempty(multi.easies)
preserve_handle(multi)
close(multi.timer) # stop grace timer
uv_timer_stop(multi.timer) # stop grace timer
end
push!(multi.easies, easy)
init!(multi)
Expand All @@ -46,14 +57,11 @@ function remove_handle(multi::Multi, easy::Easy)
@check curl_multi_remove_handle(multi.handle, easy.handle)
deleteat!(multi.easies, findlast(==(easy), multi.easies)::Int)
!isempty(multi.easies) && return
cleanup_cb = @cfunction(cleanup_callback, Cvoid, (Ptr{Cvoid},))
if multi.grace <= 0
done!(multi)
elseif 0 < multi.grace < typemax(multi.grace)
multi.timer = Timer(multi.grace/1000)
@async begin
wait(multi.timer)
isopen(multi.timer) && done!(multi)
end
uv_timer_start(multi.timer, cleanup_cb, multi.grace, 0)
end
unpreserve_handle(multi)
end
Expand All @@ -65,14 +73,15 @@ function set_defaults(multi::Multi)
# currently no defaults
end

# multi-socket handle state updates
# libuv callbacks

struct CURLMsg
msg :: CURLMSG
easy :: Ptr{Cvoid}
code :: CURLcode
end

# should already be locked
function check_multi_info(multi::Multi)
while true
p = curl_multi_info_read(multi.handle, Ref{Cint}())
Expand All @@ -95,15 +104,37 @@ function check_multi_info(multi::Multi)
end
end

# curl callbacks
function event_callback(
uv_poll_p :: Ptr{Cvoid},
status :: Cint,
events :: Cint,
)::Cvoid
## TODO: use a member access API
multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_poll_p))
multi = unsafe_pointer_to_objref(multi_p)::Multi
sock_p = uv_poll_p + Base._sizeof_uv_poll
sock = unsafe_load(convert(Ptr{curl_socket_t}, sock_p))
flags = 0
events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN)
events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT)
lock(multi.lock) do
@check curl_multi_socket_action(multi.handle, sock, flags)
check_multi_info(multi)
end
end

function do_multi(multi::Multi)
function timeout_callback(uv_timer_p::Ptr{Cvoid})::Cvoid
## TODO: use a member access API
multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_timer_p))
multi = unsafe_pointer_to_objref(multi_p)::Multi
lock(multi.lock) do
@check curl_multi_socket_action(multi.handle, CURL_SOCKET_TIMEOUT, 0)
check_multi_info(multi)
end
end

# curl callbacks

function timer_callback(
multi_h :: Ptr{Cvoid},
timeout_ms :: Clong,
Expand All @@ -112,13 +143,15 @@ function timer_callback(
multi = unsafe_pointer_to_objref(multi_p)::Multi
@assert multi_h == multi.handle
if timeout_ms == 0
do_multi(multi)
elseif timeout_ms >= 0
multi.timer = Timer(timeout_ms/1000) do timer
do_multi(multi)
lock(multi.lock) do
@check curl_multi_socket_action(multi.handle, CURL_SOCKET_TIMEOUT, 0)
check_multi_info(multi)
end
elseif timeout_ms >= 0
timeout_cb = @cfunction(timeout_callback, Cvoid, (Ptr{Cvoid},))
uv_timer_start(multi.timer, timeout_cb, max(1, timeout_ms), 0)
elseif timeout_ms == -1
close(multi.timer)
uv_timer_stop(multi.timer)
else
@async @error("timer_callback: invalid timeout value", timeout_ms)
return -1
Expand All @@ -131,47 +164,46 @@ function socket_callback(
sock :: curl_socket_t,
action :: Cint,
multi_p :: Ptr{Cvoid},
watcher_p :: Ptr{Cvoid},
uv_poll_p :: Ptr{Cvoid},
)::Cint
if action ∉ (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE)
@async @error("socket_callback: unexpected action", action)
return -1
end
multi = unsafe_pointer_to_objref(multi_p)::Multi
if watcher_p != C_NULL
old_watcher = unsafe_pointer_to_objref(watcher_p)::FDWatcher
@check curl_multi_assign(multi.handle, sock, C_NULL)
unpreserve_handle(old_watcher)
end
if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT)
readable = action in (CURL_POLL_IN, CURL_POLL_INOUT)
writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT)
watcher = FDWatcher(OS_HANDLE(sock), readable, writable)
preserve_handle(watcher)
watcher_p = pointer_from_objref(watcher)
@check curl_multi_assign(multi.handle, sock, watcher_p)
task = @async while true
events = try wait(watcher)
catch err
err isa EOFError && break
rethrow()
if uv_poll_p == C_NULL
uv_poll_p = uv_poll_alloc()
uv_poll_init(uv_poll_p, sock)
## TODO: use a member access API
unsafe_store!(convert(Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p)
sock_p = uv_poll_p + Base._sizeof_uv_poll
unsafe_store!(convert(Ptr{curl_socket_t}, sock_p), sock)
lock(multi.lock) do
@check curl_multi_assign(multi.handle, sock, uv_poll_p)
end
flags = CURL_CSELECT_IN * isreadable(events) +
CURL_CSELECT_OUT * iswritable(events) +
CURL_CSELECT_ERR * events.disconnect
end
events = 0
action != CURL_POLL_IN && (events |= UV_WRITABLE)
action != CURL_POLL_OUT && (events |= UV_READABLE)
event_cb = @cfunction(event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint))
uv_poll_start(uv_poll_p, events, event_cb)
elseif action == CURL_POLL_REMOVE
if uv_poll_p != C_NULL
uv_poll_stop(uv_poll_p)
uv_close(uv_poll_p, cglobal(:jl_free))
lock(multi.lock) do
@check curl_multi_socket_action(multi.handle, sock, flags)
check_multi_info(multi)
@check curl_multi_assign(multi.handle, sock, C_NULL)
end
end
@isdefined(errormonitor) && errormonitor(task)
else
@async @error("socket_callback: unexpected action", action)
return -1
end
@isdefined(old_watcher) && close(old_watcher)
return 0
end

function add_callbacks(multi::Multi)
# stash multi handle pointer in timer
multi_p = pointer_from_objref(multi)
## TODO: use a member access API
unsafe_store!(convert(Ptr{Ptr{Cvoid}}, multi.timer), multi_p)

# set timer callback
timer_cb = @cfunction(timer_callback, Cint, (Ptr{Cvoid}, Clong, Ptr{Cvoid}))
Expand Down
72 changes: 67 additions & 5 deletions src/Curl/utils.jl
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# basic C stuff

if !@isdefined(contains)
contains(haystack, needle) = occursin(needle, haystack)
export contains
end

# basic C stuff

puts(s::Union{String,SubString{String}}) = ccall(:puts, Cint, (Ptr{Cchar},), s)

jl_malloc(n::Integer) = ccall(:jl_malloc, Ptr{Cvoid}, (Csize_t,), n)

# check if a call failed
# check if a function or C call failed

macro check(ex::Expr)
function check(ex::Expr, lock::Bool)
ex.head == :call ||
error("@check: not a call: $ex")
arg1 = ex.args[1] :: Symbol
Expand All @@ -24,13 +24,75 @@ macro check(ex::Expr)
f = arg1
end
prefix = "$f: "
ex = esc(ex)
if lock
ex = quote
Base.iolock_begin()
value = $ex
Base.iolock_end()
value
end
end
quote
r = $(esc(ex))
r = $ex
iszero(r) || @async @error($prefix * string(r))
r
end
end

macro check(ex::Expr) check(ex, false) end
macro check_iolock(ex::Expr) check(ex, true) end

# some libuv wrappers

const UV_READABLE = 1
const UV_WRITABLE = 2

function uv_poll_alloc()
# allocate memory for: uv_poll_t struct + extra for curl_socket_t
jl_malloc(Base._sizeof_uv_poll + sizeof(curl_socket_t))
end

function uv_poll_init(p::Ptr{Cvoid}, sock::curl_socket_t)
@check_iolock ccall(:uv_poll_init, Cint,
(Ptr{Cvoid}, Ptr{Cvoid}, curl_socket_t), Base.eventloop(), p, sock)
end

function uv_poll_start(p::Ptr{Cvoid}, events::Integer, cb::Ptr{Cvoid})
@check_iolock ccall(:uv_poll_start, Cint,
(Ptr{Cvoid}, Cint, Ptr{Cvoid}), p, events, cb)
end

function uv_poll_stop(p::Ptr{Cvoid})
@check_iolock ccall(:uv_poll_stop, Cint, (Ptr{Cvoid},), p)
end

function uv_close(p::Ptr{Cvoid}, cb::Ptr{Cvoid})
Base.iolock_begin()
ccall(:uv_close, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), p, cb)
Base.iolock_end()
end

function uv_timer_init(p::Ptr{Cvoid})
@check_iolock ccall(:uv_timer_init, Cint,
(Ptr{Cvoid}, Ptr{Cvoid}), Base.eventloop(), p)
end

function uv_timer_start(p::Ptr{Cvoid}, cb::Ptr{Cvoid}, t::Integer, r::Integer)
@check_iolock ccall(:uv_timer_start, Cint,
(Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), p, cb, t, r)
end

function uv_timer_stop(p::Ptr{Cvoid})
@check_iolock ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), p)
end

# additional libcurl methods

function curl_multi_socket_action(multi_handle, s, ev_bitmask)
LibCURL.curl_multi_socket_action(multi_handle, s, ev_bitmask, Ref{Cint}())
end

# curl string list structure

struct curl_slist_t
Expand Down