Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DTables.groupby causes issues when multiple processes available #450

Closed
StevenWhitaker opened this issue Nov 1, 2023 · 5 comments
Closed

Comments

@StevenWhitaker
Copy link

StevenWhitaker commented Nov 1, 2023

I have some code that involves several operations on DTables. I ran my code with nprocs() equal to 1, and everything worked fine. I then added some processes so that nprocs() equaled 5 and ran my code again on worker 1 (so I didn't explicitly use any of the added workers). In this case, my code would hang when calling reduce on a GDTable (i.e., after calling groupby).

I tried to create a MWE, but I haven't yet been able to find one that hangs. Fortunately, I did find a MWE that gives a different error (ConcurrencyViolationError("lock must be held")); hopefully this error and the hanging I'm experiencing are different manifestations of the same issue.

EDIT: The next comment contains a simpler MWE that produces the same error (slightly different stacktrace, though).

Contents of mwe.jl:

using Distributed
nworkers = 4
addprocs(nworkers - nprocs() + 1)

@everywhere using DTables, DataFrames, CSV

function f()
    dt = DTable(x -> CSV.File(x), ["file.csv"]; tabletype = DataFrame)
    df = fetch(dt)
    gdt = groupby(dt, Symbol.(names(df)[[6, 12, 48]]))
    sums = fetch(reduce(+, gdt; cols = Symbol.(names(df)[[93, 94]])))
end

f()

Error:

julia> include("mwe.jl")
ERROR: LoadError: ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
On worker 2:
ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
On worker 2:
ConcurrencyViolationError("lock must be held")
Stacktrace:
  [1] assert_havelock
    @ ./condition.jl:25 [inlined]
  [2] assert_havelock
    @ ./condition.jl:48 [inlined]
  [3] assert_havelock
    @ ./condition.jl:72 [inlined]
  [4] _wait2
    @ ./condition.jl:83
  [5] #wait#621
    @ ./condition.jl:127
  [6] wait
    @ ./condition.jl:125 [inlined]
  [7] wait_for_conn
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:195
  [8] check_worker_state
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:170
  [9] send_msg_
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:172
 [10] send_msg
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:122 [inlined]
 [11] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:460
 [12] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [13] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [14] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [15] #171
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:424 [inlined]
 [16] forwardkeyerror
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:409
 [17] poolget
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:423
 [18] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:98
 [19] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:96 [inlined]
 [20] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:102
 [21] #fetch#70
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:21
 [22] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
 [23] #fetch#75
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
 [24] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
 [25] _broadcast_getindex_evalf
    @ ./broadcast.jl:683 [inlined]
 [26] _broadcast_getindex
    @ ./broadcast.jl:656 [inlined]
 [27] _getindex
    @ ./broadcast.jl:679 [inlined]
 [28] _broadcast_getindex
    @ ./broadcast.jl:655 [inlined]
 [29] getindex
    @ ./broadcast.jl:610 [inlined]
 [30] copyto_nonleaf!
    @ ./broadcast.jl:1068
 [31] copy
    @ ./broadcast.jl:920 [inlined]
 [32] materialize
    @ ./broadcast.jl:873 [inlined]
 [33] #79
    @ ~/.julia/packages/DTables/BjdY2/src/operations/operations.jl:187
 [34] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [35] invokelatest
    @ ./essentials.jl:816 [inlined]
 [36] #43
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:157
  [5] #158
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] with_options
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:16
 [13] do_task
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1549
 [14] macro expansion
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1243 [inlined]
 [15] #132
    @ ./task.jl:134
  This Thunk:  Thunk(id=9, #79(5 inputs...))
Stacktrace:
  [1] #fetch#70
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:16
  [2] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
  [3] #fetch#75
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
  [4] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
  [5] #86
    @ ./none:0
  [6] iterate
    @ ./generator.jl:47 [inlined]
  [7] collect
    @ ./array.jl:782
  [8] #83
    @ ~/.julia/packages/DTables/BjdY2/src/operations/operations.jl:205
  [9] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [10] invokelatest
    @ ./essentials.jl:816 [inlined]
 [11] #43
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:157
  [5] #158
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] with_options
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:16
 [13] do_task
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1549
 [14] macro expansion
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1243 [inlined]
 [15] #132
    @ ./task.jl:134
  This Thunk:  Thunk(id=11, #83(5 inputs...))
Stacktrace:
 [1] fetch(t::Dagger.ThunkFuture; proc::Dagger.OSProc, raw::Bool)
   @ Dagger ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:16
 [2] fetch
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
 [3] #fetch#75
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
 [4] fetch
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
 [5] f()
   @ Main ~/tmp/mwe.jl:11
 [6] top-level scope
   @ ~/tmp/mwe.jl:14
 [7] include(fname::String)
   @ Base.MainInclude ./client.jl:478
 [8] top-level scope
   @ REPL[1]:1
in expression starting at /home/steven/tmp/mwe.jl:14

Some notes:

  • This is with Dagger v0.18.4 and DTables v0.4.2 (the latest versions).
  • The error does not occur every time.
  • The error also occurs (again, sometimes) when calling f via a remotecall_fetch.
  • As in Various errors working with DTables.jl #438, "file.csv" is a 157 MB table with 233930 rows and 102 columns of String and Float64 values. I tried to generate data to keep the MWE self-contained, but wasn't successful.
  • This MWE is very similar to the MWE in Various errors working with DTables.jl #438, just cleaned up. The biggest difference that actually matters (I think) in terms of reproducing the error is having more processes. Increasing nworkers to 10 in this MWE seems to make the error happen more frequently. I'm guessing the previous MWE also would have exhibited this error if nworkers was larger.
@StevenWhitaker
Copy link
Author

StevenWhitaker commented Nov 3, 2023

I just came across basically the same error, but with a slightly different stacktrace:

julia> using Distributed; addprocs(10); @everywhere using Dagger, DTables, DataFrames

julia> dt = DTable(DataFrame(a = 1:100, b = rand(1:5, 100)))
DTable with 1 partitions
Tabletype: DataFrame

julia> gdt = groupby(dt, :b)
ERROR: ThunkFailedException:
  Root Exception Type: RemoteException
  Root Exception:
On worker 5:
ConcurrencyViolationError("lock must be held")
Stacktrace:
  [1] concurrency_violation
    @ ./condition.jl:8
  [2] assert_havelock
    @ ./condition.jl:25 [inlined]
  [3] assert_havelock
    @ ./condition.jl:48 [inlined]
  [4] assert_havelock
    @ ./condition.jl:72 [inlined]
  [5] _wait2
    @ ./condition.jl:83
  [6] #wait#621
    @ ./condition.jl:127
  [7] wait
    @ ./condition.jl:125 [inlined]
  [8] wait_for_conn
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:195
  [9] check_worker_state
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/cluster.jl:170
 [10] send_msg_
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:172
 [11] send_msg
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/messages.jl:122 [inlined]
 [12] #remotecall_fetch#159
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:460
 [13] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:454
 [14] #remotecall_fetch#162
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [15] remotecall_fetch
    @ ~/programs/julia/julia-1.9.3/share/julia/stdlib/v1.9/Distributed/src/remotecall.jl:492 [inlined]
 [16] #171
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:424 [inlined]
 [17] forwardkeyerror
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:409
 [18] poolget
    @ ~/.julia/packages/MemPool/l9nLj/src/datastore.jl:423
 [19] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:98
 [20] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:96 [inlined]
 [21] move
    @ ~/.julia/packages/Dagger/M13n0/src/chunks.jl:102
 [22] #fetch#70
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:21
 [23] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
 [24] #fetch#75
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
 [25] fetch
    @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
 [26] build_groupby_index
    @ ~/.julia/packages/DTables/BjdY2/src/operations/groupby.jl:176
 [27] #invokelatest#2
    @ ./essentials.jl:819 [inlined]
 [28] invokelatest
    @ ./essentials.jl:816 [inlined]
 [29] #43
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:162
Stacktrace:
  [1] wait
    @ ./task.jl:349 [inlined]
  [2] fetch
    @ ./task.jl:369 [inlined]
  [3] #execute!#42
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:172
  [4] execute!
    @ ~/.julia/packages/Dagger/M13n0/src/processor.jl:157
  [5] #158
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1551 [inlined]
  [6] #21
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:17 [inlined]
  [7] #1
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:163
  [8] with_logstate
    @ ./logging.jl:514
  [9] with_logger
    @ ./logging.jl:626 [inlined]
 [10] enter_scope
    @ ~/.julia/packages/ScopedValues/92HJZ/src/payloadlogger.jl:17 [inlined]
 [11] with
    @ ~/.julia/packages/ScopedValues/92HJZ/src/ScopedValues.jl:162
 [12] with_options
    @ ~/.julia/packages/Dagger/M13n0/src/options.jl:16
 [13] do_task
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1549
 [14] macro expansion
    @ ~/.julia/packages/Dagger/M13n0/src/sch/Sch.jl:1243 [inlined]
 [15] #132
    @ ./task.jl:134
  This Thunk:  Thunk(id=3, build_groupby_index(true, 0, DataFrame, Thunk[2](#134, Any[Union{Dagger.EagerThunk, Dagger.Chunk}[Dagger.Chunk{DataFrame, MemPool.DRef, OSProc, AnyScope}(DataFrame, UnitDomain(), MemPool.DRef(1, 0, 0x0000000000000908), OSProc(1), AnyScope(), false)], DTables.var"#123#125"{Symbol, DTables.var"#122#124"}(:b, DTables.var"#122#124"())])))
Stacktrace:
 [1] fetch(t::Dagger.ThunkFuture; proc::OSProc, raw::Bool)
   @ Dagger ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:16
 [2] fetch
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:11 [inlined]
 [3] #fetch#75
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:58 [inlined]
 [4] fetch
   @ ~/.julia/packages/Dagger/M13n0/src/eager_thunk.jl:54 [inlined]
 [5] _groupby(d::DTable, row_function::Function, cols::Vector{Symbol}, merge::Bool, chunksize::Int64)
   @ DTables ~/.julia/packages/DTables/BjdY2/src/operations/groupby.jl:132
 [6] #groupby#121
   @ ~/.julia/packages/DTables/BjdY2/src/operations/groupby.jl:38 [inlined]
 [7] groupby(d::DTable, col::Symbol)
   @ DTables ~/.julia/packages/DTables/BjdY2/src/operations/groupby.jl:35
 [8] top-level scope
   @ REPL[3]:1

@StevenWhitaker
Copy link
Author

The above error appears to go away when I wrap the code with Dagger.with_options(f; scope = ProcessScope(myid())).

@jpsamaroo
Copy link
Member

@StevenWhitaker can you try this with Julia master (or any recent 1.11 version), and JuliaLang/Distributed.jl#4? This is just a Distributed concurrency bug that my PR fixes. You can ] dev /path/to/Distributed in your project to make Julia load it. It requires 1.11, as it has support for external stdlibs.

@StevenWhitaker
Copy link
Author

Your PR does seem to fix the issue!

@jpsamaroo
Copy link
Member

Awesome! Considering this isn't our bug, I'm going to close this, with the understanding that I plan to get that PR merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants