Skip to content

Commit

Permalink
Revert "Revert "stop using raw libuv API" (JuliaLang#156)"
Browse files Browse the repository at this point in the history
This reverts commit c91876a.
  • Loading branch information
StefanKarpinski authored and ericphanson committed Jan 26, 2022
1 parent 890a039 commit 8a19106
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 146 deletions.
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "1.5.2"

[deps]
ArgTools = "0dad84c5-d112-42e6-8d28-ef12dabb789f"
FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee"
LibCURL = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21"
NetworkOptions = "ca575930-c2e3-43a9-ace4-1e988b2c1908"

Expand Down
27 changes: 25 additions & 2 deletions src/Curl/Curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,38 @@ export
remove_handle

using LibCURL
using LibCURL: curl_off_t
using LibCURL: curl_off_t, libcurl
# not exported: https://github.com/JuliaWeb/LibCURL.jl/issues/87

# constants that LibCURL should have but doesn't
const CURLE_PEER_FAILED_VERIFICATION = 60
const CURLSSLOPT_REVOKE_BEST_EFFORT = 1 << 3

# these are incorrectly defined on Windows by LibCURL:
if Sys.iswindows()
const curl_socket_t = Base.OS_HANDLE
const CURL_SOCKET_TIMEOUT = Base.INVALID_OS_HANDLE
else
const curl_socket_t = Cint
const CURL_SOCKET_TIMEOUT = -1
end

# definitions affected by incorrect curl_socket_t (copied verbatim):
function curl_multi_socket_action(multi_handle, s, ev_bitmask, running_handles)
ccall((:curl_multi_socket_action, libcurl), CURLMcode, (Ptr{CURLM}, curl_socket_t, Cint, Ptr{Cint}), multi_handle, s, ev_bitmask, running_handles)
end
function curl_multi_assign(multi_handle, sockfd, sockp)
ccall((:curl_multi_assign, libcurl), CURLMcode, (Ptr{CURLM}, curl_socket_t, Ptr{Cvoid}), multi_handle, sockfd, sockp)
end

# additional curl_multi_socket_action method
function curl_multi_socket_action(multi_handle, s, ev_bitmask)
curl_multi_socket_action(multi_handle, s, ev_bitmask, Ref{Cint}())
end

using FileWatching
using NetworkOptions
using Base: preserve_handle, unpreserve_handle
using Base: OS_HANDLE, preserve_handle, unpreserve_handle

include("utils.jl")

Expand Down
122 changes: 45 additions & 77 deletions src/Curl/Multi.jl
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
mutable struct Multi
lock :: ReentrantLock
handle :: Ptr{Cvoid}
timer :: Ptr{Cvoid}
timer :: Timer
easies :: Vector{Easy}
grace :: UInt64

function Multi(grace::Integer = typemax(UInt64))
timer = jl_malloc(Base._sizeof_uv_timer)
uv_timer_init(timer)
multi = new(ReentrantLock(), C_NULL, timer, Easy[], grace)
multi = new(ReentrantLock(), C_NULL, Timer(0), Easy[], grace)
finalizer(multi) do multi
uv_timer_stop(multi.timer)
uv_close(multi.timer, cglobal(:jl_free))
close(multi.timer)
done!(multi)
end
end
Expand All @@ -32,19 +29,11 @@ end

# adding & removing easy handles

function cleanup_callback(uv_timer_p::Ptr{Cvoid})::Cvoid
## TODO: use a member access API
multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_timer_p))
multi = unsafe_pointer_to_objref(multi_p)::Multi
done!(multi)
return
end

function add_handle(multi::Multi, easy::Easy)
lock(multi.lock) do
if isempty(multi.easies)
preserve_handle(multi)
uv_timer_stop(multi.timer) # stop grace timer
close(multi.timer) # stop grace timer
end
push!(multi.easies, easy)
init!(multi)
Expand All @@ -57,11 +46,14 @@ function remove_handle(multi::Multi, easy::Easy)
@check curl_multi_remove_handle(multi.handle, easy.handle)
deleteat!(multi.easies, findlast(==(easy), multi.easies)::Int)
!isempty(multi.easies) && return
cleanup_cb = @cfunction(cleanup_callback, Cvoid, (Ptr{Cvoid},))
if multi.grace <= 0
done!(multi)
elseif 0 < multi.grace < typemax(multi.grace)
uv_timer_start(multi.timer, cleanup_cb, multi.grace, 0)
multi.timer = Timer(multi.grace/1000)
@async begin
wait(multi.timer)
isopen(multi.timer) && done!(multi)
end
end
unpreserve_handle(multi)
end
Expand All @@ -73,15 +65,14 @@ function set_defaults(multi::Multi)
# currently no defaults
end

# libuv callbacks
# multi-socket handle state updates

struct CURLMsg
msg :: CURLMSG
easy :: Ptr{Cvoid}
code :: CURLcode
end

# should already be locked
function check_multi_info(multi::Multi)
while true
p = curl_multi_info_read(multi.handle, Ref{Cint}())
Expand All @@ -104,37 +95,15 @@ function check_multi_info(multi::Multi)
end
end

function event_callback(
uv_poll_p :: Ptr{Cvoid},
status :: Cint,
events :: Cint,
)::Cvoid
## TODO: use a member access API
multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_poll_p))
multi = unsafe_pointer_to_objref(multi_p)::Multi
sock_p = uv_poll_p + Base._sizeof_uv_poll
sock = unsafe_load(convert(Ptr{curl_socket_t}, sock_p))
flags = 0
events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN)
events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT)
lock(multi.lock) do
@check curl_multi_socket_action(multi.handle, sock, flags)
check_multi_info(multi)
end
end
# curl callbacks

function timeout_callback(uv_timer_p::Ptr{Cvoid})::Cvoid
## TODO: use a member access API
multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_timer_p))
multi = unsafe_pointer_to_objref(multi_p)::Multi
function do_multi(multi::Multi)
lock(multi.lock) do
@check curl_multi_socket_action(multi.handle, CURL_SOCKET_TIMEOUT, 0)
check_multi_info(multi)
end
end

# curl callbacks

function timer_callback(
multi_h :: Ptr{Cvoid},
timeout_ms :: Clong,
Expand All @@ -143,15 +112,13 @@ function timer_callback(
multi = unsafe_pointer_to_objref(multi_p)::Multi
@assert multi_h == multi.handle
if timeout_ms == 0
lock(multi.lock) do
@check curl_multi_socket_action(multi.handle, CURL_SOCKET_TIMEOUT, 0)
check_multi_info(multi)
end
do_multi(multi)
elseif timeout_ms >= 0
timeout_cb = @cfunction(timeout_callback, Cvoid, (Ptr{Cvoid},))
uv_timer_start(multi.timer, timeout_cb, max(1, timeout_ms), 0)
multi.timer = Timer(timeout_ms/1000) do timer
do_multi(multi)
end
elseif timeout_ms == -1
uv_timer_stop(multi.timer)
close(multi.timer)
else
@async @error("timer_callback: invalid timeout value", timeout_ms)
return -1
Expand All @@ -164,46 +131,47 @@ function socket_callback(
sock :: curl_socket_t,
action :: Cint,
multi_p :: Ptr{Cvoid},
uv_poll_p :: Ptr{Cvoid},
watcher_p :: Ptr{Cvoid},
)::Cint
if action (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE)
@async @error("socket_callback: unexpected action", action)
return -1
end
multi = unsafe_pointer_to_objref(multi_p)::Multi
if watcher_p != C_NULL
old_watcher = unsafe_pointer_to_objref(watcher_p)::FDWatcher
@check curl_multi_assign(multi.handle, sock, C_NULL)
unpreserve_handle(old_watcher)
end
if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT)
if uv_poll_p == C_NULL
uv_poll_p = uv_poll_alloc()
uv_poll_init(uv_poll_p, sock)
## TODO: use a member access API
unsafe_store!(convert(Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p)
sock_p = uv_poll_p + Base._sizeof_uv_poll
unsafe_store!(convert(Ptr{curl_socket_t}, sock_p), sock)
lock(multi.lock) do
@check curl_multi_assign(multi.handle, sock, uv_poll_p)
readable = action in (CURL_POLL_IN, CURL_POLL_INOUT)
writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT)
watcher = FDWatcher(OS_HANDLE(sock), readable, writable)
preserve_handle(watcher)
watcher_p = pointer_from_objref(watcher)
@check curl_multi_assign(multi.handle, sock, watcher_p)
task = @async while true
events = try wait(watcher)
catch err
err isa EOFError && break
rethrow()
end
end
events = 0
action != CURL_POLL_IN && (events |= UV_WRITABLE)
action != CURL_POLL_OUT && (events |= UV_READABLE)
event_cb = @cfunction(event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint))
uv_poll_start(uv_poll_p, events, event_cb)
elseif action == CURL_POLL_REMOVE
if uv_poll_p != C_NULL
uv_poll_stop(uv_poll_p)
uv_close(uv_poll_p, cglobal(:jl_free))
flags = CURL_CSELECT_IN * isreadable(events) +
CURL_CSELECT_OUT * iswritable(events) +
CURL_CSELECT_ERR * events.disconnect
lock(multi.lock) do
@check curl_multi_assign(multi.handle, sock, C_NULL)
@check curl_multi_socket_action(multi.handle, sock, flags)
check_multi_info(multi)
end
end
else
@async @error("socket_callback: unexpected action", action)
return -1
@isdefined(errormonitor) && errormonitor(task)
end
@isdefined(old_watcher) && close(old_watcher)
return 0
end

