From 3bd1e7b3b99067907e65f7d8f1417dd30add2734 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Tue, 22 Oct 2024 18:43:10 +0300 Subject: [PATCH 1/4] compiler: Initialize additional halo opt pass --- devito/ir/iet/nodes.py | 5 +- devito/ir/support/basic.py | 2 +- devito/mpi/halo_scheme.py | 8 +- devito/passes/iet/mpi.py | 156 ++++++++++++++++++++++++++++++++++++- tests/mfe_1d.py | 42 ++++++++++ tests/test_mpi.py | 51 +++++++++++- 6 files changed, 257 insertions(+), 7 deletions(-) create mode 100644 tests/mfe_1d.py diff --git a/devito/ir/iet/nodes.py b/devito/ir/iet/nodes.py index 9bcc3460f6..ab87264255 100644 --- a/devito/ir/iet/nodes.py +++ b/devito/ir/iet/nodes.py @@ -1457,8 +1457,9 @@ def __init__(self, body, halo_scheme): self._halo_scheme = halo_scheme def __repr__(self): - functions = "(%s)" % ",".join(i.name for i in self.functions) - return "<%s%s>" % (self.__class__.__name__, functions) + functions = "(%s" % ",".join(i.name for i in self.functions) + loc_indices = "[%s])" % ",".join(str(i) for i in self.halo_scheme.loc_indices2) + return "<%s%s%s>" % (self.__class__.__name__, functions, loc_indices) @property def halo_scheme(self): diff --git a/devito/ir/support/basic.py b/devito/ir/support/basic.py index 3d6bb3ba60..51e647b399 100644 --- a/devito/ir/support/basic.py +++ b/devito/ir/support/basic.py @@ -25,7 +25,7 @@ class IndexMode(Tag): REGULAR = IndexMode('regular') IRREGULAR = IndexMode('irregular') -# Symbols to create mock data depdendencies +# Symbols to create mock data dependencies mocksym0 = Symbol(name='__⋈_0__') mocksym1 = Symbol(name='__⋈_1__') diff --git a/devito/mpi/halo_scheme.py b/devito/mpi/halo_scheme.py index 0062b5b32f..5c24fb5369 100644 --- a/devito/mpi/halo_scheme.py +++ b/devito/mpi/halo_scheme.py @@ -96,7 +96,8 @@ def __init__(self, exprs, ispace): def __repr__(self): fnames = ",".join(i.name for i in set(self._mapper)) - return "HaloScheme<%s>" % fnames + loc_indices = "[%s]" % ",".join(str(i) for i in self.loc_indices2) + return "HaloScheme<%s%s>" % (fnames, loc_indices) def __eq__(self, other): return (isinstance(other, HaloScheme) and @@ -121,6 +122,7 @@ def union(self, halo_schemes): """ Create a new HaloScheme from the union of a set of HaloSchemes. """ + # import pdb; pdb.set_trace() halo_schemes = [hs for hs in halo_schemes if hs is not None] if not halo_schemes: return None @@ -366,6 +368,10 @@ def distributed_aindices(self): def loc_indices(self): return set().union(*[i.loc_indices.keys() for i in self.fmapper.values()]) + @cached_property + def loc_indices2(self): + return set().union(*[i.loc_indices.values() for i in self.fmapper.values()]) + @cached_property def arguments(self): return self.dimensions | set(flatten(self.honored.values())) diff --git a/devito/passes/iet/mpi.py b/devito/passes/iet/mpi.py index 9f49440709..ca68205f21 100644 --- a/devito/passes/iet/mpi.py +++ b/devito/passes/iet/mpi.py @@ -7,11 +7,11 @@ retrieve_iteration_tree) from devito.ir.support import PARALLEL, Scope from devito.ir.support.guards import GuardFactorEq -from devito.mpi.halo_scheme import HaloScheme +from devito.mpi.halo_scheme import HaloScheme, HaloSchemeEntry from devito.mpi.reduction_scheme import DistReduce from devito.mpi.routines import HaloExchangeBuilder, ReductionBuilder from devito.passes.iet.engine import iet_pass -from devito.tools import generator +from devito.tools import generator, frozendict __all__ = ['mpiize'] @@ -24,6 +24,9 @@ def optimize_halospots(iet, **kwargs): """ iet = _drop_halospots(iet) iet = _hoist_halospots(iet) + + iet = _merge_halospots_byfunc(iet) + iet = _merge_halospots(iet) iet = _drop_if_unwritten(iet, **kwargs) iet = _mark_overlappable(iet) @@ -122,6 +125,16 @@ def rule1(dep, candidates, loc_dims): iet = Transformer(mapper, nested=True).visit(iet) # Clean up: de-nest HaloSpots if necessary + iet = denest_halospots(iet) + + return iet + + +def denest_halospots(iet): + """ + Denest nested HaloSpots. + # TOFIX: This also merges HaloSpots that have different loc_indices + """ mapper = {} for hs in FindNodes(HaloSpot).visit(iet): if hs.body.is_HaloSpot: @@ -210,6 +223,145 @@ def rule2(dep, hs, loc_indices): return iet +def _merge_halospots_byfunc(iet): + """ + Merge HaloSpots on the same Iteration tree level where all data dependencies + would be honored. + """ + + # Merge rules -- if the retval is True, then it means the input `dep` is not + # a stopper to halo merging + + def rule0(dep, hs, loc_indices): + # E.g., `dep=W -> R` => True + return not any(d in hs.dimensions or dep.distance_mapper[d] is S.Infinity + for d in dep.cause) + + def rule1(dep, hs, loc_indices): + # TODO This is apparently never hit, but feeling uncomfortable to remove it + return (dep.is_regular and + dep.read is not None and + all(not any(dep.read.touched_halo(d.root)) for d in dep.cause)) + + def rule2(dep, hs, loc_indices): + # E.g., `dep=W -> R` and `loc_indices={t: t0}` => True + return any(dep.distance_mapper[d] == 0 and dep.source[d] is not v + for d, v in loc_indices.items()) + + rules = [rule0, rule1, rule2] + + # Analysis + cond_mapper = MapHaloSpots().visit(iet) + cond_mapper = {hs: {i for i in v if i.is_Conditional and + not isinstance(i.condition, GuardFactorEq)} + for hs, v in cond_mapper.items()} + + iter_mapper = MapNodes(Iteration, HaloSpot, 'immediate').visit(iet) + + mapper = {} + hoist_mapper = {} + raw_loc_indices = {} + + for iter, halo_spots in iter_mapper.items(): + if iter is None or len(halo_spots) <= 1: + continue + + scope = Scope([e.expr for e in FindNodes(Expression).visit(iter)]) + + candidates = [] + + for i, _ in enumerate(halo_spots): + hs0 = halo_spots[i] + + for hs1 in halo_spots[i+1:]: + # If there are Conditionals involved, both `hs0` and `hs1` must be + # within the same Conditional, otherwise we would break the control + # flow semantics + if cond_mapper.get(hs0) != cond_mapper.get(hs1): + continue + + for f, v in hs1.fmapper.items(): + for dep in scope.d_flow.project(f): + if not any(r(dep, hs1, v.loc_indices) for r in rules): + break + else: + try: + reps = (*hs0.functions, *hs1.functions).count(f) + candidates.append((hs0, hs1, reps)) + except ValueError: + # import pdb;pdb.set_trace() + pass + + if candidates: + ordered_candidates = sorted([candidates[-1]], key=lambda x: x[-1]) + + for hs0, hs1, reps in ordered_candidates: + # We assume that the latter HaloSpot is lifted while the former + # stays in place. We only work with twins "{t0, t1}" and not identicals + + # Ensure they are different + bool_locs = hs0.halo_scheme.loc_indices2 == hs1.halo_scheme.loc_indices2 + if bool_locs: + continue + + # We do not work with less than two occurences of the function + if reps < 2: + continue + + hs1_temp = hs1 + + # This drops the latter halospot + hs1_temp = hs1_temp._rebuild(halo_scheme=hs1_temp.halo_scheme.drop(f)) + mapper[hs1] = hs1_temp.halo_scheme + + # Now we need to reconstruct a HaloSchemeEntry for the hoisted HaloSpot + loc_indices = hs1.halo_scheme.fmapper[f].loc_indices + loc_dirs = hs1.halo_scheme.fmapper[f].loc_dirs + halos = hs1.halo_scheme.fmapper[f].halos + dims = hs1.halo_scheme.fmapper[f].dims + + # Since lifted, we need to update the loc_indices + # with known values + + for d in loc_indices: + root_min = loc_indices[d].symbolic_min + new_min = root_min.subs(loc_indices[d].root, + loc_indices[d].root.symbolic_min) + raw_loc_indices[d] = new_min + + raw_loc_indices = frozendict(raw_loc_indices) + + # Is that safe in terms of immutability ? + hs1.halo_scheme.fmapper[f] = HaloSchemeEntry(raw_loc_indices, + loc_dirs, + halos, + dims) + # import pdb;pdb.set_trace() + + hs1_new = hs1._rebuild(halo_scheme=hs1.halo_scheme, body=iter) + # mapper[hs0.body] = hs1_new + hoist_mapper[iter] = hs1_new + + # Post-process analysis + # This is also necessary to avoid the no-children error + for i, hs in mapper.items(): + try: + if hs.is_void: + mapper[i] = i.body + except AttributeError: + pass + + # Transform the IET merging/dropping HaloSpots as according to the analysis + iet = Transformer(hoist_mapper, nested=False).visit(iet) + iet = Transformer(mapper, nested=False).visit(iet) + + # Clean up: de-nest HaloSpots if necessary + # THis also merges HaloSpots that have different loc_indices + # iet = denest_halospots(iet) + + return iet + + def _drop_if_unwritten(iet, options=None, **kwargs): """ Drop HaloSpots for unwritten Functions. diff --git a/tests/mfe_1d.py b/tests/mfe_1d.py new file mode 100644 index 0000000000..4641f8ed17 --- /dev/null +++ b/tests/mfe_1d.py @@ -0,0 +1,42 @@ +import numpy as np + +from devito import (SpaceDimension, Grid, TimeFunction, Eq, Operator, + solve, Constant, norm) +from examples.seismic.source import TimeAxis, Receiver + +# Space related +extent = (1500., ) +shape = (201, ) +x = SpaceDimension(name='x', spacing=Constant(name='h_x', value=extent[0]/(shape[0]-1))) +grid = Grid(extent=extent, shape=shape, dimensions=(x, )) + +# Time related +t0, tn = 0., 30. +dt = (10. / np.sqrt(2.)) / 6. +time_range = TimeAxis(start=t0, stop=tn, step=dt) + +# Velocity and pressure fields +so = 2 +to = 1 +v = TimeFunction(name='v', grid=grid, space_order=so, time_order=to) +tau = TimeFunction(name='tau', grid=grid, space_order=so, time_order=to) + +# The receiver +nrec = 1 +rec = Receiver(name="rec", grid=grid, npoint=nrec, time_range=time_range) +rec.coordinates.data[:, 0] = np.linspace(0., extent[0], num=nrec) +rec_term = rec.interpolate(expr=v) + +# First order elastic-like dependencies equations +pde_v = v.dt - (tau.dx) +pde_tau = (tau.dt - ((v.forward).dx)) + +u_v = Eq(v.forward, solve(pde_v, v.forward)) +u_tau = Eq(tau.forward, solve(pde_tau, tau.forward)) + +op = Operator([u_v] + [u_tau] + rec_term) +op.apply(dt=dt) + +print(norm(v)) +print(norm(tau)) +# print(op.ccode) \ No newline at end of file diff --git a/tests/test_mpi.py b/tests/test_mpi.py index 74e09575c1..d374fff70a 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -7,7 +7,8 @@ SparseTimeFunction, Dimension, ConditionalDimension, div, SubDimension, SubDomain, Eq, Ne, Inc, NODE, Operator, norm, inner, configuration, switchconfig, generic_derivative, - PrecomputedSparseFunction, DefaultDimension, Buffer) + PrecomputedSparseFunction, DefaultDimension, Buffer, + solve) from devito.arch.compiler import OneapiCompiler from devito.data import LEFT, RIGHT from devito.ir.iet import (Call, Conditional, Iteration, FindNodes, FindSymbols, @@ -17,8 +18,10 @@ ComputeCall) from devito.mpi.distributed import CustomTopology from devito.tools import Bunch +from devito.types.dimension import SpaceDimension from examples.seismic.acoustic import acoustic_setup +from examples.seismic import Receiver, TimeAxis class TestDistributor: @@ -1005,6 +1008,51 @@ def test_avoid_haloupdate_if_distr_but_sequential(self, mode): calls = FindNodes(Call).visit(op) assert len(calls) == 0 + @pytest.mark.parallel(mode=1) + def test_issue_2448(self, mode): + # TOFIX: Placeholder test for issue 2448 + # Space related + extent = (1500., ) + shape = (201, ) + x = SpaceDimension(name='x', spacing=Constant(name='h_x', + value=extent[0]/(shape[0]-1))) + grid = Grid(extent=extent, shape=shape, dimensions=(x, )) + + # Time related + t0, tn = 0., 30. + dt = (10. / np.sqrt(2.)) / 6. + time_range = TimeAxis(start=t0, stop=tn, step=dt) + + # Velocity and pressure fields + so = 2 + to = 1 + v = TimeFunction(name='v', grid=grid, space_order=so, time_order=to) + tau = TimeFunction(name='tau', grid=grid, space_order=so, time_order=to) + + # The receiver + nrec = 1 + rec = Receiver(name="rec", grid=grid, npoint=nrec, time_range=time_range) + rec.coordinates.data[:, 0] = np.linspace(0., extent[0], num=nrec) + rec_term = rec.interpolate(expr=v) + + # First order elastic-like dependencies equations + pde_v = v.dt - (tau.dx) + pde_tau = (tau.dt - ((v.forward).dx)) + u_v = Eq(v.forward, solve(pde_v, v.forward)) + + u_tau = Eq(tau.forward, solve(pde_tau, tau.forward)) + + op = Operator([u_v] + [u_tau] + rec_term) + + calls = [i for i in FindNodes(Call).visit(op) + if isinstance(i, HaloUpdateCall)] + + # The correct we want + assert len(calls) == 3 + assert calls[0].arguments[0] is v + assert calls[1].arguments[0] is tau + assert calls[2].arguments[0] is v + @pytest.mark.parallel(mode=1) def test_avoid_haloupdate_with_subdims(self, mode): grid = Grid(shape=(4,)) @@ -1357,6 +1405,7 @@ def test_merge_haloupdate_if_diff_locindices_v1(self, mode): op = Operator(eqns) calls = FindNodes(Call).visit(op) + print(op.ccode) assert len(calls) == 1 op.apply(time_M=1) From 7477409d20b9d3751071258c20db9cf110cfc541 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Wed, 23 Oct 2024 12:32:08 +0100 Subject: [PATCH 2/4] compiler: Improve candidate selection and mapper application --- devito/ir/iet/nodes.py | 17 +++- devito/passes/iet/mpi.py | 162 ++++++++++++++++++--------------------- tests/test_mpi.py | 3 +- 3 files changed, 90 insertions(+), 92 deletions(-) diff --git a/devito/ir/iet/nodes.py b/devito/ir/iet/nodes.py index ab87264255..b49dace92d 100644 --- a/devito/ir/iet/nodes.py +++ b/devito/ir/iet/nodes.py @@ -1457,9 +1457,20 @@ def __init__(self, body, halo_scheme): self._halo_scheme = halo_scheme def __repr__(self): - functions = "(%s" % ",".join(i.name for i in self.functions) - loc_indices = "[%s])" % ",".join(str(i) for i in self.halo_scheme.loc_indices2) - return "<%s%s%s>" % (self.__class__.__name__, functions, loc_indices) + function_strings = [] + for func in self.functions: + loc_indices = set().union(*[self.halo_scheme.fmapper[func].loc_indices.values()]) + loc_indices = list(loc_indices) + if loc_indices: + loc_indices_str = str(loc_indices) + else: + loc_indices_str = "" + + function_strings.append(f"{func.name}{loc_indices_str}") + + functions = ",".join(function_strings) + + return "<%s(%s)>" % (self.__class__.__name__, functions) @property def halo_scheme(self): diff --git a/devito/passes/iet/mpi.py b/devito/passes/iet/mpi.py index ca68205f21..3aace13d69 100644 --- a/devito/passes/iet/mpi.py +++ b/devito/passes/iet/mpi.py @@ -24,9 +24,7 @@ def optimize_halospots(iet, **kwargs): """ iet = _drop_halospots(iet) iet = _hoist_halospots(iet) - iet = _merge_halospots_byfunc(iet) - iet = _merge_halospots(iet) iet = _drop_if_unwritten(iet, **kwargs) iet = _mark_overlappable(iet) @@ -87,6 +85,7 @@ def rule1(dep, candidates, loc_dims): # Analysis hsmapper = {} imapper = defaultdict(list) + # Look for parent Iterations of children HaloSpots for iters, halo_spots in MapNodes(Iteration, HaloSpot, 'groupby').visit(iet).items(): for hs in halo_spots: hsmapper[hs] = hs.halo_scheme @@ -111,6 +110,7 @@ def rule1(dep, candidates, loc_dims): if not any(r(dep, candidates, loc_dims) for r in rules): break else: + # Only adjoints enter here? hsmapper[hs] = hsmapper[hs].drop(f) imapper[i].append(hs.halo_scheme.project(f)) break @@ -118,6 +118,7 @@ def rule1(dep, candidates, loc_dims): # Post-process analysis mapper = {i: HaloSpot(i._rebuild(), HaloScheme.union(hss)) for i, hss in imapper.items()} + mapper.update({i: i.body if hs.is_void else i._rebuild(halo_scheme=hs) for i, hs in hsmapper.items()}) @@ -181,12 +182,13 @@ def rule2(dep, hs, loc_indices): iter_mapper = MapNodes(Iteration, HaloSpot, 'immediate').visit(iet) mapper = {} - for i, halo_spots in iter_mapper.items(): - if i is None or len(halo_spots) <= 1: + for iter, halo_spots in iter_mapper.items(): + if iter is None or len(halo_spots) <= 1: continue - scope = Scope([e.expr for e in FindNodes(Expression).visit(i)]) + scope = Scope([e.expr for e in FindNodes(Expression).visit(iter)]) + # TOFIX: Why only comparing against the first HaloSpot? hs0 = halo_spots[0] mapper[hs0] = hs0.halo_scheme @@ -228,7 +230,6 @@ def _merge_halospots_byfunc(iet): Merge HaloSpots on the same Iteration tree level where all data dependencies would be honored. """ - # Merge rules -- if the retval is True, then it means the input `dep` is not # a stopper to halo merging @@ -256,20 +257,17 @@ def rule2(dep, hs, loc_indices): not isinstance(i.condition, GuardFactorEq)} for hs, v in cond_mapper.items()} - iter_mapper = MapNodes(Iteration, HaloSpot, 'immediate').visit(iet) + # Precompute scopes to save time + scopes = {i: Scope([e.expr for e in v]) for i, v in MapNodes().visit(iet).items()} - mapper = {} - hoist_mapper = {} - raw_loc_indices = {} + candidates = [] - for iter, halo_spots in iter_mapper.items(): - if iter is None or len(halo_spots) <= 1: + # Loop over iterations and their halospots to detect candidates + for iters, halo_spots in MapNodes(Iteration, HaloSpot, 'groupby').visit(iet).items(): + if iters is None or len(halo_spots) <= 1: continue - scope = Scope([e.expr for e in FindNodes(Expression).visit(iter)]) - - candidates = [] - + # Try to detect pairs of halospots that refer to the same function for i, _ in enumerate(halo_spots): hs0 = halo_spots[i] @@ -280,84 +278,74 @@ def rule2(dep, hs, loc_indices): if cond_mapper.get(hs0) != cond_mapper.get(hs1): continue + # Ensure no common loc_indices are shared for the two candidates + if any(i in hs0.halo_scheme.loc_indices2 for i in hs1.halo_scheme.loc_indices2): # noqa + continue + for f, v in hs1.fmapper.items(): - for dep in scope.d_flow.project(f): - if not any(r(dep, hs1, v.loc_indices) for r in rules): - break - else: - try: - reps = (*hs0.functions, *hs1.functions).count(f) - candidates.append((hs0, hs1, reps)) - except ValueError: - # import pdb;pdb.set_trace() - pass + if not hs1.halo_scheme.fmapper[f].loc_indices: + continue - if candidates: - ordered_candidates = sorted([candidates[-1]], key=lambda x: x[-1]) + for iter in iters: + if iter not in scopes: + continue - for hs0, hs1, reps in ordered_candidates: - # We assume that the latter HaloSpot is lifted while the former - # stays in place. We only work with twins "{t0, t1}" and not identicals + for dep in scopes[iter].d_flow.project(f): + if not any(r(dep, hs1, v.loc_indices) for r in rules): + break + else: + # Count the number of repetitions of the function + reps = (*hs0.functions, *hs1.functions).count(f) + # We only care for functions appearing in both + if reps < 2: + continue + else: + candidates.append((hs0, hs1, f, reps, iter)) - # Ensure they are different - bool_locs = hs0.halo_scheme.loc_indices2 == hs1.halo_scheme.loc_indices2 - if bool_locs: - continue + # Mapper for the iterations (hoisting the halospots) + imapper = defaultdict(list) + # Mapper for the halospots (dropping the functions) + hs_mapper = {} + + if candidates: + # The latter HaloSpot (hs1) is lifted while the former (hs0) stays in place + for hs0, hs1, f, reps, iter in candidates: + + # Drop more functions if the Halospot already exists in mapper + if hs1 in hs_mapper: + hs_mapper[hs1] = hs_mapper[hs1].drop(f) + else: + hs_mapper[hs1] = hs1.halo_scheme.drop(f) + + # Reconstruct a HaloSchemeEntry for the hoisted HaloSpot + fm = hs1.halo_scheme.fmapper[f] + + raw_loc_indices = {} + # Since lifted, we need to update the loc_indices with known values + # TODO: Can I get this in a more elegant way? + for d in fm.loc_indices: + root_min = fm.loc_indices[d].symbolic_min + new_min = root_min.subs(fm.loc_indices[d].root, + fm.loc_indices[d].root.symbolic_min) + raw_loc_indices[d] = new_min + + # Should HaloSchemeEntry be turned into a class and not be a namedtuple + # anymore? Then here we can use a ._rebuild + hs1.halo_scheme.fmapper[f] = HaloSchemeEntry(frozendict(raw_loc_indices), + fm.loc_dirs, + fm.halos, + fm.dims) + + imapper[iter].append(hs1.halo_scheme.project(f)) - # We do not work with less than two occurences of the function - if reps < 2: - continue + # Post-process analysis + mapper = {i: HaloSpot(i._rebuild(), HaloScheme.union(hss)) + for i, hss in imapper.items()} - hs1_temp = hs1 - - # This drops the latter halospot - hs1_temp = hs1_temp._rebuild(halo_scheme=hs1_temp.halo_scheme.drop(f)) - mapper[hs1] = hs1_temp.halo_scheme - - # Now we need to reconstruct a HaloSchemeEntry for the hoisted HaloSpot - loc_indices = hs1.halo_scheme.fmapper[f].loc_indices - loc_dirs = hs1.halo_scheme.fmapper[f].loc_dirs - halos = hs1.halo_scheme.fmapper[f].halos - dims = hs1.halo_scheme.fmapper[f].dims - - # Since lifted, we need to update the loc_indices - # with known values - - for d in loc_indices: - root_min = loc_indices[d].symbolic_min - new_min = root_min.subs(loc_indices[d].root, - loc_indices[d].root.symbolic_min) - raw_loc_indices[d] = new_min - - raw_loc_indices = frozendict(raw_loc_indices) - - # Is that safe in terms of immutability ? - hs1.halo_scheme.fmapper[f] = HaloSchemeEntry(raw_loc_indices, - loc_dirs, - halos, - dims) - # import pdb;pdb.set_trace() - - hs1_new = hs1._rebuild(halo_scheme=hs1.halo_scheme, body=iter) - # mapper[hs0.body] = hs1_new - hoist_mapper[iter] = hs1_new - - # Post-process analysis - # This is also necessary to avoid the no-children error - for i, hs in mapper.items(): - try: - if hs.is_void: - mapper[i] = i.body - except AttributeError: - pass - - # Transform the IET merging/dropping HaloSpots as according to the analysis - iet = Transformer(hoist_mapper, nested=False).visit(iet) - iet = Transformer(mapper, nested=False).visit(iet) + mapper.update({i: i.body if hs.is_void else i._rebuild(halo_scheme=hs) + for i, hs in hs_mapper.items()}) - # Clean up: de-nest HaloSpots if necessary - # THis also merges HaloSpots that have different loc_indices - # iet = denest_halospots(iet) + iet = Transformer(mapper, nested=True).visit(iet) return iet diff --git a/tests/test_mpi.py b/tests/test_mpi.py index d374fff70a..41e0f28fa4 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -1186,7 +1186,6 @@ def test_hoist_haloupdate_from_innerloop(self, mode): calls = FindNodes(Call).visit(op) assert len(calls) == 1 - # Also make sure the Call is at the right place in the IET assert op.body.body[-1].body[1].body[0].body[0].body[0].body[0].is_Call assert op.body.body[-1].body[1].body[0].body[0].body[1].is_Iteration @@ -1406,7 +1405,7 @@ def test_merge_haloupdate_if_diff_locindices_v1(self, mode): calls = FindNodes(Call).visit(op) print(op.ccode) - assert len(calls) == 1 + assert len(calls) == 2 op.apply(time_M=1) glb_pos_map = f.grid.distributor.glb_pos_map From 5e53d357a5ed1bfd8bac5bde8bede07f305a8975 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Tue, 29 Oct 2024 17:32:53 +0200 Subject: [PATCH 3/4] compiler: Drop duplicated code --- devito/ir/iet/nodes.py | 10 +- devito/mpi/halo_scheme.py | 59 +++++-- devito/passes/iet/mpi.py | 323 ++++++++++++++++++-------------------- tests/test_mpi.py | 47 +++++- 4 files changed, 255 insertions(+), 184 deletions(-) diff --git a/devito/ir/iet/nodes.py b/devito/ir/iet/nodes.py index b49dace92d..fc85f3c545 100644 --- a/devito/ir/iet/nodes.py +++ b/devito/ir/iet/nodes.py @@ -1457,18 +1457,18 @@ def __init__(self, body, halo_scheme): self._halo_scheme = halo_scheme def __repr__(self): - function_strings = [] - for func in self.functions: - loc_indices = set().union(*[self.halo_scheme.fmapper[func].loc_indices.values()]) + fstrings = [] + for f in self.functions: + loc_indices = set().union(*[self.halo_scheme.fmapper[f].loc_indices.values()]) loc_indices = list(loc_indices) if loc_indices: loc_indices_str = str(loc_indices) else: loc_indices_str = "" - function_strings.append(f"{func.name}{loc_indices_str}") + fstrings.append(f"{f.name}{loc_indices_str}") - functions = ",".join(function_strings) + functions = ",".join(fstrings) return "<%s(%s)>" % (self.__class__.__name__, functions) diff --git a/devito/mpi/halo_scheme.py b/devito/mpi/halo_scheme.py index 5c24fb5369..e00ad204f0 100644 --- a/devito/mpi/halo_scheme.py +++ b/devito/mpi/halo_scheme.py @@ -28,7 +28,39 @@ class HaloLabel(Tag): STENCIL = HaloLabel('stencil') -HaloSchemeEntry = namedtuple('HaloSchemeEntry', 'loc_indices loc_dirs halos dims') +class HaloSchemeEntry: + + def __init__(self, loc_indices, loc_dirs, halos, dims): + self.loc_indices = loc_indices + self.loc_dirs = loc_dirs + self.halos = halos + self.dims = dims + + def __repr__(self): + return (f"HaloSchemeEntry(loc_indices={self.loc_indices}, " + f"loc_dirs={self.loc_dirs}, halos={self.halos}, dims={self.dims})") + + def __eq__(self, other): + if not isinstance(other, HaloSchemeEntry): + return False + return (self.loc_indices == other.loc_indices and + self.loc_dirs == other.loc_dirs and + self.halos == other.halos and + self.dims == other.dims) + + def __hash__(self): + return hash((frozenset(self.loc_indices.items()), + frozenset(self.loc_dirs.items()), + frozenset(self.halos), + frozenset(self.dims))) + + def rebuild(self, **kwargs): + loc_indices = kwargs.get('loc_indices', self.loc_indices) + loc_dirs = kwargs.get('loc_dirs', self.loc_dirs) + halos = kwargs.get('halos', self.halos) + dims = kwargs.get('dims', self.dims) + return HaloSchemeEntry(loc_indices, loc_dirs, halos, dims) + Halo = namedtuple('Halo', 'dim side') @@ -95,9 +127,20 @@ def __init__(self, exprs, ispace): self._honored = frozendict(self._honored) def __repr__(self): - fnames = ",".join(i.name for i in set(self._mapper)) - loc_indices = "[%s]" % ",".join(str(i) for i in self.loc_indices2) - return "HaloScheme<%s%s>" % (fnames, loc_indices) + fstrings = [] + for f in self.fmapper: + loc_indices = set().union(*[self._mapper[f].loc_indices.values()]) + loc_indices = list(loc_indices) + if loc_indices: + loc_indices_str = str(loc_indices) + else: + loc_indices_str = "" + + fstrings.append(f"{f.name}{loc_indices_str}") + + functions = ",".join(fstrings) + + return "%s<%s>" % (self.__class__.__name__, functions) def __eq__(self, other): return (isinstance(other, HaloScheme) and @@ -122,7 +165,6 @@ def union(self, halo_schemes): """ Create a new HaloScheme from the union of a set of HaloSchemes. """ - # import pdb; pdb.set_trace() halo_schemes = [hs for hs in halo_schemes if hs is not None] if not halo_schemes: return None @@ -369,7 +411,7 @@ def loc_indices(self): return set().union(*[i.loc_indices.keys() for i in self.fmapper.values()]) @cached_property - def loc_indices2(self): + def loc_values(self): return set().union(*[i.loc_indices.values() for i in self.fmapper.values()]) @cached_property @@ -641,9 +683,8 @@ def _uxreplace_dispatch_haloscheme(hs0, rule): # Nope, let's try with the next Indexed, if any continue - hse = HaloSchemeEntry(frozendict(loc_indices), - frozendict(loc_dirs), - hse0.halos, hse0.dims) + hse = hse0.rebuild(loc_indices=frozendict(loc_indices), + loc_dirs=frozendict(loc_dirs)) else: continue diff --git a/devito/passes/iet/mpi.py b/devito/passes/iet/mpi.py index 3aace13d69..913164a7aa 100644 --- a/devito/passes/iet/mpi.py +++ b/devito/passes/iet/mpi.py @@ -7,7 +7,7 @@ retrieve_iteration_tree) from devito.ir.support import PARALLEL, Scope from devito.ir.support.guards import GuardFactorEq -from devito.mpi.halo_scheme import HaloScheme, HaloSchemeEntry +from devito.mpi.halo_scheme import HaloScheme from devito.mpi.reduction_scheme import DistReduce from devito.mpi.routines import HaloExchangeBuilder, ReductionBuilder from devito.passes.iet.engine import iet_pass @@ -24,7 +24,7 @@ def optimize_halospots(iet, **kwargs): """ iet = _drop_halospots(iet) iet = _hoist_halospots(iet) - iet = _merge_halospots_byfunc(iet) + iet = _hoist_and_drop(iet) iet = _merge_halospots(iet) iet = _drop_if_unwritten(iet, **kwargs) iet = _mark_overlappable(iet) @@ -62,23 +62,6 @@ def _hoist_halospots(iet): would be honored. """ - # Hoisting rules -- if the retval is True, then it means the input `dep` is not - # a stopper to halo hoisting - - def rule0(dep, candidates, loc_dims): - # E.g., `dep=W -> R` and `candidates=({time}, {x})` => False - # E.g., `dep=W -> R`, `dep.cause={t,time}` and - # `candidates=({x},)` => True - return (all(i & set(dep.distance_mapper) for i in candidates) and - not any(i & dep.cause for i in candidates) and - not any(i & loc_dims for i in candidates)) - - def rule1(dep, candidates, loc_dims): - # A reduction isn't a stopper to hoisting - return dep.write is not None and dep.write.is_reduction - - rules = [rule0, rule1] - # Precompute scopes to save time scopes = {i: Scope([e.expr for e in v]) for i, v in MapNodes().visit(iet).items()} @@ -107,10 +90,9 @@ def rule1(dep, candidates, loc_dims): continue for dep in scopes[i].d_flow.project(f): - if not any(r(dep, candidates, loc_dims) for r in rules): + if not any(r(dep, candidates, loc_dims) for r in hoist_rules()): break else: - # Only adjoints enter here? hsmapper[hs] = hsmapper[hs].drop(f) imapper[i].append(hs.halo_scheme.project(f)) break @@ -131,138 +113,21 @@ def rule1(dep, candidates, loc_dims): return iet -def denest_halospots(iet): - """ - Denest nested HaloSpots. - # TOFIX: This also merges HaloSpots that have different loc_indices - """ - mapper = {} - for hs in FindNodes(HaloSpot).visit(iet): - if hs.body.is_HaloSpot: - halo_scheme = HaloScheme.union([hs.halo_scheme, hs.body.halo_scheme]) - mapper[hs] = hs._rebuild(halo_scheme=halo_scheme, body=hs.body.body) - iet = Transformer(mapper, nested=True).visit(iet) - - return iet - - -def _merge_halospots(iet): +def _hoist_and_drop(iet): """ - Merge HaloSpots on the same Iteration tree level where all data dependencies - would be honored. + Detect HaloSpots that refer to different time slices of the same TimeFunction and + could be hoisted to the outermost Iteration. This is a function that mixes principles + from both `_hoist_halospots` and `_merge_halospots`, aiming to catch a special case + where the HaloSpots refer to the same Function but with different loc_indices. + In some cases, it is better to hoist rather than merge and this is why this comes + before `_merge_halospots`. """ - - # Merge rules -- if the retval is True, then it means the input `dep` is not - # a stopper to halo merging - - def rule0(dep, hs, loc_indices): - # E.g., `dep=W -> R` => True - return not any(d in hs.dimensions or dep.distance_mapper[d] is S.Infinity - for d in dep.cause) - - def rule1(dep, hs, loc_indices): - # TODO This is apparently never hit, but feeling uncomfortable to remove it - return (dep.is_regular and - dep.read is not None and - all(not any(dep.read.touched_halo(d.root)) for d in dep.cause)) - - def rule2(dep, hs, loc_indices): - # E.g., `dep=W -> R` and `loc_indices={t: t0}` => True - return any(dep.distance_mapper[d] == 0 and dep.source[d] is not v - for d, v in loc_indices.items()) - - rules = [rule0, rule1, rule2] - # Analysis - cond_mapper = MapHaloSpots().visit(iet) - cond_mapper = {hs: {i for i in v if i.is_Conditional and - not isinstance(i.condition, GuardFactorEq)} - for hs, v in cond_mapper.items()} - - iter_mapper = MapNodes(Iteration, HaloSpot, 'immediate').visit(iet) - - mapper = {} - for iter, halo_spots in iter_mapper.items(): - if iter is None or len(halo_spots) <= 1: - continue - - scope = Scope([e.expr for e in FindNodes(Expression).visit(iter)]) - - # TOFIX: Why only comparing against the first HaloSpot? - hs0 = halo_spots[0] - mapper[hs0] = hs0.halo_scheme - - for hs1 in halo_spots[1:]: - mapper[hs1] = hs1.halo_scheme - - # If there are Conditionals involved, both `hs0` and `hs1` must be - # within the same Conditional, otherwise we would break the control - # flow semantics - if cond_mapper.get(hs0) != cond_mapper.get(hs1): - continue - - for f, v in hs1.fmapper.items(): - for dep in scope.d_flow.project(f): - if not any(r(dep, hs1, v.loc_indices) for r in rules): - break - else: - try: - hs = hs1.halo_scheme.project(f) - mapper[hs0] = HaloScheme.union([mapper[hs0], hs]) - mapper[hs1] = mapper[hs1].drop(f) - except ValueError: - # `hs1.loc_indices= -> R` => True - return not any(d in hs.dimensions or dep.distance_mapper[d] is S.Infinity - for d in dep.cause) - - def rule1(dep, hs, loc_indices): - # TODO This is apparently never hit, but feeling uncomfortable to remove it - return (dep.is_regular and - dep.read is not None and - all(not any(dep.read.touched_halo(d.root)) for d in dep.cause)) - - def rule2(dep, hs, loc_indices): - # E.g., `dep=W -> R` and `loc_indices={t: t0}` => True - return any(dep.distance_mapper[d] == 0 and dep.source[d] is not v - for d, v in loc_indices.items()) - - rules = [rule0, rule1, rule2] - - # Analysis - cond_mapper = MapHaloSpots().visit(iet) - cond_mapper = {hs: {i for i in v if i.is_Conditional and - not isinstance(i.condition, GuardFactorEq)} - for hs, v in cond_mapper.items()} - - # Precompute scopes to save time - scopes = {i: Scope([e.expr for e in v]) for i, v in MapNodes().visit(iet).items()} + cond_mapper = _create_cond_mapper(iet) candidates = [] - # Loop over iterations and their halospots to detect candidates + # Look for parent Iterations of children HaloSpots for iters, halo_spots in MapNodes(Iteration, HaloSpot, 'groupby').visit(iet).items(): if iters is None or len(halo_spots) <= 1: continue @@ -278,33 +143,34 @@ def rule2(dep, hs, loc_indices): if cond_mapper.get(hs0) != cond_mapper.get(hs1): continue - # Ensure no common loc_indices are shared for the two candidates - if any(i in hs0.halo_scheme.loc_indices2 for i in hs1.halo_scheme.loc_indices2): # noqa + # Ensure no common loc_indices are shared for the two candidates + if any(i in hs0.halo_scheme.loc_values + for i in hs1.halo_scheme.loc_values): continue for f, v in hs1.fmapper.items(): + # Skip functions that do not have time accesses if not hs1.halo_scheme.fmapper[f].loc_indices: continue for iter in iters: - if iter not in scopes: - continue - - for dep in scopes[iter].d_flow.project(f): - if not any(r(dep, hs1, v.loc_indices) for r in rules): + # Follow the rules of mrgeable HaloSpots + scope = Scope([e.expr for e in FindNodes(Expression).visit(iter)]) + for dep in scope.d_flow.project(f): + if not any(r(dep, hs1, v.loc_indices) for r in merge_rules()): break else: - # Count the number of repetitions of the function - reps = (*hs0.functions, *hs1.functions).count(f) - # We only care for functions appearing in both - if reps < 2: + # Count the number of repetitions of the function, stop + # if it appears only once + count = (*hs0.functions, *hs1.functions).count(f) + if count < 2: continue else: - candidates.append((hs0, hs1, f, reps, iter)) + candidates.append((hs0, hs1, f, count, iter)) - # Mapper for the iterations (hoisting the halospots) + # Mapper for the iterations (hoist the halospots) imapper = defaultdict(list) - # Mapper for the halospots (dropping the functions) + # Mapper for the halospots (drop functions from the halospots) hs_mapper = {} if candidates: @@ -329,12 +195,9 @@ def rule2(dep, hs, loc_indices): fm.loc_indices[d].root.symbolic_min) raw_loc_indices[d] = new_min - # Should HaloSchemeEntry be turned into a class and not be a namedtuple - # anymore? Then here we can use a ._rebuild - hs1.halo_scheme.fmapper[f] = HaloSchemeEntry(frozendict(raw_loc_indices), - fm.loc_dirs, - fm.halos, - fm.dims) + hs_entry = hs1.halo_scheme.fmapper[f] + hs_entry = hs_entry.rebuild(loc_indices=frozendict(raw_loc_indices)) + hs1.halo_scheme.fmapper[f] = hs_entry imapper[iter].append(hs1.halo_scheme.project(f)) @@ -350,6 +213,62 @@ def rule2(dep, hs, loc_indices): return iet +def _merge_halospots(iet): + """ + Merge HaloSpots on the same Iteration tree level where all data dependencies + would be honored. + """ + + # Analysis + cond_mapper = _create_cond_mapper(iet) + + mapper = {} + for iter, halo_spots in MapNodes(Iteration, HaloSpot, 'immediate').visit(iet).items(): + if iter is None or len(halo_spots) <= 1: + continue + + scope = Scope([e.expr for e in FindNodes(Expression).visit(iter)]) + + # TOFIX: Why only comparing against the first HaloSpot? + # We could similary to `_hoist_and_drop` compare all pairs of HaloSpots + # and merge them based upon some priority ranking on which pair we prefer to + # merge + hs0 = halo_spots[0] + mapper[hs0] = hs0.halo_scheme + + for hs1 in halo_spots[1:]: + mapper[hs1] = hs1.halo_scheme + + # If there are Conditionals involved, both `hs0` and `hs1` must be + # within the same Conditional, otherwise we would break the control + # flow semantics + if cond_mapper.get(hs0) != cond_mapper.get(hs1): + continue + + for f, v in hs1.fmapper.items(): + for dep in scope.d_flow.project(f): + if not any(r(dep, hs1, v.loc_indices) for r in merge_rules()): + break + else: + try: + hs = hs1.halo_scheme.project(f) + mapper[hs0] = HaloScheme.union([mapper[hs0], hs]) + mapper[hs1] = mapper[hs1].drop(f) + except ValueError: + # `hs1.loc_indices= -> R` => True + return not any(d in hs.dimensions or dep.distance_mapper[d] is S.Infinity + for d in dep.cause) + + def rule1(dep, hs, loc_indices): + # TODO This is apparently never hit, but feeling uncomfortable to remove it + return (dep.is_regular and + dep.read is not None and + all(not any(dep.read.touched_halo(d.root)) for d in dep.cause)) + + def rule2(dep, hs, loc_indices): + # E.g., `dep=W -> R` and `loc_indices={t: t0}` => True + return any(dep.distance_mapper[d] == 0 and dep.source[d] is not v + for d, v in loc_indices.items()) + + rules = [rule0, rule1, rule2] + + return rules + + +def hoist_rules(): + # Hoisting rules -- if the retval is True, then it means the input `dep` is not + # a stopper to halo hoisting + + def rule0(dep, candidates, loc_dims): + # E.g., `dep=W -> R` and `candidates=({time}, {x})` => False + # E.g., `dep=W -> R`, `dep.cause={t,time}` and + # `candidates=({x},)` => True + return (all(i & set(dep.distance_mapper) for i in candidates) and + not any(i & dep.cause for i in candidates) and + not any(i & loc_dims for i in candidates)) + + def rule1(dep, candidates, loc_dims): + # A reduction isn't a stopper to hoisting + return dep.write is not None and dep.write.is_reduction + + rules = [rule0, rule1] + + return rules diff --git a/tests/test_mpi.py b/tests/test_mpi.py index 41e0f28fa4..0ffd771747 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -1010,8 +1010,6 @@ def test_avoid_haloupdate_if_distr_but_sequential(self, mode): @pytest.mark.parallel(mode=1) def test_issue_2448(self, mode): - # TOFIX: Placeholder test for issue 2448 - # Space related extent = (1500., ) shape = (201, ) x = SpaceDimension(name='x', spacing=Constant(name='h_x', @@ -1053,6 +1051,50 @@ def test_issue_2448(self, mode): assert calls[1].arguments[0] is tau assert calls[2].arguments[0] is v + @pytest.mark.parallel(mode=1) + def test_issue_2448_v2(self, mode): + # Similar but with v.forward + extent = (1500., ) + shape = (201, ) + x = SpaceDimension(name='x', spacing=Constant(name='h_x', + value=extent[0]/(shape[0]-1))) + grid = Grid(extent=extent, shape=shape, dimensions=(x, )) + + # Time related + t0, tn = 0., 30. + dt = (10. / np.sqrt(2.)) / 6. + time_range = TimeAxis(start=t0, stop=tn, step=dt) + + # Velocity and pressure fields + so = 2 + to = 1 + v = TimeFunction(name='v', grid=grid, space_order=so, time_order=to) + tau = TimeFunction(name='tau', grid=grid, space_order=so, time_order=to) + + # The receiver + nrec = 1 + rec = Receiver(name="rec", grid=grid, npoint=nrec, time_range=time_range) + rec.coordinates.data[:, 0] = np.linspace(0., extent[0], num=nrec) + rec_term = rec.interpolate(expr=v.forward) + + # First order elastic-like dependencies equations + pde_v = v.dt - (tau.dx) + pde_tau = (tau.dt - ((v.forward).dx)) + u_v = Eq(v.forward, solve(pde_v, v.forward)) + + u_tau = Eq(tau.forward, solve(pde_tau, tau.forward)) + + op = Operator([u_v] + [u_tau] + rec_term) + + calls = [i for i in FindNodes(Call).visit(op) + if isinstance(i, HaloUpdateCall)] + + # The correct we want + assert len(calls) == 3 + assert calls[0].arguments[0] is tau + assert calls[1].arguments[0] is v + assert calls[2].arguments[0] is v + @pytest.mark.parallel(mode=1) def test_avoid_haloupdate_with_subdims(self, mode): grid = Grid(shape=(4,)) @@ -1404,7 +1446,6 @@ def test_merge_haloupdate_if_diff_locindices_v1(self, mode): op = Operator(eqns) calls = FindNodes(Call).visit(op) - print(op.ccode) assert len(calls) == 2 op.apply(time_M=1) From 96eaa172a8455dcbdbf3a1f50ef2b450810e3894 Mon Sep 17 00:00:00 2001 From: George Bisbas Date: Tue, 29 Oct 2024 17:34:03 +0200 Subject: [PATCH 4/4] tests: rm MFE file --- tests/mfe_1d.py | 42 ------------------------------------------ 1 file changed, 42 deletions(-) delete mode 100644 tests/mfe_1d.py diff --git a/tests/mfe_1d.py b/tests/mfe_1d.py deleted file mode 100644 index 4641f8ed17..0000000000 --- a/tests/mfe_1d.py +++ /dev/null @@ -1,42 +0,0 @@ -import numpy as np - -from devito import (SpaceDimension, Grid, TimeFunction, Eq, Operator, - solve, Constant, norm) -from examples.seismic.source import TimeAxis, Receiver - -# Space related -extent = (1500., ) -shape = (201, ) -x = SpaceDimension(name='x', spacing=Constant(name='h_x', value=extent[0]/(shape[0]-1))) -grid = Grid(extent=extent, shape=shape, dimensions=(x, )) - -# Time related -t0, tn = 0., 30. -dt = (10. / np.sqrt(2.)) / 6. -time_range = TimeAxis(start=t0, stop=tn, step=dt) - -# Velocity and pressure fields -so = 2 -to = 1 -v = TimeFunction(name='v', grid=grid, space_order=so, time_order=to) -tau = TimeFunction(name='tau', grid=grid, space_order=so, time_order=to) - -# The receiver -nrec = 1 -rec = Receiver(name="rec", grid=grid, npoint=nrec, time_range=time_range) -rec.coordinates.data[:, 0] = np.linspace(0., extent[0], num=nrec) -rec_term = rec.interpolate(expr=v) - -# First order elastic-like dependencies equations -pde_v = v.dt - (tau.dx) -pde_tau = (tau.dt - ((v.forward).dx)) - -u_v = Eq(v.forward, solve(pde_v, v.forward)) -u_tau = Eq(tau.forward, solve(pde_tau, tau.forward)) - -op = Operator([u_v] + [u_tau] + rec_term) -op.apply(dt=dt) - -print(norm(v)) -print(norm(tau)) -# print(op.ccode) \ No newline at end of file