Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ctrl-C when master process is waiting for crashed workers #29

Open
alyst opened this issue Aug 15, 2015 · 3 comments
Open

Ctrl-C when master process is waiting for crashed workers #29

alyst opened this issue Aug 15, 2015 · 3 comments

Comments

@alyst
Copy link

alyst commented Aug 15, 2015

When master Julia process is waiting in take!() on its own RemoteRef and the worker processes have all thrown exceptions, pressing Ctrl-C in REPL results in

ERROR (unhandled task failure): InterruptException:
 in task_done_hook at ./task.jl:175

but doesn't bring the master from the waiting state.

@alyst
Copy link
Author

alyst commented Aug 15, 2015

Somehow related question: would it be possible to modify take!() to throw RemoteException if all(any?) of the processes it waits for have crashed? Otherwise the tasks have to make sure that before they throw exception in the outer space they put something in the channel the master is waiting for.

@amitmurthy
Copy link
Contributor

Can you post your code that results in this behavior?

@alyst
Copy link
Author

alyst commented Sep 9, 2015

It was not as simple as I've described. In very simple examples Julia behaves normally.
However, here's the stripped down version of master/slave parallel optimizer from robertfeldt/BlackBoxOptim.jl#25. Unfortunately it's not the most minimal test, but at least it should demonstrate the problem.
The master stucks in step!() waiting for the slave that have thrown an exception.
When I run this code from REPL, Ctrl+C while waiting for take!() gives the above exception. From command line it's ok.

addprocs(1)

@everywhere module TestTake

# message with the candidate passed between the workers and the master

typealias WorkerChannel Channel{Float64}
typealias WorkerChannelRef RemoteRef{WorkerChannel}

type ParallelPopulationOptimizer
  worker_procs::Vector{Int}                         # IDs of worker processes
  final_fitnesses::Vector{RemoteRef{Channel{Any}}}  # references to the @spawnat ID run_worker()
  from_workers::WorkerChannelRef                 # inbound channel of candidates from all workers
  to_workers::Vector{WorkerChannelRef}           # outgoing channels to each worker
  is_started::RemoteRef{Channel{Bool}}              # flag that all workers have started
end

nworkers(ppopt::ParallelPopulationOptimizer) = length(ppopt.worker_procs)

# outer parallel population optimizer constructor that
# also spawns worker tasks
function ParallelPopulationOptimizer(NWorkers::Int = 2,
    ArchiveCapacity::Int = 10,
    ToWorkerChannelCapacity::Int = 1000,
    FromWorkersChannelCapacity::Int = 10000)
  # take the first NWorkers workers
  Workers = workers()
  ParallelPopulationOptimizer(Workers,
       Vector{RemoteRef{Channel{Any}}}(length(Workers)),
       RemoteRef(() -> WorkerChannel(FromWorkersChannelCapacity)),
       WorkerChannelRef[RemoteRef(() -> WorkerChannel(ToWorkerChannelCapacity), id) for id in Workers],
       RemoteRef(() -> Channel{Bool}(1)))
end

function setup!(ppopt::ParallelPopulationOptimizer)
  info("Initializing parallel workers...")
  workers_ready = RemoteRef(() -> Channel{Int}(nworkers(ppopt))) # FIXME do we need to wait for the worker?
  @assert !isready(ppopt.is_started)
  for i in eachindex(ppopt.worker_procs)
    procid = ppopt.worker_procs[i]
    info("  Spawning worker #$i at process #$procid...");
    ppopt.final_fitnesses[i] = @spawnat procid run_worker(i,
                           workers_ready, ppopt.is_started
                           )
  end
  info("Waiting for the workers to be ready...")
  nready = 0
  while nready < nworkers(ppopt)
    worker_id = take!(workers_ready)
    info("  Worker #$worker_id is ready")
    nready += 1
  end
  info("All workers ready")
  return ppopt
end

function step!(ppopt::ParallelPopulationOptimizer)
  #println("main#: n_evals=$(num_evals(ppopt.evaluator))")
  if !isready(ppopt.is_started) put!(ppopt.is_started, true) end # if it's the first iteration
  info("take!...")
  candidate = take!(ppopt.from_workers)#::CandidateMessage
  info("take!")
end

function run_worker(id::Int,
                    worker_ready::RemoteRef{Channel{Int}},
                    is_started::RemoteRef{Channel{Bool}} )
  put!(worker_ready, id)
  fetch(is_started) # wait until the master is started
  try
    error("just error")
  catch e
    rethrow(e)
  end
end

end

ppopt = TestTake.ParallelPopulationOptimizer()
TestTake.setup!(ppopt)
TestTake.step!(ppopt)

@vtjnash vtjnash transferred this issue from JuliaLang/julia Feb 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants