diff --git a/Project.toml b/Project.toml index 0b19d43..1f147eb 100644 --- a/Project.toml +++ b/Project.toml @@ -5,6 +5,7 @@ version = "1.5.2" [deps] ArgTools = "0dad84c5-d112-42e6-8d28-ef12dabb789f" +FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" LibCURL = "b27032c2-a3e7-50c8-80cd-2d36dbcbfd21" NetworkOptions = "ca575930-c2e3-43a9-ace4-1e988b2c1908" diff --git a/src/Curl/Curl.jl b/src/Curl/Curl.jl index e4f5171..fcfea37 100644 --- a/src/Curl/Curl.jl +++ b/src/Curl/Curl.jl @@ -26,15 +26,38 @@ export remove_handle using LibCURL -using LibCURL: curl_off_t +using LibCURL: curl_off_t, libcurl # not exported: https://github.com/JuliaWeb/LibCURL.jl/issues/87 # constants that LibCURL should have but doesn't const CURLE_PEER_FAILED_VERIFICATION = 60 const CURLSSLOPT_REVOKE_BEST_EFFORT = 1 << 3 +# these are incorrectly defined on Windows by LibCURL: +if Sys.iswindows() + const curl_socket_t = Base.OS_HANDLE + const CURL_SOCKET_TIMEOUT = Base.INVALID_OS_HANDLE +else + const curl_socket_t = Cint + const CURL_SOCKET_TIMEOUT = -1 +end + +# definitions affected by incorrect curl_socket_t (copied verbatim): +function curl_multi_socket_action(multi_handle, s, ev_bitmask, running_handles) + ccall((:curl_multi_socket_action, libcurl), CURLMcode, (Ptr{CURLM}, curl_socket_t, Cint, Ptr{Cint}), multi_handle, s, ev_bitmask, running_handles) +end +function curl_multi_assign(multi_handle, sockfd, sockp) + ccall((:curl_multi_assign, libcurl), CURLMcode, (Ptr{CURLM}, curl_socket_t, Ptr{Cvoid}), multi_handle, sockfd, sockp) +end + +# additional curl_multi_socket_action method +function curl_multi_socket_action(multi_handle, s, ev_bitmask) + curl_multi_socket_action(multi_handle, s, ev_bitmask, Ref{Cint}()) +end + +using FileWatching using NetworkOptions -using Base: preserve_handle, unpreserve_handle +using Base: OS_HANDLE, preserve_handle, unpreserve_handle include("utils.jl") diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index eb16d00..c56b94a 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -1,17 +1,14 @@ mutable struct Multi lock :: ReentrantLock handle :: Ptr{Cvoid} - timer :: Ptr{Cvoid} + timer :: Timer easies :: Vector{Easy} grace :: UInt64 function Multi(grace::Integer = typemax(UInt64)) - timer = jl_malloc(Base._sizeof_uv_timer) - uv_timer_init(timer) - multi = new(ReentrantLock(), C_NULL, timer, Easy[], grace) + multi = new(ReentrantLock(), C_NULL, Timer(0), Easy[], grace) finalizer(multi) do multi - uv_timer_stop(multi.timer) - uv_close(multi.timer, cglobal(:jl_free)) + close(multi.timer) done!(multi) end end @@ -32,19 +29,11 @@ end # adding & removing easy handles -function cleanup_callback(uv_timer_p::Ptr{Cvoid})::Cvoid - ## TODO: use a member access API - multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_timer_p)) - multi = unsafe_pointer_to_objref(multi_p)::Multi - done!(multi) - return -end - function add_handle(multi::Multi, easy::Easy) lock(multi.lock) do if isempty(multi.easies) preserve_handle(multi) - uv_timer_stop(multi.timer) # stop grace timer + close(multi.timer) # stop grace timer end push!(multi.easies, easy) init!(multi) @@ -57,11 +46,14 @@ function remove_handle(multi::Multi, easy::Easy) @check curl_multi_remove_handle(multi.handle, easy.handle) deleteat!(multi.easies, findlast(==(easy), multi.easies)::Int) !isempty(multi.easies) && return - cleanup_cb = @cfunction(cleanup_callback, Cvoid, (Ptr{Cvoid},)) if multi.grace <= 0 done!(multi) elseif 0 < multi.grace < typemax(multi.grace) - uv_timer_start(multi.timer, cleanup_cb, multi.grace, 0) + multi.timer = Timer(multi.grace/1000) + @async begin + wait(multi.timer) + isopen(multi.timer) && done!(multi) + end end unpreserve_handle(multi) end @@ -73,7 +65,7 @@ function set_defaults(multi::Multi) # currently no defaults end -# libuv callbacks +# multi-socket handle state updates struct CURLMsg msg :: CURLMSG @@ -81,7 +73,6 @@ struct CURLMsg code :: CURLcode end -# should already be locked function check_multi_info(multi::Multi) while true p = curl_multi_info_read(multi.handle, Ref{Cint}()) @@ -104,37 +95,15 @@ function check_multi_info(multi::Multi) end end -function event_callback( - uv_poll_p :: Ptr{Cvoid}, - status :: Cint, - events :: Cint, -)::Cvoid - ## TODO: use a member access API - multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_poll_p)) - multi = unsafe_pointer_to_objref(multi_p)::Multi - sock_p = uv_poll_p + Base._sizeof_uv_poll - sock = unsafe_load(convert(Ptr{curl_socket_t}, sock_p)) - flags = 0 - events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN) - events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT) - lock(multi.lock) do - @check curl_multi_socket_action(multi.handle, sock, flags) - check_multi_info(multi) - end -end +# curl callbacks -function timeout_callback(uv_timer_p::Ptr{Cvoid})::Cvoid - ## TODO: use a member access API - multi_p = unsafe_load(convert(Ptr{Ptr{Cvoid}}, uv_timer_p)) - multi = unsafe_pointer_to_objref(multi_p)::Multi +function do_multi(multi::Multi) lock(multi.lock) do @check curl_multi_socket_action(multi.handle, CURL_SOCKET_TIMEOUT, 0) check_multi_info(multi) end end -# curl callbacks - function timer_callback( multi_h :: Ptr{Cvoid}, timeout_ms :: Clong, @@ -143,15 +112,13 @@ function timer_callback( multi = unsafe_pointer_to_objref(multi_p)::Multi @assert multi_h == multi.handle if timeout_ms == 0 - lock(multi.lock) do - @check curl_multi_socket_action(multi.handle, CURL_SOCKET_TIMEOUT, 0) - check_multi_info(multi) - end + do_multi(multi) elseif timeout_ms >= 0 - timeout_cb = @cfunction(timeout_callback, Cvoid, (Ptr{Cvoid},)) - uv_timer_start(multi.timer, timeout_cb, max(1, timeout_ms), 0) + multi.timer = Timer(timeout_ms/1000) do timer + do_multi(multi) + end elseif timeout_ms == -1 - uv_timer_stop(multi.timer) + close(multi.timer) else @async @error("timer_callback: invalid timeout value", timeout_ms) return -1 @@ -164,46 +131,47 @@ function socket_callback( sock :: curl_socket_t, action :: Cint, multi_p :: Ptr{Cvoid}, - uv_poll_p :: Ptr{Cvoid}, + watcher_p :: Ptr{Cvoid}, )::Cint + if action ∉ (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE) + @async @error("socket_callback: unexpected action", action) + return -1 + end multi = unsafe_pointer_to_objref(multi_p)::Multi + if watcher_p != C_NULL + old_watcher = unsafe_pointer_to_objref(watcher_p)::FDWatcher + @check curl_multi_assign(multi.handle, sock, C_NULL) + unpreserve_handle(old_watcher) + end if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT) - if uv_poll_p == C_NULL - uv_poll_p = uv_poll_alloc() - uv_poll_init(uv_poll_p, sock) - ## TODO: use a member access API - unsafe_store!(convert(Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p) - sock_p = uv_poll_p + Base._sizeof_uv_poll - unsafe_store!(convert(Ptr{curl_socket_t}, sock_p), sock) - lock(multi.lock) do - @check curl_multi_assign(multi.handle, sock, uv_poll_p) + readable = action in (CURL_POLL_IN, CURL_POLL_INOUT) + writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT) + watcher = FDWatcher(OS_HANDLE(sock), readable, writable) + preserve_handle(watcher) + watcher_p = pointer_from_objref(watcher) + @check curl_multi_assign(multi.handle, sock, watcher_p) + task = @async while true + events = try wait(watcher) + catch err + err isa EOFError && break + rethrow() end - end - events = 0 - action != CURL_POLL_IN && (events |= UV_WRITABLE) - action != CURL_POLL_OUT && (events |= UV_READABLE) - event_cb = @cfunction(event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint)) - uv_poll_start(uv_poll_p, events, event_cb) - elseif action == CURL_POLL_REMOVE - if uv_poll_p != C_NULL - uv_poll_stop(uv_poll_p) - uv_close(uv_poll_p, cglobal(:jl_free)) + flags = CURL_CSELECT_IN * isreadable(events) + + CURL_CSELECT_OUT * iswritable(events) + + CURL_CSELECT_ERR * events.disconnect lock(multi.lock) do - @check curl_multi_assign(multi.handle, sock, C_NULL) + @check curl_multi_socket_action(multi.handle, sock, flags) + check_multi_info(multi) end end - else - @async @error("socket_callback: unexpected action", action) - return -1 + @isdefined(errormonitor) && errormonitor(task) end + @isdefined(old_watcher) && close(old_watcher) return 0 end function add_callbacks(multi::Multi) - # stash multi handle pointer in timer multi_p = pointer_from_objref(multi) - ## TODO: use a member access API - unsafe_store!(convert(Ptr{Ptr{Cvoid}}, multi.timer), multi_p) # set timer callback timer_cb = @cfunction(timer_callback, Cint, (Ptr{Cvoid}, Clong, Ptr{Cvoid})) diff --git a/src/Curl/utils.jl b/src/Curl/utils.jl index 18b7456..e7aa08a 100644 --- a/src/Curl/utils.jl +++ b/src/Curl/utils.jl @@ -1,17 +1,17 @@ -# basic C stuff - if !@isdefined(contains) contains(haystack, needle) = occursin(needle, haystack) export contains end +# basic C stuff + puts(s::Union{String,SubString{String}}) = ccall(:puts, Cint, (Ptr{Cchar},), s) jl_malloc(n::Integer) = ccall(:jl_malloc, Ptr{Cvoid}, (Csize_t,), n) -# check if a function or C call failed +# check if a call failed -function check(ex::Expr, lock::Bool) +macro check(ex::Expr) ex.head == :call || error("@check: not a call: $ex") arg1 = ex.args[1] :: Symbol @@ -24,75 +24,13 @@ function check(ex::Expr, lock::Bool) f = arg1 end prefix = "$f: " - ex = esc(ex) - if lock - ex = quote - Base.iolock_begin() - value = $ex - Base.iolock_end() - value - end - end quote - r = $ex + r = $(esc(ex)) iszero(r) || @async @error($prefix * string(r)) r end end -macro check(ex::Expr) check(ex, false) end -macro check_iolock(ex::Expr) check(ex, true) end - -# some libuv wrappers - -const UV_READABLE = 1 -const UV_WRITABLE = 2 - -function uv_poll_alloc() - # allocate memory for: uv_poll_t struct + extra for curl_socket_t - jl_malloc(Base._sizeof_uv_poll + sizeof(curl_socket_t)) -end - -function uv_poll_init(p::Ptr{Cvoid}, sock::curl_socket_t) - @check_iolock ccall(:uv_poll_init, Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, curl_socket_t), Base.eventloop(), p, sock) -end - -function uv_poll_start(p::Ptr{Cvoid}, events::Integer, cb::Ptr{Cvoid}) - @check_iolock ccall(:uv_poll_start, Cint, - (Ptr{Cvoid}, Cint, Ptr{Cvoid}), p, events, cb) -end - -function uv_poll_stop(p::Ptr{Cvoid}) - @check_iolock ccall(:uv_poll_stop, Cint, (Ptr{Cvoid},), p) -end - -function uv_close(p::Ptr{Cvoid}, cb::Ptr{Cvoid}) - Base.iolock_begin() - ccall(:uv_close, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), p, cb) - Base.iolock_end() -end - -function uv_timer_init(p::Ptr{Cvoid}) - @check_iolock ccall(:uv_timer_init, Cint, - (Ptr{Cvoid}, Ptr{Cvoid}), Base.eventloop(), p) -end - -function uv_timer_start(p::Ptr{Cvoid}, cb::Ptr{Cvoid}, t::Integer, r::Integer) - @check_iolock ccall(:uv_timer_start, Cint, - (Ptr{Cvoid}, Ptr{Cvoid}, UInt64, UInt64), p, cb, t, r) -end - -function uv_timer_stop(p::Ptr{Cvoid}) - @check_iolock ccall(:uv_timer_stop, Cint, (Ptr{Cvoid},), p) -end - -# additional libcurl methods - -function curl_multi_socket_action(multi_handle, s, ev_bitmask) - LibCURL.curl_multi_socket_action(multi_handle, s, ev_bitmask, Ref{Cint}()) -end - # curl string list structure struct curl_slist_t