From 0bdef46af006842bc4f75c9660f6cf6233325f1f Mon Sep 17 00:00:00 2001 From: odow Date: Wed, 24 Jul 2024 21:02:06 +1200 Subject: [PATCH 01/11] DNM: test default threading --- src/algorithm.jl | 401 ++++++++++++++++-------------- src/plugins/bellman_functions.jl | 38 ++- src/plugins/forward_passes.jl | 121 ++++----- src/plugins/parallel_schemes.jl | 23 +- src/plugins/sampling_schemes.jl | 26 +- src/plugins/stopping_rules.jl | 38 ++- test/Experimental.jl | 2 +- test/MSPFormat.jl | 2 +- test/algorithm.jl | 39 ++- test/plugins/bellman_functions.jl | 17 +- test/plugins/forward_passes.jl | 4 +- test/plugins/stopping_rules.jl | 7 +- test/user_interface.jl | 3 +- 13 files changed, 409 insertions(+), 312 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index ce17851b6..831980877 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -4,13 +4,15 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. macro _timeit_threadsafe(timer, label, block) - return esc(quote - if Threads.threadid() == 1 + code = quote + first_thread = first(Threads.threadpooltids(Threads.threadpool())) + if Threads.threadid() == first_thread TimerOutputs.@timeit $timer $label $block else $block end - end) + end + return esc(code) end # to_nodal_form is an internal helper function so users can pass arguments like: @@ -302,11 +304,13 @@ function attempt_numerical_recovery(model::PolicyGraph, node::Node) end if !_has_primal_solution(node) model.ext[:numerical_issue] = true - @info "Writing cuts to the file `model.cuts.json`" - write_cuts_to_file(model, "model.cuts.json") + suffix = Threads.threadid() == 1 ? "" : "_pid=$(Threads.threadid())" + filename = "model$suffix.cuts.json" + @info "Writing cuts to the file `$filename`" + write_cuts_to_file(model, filename) write_subproblem_to_file( node, - "subproblem_$(node.index).mof.json"; + "subproblem_$(node.index)$suffix.mof.json"; throw_error = true, ) end @@ -400,7 +404,6 @@ function solve_subproblem( scenario_path::Vector{Tuple{T,S}}; duality_handler::Union{Nothing,AbstractDualityHandler}, ) where {T,S} - lock(node.lock) # LOCK-ID-005 _initialize_solver(node; throw_error = false) # Parameterize the model. First, fix the value of the incoming state # variables. Then parameterize the model depending on `noise`. Finally, @@ -443,7 +446,6 @@ function solve_subproblem( if node.post_optimize_hook !== nothing node.post_optimize_hook(pre_optimize_ret) end - unlock(node.lock) # LOCK-ID-005 return ( state = state, duals = dual_values, @@ -670,74 +672,80 @@ function solve_all_children( continue end child_node = model[child.term] - lock(child_node.lock) # LOCK-ID-004 - @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin - restore_duality = prepare_backward_pass( + lock(child_node.lock) + try + @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin + restore_duality = prepare_backward_pass( + child_node, + options.duality_handler, + options, + ) + end + for noise in sample_backward_noise_terms_with_state( + backward_sampling_scheme, child_node, - options.duality_handler, - options, + outgoing_state, ) - end - for noise in sample_backward_noise_terms_with_state( - backward_sampling_scheme, - child_node, - outgoing_state, - ) - if length(scenario_path) == length_scenario_path - push!(scenario_path, (child.term, noise.term)) - else - scenario_path[end] = (child.term, noise.term) - end - if haskey(items.cached_solutions, (child.term, noise.term)) - sol_index = items.cached_solutions[(child.term, noise.term)] - push!(items.duals, items.duals[sol_index]) - push!(items.supports, items.supports[sol_index]) - push!(items.nodes, child_node.index) - push!(items.probability, items.probability[sol_index]) - push!(items.objectives, items.objectives[sol_index]) - push!(items.belief, belief) - else - # Update belief state, etc. - if belief_state !== nothing - current_belief = child_node.belief_state::BeliefState{T} - current_belief.updater( - current_belief.belief, - belief_state, - current_belief.partition_index, - noise.term, - ) + if length(scenario_path) == length_scenario_path + push!(scenario_path, (child.term, noise.term)) + else + scenario_path[end] = (child.term, noise.term) end - if objective_state !== nothing - update_objective_state( - child_node.objective_state, - objective_state, - noise.term, - ) - end - @_timeit_threadsafe model.timer_output "solve_subproblem" begin - subproblem_results = solve_subproblem( - model, - child_node, - outgoing_state, - noise.term, - scenario_path; - duality_handler = duality_handler, + if haskey(items.cached_solutions, (child.term, noise.term)) + sol_index = items.cached_solutions[(child.term, noise.term)] + push!(items.duals, items.duals[sol_index]) + push!(items.supports, items.supports[sol_index]) + push!(items.nodes, child_node.index) + push!(items.probability, items.probability[sol_index]) + push!(items.objectives, items.objectives[sol_index]) + push!(items.belief, belief) + else + # Update belief state, etc. + if belief_state !== nothing + current_belief = child_node.belief_state::BeliefState{T} + current_belief.updater( + current_belief.belief, + belief_state, + current_belief.partition_index, + noise.term, + ) + end + if objective_state !== nothing + update_objective_state( + child_node.objective_state, + objective_state, + noise.term, + ) + end + @_timeit_threadsafe model.timer_output "solve_subproblem" begin + subproblem_results = solve_subproblem( + model, + child_node, + outgoing_state, + noise.term, + scenario_path; + duality_handler = duality_handler, + ) + end + push!(items.duals, subproblem_results.duals) + push!(items.supports, noise) + push!(items.nodes, child_node.index) + push!( + items.probability, + child.probability * noise.probability, ) + push!(items.objectives, subproblem_results.objective) + push!(items.belief, belief) + items.cached_solutions[(child.term, noise.term)] = + length(items.duals) end - push!(items.duals, subproblem_results.duals) - push!(items.supports, noise) - push!(items.nodes, child_node.index) - push!(items.probability, child.probability * noise.probability) - push!(items.objectives, subproblem_results.objective) - push!(items.belief, belief) - items.cached_solutions[(child.term, noise.term)] = - length(items.duals) end + @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin + restore_duality() + end + finally + unlock(child_node.lock) end - @_timeit_threadsafe model.timer_output "prepare_backward_pass" begin - restore_duality() - end - unlock(child_node.lock) # LOCK-ID-004 end if length(scenario_path) == length_scenario_path # No-op. There weren't any children to solve. @@ -775,39 +783,42 @@ function calculate_bound( continue end node = model[child.term] - lock(node.lock) # LOCK-ID-006 - for noise in node.noise_terms - if node.objective_state !== nothing - update_objective_state( - node.objective_state, - node.objective_state.initial_value, - noise.term, - ) - end - # Update belief state, etc. - if node.belief_state !== nothing - belief = node.belief_state::BeliefState{T} - partition_index = belief.partition_index - belief.updater( - belief.belief, - current_belief, - partition_index, + lock(node.lock) + try + for noise in node.noise_terms + if node.objective_state !== nothing + update_objective_state( + node.objective_state, + node.objective_state.initial_value, + noise.term, + ) + end + # Update belief state, etc. + if node.belief_state !== nothing + belief = node.belief_state::BeliefState{T} + partition_index = belief.partition_index + belief.updater( + belief.belief, + current_belief, + partition_index, + noise.term, + ) + end + subproblem_results = solve_subproblem( + model, + node, + root_state, noise.term, + Tuple{T,Any}[(child.term, noise.term)]; + duality_handler = nothing, ) + push!(objectives, subproblem_results.objective) + push!(probabilities, child.probability * noise.probability) + push!(noise_supports, noise.term) end - subproblem_results = solve_subproblem( - model, - node, - root_state, - noise.term, - Tuple{T,Any}[(child.term, noise.term)]; - duality_handler = nothing, - ) - push!(objectives, subproblem_results.objective) - push!(probabilities, child.probability * noise.probability) - push!(noise_supports, noise.term) + finally + unlock(node.lock) end - unlock(node.lock) # LOCK-ID-006 end # Now compute the risk-adjusted probability measure: risk_adjusted_probability = similar(probabilities) @@ -869,20 +880,20 @@ function iteration(model::PolicyGraph{T}, options::Options) where {T} model.ext[:numerical_issue], ), ) + has_converged, status = + convergence_test(model, options.log, options.stopping_rules) + return IterationResult( + max(Threads.threadid(), Distributed.myid()), + bound, + forward_trajectory.cumulative_value, + has_converged, + status, + cuts, + model.ext[:numerical_issue], + ) finally unlock(options.lock) end - has_converged, status = - convergence_test(model, options.log, options.stopping_rules) - return IterationResult( - max(Threads.threadid(), Distributed.myid()), - bound, - forward_trajectory.cumulative_value, - has_converged, - status, - cuts, - model.ext[:numerical_issue], - ) end """ @@ -955,7 +966,7 @@ Train the policy for `model`. to `false`. - `parallel_scheme::AbstractParallelScheme`: specify a scheme for solving in - parallel. Defaults to `Serial()`. + parallel. Defaults to `Threaded()`. - `forward_pass::AbstractForwardPass`: specify a scheme to use for the forward passes. @@ -999,7 +1010,7 @@ function train( cut_deletion_minimum::Int = 1, backward_sampling_scheme::AbstractBackwardSamplingScheme = SDDP.CompleteSampler(), dashboard::Bool = false, - parallel_scheme::AbstractParallelScheme = Serial(), + parallel_scheme::AbstractParallelScheme = Threaded(), forward_pass::AbstractForwardPass = DefaultForwardPass(), forward_pass_resampling_probability::Union{Nothing,Float64} = nothing, add_to_existing_cuts::Bool = false, @@ -1150,7 +1161,15 @@ function train( try status = master_loop(parallel_scheme, model, options) catch ex - if isa(ex, InterruptException) + # Unwrap exceptions from tasks. If there are multiple exceptions, + # rethrow only the first one. + if ex isa CompositeException + ex = first(ex.exceptions) + end + if ex isa TaskFailedException + ex = ex.task.exception + end + if ex isa InterruptException status = :interrupted interrupt(parallel_scheme) else @@ -1160,11 +1179,6 @@ function train( finally # And close the dashboard callback if necessary. dashboard_callback(nothing, true) - for node in values(model.nodes) - if islocked(node.lock) - unlock(node.lock) - end - end end training_results = TrainingResults(status, log) model.most_recent_training_results = training_results @@ -1212,84 +1226,87 @@ function _simulate( objective_states = NTuple{N,Float64}[] for (depth, (node_index, noise)) in enumerate(scenario_path) node = model[node_index] - lock(node.lock) # LOCK-ID-002 - # Objective state interpolation. - objective_state_vector = update_objective_state( - node.objective_state, - objective_state_vector, - noise, - ) - if objective_state_vector !== nothing - push!(objective_states, objective_state_vector) - end - if node.belief_state !== nothing - belief = node.belief_state::BeliefState{T} - partition_index = belief.partition_index - current_belief = belief.updater( - belief.belief, - current_belief, - partition_index, + lock(node.lock) + try + # Objective state interpolation. + objective_state_vector = update_objective_state( + node.objective_state, + objective_state_vector, noise, ) - else - current_belief = Dict(node_index => 1.0) - end - # Solve the subproblem. - subproblem_results = solve_subproblem( - model, - node, - incoming_state, - noise, - scenario_path[1:depth]; - duality_handler = duality_handler, - ) - # Add the stage-objective - cumulative_value += subproblem_results.stage_objective - # Record useful variables from the solve. - store = Dict{Symbol,Any}( - :node_index => node_index, - :noise_term => noise, - :stage_objective => subproblem_results.stage_objective, - :bellman_term => - subproblem_results.objective - - subproblem_results.stage_objective, - :objective_state => objective_state_vector, - :belief => copy(current_belief), - ) - if objective_state_vector !== nothing && N == 1 - store[:objective_state] = store[:objective_state][1] - end - # Loop through the primal variable values that the user wants. - for variable in variables - if haskey(node.subproblem.obj_dict, variable) - # Note: we broadcast the call to value for variables which are - # containers (like Array, Containers.DenseAxisArray, etc). If - # the variable is a scalar (e.g. just a plain VariableRef), the - # broadcast preseves the scalar shape. - # TODO: what if the variable container is a dictionary? They - # should be using Containers.SparseAxisArray, but this might not - # always be the case... - store[variable] = JuMP.value.(node.subproblem[variable]) - elseif skip_undefined_variables - store[variable] = NaN - else - error( - "No variable named $(variable) exists in the subproblem.", - " If you want to simulate the value of a variable, make ", - "sure it is defined in _all_ subproblems, or pass ", - "`skip_undefined_variables=true` to `simulate`.", + if objective_state_vector !== nothing + push!(objective_states, objective_state_vector) + end + if node.belief_state !== nothing + belief = node.belief_state::BeliefState{T} + partition_index = belief.partition_index + current_belief = belief.updater( + belief.belief, + current_belief, + partition_index, + noise, ) + else + current_belief = Dict(node_index => 1.0) end + # Solve the subproblem. + subproblem_results = solve_subproblem( + model, + node, + incoming_state, + noise, + scenario_path[1:depth]; + duality_handler = duality_handler, + ) + # Add the stage-objective + cumulative_value += subproblem_results.stage_objective + # Record useful variables from the solve. + store = Dict{Symbol,Any}( + :node_index => node_index, + :noise_term => noise, + :stage_objective => subproblem_results.stage_objective, + :bellman_term => + subproblem_results.objective - + subproblem_results.stage_objective, + :objective_state => objective_state_vector, + :belief => copy(current_belief), + ) + if objective_state_vector !== nothing && N == 1 + store[:objective_state] = store[:objective_state][1] + end + # Loop through the primal variable values that the user wants. + for variable in variables + if haskey(node.subproblem.obj_dict, variable) + # Note: we broadcast the call to value for variables which are + # containers (like Array, Containers.DenseAxisArray, etc). If + # the variable is a scalar (e.g. just a plain VariableRef), the + # broadcast preseves the scalar shape. + # TODO: what if the variable container is a dictionary? They + # should be using Containers.SparseAxisArray, but this might not + # always be the case... + store[variable] = JuMP.value.(node.subproblem[variable]) + elseif skip_undefined_variables + store[variable] = NaN + else + error( + "No variable named $(variable) exists in the subproblem.", + " If you want to simulate the value of a variable, make ", + "sure it is defined in _all_ subproblems, or pass ", + "`skip_undefined_variables=true` to `simulate`.", + ) + end + end + # Loop through any custom recorders that the user provided. + for (sym, recorder) in custom_recorders + store[sym] = recorder(node.subproblem) + end + # Add the store to our list. + push!(simulation, store) + # Set outgoing state as the incoming state for the next node. + incoming_state = copy(subproblem_results.state) + finally + unlock(node.lock) end - # Loop through any custom recorders that the user provided. - for (sym, recorder) in custom_recorders - store[sym] = recorder(node.subproblem) - end - # Add the store to our list. - push!(simulation, store) - # Set outgoing state as the incoming state for the next node. - incoming_state = copy(subproblem_results.state) - unlock(node.lock) # LOCK-ID-002 end return simulation end @@ -1308,7 +1325,7 @@ end custom_recorders = Dict{Symbol, Function}(), duality_handler::Union{Nothing,AbstractDualityHandler} = nothing, skip_undefined_variables::Bool = false, - parallel_scheme::AbstractParallelScheme = Serial(), + parallel_scheme::AbstractParallelScheme = Threaded(), incoming_state::Dict{String,Float64} = _initial_state(model), )::Vector{Vector{Dict{Symbol,Any}}} @@ -1399,7 +1416,7 @@ function simulate( custom_recorders = Dict{Symbol,Function}(), duality_handler::Union{Nothing,AbstractDualityHandler} = nothing, skip_undefined_variables::Bool = false, - parallel_scheme::AbstractParallelScheme = Serial(), + parallel_scheme::AbstractParallelScheme = Threaded(), incoming_state::Dict{String,Float64} = _initial_state(model), ) return _simulate( diff --git a/src/plugins/bellman_functions.jl b/src/plugins/bellman_functions.jl index d6b6edd65..f49de9f2a 100644 --- a/src/plugins/bellman_functions.jl +++ b/src/plugins/bellman_functions.jl @@ -410,7 +410,35 @@ function refine_bellman_function( nominal_probability::Vector{Float64}, objective_realizations::Vector{Float64}, ) where {T} - lock(node.lock) # LOCK-ID-003 + lock(node.lock) + try + return _refine_bellman_function_no_lock( + model, + node, + bellman_function, + risk_measure, + outgoing_state, + dual_variables, + noise_supports, + nominal_probability, + objective_realizations, + ) + finally + unlock(node.lock) + end +end + +function _refine_bellman_function_no_lock( + model::PolicyGraph{T}, + node::Node{T}, + bellman_function::BellmanFunction, + risk_measure::AbstractRiskMeasure, + outgoing_state::Dict{Symbol,Float64}, + dual_variables::Vector{Dict{Symbol,Float64}}, + noise_supports::Vector, + nominal_probability::Vector{Float64}, + objective_realizations::Vector{Float64}, +) where {T} # Sanity checks. @assert length(dual_variables) == length(noise_supports) == @@ -427,8 +455,8 @@ function refine_bellman_function( model.objective_sense == MOI.MIN_SENSE, ) # The meat of the function. - ret = if bellman_function.cut_type == SINGLE_CUT - _add_average_cut( + if bellman_function.cut_type == SINGLE_CUT + return _add_average_cut( node, outgoing_state, risk_adjusted_probability, @@ -439,7 +467,7 @@ function refine_bellman_function( else # Add a multi-cut @assert bellman_function.cut_type == MULTI_CUT _add_locals_if_necessary(node, bellman_function, length(dual_variables)) - _add_multi_cut( + return _add_multi_cut( node, outgoing_state, risk_adjusted_probability, @@ -448,8 +476,6 @@ function refine_bellman_function( offset, ) end - unlock(node.lock) # LOCK-ID-003 - return ret end function _add_average_cut( diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index 24c5366bd..9ac88e18a 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -51,69 +51,72 @@ function forward_pass( # Iterate down the scenario. for (depth, (node_index, noise)) in enumerate(scenario_path) node = model[node_index] - lock(node.lock) # LOCK-ID-001 - # Objective state interpolation. - objective_state_vector = update_objective_state( - node.objective_state, - objective_state_vector, - noise, - ) - if objective_state_vector !== nothing - push!(objective_states, objective_state_vector) - end - # Update belief state, etc. - if node.belief_state !== nothing - belief = node.belief_state::BeliefState{T} - partition_index = belief.partition_index - current_belief = belief.updater( - belief.belief, - current_belief, - partition_index, + lock(node.lock) + try + # Objective state interpolation. + objective_state_vector = update_objective_state( + node.objective_state, + objective_state_vector, noise, ) - push!(belief_states, (partition_index, copy(current_belief))) - end - # ===== Begin: starting state for infinite horizon ===== - starting_states = options.starting_states[node_index] - if length(starting_states) > 0 - # There is at least one other possible starting state. If our - # incoming state is more than δ away from the other states, add it - # as a possible starting state. - if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta - push!(starting_states, incoming_state_value) + if objective_state_vector !== nothing + push!(objective_states, objective_state_vector) end - # TODO(odow): - # - A better way of randomly sampling a starting state. - # - Is is bad that we splice! here instead of just sampling? For - # convergence it is probably bad, since our list of possible - # starting states keeps changing, but from a computational - # perspective, we don't want to keep a list of discretized points - # in the state-space δ distance apart... - incoming_state_value = - splice!(starting_states, rand(1:length(starting_states))) - end - # ===== End: starting state for infinite horizon ===== - # Solve the subproblem, note that `duality_handler = nothing`. - @_timeit_threadsafe model.timer_output "solve_subproblem" begin - subproblem_results = solve_subproblem( - model, - node, - incoming_state_value, - noise, - scenario_path[1:depth]; - duality_handler = nothing, - ) + # Update belief state, etc. + if node.belief_state !== nothing + belief = node.belief_state::BeliefState{T} + partition_index = belief.partition_index + current_belief = belief.updater( + belief.belief, + current_belief, + partition_index, + noise, + ) + push!(belief_states, (partition_index, copy(current_belief))) + end + # ===== Begin: starting state for infinite horizon ===== + starting_states = options.starting_states[node_index] + if length(starting_states) > 0 + # There is at least one other possible starting state. If our + # incoming state is more than δ away from the other states, add it + # as a possible starting state. + if distance(starting_states, incoming_state_value) > + options.cycle_discretization_delta + push!(starting_states, incoming_state_value) + end + # TODO(odow): + # - A better way of randomly sampling a starting state. + # - Is is bad that we splice! here instead of just sampling? For + # convergence it is probably bad, since our list of possible + # starting states keeps changing, but from a computational + # perspective, we don't want to keep a list of discretized points + # in the state-space δ distance apart... + incoming_state_value = + splice!(starting_states, rand(1:length(starting_states))) + end + # ===== End: starting state for infinite horizon ===== + # Solve the subproblem, note that `duality_handler = nothing`. + @_timeit_threadsafe model.timer_output "solve_subproblem" begin + subproblem_results = solve_subproblem( + model, + node, + incoming_state_value, + noise, + scenario_path[1:depth]; + duality_handler = nothing, + ) + end + # Cumulate the stage_objective. + cumulative_value += subproblem_results.stage_objective + # Set the outgoing state value as the incoming state value for the next + # node. + incoming_state_value = copy(subproblem_results.state) + # Add the outgoing state variable to the list of states we have sampled + # on this forward pass. + push!(sampled_states, incoming_state_value) + finally + unlock(node.lock) end - # Cumulate the stage_objective. - cumulative_value += subproblem_results.stage_objective - # Set the outgoing state value as the incoming state value for the next - # node. - incoming_state_value = copy(subproblem_results.state) - # Add the outgoing state variable to the list of states we have sampled - # on this forward pass. - push!(sampled_states, incoming_state_value) - unlock(node.lock) # LOCK-ID-001 end if terminated_due_to_cycle # We terminated due to a cycle. Here is the list of possible starting diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index 7bb4fb0b9..0fa94b901 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -141,7 +141,7 @@ function slave_update(model::PolicyGraph, result::IterationResult) if cut === nothing error( "This model uses features that are not suppored in async " * - "mode. Use `parallel_scheme = Serial()` instead.", + "mode. Use `parallel_scheme = Threaded()` instead.", ) end _add_cut( @@ -362,14 +362,23 @@ function master_loop( convergence_lock = ReentrantLock() keep_iterating, status = true, nothing @sync for _ in 1:Threads.nthreads() - Threads.@spawn while keep_iterating - result = iteration(model, options) - options.post_iteration_callback(result) - lock(() -> log_iteration(options), options.lock) - if result.has_converged + Threads.@spawn begin + try + while keep_iterating + result = iteration(model, options) + options.post_iteration_callback(result) + lock(() -> log_iteration(options), options.lock) + if result.has_converged + lock(convergence_lock) do + keep_iterating = false + status = result.status + return + end + end + end + finally lock(convergence_lock) do keep_iterating = false - status = result.status return end end diff --git a/src/plugins/sampling_schemes.jl b/src/plugins/sampling_schemes.jl index cc1ec8a69..b6e5f7de8 100644 --- a/src/plugins/sampling_schemes.jl +++ b/src/plugins/sampling_schemes.jl @@ -442,12 +442,14 @@ mutable struct PSRSamplingScheme{A} <: AbstractSamplingScheme sampling_scheme::A scenarios::Vector{Any} counter::Int + lock::ReentrantLock function PSRSamplingScheme( N::Int; sampling_scheme::AbstractSamplingScheme = InSampleMonteCarlo(), ) - return new{typeof(sampling_scheme)}(N, sampling_scheme, Any[], 0) + lock = ReentrantLock() + return new{typeof(sampling_scheme)}(N, sampling_scheme, Any[], 0, lock) end end @@ -461,14 +463,22 @@ function sample_scenario( s::PSRSamplingScheme{A}; kwargs..., ) where {T,A} - s.counter += 1 - if s.counter > s.N - s.counter = 1 - end - if s.counter > length(s.scenarios) - push!(s.scenarios, sample_scenario(graph, s.sampling_scheme; kwargs...)) + lock(s.lock) + try + s.counter += 1 + if s.counter > s.N + s.counter = 1 + end + if s.counter > length(s.scenarios) + push!( + s.scenarios, + sample_scenario(graph, s.sampling_scheme; kwargs...), + ) + end + return s.scenarios[s.counter] + finally + unlock(s.lock) end - return s.scenarios[s.counter] end """ diff --git a/src/plugins/stopping_rules.jl b/src/plugins/stopping_rules.jl index 47cac1e41..520813c19 100644 --- a/src/plugins/stopping_rules.jl +++ b/src/plugins/stopping_rules.jl @@ -331,7 +331,16 @@ function SimulationStoppingRule(; PSRSamplingScheme(replications; sampling_scheme = sampling_scheme) function simulator(model, N) cached_sampling_scheme.N = max(N, cached_sampling_scheme.N) - scenarios = simulate(model, N; sampling_scheme = cached_sampling_scheme) + scenarios = simulate( + model, + N; + sampling_scheme = cached_sampling_scheme, + # TODO(odow): if we use Threaded() here, then we get out of order + # between the simulations and the PSRSamplingScheme: it's not the + # case that simulation `i` accesses sample `i`, because they might + # happen out-of-order. + parallel_scheme = Serial(), + ) # !!! info # At one point, I tried adding the primal value of the state # variables. But it didn't work for some models because of @@ -461,18 +470,23 @@ function convergence_test( "not deterministic", ) end - set_incoming_state(node, model.initial_root_state) - parameterize(node, first(node.noise_terms).term) - optimize!(node.subproblem) - state = get_outgoing_state(node) - push!(rule.data, state) - if length(rule.data) < rule.iterations - return false - end - for i in 1:(rule.iterations-1), (k, v) in state - if !isapprox(rule.data[end-i][k], v; atol = rule.atol) + lock(node.lock) + try + set_incoming_state(node, model.initial_root_state) + parameterize(node, first(node.noise_terms).term) + optimize!(node.subproblem) + state = get_outgoing_state(node) + push!(rule.data, state) + if length(rule.data) < rule.iterations return false end + for i in 1:(rule.iterations-1), (k, v) in state + if !isapprox(rule.data[end-i][k], v; atol = rule.atol) + return false + end + end + return true + finally + unlock(node.lock) end - return true end diff --git a/test/Experimental.jl b/test/Experimental.jl index 635531810..a229e0313 100644 --- a/test/Experimental.jl +++ b/test/Experimental.jl @@ -224,7 +224,7 @@ function test_slptestset() model, validation_scenarios = SDDP.read_from_file(joinpath(@__DIR__, "electric.sof.json")) set_optimizer(model, HiGHS.Optimizer) - SDDP.train(model; iteration_limit = 20, print_level = 0) + SDDP.train(model; iteration_limit = 30, print_level = 0) @test isapprox(SDDP.calculate_bound(model), 381.8533; atol = 1e-3) scenarios = SDDP.evaluate(model, validation_scenarios) @test length(scenarios["problem_sha256_checksum"]) == 64 diff --git a/test/MSPFormat.jl b/test/MSPFormat.jl index b7e44342f..ba7f8386d 100644 --- a/test/MSPFormat.jl +++ b/test/MSPFormat.jl @@ -128,7 +128,7 @@ function test_electric() problem = joinpath(@__DIR__, "electric") model = MSPFormat.read_from_file(problem) JuMP.set_optimizer(model, HiGHS.Optimizer) - SDDP.train(model; iteration_limit = 10, print_level = 0) + SDDP.train(model; iteration_limit = 40, print_level = 0) @test ≈(SDDP.calculate_bound(model), 381.8533, atol = 1e-4) return end diff --git a/test/algorithm.jl b/test/algorithm.jl index 4f5dd6981..756f45990 100644 --- a/test/algorithm.jl +++ b/test/algorithm.jl @@ -137,7 +137,10 @@ function test_simulate_missing() end @stageobjective(sp, x[1].out + x[2].out) end - @test_throws ErrorException SDDP.simulate(model, 1, [:y]) + @test_throws( + ErrorException, + SDDP.simulate(model, 1, [:y]; parallel_scheme = SDDP.Serial()), + ) sims = SDDP.simulate(model, 1, [:y]; skip_undefined_variables = true) @test sims[1][1][:y] == 0.0 @test isnan(sims[1][2][:y]) @@ -171,7 +174,15 @@ There are two common causes of this error: See https://odow.github.io/SDDP.jl/stable/tutorial/warnings/ for more information.""", ) - @test_throws ex SDDP.train(model; iteration_limit = 1, print_level = 0) + @test_throws( + ex, + SDDP.train( + model; + iteration_limit = 1, + print_level = 0, + parallel_scheme = SDDP.Serial(), + ), + ) @test isfile("subproblem_1.mof.json") rm("subproblem_1.mof.json") return @@ -205,7 +216,15 @@ There are two common causes of this error: See https://odow.github.io/SDDP.jl/stable/tutorial/warnings/ for more information.""", ) - @test_throws ex SDDP.train(model; iteration_limit = 1, print_level = 0) + @test_throws( + ex, + SDDP.train( + model; + iteration_limit = 1, + print_level = 0, + parallel_scheme = SDDP.Serial(), + ), + ) @test isfile("subproblem_1.mof.json") rm("subproblem_1.mof.json") return @@ -228,10 +247,9 @@ function test_refine_at_similar_nodes() refine_at_similar_nodes = false, print_level = 0, ) - @test SDDP.calculate_bound(model) ≈ 5.7 || SDDP.calculate_bound(model) ≈ 6.3 mi1 = length(model[(1, 1)].bellman_function.global_theta.cuts) mi2 = length(model[(1, 2)].bellman_function.global_theta.cuts) - @test mi1 + mi2 == 1 + @test mi1 + mi2 == length(model.most_recent_training_results.log) model = SDDP.MarkovianPolicyGraph(; transition_matrices = [[0.5 0.5], [0.2 0.8; 0.8 0.2]], @@ -249,9 +267,9 @@ function test_refine_at_similar_nodes() refine_at_similar_nodes = true, print_level = 0, ) - @test SDDP.calculate_bound(model) ≈ 9.5 - @test length(model[(1, 1)].bellman_function.global_theta.cuts) == 1 - @test length(model[(1, 2)].bellman_function.global_theta.cuts) == 1 + @test length(model[(1, 1)].bellman_function.global_theta.cuts) == + length(model[(1, 2)].bellman_function.global_theta.cuts) == + length(model.most_recent_training_results.log) return end @@ -308,9 +326,10 @@ function test_write_log_to_csv() log = read("sddp.csv", String) saved_log = """ iteration, simulation, bound, time - 1, 3.0, 3.0, 2.993860960006714 - 2, 3.0, 3.0, 2.994189739227295 """ + for i in 1:length(model.most_recent_training_results.log) + saved_log *= "$i, 3.0, 3.0, 3.0\n" + end @test replace(log, r"[0-9\.]+\n" => "") == replace(saved_log, r"[0-9\.]+\n" => "") rm("sddp.csv") diff --git a/test/plugins/bellman_functions.jl b/test/plugins/bellman_functions.jl index 889b75373..592813e3a 100644 --- a/test/plugins/bellman_functions.jl +++ b/test/plugins/bellman_functions.jl @@ -105,12 +105,7 @@ function test_read_write_cuts_to_file_String() ) model = _create_model(graph) @test SDDP.calculate_bound(model) ≈ 9.17 atol = 0.1 - SDDP.train( - model; - iteration_limit = 50, - print_level = 0, - cut_type = SDDP.MULTI_CUT, - ) + SDDP.train(model; iteration_limit = 50, cut_type = SDDP.MULTI_CUT) @test SDDP.calculate_bound(model) ≈ 119.167 atol = 0.1 SDDP.write_cuts_to_file(model, "model.cuts.json") model_2 = _create_model(graph) @@ -224,9 +219,10 @@ function test_add_all_cuts_SINGLE_CUT() ) < 10 end SDDP.add_all_cuts(model) + n_iter = length(model.most_recent_training_results.log) for (t, node) in model.nodes n = num_constraints(node.subproblem, AffExpr, MOI.GreaterThan{Float64}) - @test t == 3 || n == 10 + @test t == 3 || n == n_iter end return end @@ -259,9 +255,10 @@ function test_add_all_cuts_MULTI_CUT() ) < 31 end SDDP.add_all_cuts(model) + n_iter = length(model.most_recent_training_results.log) for (t, node) in model.nodes n = num_constraints(node.subproblem, AffExpr, MOI.GreaterThan{Float64}) - @test t == 3 || n == 31 + @test t == 3 || n == (3 * n_iter + 1) end return end @@ -308,13 +305,13 @@ function test_belief_state_cut_selection() SDDP.train( model; iteration_limit = 20, - cut_deletion_minimum = 20, + cut_deletion_minimum = 30, print_level = 0, ) n_cuts = count(model[:Ad].bellman_function.global_theta.cuts) do cut return cut.constraint_ref !== nothing end - @test n_cuts == 20 + @test n_cuts == length(model.most_recent_training_results.log) SDDP.train( model; iteration_limit = 1, diff --git a/test/plugins/forward_passes.jl b/test/plugins/forward_passes.jl index ee33b3c9b..158892386 100644 --- a/test/plugins/forward_passes.jl +++ b/test/plugins/forward_passes.jl @@ -265,9 +265,9 @@ function test_RegularizedForwardPass() fp = SDDP.RegularizedForwardPass() reg_bound = main(cost, fp, hint) bound = main(cost, SDDP.DefaultForwardPass(), hint) - @test reg_bound >= bound - 1e-6 + @test reg_bound >= bound - 1.0 end - # Test that initializingn with a bad guess performs poorly + # Test that initializing with a bad guess performs poorly fp = SDDP.RegularizedForwardPass() reg_bound = main(400, fp, 400) bound = main(400, SDDP.DefaultForwardPass(), 0) diff --git a/test/plugins/stopping_rules.jl b/test/plugins/stopping_rules.jl index cf239ca18..4a28d6815 100644 --- a/test/plugins/stopping_rules.jl +++ b/test/plugins/stopping_rules.jl @@ -290,11 +290,12 @@ function test_FirstStageStoppingRule() end rule = SDDP.FirstStageStoppingRule() SDDP.train(model; stopping_rules = [rule]) - @test length(rule.data) == 50 + n_iterations = length(rule.data) + @test 50 <= n_iterations <= 55 # Depends on nthreads log = model.most_recent_training_results.log set_lower_bound(model[1].subproblem[:x].out, 1.0) @test !SDDP.convergence_test(model, log, rule) - @test length(rule.data) == 51 + @test length(rule.data) == n_iterations + 1 model = SDDP.LinearPolicyGraph(; stages = 2, lower_bound = 0.0, @@ -315,6 +316,7 @@ function test_FirstStageStoppingRule() model; print_level = 0, stopping_rules = [SDDP.FirstStageStoppingRule()], + parallel_scheme = SDDP.Serial(), ), ) graph = SDDP.Graph(0, [1, 2], [(0 => 1, 0.5), (0 => 2, 0.5)]) @@ -336,6 +338,7 @@ function test_FirstStageStoppingRule() model; print_level = 0, stopping_rules = [SDDP.FirstStageStoppingRule()], + parallel_scheme = SDDP.Serial(), ), ) return diff --git a/test/user_interface.jl b/test/user_interface.jl index 2db4df9a9..bea11252d 100644 --- a/test/user_interface.jl +++ b/test/user_interface.jl @@ -480,9 +480,8 @@ function test_objective_state() end @test_throws( ErrorException("No objective state defined."), - SDDP.simulate(model, 1), + SDDP.simulate(model, 1; parallel_scheme = SDDP.Serial()), ) - @test_throws( ErrorException("add_objective_state can only be called once."), SDDP.LinearPolicyGraph(; From 75c419b504b450e7caa3c32e9c656e4b311d4e70 Mon Sep 17 00:00:00 2001 From: odow Date: Thu, 25 Jul 2024 16:38:16 +1200 Subject: [PATCH 02/11] Update --- docs/src/examples/air_conditioning_forward.jl | 2 + docs/src/examples/belief.jl | 3 +- src/algorithm.jl | 42 +++++++++++-------- src/plugins/parallel_schemes.jl | 7 +++- 4 files changed, 34 insertions(+), 20 deletions(-) diff --git a/docs/src/examples/air_conditioning_forward.jl b/docs/src/examples/air_conditioning_forward.jl index fb4707de8..02a08e235 100644 --- a/docs/src/examples/air_conditioning_forward.jl +++ b/docs/src/examples/air_conditioning_forward.jl @@ -37,6 +37,8 @@ SDDP.train( forward_pass = SDDP.AlternativeForwardPass(non_convex), post_iteration_callback = SDDP.AlternativePostIterationCallback(non_convex), iteration_limit = 10, + # TODO(odow): bug with Threaded? + parallel_scheme = SDDP.Serial(), ) Test.@test isapprox(SDDP.calculate_bound(non_convex), 62_500.0, atol = 0.1) Test.@test isapprox(SDDP.calculate_bound(convex), 62_500.0, atol = 0.1) diff --git a/docs/src/examples/belief.jl b/docs/src/examples/belief.jl index 6fae19605..c060d57f3 100644 --- a/docs/src/examples/belief.jl +++ b/docs/src/examples/belief.jl @@ -58,8 +58,9 @@ function inventory_management_problem() iteration_limit = 100, cut_type = SDDP.SINGLE_CUT, log_frequency = 10, + parallel_scheme = SDDP.Serial(), ) - results = SDDP.simulate(model, 500) + results = SDDP.simulate(model, 500; parallel_scheme = SDDP.Serial()) objectives = [sum(s[:stage_objective] for s in simulation) for simulation in results] sample_mean = round(Statistics.mean(objectives); digits = 2) diff --git a/src/algorithm.jl b/src/algorithm.jl index 831980877..1c7750dee 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -5,7 +5,10 @@ macro _timeit_threadsafe(timer, label, block) code = quote - first_thread = first(Threads.threadpooltids(Threads.threadpool())) + first_thread = 1 + if VERSION >= v"1.7" + first_thread = first(Threads.threadpooltids(Threads.threadpool())) + end if Threads.threadid() == first_thread TimerOutputs.@timeit $timer $label $block else @@ -552,23 +555,28 @@ function backward_pass( # We need to refine our estimate at all nodes in the partition. for node_index in model.belief_partition[partition_index] node = model[node_index] - # Update belief state, etc. - current_belief = node.belief_state::BeliefState{T} - for (idx, belief) in belief_state - current_belief.belief[idx] = belief + lock(node.lock) + try + # Update belief state, etc. + current_belief = node.belief_state::BeliefState{T} + for (idx, belief) in belief_state + current_belief.belief[idx] = belief + end + new_cuts = refine_bellman_function( + model, + node, + node.bellman_function, + options.risk_measures[node_index], + outgoing_state, + items.duals, + items.supports, + items.probability .* items.belief, + items.objectives, + ) + push!(cuts[node_index], new_cuts) + finally + unlock(node.lock) end - new_cuts = refine_bellman_function( - model, - node, - node.bellman_function, - options.risk_measures[node_index], - outgoing_state, - items.duals, - items.supports, - items.probability .* items.belief, - items.objectives, - ) - push!(cuts[node_index], new_cuts) end else node_index, _ = scenario_path[index] diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index 0fa94b901..c9888b8ed 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -366,8 +366,11 @@ function master_loop( try while keep_iterating result = iteration(model, options) - options.post_iteration_callback(result) - lock(() -> log_iteration(options), options.lock) + lock(options.lock) do + options.post_iteration_callback(result) + log_iteration(options) + return + end if result.has_converged lock(convergence_lock) do keep_iterating = false From d61d5cb370cd3d3a86a131cf5d6434f15f721ab8 Mon Sep 17 00:00:00 2001 From: odow Date: Thu, 25 Jul 2024 16:45:54 +1200 Subject: [PATCH 03/11] Update --- src/algorithm.jl | 8 ++++---- src/plugins/forward_passes.jl | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index 1c7750dee..ff24d605a 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -498,7 +498,7 @@ function distance( if length(starting_states) == 0 return Inf end - return minimum(norm.(starting_states, Ref(state))) + return minimum(norm.(starting_states, Ref(state)); init = Inf) end # Internal function: the norm to use when checking the distance between two @@ -1170,9 +1170,9 @@ function train( status = master_loop(parallel_scheme, model, options) catch ex # Unwrap exceptions from tasks. If there are multiple exceptions, - # rethrow only the first one. + # rethrow only the last one. if ex isa CompositeException - ex = first(ex.exceptions) + ex = last(ex.exceptions) end if ex isa TaskFailedException ex = ex.task.exception @@ -1182,7 +1182,7 @@ function train( interrupt(parallel_scheme) else close(log_file_handle) - rethrow(ex) + throw(ex) end finally # And close the dashboard callback if necessary. diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index 9ac88e18a..2ed68cbe2 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -81,7 +81,7 @@ function forward_pass( # incoming state is more than δ away from the other states, add it # as a possible starting state. if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta + options.cycle_discretization_delta push!(starting_states, incoming_state_value) end # TODO(odow): From 5c13d3c7dc5d9e7c309357cd83460c40d716c911 Mon Sep 17 00:00:00 2001 From: odow Date: Thu, 25 Jul 2024 17:27:10 +1200 Subject: [PATCH 04/11] Update --- docs/src/examples/air_conditioning_forward.jl | 2 -- src/algorithm.jl | 8 ++++++++ src/plugins/forward_passes.jl | 2 +- test/plugins/stopping_rules.jl | 2 -- test/user_interface.jl | 2 +- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/src/examples/air_conditioning_forward.jl b/docs/src/examples/air_conditioning_forward.jl index 02a08e235..fb4707de8 100644 --- a/docs/src/examples/air_conditioning_forward.jl +++ b/docs/src/examples/air_conditioning_forward.jl @@ -37,8 +37,6 @@ SDDP.train( forward_pass = SDDP.AlternativeForwardPass(non_convex), post_iteration_callback = SDDP.AlternativePostIterationCallback(non_convex), iteration_limit = 10, - # TODO(odow): bug with Threaded? - parallel_scheme = SDDP.Serial(), ) Test.@test isapprox(SDDP.calculate_bound(non_convex), 62_500.0, atol = 0.1) Test.@test isapprox(SDDP.calculate_bound(convex), 62_500.0, atol = 0.1) diff --git a/src/algorithm.jl b/src/algorithm.jl index ff24d605a..e031fcc59 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -1026,6 +1026,14 @@ function train( forward_pass_callback::Function = (x) -> nothing, post_iteration_callback = result -> nothing, ) + if any(node -> node.objective_state !== nothing, values(model.nodes)) + # FIXME(odow): Threaded is broken for objective states + parallel_scheme = Serial() + end + if forward_pass isa AlternativeForwardPass + # FIXME(odow): Threaded is broken for AlternativeForwardPass + parallel_scheme = Serial() + end if log_frequency <= 0 msg = "`log_frequency` must be at least `1`. Got $log_frequency." throw(ArgumentError(msg)) diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index 2ed68cbe2..9ac88e18a 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -81,7 +81,7 @@ function forward_pass( # incoming state is more than δ away from the other states, add it # as a possible starting state. if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta + options.cycle_discretization_delta push!(starting_states, incoming_state_value) end # TODO(odow): diff --git a/test/plugins/stopping_rules.jl b/test/plugins/stopping_rules.jl index 4a28d6815..f715c7057 100644 --- a/test/plugins/stopping_rules.jl +++ b/test/plugins/stopping_rules.jl @@ -316,7 +316,6 @@ function test_FirstStageStoppingRule() model; print_level = 0, stopping_rules = [SDDP.FirstStageStoppingRule()], - parallel_scheme = SDDP.Serial(), ), ) graph = SDDP.Graph(0, [1, 2], [(0 => 1, 0.5), (0 => 2, 0.5)]) @@ -338,7 +337,6 @@ function test_FirstStageStoppingRule() model; print_level = 0, stopping_rules = [SDDP.FirstStageStoppingRule()], - parallel_scheme = SDDP.Serial(), ), ) return diff --git a/test/user_interface.jl b/test/user_interface.jl index bea11252d..8dd12603b 100644 --- a/test/user_interface.jl +++ b/test/user_interface.jl @@ -480,7 +480,7 @@ function test_objective_state() end @test_throws( ErrorException("No objective state defined."), - SDDP.simulate(model, 1; parallel_scheme = SDDP.Serial()), + SDDP.simulate(model, 1), ) @test_throws( ErrorException("add_objective_state can only be called once."), From 1ee81f4147537d38f2b88590b048d3ecceebc228 Mon Sep 17 00:00:00 2001 From: odow Date: Thu, 25 Jul 2024 19:27:28 +1200 Subject: [PATCH 05/11] Update --- src/algorithm.jl | 5 +++-- test/plugins/forward_passes.jl | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index e031fcc59..fdd4dc298 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -1030,8 +1030,9 @@ function train( # FIXME(odow): Threaded is broken for objective states parallel_scheme = Serial() end - if forward_pass isa AlternativeForwardPass - # FIXME(odow): Threaded is broken for AlternativeForwardPass + if forward_pass isa AlternativeForwardPass || + forward_pass isa RegularizedForwardPass + # FIXME(odow): Threaded is broken for these forward passes parallel_scheme = Serial() end if log_frequency <= 0 diff --git a/test/plugins/forward_passes.jl b/test/plugins/forward_passes.jl index 158892386..9a593faba 100644 --- a/test/plugins/forward_passes.jl +++ b/test/plugins/forward_passes.jl @@ -258,6 +258,7 @@ function test_RegularizedForwardPass() print_level = 0, forward_pass = forward_pass, iteration_limit = 10, + parallel_scheme = SDDP.Serial(), ) return SDDP.calculate_bound(model) end From 4894a5096466e9fb250f606c475ae54f6eaa47a6 Mon Sep 17 00:00:00 2001 From: odow Date: Fri, 26 Jul 2024 11:29:34 +1200 Subject: [PATCH 06/11] Update --- src/algorithm.jl | 30 +++++++++++++++--------------- src/plugins/parallel_schemes.jl | 15 +++++++-------- src/user_interface.jl | 2 ++ 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index fdd4dc298..4f5b6174f 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -5,11 +5,9 @@ macro _timeit_threadsafe(timer, label, block) code = quote - first_thread = 1 - if VERSION >= v"1.7" - first_thread = first(Threads.threadpooltids(Threads.threadpool())) - end - if Threads.threadid() == first_thread + # TimerOutputs is not thread-safe, so run it only if there is a single + # thread. + if Threads.nthreads() == 1 TimerOutputs.@timeit $timer $label $block else $block @@ -307,13 +305,16 @@ function attempt_numerical_recovery(model::PolicyGraph, node::Node) end if !_has_primal_solution(node) model.ext[:numerical_issue] = true - suffix = Threads.threadid() == 1 ? "" : "_pid=$(Threads.threadid())" - filename = "model$suffix.cuts.json" + # We use the `node.index` in the filename because two threads could both + # try to write the cuts to file at the same time. If, after writing this + # file, a second thread finds an infeasibility of the same node, it + # doesn't matter if we over-write this file. + filename = "model_infeasible_node_$(node.index).cuts.json" @info "Writing cuts to the file `$filename`" write_cuts_to_file(model, filename) write_subproblem_to_file( node, - "subproblem_$(node.index)$suffix.mof.json"; + "subproblem_$(node.index).mof.json"; throw_error = true, ) end @@ -426,10 +427,9 @@ function solve_subproblem( nothing end JuMP.optimize!(node.subproblem) - if haskey(model.ext, :total_solves) - model.ext[:total_solves] += 1 - else - model.ext[:total_solves] = 1 + lock(model.lock) do + model.ext[:total_solves] = get(model.ext, :total_solves, 0) + 1 + return end if JuMP.primal_status(node.subproblem) == JuMP.MOI.INTERRUPTED # If the solver was interrupted, the user probably hit CTRL+C but the @@ -883,9 +883,9 @@ function iteration(model::PolicyGraph{T}, options::Options) where {T} forward_trajectory.cumulative_value, time() - options.start_time, max(Threads.threadid(), Distributed.myid()), - model.ext[:total_solves], + lock(() -> model.ext[:total_solves], model.lock), duality_log_key(options.duality_handler), - model.ext[:numerical_issue], + lock(() -> model.ext[:numerical_issue], model.lock), ), ) has_converged, status = @@ -897,7 +897,7 @@ function iteration(model::PolicyGraph{T}, options::Options) where {T} has_converged, status, cuts, - model.ext[:numerical_issue], + lock(() -> model.ext[:numerical_issue], model.lock), ) finally unlock(options.lock) diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index c9888b8ed..37e9b0397 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -285,7 +285,7 @@ function master_loop( result.cumulative_value, time() - options.start_time, result.pid, - model.ext[:total_solves], + lock(() -> model.ext[:total_solves], model.lock), duality_log_key(options.duality_handler), result.numerical_issue, ), @@ -359,28 +359,27 @@ function master_loop( options::Options, ) where {T} _initialize_solver(model; throw_error = false) - convergence_lock = ReentrantLock() keep_iterating, status = true, nothing @sync for _ in 1:Threads.nthreads() Threads.@spawn begin try + # This access of `keep_iterating` is not thread-safe, but it + # doesn't matter because all it will do is another iteration + # before terminating. while keep_iterating result = iteration(model, options) lock(options.lock) do options.post_iteration_callback(result) log_iteration(options) - return - end - if result.has_converged - lock(convergence_lock) do + if result.has_converged keep_iterating = false status = result.status - return end + return end end finally - lock(convergence_lock) do + lock(options.lock) do keep_iterating = false return end diff --git a/src/user_interface.jl b/src/user_interface.jl index cff15005a..8070c75ce 100644 --- a/src/user_interface.jl +++ b/src/user_interface.jl @@ -718,6 +718,7 @@ mutable struct PolicyGraph{T} # SDDP.jl to stash things. ext::Dict{Symbol,Any} timer_output::TimerOutputs.TimerOutput + lock::ReentrantLock function PolicyGraph(sense::Symbol, root_node::T) where {T} if sense != :Min && sense != :Max @@ -736,6 +737,7 @@ mutable struct PolicyGraph{T} nothing, Dict{Symbol,Any}(), TimerOutputs.TimerOutput(), + ReentrantLock(), ) end end From 7370c56b8a89b33f2d41ee6b394ed8ccf9564e66 Mon Sep 17 00:00:00 2001 From: odow Date: Fri, 26 Jul 2024 12:10:40 +1200 Subject: [PATCH 07/11] Update --- docs/src/examples/the_farmers_problem.jl | 4 ++-- src/plugins/forward_passes.jl | 3 ++- test/user_interface.jl | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/src/examples/the_farmers_problem.jl b/docs/src/examples/the_farmers_problem.jl index f113bb292..26c6aaa8c 100644 --- a/docs/src/examples/the_farmers_problem.jl +++ b/docs/src/examples/the_farmers_problem.jl @@ -245,10 +245,10 @@ end # ## Training a policy # Now that we've built a model, we need to train it using [`SDDP.train`](@ref). -# The keyword `iteration_limit` stops the training after 20 iterations. See +# The keyword `iteration_limit` stops the training after 30 iterations. See # [Choose a stopping rule](@ref) for other ways to stop the training. -SDDP.train(model; iteration_limit = 20) +SDDP.train(model; iteration_limit = 30) # ## Checking the policy diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index 9ac88e18a..1cb262462 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -118,7 +118,8 @@ function forward_pass( unlock(node.lock) end end - if terminated_due_to_cycle + # TODO(odow): .starting_states isn't thread-safe + if terminated_due_to_cycle && Threads.nthreads() == 1 # We terminated due to a cycle. Here is the list of possible starting # states for that node: starting_states = options.starting_states[final_node[1]] diff --git a/test/user_interface.jl b/test/user_interface.jl index 8dd12603b..bea11252d 100644 --- a/test/user_interface.jl +++ b/test/user_interface.jl @@ -480,7 +480,7 @@ function test_objective_state() end @test_throws( ErrorException("No objective state defined."), - SDDP.simulate(model, 1), + SDDP.simulate(model, 1; parallel_scheme = SDDP.Serial()), ) @test_throws( ErrorException("add_objective_state can only be called once."), From 91b7d4677abf0f2bfde6fbfcd3464f32d186045f Mon Sep 17 00:00:00 2001 From: odow Date: Fri, 26 Jul 2024 13:07:32 +1200 Subject: [PATCH 08/11] Update --- docs/src/examples/the_farmers_problem.jl | 4 +- src/plugins/forward_passes.jl | 77 +++++++++++++----------- test/plugins/duality_handlers.jl | 2 +- 3 files changed, 46 insertions(+), 37 deletions(-) diff --git a/docs/src/examples/the_farmers_problem.jl b/docs/src/examples/the_farmers_problem.jl index 26c6aaa8c..377d97112 100644 --- a/docs/src/examples/the_farmers_problem.jl +++ b/docs/src/examples/the_farmers_problem.jl @@ -245,10 +245,10 @@ end # ## Training a policy # Now that we've built a model, we need to train it using [`SDDP.train`](@ref). -# The keyword `iteration_limit` stops the training after 30 iterations. See +# The keyword `iteration_limit` stops the training after 40 iterations. See # [Choose a stopping rule](@ref) for other ways to stop the training. -SDDP.train(model; iteration_limit = 30) +SDDP.train(model; iteration_limit = 40) # ## Checking the policy diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index 1cb262462..b73f790af 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -75,24 +75,29 @@ function forward_pass( push!(belief_states, (partition_index, copy(current_belief))) end # ===== Begin: starting state for infinite horizon ===== - starting_states = options.starting_states[node_index] - if length(starting_states) > 0 - # There is at least one other possible starting state. If our - # incoming state is more than δ away from the other states, add it - # as a possible starting state. - if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta - push!(starting_states, incoming_state_value) + lock(options.lock) + try + starting_states = options.starting_states[node_index] + if length(starting_states) > 0 + # There is at least one other possible starting state. If our + # incoming state is more than δ away from the other states, add it + # as a possible starting state. + if distance(starting_states, incoming_state_value) > + options.cycle_discretization_delta + push!(starting_states, incoming_state_value) + end + # TODO(odow): + # - A better way of randomly sampling a starting state. + # - Is is bad that we splice! here instead of just sampling? For + # convergence it is probably bad, since our list of possible + # starting states keeps changing, but from a computational + # perspective, we don't want to keep a list of discretized points + # in the state-space δ distance apart... + incoming_state_value = + splice!(starting_states, rand(1:length(starting_states))) end - # TODO(odow): - # - A better way of randomly sampling a starting state. - # - Is is bad that we splice! here instead of just sampling? For - # convergence it is probably bad, since our list of possible - # starting states keeps changing, but from a computational - # perspective, we don't want to keep a list of discretized points - # in the state-space δ distance apart... - incoming_state_value = - splice!(starting_states, rand(1:length(starting_states))) + finally + unlock(options.lock) end # ===== End: starting state for infinite horizon ===== # Solve the subproblem, note that `duality_handler = nothing`. @@ -118,23 +123,27 @@ function forward_pass( unlock(node.lock) end end - # TODO(odow): .starting_states isn't thread-safe - if terminated_due_to_cycle && Threads.nthreads() == 1 - # We terminated due to a cycle. Here is the list of possible starting - # states for that node: - starting_states = options.starting_states[final_node[1]] - # We also need the incoming state variable to the final node, which is - # the outgoing state value of the second to last node: - incoming_state_value = if pass.include_last_node - sampled_states[end-1] - else - sampled_states[end] - end - # If this incoming state value is more than δ away from another state, - # add it to the list. - if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta - push!(starting_states, incoming_state_value) + if terminated_due_to_cycle + lock(options.lock) + try + # We terminated due to a cycle. Here is the list of possible + # starting states for that node: + starting_states = options.starting_states[final_node[1]] + # We also need the incoming state variable to the final node, which + # is the outgoing state value of the second to last node: + incoming_state_value = if pass.include_last_node + sampled_states[end-1] + else + sampled_states[end] + end + # If this incoming state value is more than δ away from another + # state, add it to the list. + if distance(starting_states, incoming_state_value) > + options.cycle_discretization_delta + push!(starting_states, incoming_state_value) + end + finally + unlock(options.lock) end end # ===== End: drop off starting state if terminated due to cycle ===== diff --git a/test/plugins/duality_handlers.jl b/test/plugins/duality_handlers.jl index d5ba1f9ce..f9c617f7f 100644 --- a/test/plugins/duality_handlers.jl +++ b/test/plugins/duality_handlers.jl @@ -456,7 +456,7 @@ function test_BanditDuality_eval() end end handler = SDDP.BanditDuality() - SDDP.train(model; duality_handler = handler, iteration_limit = 20) + SDDP.train(model; duality_handler = handler, iteration_limit = 100) @test sum( l.duality_key == " " for l in model.most_recent_training_results.log ) > 10 From 072361f0ddbf0c0509a9c84ba5e33f78efb9b0d6 Mon Sep 17 00:00:00 2001 From: odow Date: Fri, 26 Jul 2024 13:11:06 +1200 Subject: [PATCH 09/11] Fix formatting --- src/plugins/forward_passes.jl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index b73f790af..eb9cb1f4d 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -83,7 +83,7 @@ function forward_pass( # incoming state is more than δ away from the other states, add it # as a possible starting state. if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta + options.cycle_discretization_delta push!(starting_states, incoming_state_value) end # TODO(odow): @@ -93,8 +93,10 @@ function forward_pass( # starting states keeps changing, but from a computational # perspective, we don't want to keep a list of discretized points # in the state-space δ distance apart... - incoming_state_value = - splice!(starting_states, rand(1:length(starting_states))) + incoming_state_value = splice!( + starting_states, + rand(1:length(starting_states)), + ) end finally unlock(options.lock) @@ -139,7 +141,7 @@ function forward_pass( # If this incoming state value is more than δ away from another # state, add it to the list. if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta + options.cycle_discretization_delta push!(starting_states, incoming_state_value) end finally From 6e7cdc3903bb80e63712e21f08716484c5dd6a3a Mon Sep 17 00:00:00 2001 From: odow Date: Fri, 26 Jul 2024 13:25:33 +1200 Subject: [PATCH 10/11] Update --- .../infinite_horizon_hydro_thermal.jl | 1 + src/plugins/forward_passes.jl | 76 ++++++++----------- 2 files changed, 33 insertions(+), 44 deletions(-) diff --git a/docs/src/examples/infinite_horizon_hydro_thermal.jl b/docs/src/examples/infinite_horizon_hydro_thermal.jl index 0cdfdb0f5..741a45169 100644 --- a/docs/src/examples/infinite_horizon_hydro_thermal.jl +++ b/docs/src/examples/infinite_horizon_hydro_thermal.jl @@ -54,6 +54,7 @@ function infinite_hydro_thermal(; cut_type) cut_type = cut_type, log_frequency = 100, sampling_scheme = SDDP.InSampleMonteCarlo(; terminate_on_cycle = true), + parallel_scheme = SDDP.Serial(), cycle_discretization_delta = 0.1, ) @test SDDP.calculate_bound(model) ≈ 119.167 atol = 0.1 diff --git a/src/plugins/forward_passes.jl b/src/plugins/forward_passes.jl index eb9cb1f4d..1dfb6f460 100644 --- a/src/plugins/forward_passes.jl +++ b/src/plugins/forward_passes.jl @@ -75,31 +75,24 @@ function forward_pass( push!(belief_states, (partition_index, copy(current_belief))) end # ===== Begin: starting state for infinite horizon ===== - lock(options.lock) - try - starting_states = options.starting_states[node_index] - if length(starting_states) > 0 - # There is at least one other possible starting state. If our - # incoming state is more than δ away from the other states, add it - # as a possible starting state. - if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta - push!(starting_states, incoming_state_value) - end - # TODO(odow): - # - A better way of randomly sampling a starting state. - # - Is is bad that we splice! here instead of just sampling? For - # convergence it is probably bad, since our list of possible - # starting states keeps changing, but from a computational - # perspective, we don't want to keep a list of discretized points - # in the state-space δ distance apart... - incoming_state_value = splice!( - starting_states, - rand(1:length(starting_states)), - ) + starting_states = options.starting_states[node_index] + if length(starting_states) > 0 + # There is at least one other possible starting state. If our + # incoming state is more than δ away from the other states, add it + # as a possible starting state. + if distance(starting_states, incoming_state_value) > + options.cycle_discretization_delta + push!(starting_states, incoming_state_value) end - finally - unlock(options.lock) + # TODO(odow): + # - A better way of randomly sampling a starting state. + # - Is is bad that we splice! here instead of just sampling? For + # convergence it is probably bad, since our list of possible + # starting states keeps changing, but from a computational + # perspective, we don't want to keep a list of discretized points + # in the state-space δ distance apart... + incoming_state_value = + splice!(starting_states, rand(1:length(starting_states))) end # ===== End: starting state for infinite horizon ===== # Solve the subproblem, note that `duality_handler = nothing`. @@ -126,26 +119,21 @@ function forward_pass( end end if terminated_due_to_cycle - lock(options.lock) - try - # We terminated due to a cycle. Here is the list of possible - # starting states for that node: - starting_states = options.starting_states[final_node[1]] - # We also need the incoming state variable to the final node, which - # is the outgoing state value of the second to last node: - incoming_state_value = if pass.include_last_node - sampled_states[end-1] - else - sampled_states[end] - end - # If this incoming state value is more than δ away from another - # state, add it to the list. - if distance(starting_states, incoming_state_value) > - options.cycle_discretization_delta - push!(starting_states, incoming_state_value) - end - finally - unlock(options.lock) + # We terminated due to a cycle. Here is the list of possible + # starting states for that node: + starting_states = options.starting_states[final_node[1]] + # We also need the incoming state variable to the final node, which + # is the outgoing state value of the second to last node: + incoming_state_value = if pass.include_last_node + sampled_states[end-1] + else + sampled_states[end] + end + # If this incoming state value is more than δ away from another + # state, add it to the list. + if distance(starting_states, incoming_state_value) > + options.cycle_discretization_delta + push!(starting_states, incoming_state_value) end end # ===== End: drop off starting state if terminated due to cycle ===== From dff315df9e683e9354f580917c4df5e1b715fe1e Mon Sep 17 00:00:00 2001 From: odow Date: Fri, 26 Jul 2024 14:06:07 +1200 Subject: [PATCH 11/11] Update --- src/algorithm.jl | 6 +++--- src/plugins/parallel_schemes.jl | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/algorithm.jl b/src/algorithm.jl index 4f5b6174f..d6fdc3825 100644 --- a/src/algorithm.jl +++ b/src/algorithm.jl @@ -1018,7 +1018,7 @@ function train( cut_deletion_minimum::Int = 1, backward_sampling_scheme::AbstractBackwardSamplingScheme = SDDP.CompleteSampler(), dashboard::Bool = false, - parallel_scheme::AbstractParallelScheme = Threaded(), + parallel_scheme::AbstractParallelScheme = Serial(), forward_pass::AbstractForwardPass = DefaultForwardPass(), forward_pass_resampling_probability::Union{Nothing,Float64} = nothing, add_to_existing_cuts::Bool = false, @@ -1342,7 +1342,7 @@ end custom_recorders = Dict{Symbol, Function}(), duality_handler::Union{Nothing,AbstractDualityHandler} = nothing, skip_undefined_variables::Bool = false, - parallel_scheme::AbstractParallelScheme = Threaded(), + parallel_scheme::AbstractParallelScheme = Serial(), incoming_state::Dict{String,Float64} = _initial_state(model), )::Vector{Vector{Dict{Symbol,Any}}} @@ -1433,7 +1433,7 @@ function simulate( custom_recorders = Dict{Symbol,Function}(), duality_handler::Union{Nothing,AbstractDualityHandler} = nothing, skip_undefined_variables::Bool = false, - parallel_scheme::AbstractParallelScheme = Threaded(), + parallel_scheme::AbstractParallelScheme = Serial(), incoming_state::Dict{String,Float64} = _initial_state(model), ) return _simulate( diff --git a/src/plugins/parallel_schemes.jl b/src/plugins/parallel_schemes.jl index 37e9b0397..39d3916f7 100644 --- a/src/plugins/parallel_schemes.jl +++ b/src/plugins/parallel_schemes.jl @@ -141,7 +141,7 @@ function slave_update(model::PolicyGraph, result::IterationResult) if cut === nothing error( "This model uses features that are not suppored in async " * - "mode. Use `parallel_scheme = Threaded()` instead.", + "mode. Use `parallel_scheme = Serial()` instead.", ) end _add_cut(