Skip to content

Commit

Permalink
DNM: test default threading
Browse files Browse the repository at this point in the history
  • Loading branch information
odow committed Jul 25, 2024
1 parent 405d527 commit 420e1e6
Show file tree
Hide file tree
Showing 14 changed files with 419 additions and 316 deletions.
401 changes: 209 additions & 192 deletions src/algorithm.jl

Large diffs are not rendered by default.

38 changes: 32 additions & 6 deletions src/plugins/bellman_functions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,35 @@ function refine_bellman_function(
nominal_probability::Vector{Float64},
objective_realizations::Vector{Float64},
) where {T}
lock(node.lock) # LOCK-ID-003
lock(node.lock)
try
return _refine_bellman_function_no_lock(
model,
node,
bellman_function,
risk_measure,
outgoing_state,
dual_variables,
noise_supports,
nominal_probability,
objective_realizations,
)
finally
unlock(node.lock)
end
end

function _refine_bellman_function_no_lock(
model::PolicyGraph{T},
node::Node{T},
bellman_function::BellmanFunction,
risk_measure::AbstractRiskMeasure,
outgoing_state::Dict{Symbol,Float64},
dual_variables::Vector{Dict{Symbol,Float64}},
noise_supports::Vector,
nominal_probability::Vector{Float64},
objective_realizations::Vector{Float64},
) where {T}
# Sanity checks.
@assert length(dual_variables) ==
length(noise_supports) ==
Expand All @@ -427,8 +455,8 @@ function refine_bellman_function(
model.objective_sense == MOI.MIN_SENSE,
)
# The meat of the function.
ret = if bellman_function.cut_type == SINGLE_CUT
_add_average_cut(
if bellman_function.cut_type == SINGLE_CUT
return _add_average_cut(
node,
outgoing_state,
risk_adjusted_probability,
Expand All @@ -439,7 +467,7 @@ function refine_bellman_function(
else # Add a multi-cut
@assert bellman_function.cut_type == MULTI_CUT
_add_locals_if_necessary(node, bellman_function, length(dual_variables))
_add_multi_cut(
return _add_multi_cut(
node,
outgoing_state,
risk_adjusted_probability,
Expand All @@ -448,8 +476,6 @@ function refine_bellman_function(
offset,
)
end
unlock(node.lock) # LOCK-ID-003
return ret
end

function _add_average_cut(
Expand Down
121 changes: 62 additions & 59 deletions src/plugins/forward_passes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,69 +51,72 @@ function forward_pass(
# Iterate down the scenario.
for (depth, (node_index, noise)) in enumerate(scenario_path)
node = model[node_index]
lock(node.lock) # LOCK-ID-001
# Objective state interpolation.
objective_state_vector = update_objective_state(
node.objective_state,
objective_state_vector,
noise,
)
if objective_state_vector !== nothing
push!(objective_states, objective_state_vector)
end
# Update belief state, etc.
if node.belief_state !== nothing
belief = node.belief_state::BeliefState{T}
partition_index = belief.partition_index
current_belief = belief.updater(
belief.belief,
current_belief,
partition_index,
lock(node.lock)
try
# Objective state interpolation.
objective_state_vector = update_objective_state(
node.objective_state,
objective_state_vector,
noise,
)
push!(belief_states, (partition_index, copy(current_belief)))
end
# ===== Begin: starting state for infinite horizon =====
starting_states = options.starting_states[node_index]
if length(starting_states) > 0
# There is at least one other possible starting state. If our
# incoming state is more than δ away from the other states, add it
# as a possible starting state.
if distance(starting_states, incoming_state_value) >
options.cycle_discretization_delta
push!(starting_states, incoming_state_value)
if objective_state_vector !== nothing
push!(objective_states, objective_state_vector)
end
# TODO(odow):
# - A better way of randomly sampling a starting state.
# - Is is bad that we splice! here instead of just sampling? For
# convergence it is probably bad, since our list of possible
# starting states keeps changing, but from a computational
# perspective, we don't want to keep a list of discretized points
# in the state-space δ distance apart...
incoming_state_value =
splice!(starting_states, rand(1:length(starting_states)))
end
# ===== End: starting state for infinite horizon =====
# Solve the subproblem, note that `duality_handler = nothing`.
@_timeit_threadsafe model.timer_output "solve_subproblem" begin
subproblem_results = solve_subproblem(
model,
node,
incoming_state_value,
noise,
scenario_path[1:depth],
duality_handler = nothing,
)
# Update belief state, etc.
if node.belief_state !== nothing
belief = node.belief_state::BeliefState{T}
partition_index = belief.partition_index
current_belief = belief.updater(
belief.belief,
current_belief,
partition_index,
noise,
)
push!(belief_states, (partition_index, copy(current_belief)))
end
# ===== Begin: starting state for infinite horizon =====
starting_states = options.starting_states[node_index]
if length(starting_states) > 0
# There is at least one other possible starting state. If our
# incoming state is more than δ away from the other states, add it
# as a possible starting state.
if distance(starting_states, incoming_state_value) >
options.cycle_discretization_delta
push!(starting_states, incoming_state_value)
end
# TODO(odow):
# - A better way of randomly sampling a starting state.
# - Is is bad that we splice! here instead of just sampling? For
# convergence it is probably bad, since our list of possible
# starting states keeps changing, but from a computational
# perspective, we don't want to keep a list of discretized points
# in the state-space δ distance apart...
incoming_state_value =
splice!(starting_states, rand(1:length(starting_states)))
end
# ===== End: starting state for infinite horizon =====
# Solve the subproblem, note that `duality_handler = nothing`.
@_timeit_threadsafe model.timer_output "solve_subproblem" begin
subproblem_results = solve_subproblem(
model,
node,
incoming_state_value,
noise,
scenario_path[1:depth],
duality_handler = nothing,
)
end
# Cumulate the stage_objective.
cumulative_value += subproblem_results.stage_objective
# Set the outgoing state value as the incoming state value for the next
# node.
incoming_state_value = copy(subproblem_results.state)
# Add the outgoing state variable to the list of states we have sampled
# on this forward pass.
push!(sampled_states, incoming_state_value)
finally
unlock(node.lock)
end
# Cumulate the stage_objective.
cumulative_value += subproblem_results.stage_objective
# Set the outgoing state value as the incoming state value for the next
# node.
incoming_state_value = copy(subproblem_results.state)
# Add the outgoing state variable to the list of states we have sampled
# on this forward pass.
push!(sampled_states, incoming_state_value)
unlock(node.lock) # LOCK-ID-001
end
if terminated_due_to_cycle
# We terminated due to a cycle. Here is the list of possible starting
Expand Down
23 changes: 16 additions & 7 deletions src/plugins/parallel_schemes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ function slave_update(model::PolicyGraph, result::IterationResult)
if cut === nothing
error(
"This model uses features that are not suppored in async " *
"mode. Use `parallel_scheme = Serial()` instead.",
"mode. Use `parallel_scheme = Threaded()` instead.",
)
end
_add_cut(
Expand Down Expand Up @@ -362,14 +362,23 @@ function master_loop(
convergence_lock = ReentrantLock()
keep_iterating, status = true, nothing
@sync for _ in 1:Threads.nthreads()
Threads.@spawn while keep_iterating
result = iteration(model, options)
options.post_iteration_callback(result)
lock(() -> log_iteration(options), options.lock)
if result.has_converged
Threads.@spawn begin
try
while keep_iterating
result = iteration(model, options)
options.post_iteration_callback(result)
lock(() -> log_iteration(options), options.lock)
if result.has_converged
lock(convergence_lock) do
keep_iterating = false
status = result.status
return
end
end
end
finally
lock(convergence_lock) do
keep_iterating = false
status = result.status
return
end
end
Expand Down
26 changes: 18 additions & 8 deletions src/plugins/sampling_schemes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,14 @@ mutable struct PSRSamplingScheme{A} <: AbstractSamplingScheme
sampling_scheme::A
scenarios::Vector{Any}
counter::Int
lock::ReentrantLock

function PSRSamplingScheme(
N::Int;
sampling_scheme::AbstractSamplingScheme = InSampleMonteCarlo(),
)
return new{typeof(sampling_scheme)}(N, sampling_scheme, Any[], 0)
lock = ReentrantLock()
return new{typeof(sampling_scheme)}(N, sampling_scheme, Any[], 0, lock)
end
end

Expand All @@ -461,14 +463,22 @@ function sample_scenario(
s::PSRSamplingScheme{A};
kwargs...,
) where {T,A}
s.counter += 1
if s.counter > s.N
s.counter = 1
end
if s.counter > length(s.scenarios)
push!(s.scenarios, sample_scenario(graph, s.sampling_scheme; kwargs...))
lock(s.lock)
try
s.counter += 1
if s.counter > s.N
s.counter = 1
end
if s.counter > length(s.scenarios)
push!(
s.scenarios,
sample_scenario(graph, s.sampling_scheme; kwargs...),
)
end
return s.scenarios[s.counter]
finally
unlock(s.lock)
end
return s.scenarios[s.counter]
end

"""
Expand Down
38 changes: 26 additions & 12 deletions src/plugins/stopping_rules.jl
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,16 @@ function SimulationStoppingRule(;
PSRSamplingScheme(replications; sampling_scheme = sampling_scheme)
function simulator(model, N)
cached_sampling_scheme.N = max(N, cached_sampling_scheme.N)
scenarios = simulate(model, N; sampling_scheme = cached_sampling_scheme)
scenarios = simulate(
model,
N;
sampling_scheme = cached_sampling_scheme,
# TODO(odow): if we use Threaded() here, then we get out of order
# between the simulations and the PSRSamplingScheme: it's not the
# case that simulation `i` accesses sample `i`, because they might
# happen out-of-order.
parallel_scheme = Serial(),
)
# !!! info
# At one point, I tried adding the primal value of the state
# variables. But it didn't work for some models because of
Expand Down Expand Up @@ -461,18 +470,23 @@ function convergence_test(
"not deterministic",
)
end
set_incoming_state(node, model.initial_root_state)
parameterize(node, first(node.noise_terms).term)
optimize!(node.subproblem)
state = get_outgoing_state(node)
push!(rule.data, state)
if length(rule.data) < rule.iterations
return false
end
for i in 1:(rule.iterations-1), (k, v) in state
if !isapprox(rule.data[end-i][k], v; atol = rule.atol)
lock(node.lock)
try
set_incoming_state(node, model.initial_root_state)
parameterize(node, first(node.noise_terms).term)
optimize!(node.subproblem)
state = get_outgoing_state(node)
push!(rule.data, state)
if length(rule.data) < rule.iterations
return false
end
for i in 1:(rule.iterations-1), (k, v) in state
if !isapprox(rule.data[end-i][k], v; atol = rule.atol)
return false
end
end
return true
finally
unlock(node.lock)
end
return true
end
2 changes: 1 addition & 1 deletion test/Experimental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ function test_slptestset()
model, validation_scenarios =
SDDP.read_from_file(joinpath(@__DIR__, "electric.sof.json"))
set_optimizer(model, HiGHS.Optimizer)
SDDP.train(model; iteration_limit = 20, print_level = 0)
SDDP.train(model; iteration_limit = 30, print_level = 0)
@test isapprox(SDDP.calculate_bound(model), 381.8533; atol = 1e-3)
scenarios = SDDP.evaluate(model, validation_scenarios)
@test length(scenarios["problem_sha256_checksum"]) == 64
Expand Down
2 changes: 1 addition & 1 deletion test/MSPFormat.jl
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ function test_electric()
problem = joinpath(@__DIR__, "electric")
model = MSPFormat.read_from_file(problem)
JuMP.set_optimizer(model, HiGHS.Optimizer)
SDDP.train(model; iteration_limit = 10, print_level = 0)
SDDP.train(model; iteration_limit = 40, print_level = 0)
@test (SDDP.calculate_bound(model), 381.8533, atol = 1e-4)
return
end
Expand Down
Loading

0 comments on commit 420e1e6

Please sign in to comment.