function add_callbacks(multi::Multi)
# stash multi handle pointer in timer
multi_p = pointer_from_objref(multi)
## TODO: use a member access API
unsafe_store!(convert(Ptr{Ptr{Cvoid}}, multi.timer), multi_p)

# set timer callback
timer_cb = @cfunction(timer_callback, Cint, (Ptr{Cvoid}, Clong, Ptr{Cvoid}))
Expand Down
72 changes: 5 additions & 67 deletions src/Curl/utils.jl
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# basic C stuff

if !@isdefined(contains)
contains(haystack, needle) = occursin(needle, haystack)
export contains
end

# basic C stuff

puts(s::Union{String,SubString{String}}) = ccall(:puts, Cint, (Ptr{Cchar},), s)

jl_malloc(n::Integer) = ccall(:jl_malloc, Ptr{Cvoid}, (Csize_t,), n)

# check if a function or C call failed
# check if a call failed

function check(ex::Expr, lock::Bool)
macro check(ex::Expr)
ex.head == :call ||
error("@check: not a call: $ex")
arg1 = ex.args[1] :: Symbol
Expand All @@ -24,75 +24,13 @@ function check(ex::Expr, lock::Bool)
f = arg1
end
prefix = "$f: "
ex = esc(ex)
if lock
ex = quote
Base.iolock_begin()
value = $ex
Base.iolock_end()
value
end
end
quote
r = $ex
r = $(esc(ex))
iszero(r) || @async @error($prefix * string(r))
r
end
end

macro check(ex::Expr) check(ex, false) end
macro check_iolock(ex::Expr) check(ex, true) end

# some libuv wrappers

const UV_READABLE = 1
const UV_WRITABLE = 2

function uv_poll_alloc()
# allocate memory for: uv_poll_t struct + extra for curl_socket_t
jl_malloc(Base._sizeof_uv_poll + sizeof(curl_socket_t))
end

function uv_poll_init(p::Ptr{Cvoid}, sock::curl_socket_t)
@check_iolock ccall(:uv_poll_init, Cint,
(Ptr{Cvoid}, Ptr{Cvoid}, curl_socket_t), Base.eventloop(), p, sock)
end

function uv_poll_start(p::Ptr{Cvoid}, events::Integer, cb::Ptr{Cvoid})
@check_iolock ccall(:uv_poll_start, Cint,
(Ptr{Cvoid}, Cint, Ptr{Cvoid}), p, events, cb)
end

function uv_poll_stop(p::Ptr{Cvoid})
@check_iolock ccall(:uv_poll_stop, Cint, (Ptr{Cvoid},), p)
end

function uv_close(p::Ptr{Cvoid}, cb::Ptr{Cvoid})
Base.iolock_begin()
ccall(:uv_close, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), p, cb)
Base.iolock_end()
end

function uv_timer_init(p::Ptr{Cvoid})
@check_iolock ccall(:uv_timer_init, Cint,
(Ptr{Cvoid}, Ptr{Cvoid}), Base.eventloop(), p)
end

function uv_timer_start(p::Ptr{Cvoid}, cb::Ptr{Cvoid}, t::Integer, r::Integer)
@check_iolock ccall(:uv_timer_start, Cint,
(Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), p, cb, t, r)
end

function uv_timer_stop(p::Ptr{Cvoid})
@check_iolock ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), p)
end

# additional libcurl methods

function curl_multi_socket_action(multi_handle, s, ev_bitmask)
LibCURL.curl_multi_socket_action(multi_handle, s, ev_bitmask, Ref{Cint}())
end

# curl string list structure

struct curl_slist_t
Expand Down

0 comments on commit 8a19106

Please sign in to comment.