Skip to content

Commit

Permalink
Make observables run in parallel (#4748)
Browse files Browse the repository at this point in the history
Fixes #3154

Description of changes:
- make observables run in parallel
   - only send the extracted data via MPI (the original implementation sent the entire particle data structure)
   - remove a major performance bottleneck in parallel simulations
- remove 8 callbacks from the deprecated `MpiCallbacks` framework
  • Loading branch information
kodiakhq[bot] authored Jul 24, 2023
2 parents 95d47b9 + 4942938 commit a770f49
Show file tree
Hide file tree
Showing 88 changed files with 1,419 additions and 674 deletions.
4 changes: 2 additions & 2 deletions src/core/accumulators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ struct AutoUpdateAccumulator {
std::vector<AutoUpdateAccumulator> auto_update_accumulators;
} // namespace

void auto_update(int steps) {
void auto_update(boost::mpi::communicator const &comm, int steps) {
for (auto &acc : auto_update_accumulators) {
assert(steps <= acc.frequency);
acc.counter -= steps;
if (acc.counter <= 0) {
acc.acc->update();
acc.acc->update(comm);
acc.counter = acc.frequency;
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/accumulators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace Accumulators {
* they need to be updated and if so does.
*
*/
void auto_update(int steps);
void auto_update(boost::mpi::communicator const &comm, int steps);
int auto_update_next_update();
void auto_update_add(AccumulatorBase *);
void auto_update_remove(AccumulatorBase *);
Expand Down
7 changes: 6 additions & 1 deletion src/core/accumulators/AccumulatorBase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
#include <cstddef>
#include <vector>

// Forward declaration
namespace boost::mpi {
class communicator;
}

namespace Accumulators {

class AccumulatorBase {
Expand All @@ -31,7 +36,7 @@ class AccumulatorBase {

int &delta_N() { return m_delta_N; }

virtual void update() = 0;
virtual void update(boost::mpi::communicator const &comm) = 0;
/** Dimensions needed to reshape the flat array returned by the accumulator */
virtual std::vector<std::size_t> shape() const = 0;

Expand Down
24 changes: 20 additions & 4 deletions src/core/accumulators/Correlator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,22 @@ void Correlator::initialize() {
}
}

void Correlator::update() {
void Correlator::update(boost::mpi::communicator const &comm) {
if (finalized) {
throw std::runtime_error(
"No data can be added after finalize() was called.");
}

if (comm.rank() != 0) {
// worker nodes just need to update the observables and exit
A_obs->operator()(comm);
if (A_obs != B_obs) {
B_obs->operator()(comm);
}

return;
}

// We must now go through the hierarchy and make sure there is space for the
// new datapoint. For every hierarchy level we have to decide if it is
// necessary to move something
Expand Down Expand Up @@ -360,9 +371,9 @@ void Correlator::update() {
newest[0] = (newest[0] + 1) % (m_tau_lin + 1);
n_vals[0]++;

A[0][newest[0]] = A_obs->operator()();
A[0][newest[0]] = A_obs->operator()(comm);
if (A_obs != B_obs) {
B[0][newest[0]] = B_obs->operator()();
B[0][newest[0]] = B_obs->operator()(comm);
} else {
B[0][newest[0]] = A[0][newest[0]];
}
Expand Down Expand Up @@ -411,7 +422,7 @@ void Correlator::update() {
}
}

int Correlator::finalize() {
int Correlator::finalize(boost::mpi::communicator const &comm) {
using index_type = decltype(result)::index;
if (finalized) {
throw std::runtime_error("Correlator::finalize() can only be called once.");
Expand All @@ -423,6 +434,11 @@ int Correlator::finalize() {
// mark the correlation as finalized
finalized = true;

// worker nodes don't need to do anything
if (comm.rank() != 0) {
return 0;
}

for (int ll = 0; ll < m_hierarchy_depth - 1; ll++) {
long vals_ll; // number of values remaining in the lowest level
if (n_vals[ll] > m_tau_lin + 1)
Expand Down
4 changes: 2 additions & 2 deletions src/core/accumulators/Correlator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ class Correlator : public AccumulatorBase {
* the correlation estimate is updated.
* TODO: Not all correlation estimates have to be updated.
*/
void update() override;
void update(boost::mpi::communicator const &comm) override;

/** At the end of data collection, go through the whole hierarchy and
* correlate data left there.
*/
int finalize();
int finalize(boost::mpi::communicator const &comm);

/** Return correlation result */
std::vector<double> get_correlation();
Expand Down
8 changes: 7 additions & 1 deletion src/core/accumulators/MeanVarianceCalculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@
#include <vector>

namespace Accumulators {
void MeanVarianceCalculator::update() { m_acc(m_obs->operator()()); }
void MeanVarianceCalculator::update(boost::mpi::communicator const &comm) {
if (comm.rank() == 0) {
m_acc(m_obs->operator()(comm));
} else {
m_obs->operator()(comm);
}
}

std::vector<double> MeanVarianceCalculator::mean() { return m_acc.mean(); }

Expand Down
2 changes: 1 addition & 1 deletion src/core/accumulators/MeanVarianceCalculator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class MeanVarianceCalculator : public AccumulatorBase {
int delta_N)
: AccumulatorBase(delta_N), m_obs(obs), m_acc(obs->n_values()) {}

void update() override;
void update(boost::mpi::communicator const &comm) override;
std::vector<double> mean();
std::vector<double> variance();
std::vector<double> std_error();
Expand Down
8 changes: 7 additions & 1 deletion src/core/accumulators/TimeSeries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
#include <string>

namespace Accumulators {
void TimeSeries::update() { m_data.emplace_back(m_obs->operator()()); }
void TimeSeries::update(boost::mpi::communicator const &comm) {
if (comm.rank() == 0) {
m_data.emplace_back(m_obs->operator()(comm));
} else {
m_obs->operator()(comm);
}
}

std::string TimeSeries::get_internal_state() const {
std::stringstream ss;
Expand Down
2 changes: 1 addition & 1 deletion src/core/accumulators/TimeSeries.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TimeSeries : public AccumulatorBase {
TimeSeries(std::shared_ptr<Observables::Observable> obs, int delta_N)
: AccumulatorBase(delta_N), m_obs(std::move(obs)) {}

void update() override;
void update(boost::mpi::communicator const &comm) override;
std::string get_internal_state() const;
void set_internal_state(std::string const &);

Expand Down
16 changes: 8 additions & 8 deletions src/core/dpd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@

#include "dpd.hpp"

#include "MpiCallbacks.hpp"
#include "cells.hpp"
#include "communication.hpp"
#include "event.hpp"
#include "grid.hpp"
#include "integrate.hpp"
Expand All @@ -44,6 +42,8 @@
#include <utils/math/tensor_product.hpp>
#include <utils/matrix.hpp>

#include <boost/mpi/collectives/reduce.hpp>

#include <algorithm>
#include <cmath>
#include <cstdint>
Expand Down Expand Up @@ -151,7 +151,6 @@ static auto dpd_viscous_stress_local() {

return stress;
}
REGISTER_CALLBACK_REDUCTION(dpd_viscous_stress_local, std::plus<>())

/**
* @brief Viscous stress tensor of the DPD interaction.
Expand All @@ -168,12 +167,13 @@ REGISTER_CALLBACK_REDUCTION(dpd_viscous_stress_local, std::plus<>())
*
* @return Stress tensor contribution.
*/
Utils::Vector9d dpd_stress() {
auto const stress = mpi_call(Communication::Result::reduction, std::plus<>(),
dpd_viscous_stress_local);
auto const volume = box_geo.volume();
Utils::Vector9d dpd_stress(boost::mpi::communicator const &comm) {
auto const local_stress = dpd_viscous_stress_local();
std::remove_const_t<decltype(local_stress)> global_stress;

boost::mpi::reduce(comm, local_stress, global_stress, std::plus<>(), 0);

return Utils::flatten(stress) / volume;
return Utils::flatten(global_stress) / box_geo.volume();
}

#endif // DPD
7 changes: 6 additions & 1 deletion src/core/dpd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

#include <utils/Vector.hpp>

// Forward declaration
namespace boost::mpi {
class communicator;
}

struct IA_parameters;

void dpd_init(double kT, double time_step);
Expand All @@ -42,7 +47,7 @@ Utils::Vector3d dpd_pair_force(Particle const &p1, Particle const &p2,
IA_parameters const &ia_params,
Utils::Vector3d const &d, double dist,
double dist2);
Utils::Vector9d dpd_stress();
Utils::Vector9d dpd_stress(boost::mpi::communicator const &comm);

#endif // DPD
#endif
17 changes: 0 additions & 17 deletions src/core/energy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include "magnetostatics/dipoles.hpp"

#include <utils/Span.hpp>
#include <utils/mpi/iall_gatherv.hpp>

#include <memory>

Expand Down Expand Up @@ -114,18 +113,6 @@ std::shared_ptr<Observable_stat> calculate_energy() {
// NOLINTNEXTLINE(clang-analyzer-cplusplus.NewDeleteLeaks)
}

REGISTER_CALLBACK_MAIN_RANK(calculate_energy)

double mpi_calculate_potential_energy() {
auto const obs = mpi_call(Communication::Result::main_rank, calculate_energy);
return obs->accumulate(-obs->kinetic[0]);
}

double mpi_observable_compute_energy() {
auto const obs = mpi_call(Communication::Result::main_rank, calculate_energy);
return obs->accumulate(0);
}

double particle_short_range_energy_contribution(int pid) {
double ret = 0.0;

Expand Down Expand Up @@ -159,8 +146,4 @@ void calc_long_range_fields() {
auto const &dipoles = System::get_system().dipoles;
dipoles.calc_long_range_field(particles);
}

REGISTER_CALLBACK(calc_long_range_fields)

void mpi_calc_long_range_fields() { mpi_call_all(calc_long_range_fields); }
#endif
10 changes: 2 additions & 8 deletions src/core/energy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@
/** Parallel energy calculation. */
std::shared_ptr<Observable_stat> calculate_energy();

/** Calculate the total energy of the system. */
double mpi_calculate_potential_energy();

/** Helper function for @ref Observables::Energy. */
double mpi_observable_compute_energy();

/**
* @brief Compute short-range energy of a particle.
*
Expand All @@ -50,9 +44,9 @@ double mpi_observable_compute_energy();
*/
double particle_short_range_energy_contribution(int pid);

#endif
#ifdef DIPOLE_FIELD_TRACKING
/** Calculate dipole fields. */
void calc_long_range_fields();
void mpi_calc_long_range_fields();
#endif // DIPOLE_FIELD_TRACKING

#endif
58 changes: 16 additions & 42 deletions src/core/grid_based_algorithms/lb_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "grid.hpp"

#include <utils/Vector.hpp>
#include <utils/mpi/reduce_optional.hpp>

#include <boost/optional.hpp>
#include <boost/serialization/access.hpp>
Expand Down Expand Up @@ -126,40 +127,15 @@ double get_kT() {

double get_lattice_speed() { return get_agrid() / get_tau(); }

#ifdef WALBERLA
namespace Walberla {

static Utils::Vector3d get_momentum() { return lb_walberla()->get_momentum(); }

static boost::optional<Utils::Vector3d>
get_velocity_at_pos(Utils::Vector3d pos) {
return lb_walberla()->get_velocity_at_pos(pos);
}

REGISTER_CALLBACK_ONE_RANK(get_velocity_at_pos)

static boost::optional<double>
get_interpolated_density_at_pos(Utils::Vector3d pos) {
return lb_walberla()->get_interpolated_density_at_pos(pos);
}

REGISTER_CALLBACK_ONE_RANK(get_interpolated_density_at_pos)

static Utils::VectorXd<9> get_pressure_tensor() {
return lb_walberla()->get_pressure_tensor();
}

REGISTER_CALLBACK_REDUCTION(get_pressure_tensor, std::plus<>())

} // namespace Walberla
#endif // WALBERLA

Utils::VectorXd<9> const get_pressure_tensor() {
Utils::VectorXd<9> const
get_pressure_tensor(boost::mpi::communicator const &comm) {
if (lattice_switch == ActiveLB::WALBERLA_LB) {
#ifdef WALBERLA
return ::Communication::mpiCallbacks().call(
::Communication::Result::reduction, std::plus<>(),
Walberla::get_pressure_tensor);
auto const local_pressure_tensor = lb_walberla()->get_pressure_tensor();
std::remove_const_t<decltype(local_pressure_tensor)> pressure_tensor;
boost::mpi::reduce(comm, local_pressure_tensor, pressure_tensor,
std::plus<>(), 0);
return pressure_tensor;
#endif
}
throw NoLBActive();
Expand All @@ -168,30 +144,28 @@ Utils::VectorXd<9> const get_pressure_tensor() {
Utils::Vector3d calc_fluid_momentum() {
if (lattice_switch == ActiveLB::WALBERLA_LB) {
#ifdef WALBERLA
return Walberla::get_momentum();
return lb_walberla()->get_momentum();
#endif
}
throw NoLBActive();
}

Utils::Vector3d const get_interpolated_velocity(Utils::Vector3d const &pos) {
boost::optional<Utils::Vector3d>
get_interpolated_velocity(Utils::Vector3d const &pos) {
if (lattice_switch == ActiveLB::WALBERLA_LB) {
#ifdef WALBERLA
auto const folded_pos = folded_position(pos, box_geo);
return mpi_call(::Communication::Result::one_rank,
Walberla::get_velocity_at_pos, folded_pos / get_agrid());
auto const folded_pos = folded_position(pos, box_geo) / get_agrid();
return lb_walberla()->get_velocity_at_pos(folded_pos);
#endif
}
throw NoLBActive();
}

double get_interpolated_density(Utils::Vector3d const &pos) {
boost::optional<double> get_interpolated_density(Utils::Vector3d const &pos) {
if (lattice_switch == ActiveLB::WALBERLA_LB) {
#ifdef WALBERLA
auto const folded_pos = folded_position(pos, box_geo);
return mpi_call(::Communication::Result::one_rank,
Walberla::get_interpolated_density_at_pos,
folded_pos / get_agrid());
auto const folded_pos = folded_position(pos, box_geo) / get_agrid();
return lb_walberla()->get_interpolated_density_at_pos(folded_pos);
#endif
}
throw NoLBActive();
Expand Down
Loading

0 comments on commit a770f49

Please sign in to comment.