From 1aa726bce23a849e66967c965b1b3e2744d2a5b7 Mon Sep 17 00:00:00 2001 From: Mathis Richter Date: Wed, 2 Mar 2022 15:33:39 +0100 Subject: [PATCH] Virtual ports between RefPorts and VarPorts (#195) * permute initial implementation Signed-off-by: bamsumit * Tests for permute ports * Process property of virtual ports no longer returns None Signed-off-by: Mathis Richter * Added initial run-unittest for flatten() from issue #163 Signed-off-by: Mathis Richter * User-level API for TransposePort with unit tests Signed-off-by: Mathis Richter * Fixed typo Signed-off-by: Mathis Richter * Unit tests for flatten() and concat_with() Signed-off-by: Mathis Richter * Unit tests for virtual ports in Processes that are executed (wip) Signed-off-by: Mathis Richter * Preliminary implementation of virtual ports between OutPort and InPort (wip) Signed-off-by: Mathis Richter * Fixing unit tests after merge Signed-off-by: Mathis Richter * Added support for virtual ports between an OutPort and InPort of two hierarchical Processes Signed-off-by: Mathis Richter * Clean up, exceptions, and generic unit tests for virtual port topologies Signed-off-by: Mathis Richter * Fixed linter issues Signed-off-by: Mathis Richter * Raising an exception when executing ConcatPort Signed-off-by: Mathis Richter * Unit tests for virtual ports between OutPorts and InPorts in hierarchical Processes. Signed-off-by: Mathis Richter * RefPort writing to an explicit VarPort via a virtual port. Signed-off-by: Mathis Richter * RefPort reading from an explicit VarPort via a virtual port. Signed-off-by: Mathis Richter * Fixed linter error Signed-off-by: Mathis Richter * Unit tests for virtual ports between RefPorts and VarPorts in hierarchical Processes. Signed-off-by: Mathis Richter * Fixed a docstring Signed-off-by: Mathis Richter * Added docstrings to methods get_transform_func_fwd/bwd. Signed-off-by: Mathis Richter Co-authored-by: bamsumit Co-authored-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> --- src/lava/magma/compiler/builders/builder.py | 6 +- src/lava/magma/compiler/compiler.py | 49 +- src/lava/magma/compiler/utils.py | 1 + src/lava/magma/core/model/py/model.py | 2 +- src/lava/magma/core/model/py/ports.py | 54 +- src/lava/magma/core/process/ports/ports.py | 131 +++- .../ports/test_virtual_ports_in_process.py | 727 ++++++++++++++++-- 7 files changed, 858 insertions(+), 112 deletions(-) diff --git a/src/lava/magma/compiler/builders/builder.py b/src/lava/magma/compiler/builders/builder.py index c9aa45fc7..17f15f21d 100644 --- a/src/lava/magma/compiler/builders/builder.py +++ b/src/lava/magma/compiler/builders/builder.py @@ -401,7 +401,8 @@ def build(self): csp_send = csp_ports[0] if isinstance( csp_ports[0], CspSendPort) else csp_ports[1] - port = port_cls(csp_send, csp_recv, pm, p.shape, lt.d_type) + port = port_cls(csp_send, csp_recv, pm, p.shape, lt.d_type, + p.transform_funcs) # Create dynamic RefPort attribute on ProcModel setattr(pm, name, port) @@ -422,7 +423,8 @@ def build(self): csp_send = csp_ports[0] if isinstance( csp_ports[0], CspSendPort) else csp_ports[1] port = port_cls( - p.var_name, csp_send, csp_recv, pm, p.shape, p.d_type) + p.var_name, csp_send, csp_recv, pm, p.shape, p.d_type, + p.transform_funcs) # Create dynamic VarPort attribute on ProcModel setattr(pm, name, port) diff --git a/src/lava/magma/compiler/compiler.py b/src/lava/magma/compiler/compiler.py index 14a513e8e..7e3ac47ef 100644 --- a/src/lava/magma/compiler/compiler.py +++ b/src/lava/magma/compiler/compiler.py @@ -352,13 +352,14 @@ def _compile_proc_models( for pt in (list(p.in_ports) + list(p.out_ports)): # For all InPorts that receive input from # virtual ports... - transform_funcs = None + transform_funcs = [] if isinstance(pt, InPort): # ... extract a function pointer to the # transformation function of each virtual port. transform_funcs = \ - [vp.get_transform_func() + [vp.get_transform_func_fwd() for vp in pt.get_incoming_virtual_ports()] + pi = PortInitializer(pt.name, pt.shape, self._get_port_dtype(pt, pm), @@ -366,26 +367,38 @@ def _compile_proc_models( pp_ch_size, transform_funcs) ports.append(pi) + # Create RefPort (also use PortInitializers) - ref_ports = list(p.ref_ports) - ref_ports = [ - PortInitializer(pt.name, - pt.shape, - self._get_port_dtype(pt, pm), - pt.__class__.__name__, - pp_ch_size) for pt in ref_ports] + ref_ports = [] + for pt in list(p.ref_ports): + transform_funcs = \ + [vp.get_transform_func_bwd() + for vp in pt.get_outgoing_virtual_ports()] + + pi = PortInitializer(pt.name, + pt.shape, + self._get_port_dtype(pt, pm), + pt.__class__.__name__, + pp_ch_size, + transform_funcs) + ref_ports.append(pi) + # Create VarPortInitializers (contain also the Var name) var_ports = [] for pt in list(p.var_ports): - var_ports.append( - VarPortInitializer( - pt.name, - pt.shape, - pt.var.name, - self._get_port_dtype(pt, pm), - pt.__class__.__name__, - pp_ch_size, - self._map_var_port_class(pt, proc_groups))) + transform_funcs = \ + [vp.get_transform_func_fwd() + for vp in pt.get_incoming_virtual_ports()] + pi = VarPortInitializer( + pt.name, + pt.shape, + pt.var.name, + self._get_port_dtype(pt, pm), + pt.__class__.__name__, + pp_ch_size, + self._map_var_port_class(pt, proc_groups), + transform_funcs) + var_ports.append(pi) # Set implicit VarPorts (created by connecting a RefPort # directly to a Var) as attribute to ProcessModel diff --git a/src/lava/magma/compiler/utils.py b/src/lava/magma/compiler/utils.py index 27f4775fb..3787b274e 100644 --- a/src/lava/magma/compiler/utils.py +++ b/src/lava/magma/compiler/utils.py @@ -31,3 +31,4 @@ class VarPortInitializer: port_type: str size: int port_cls: type + transform_funcs: ty.List[ft.partial] = None diff --git a/src/lava/magma/core/model/py/model.py b/src/lava/magma/core/model/py/model.py index a9bc34560..c93e91c7b 100644 --- a/src/lava/magma/core/model/py/model.py +++ b/src/lava/magma/core/model/py/model.py @@ -114,7 +114,7 @@ def _get_var(self): data_port.send(enum_to_np(var)) elif isinstance(var, np.ndarray): # FIXME: send a whole vector (also runtime_service.py) - var_iter = np.nditer(var) + var_iter = np.nditer(var, order='C') num_items: np.integer = np.prod(var.shape) data_port.send(enum_to_np(num_items)) for value in var_iter: diff --git a/src/lava/magma/core/model/py/ports.py b/src/lava/magma/core/model/py/ports.py index 6a3c6ffa2..b96bc77a7 100644 --- a/src/lava/magma/core/model/py/ports.py +++ b/src/lava/magma/core/model/py/ports.py @@ -460,7 +460,10 @@ def __init__(self, csp_recv_port: ty.Optional[CspRecvPort], process_model: AbstractProcessModel, shape: ty.Tuple[int, ...] = tuple(), - d_type: type = int): + d_type: type = int, + transform_funcs: ty.Optional[ty.List[ft.partial]] = None): + + self._transform_funcs = transform_funcs self._csp_recv_port = csp_recv_port self._csp_send_port = csp_send_port super().__init__(process_model, shape, d_type) @@ -513,6 +516,25 @@ def write( """ pass + def _transform(self, recv_data: np.array) -> np.array: + """Applies all transformation function pointers to the input data. + + Parameters + ---------- + recv_data : numpy.ndarray + data received on the port that shall be transformed + + Returns + ------- + recv_data : numpy.ndarray + received data, transformed by the incoming virtual ports + """ + if self._transform_funcs: + # apply all transformation functions to the received data + for f in reversed(self._transform_funcs): + recv_data = f(recv_data) + return recv_data + class PyRefPortVectorDense(PyRefPort): """Python implementation of RefPort for dense vector data.""" @@ -529,8 +551,10 @@ def read(self) -> np.ndarray: header = np.ones(self._csp_send_port.shape) * VarPortCmd.GET self._csp_send_port.send(header) - return self._csp_recv_port.recv() + return self._transform(self._csp_recv_port.recv()) + # TODO (MR): self._shape must be set to the correct shape when + # instantiating the Port return np.zeros(self._shape, self._d_type) def write(self, data: np.ndarray): @@ -660,7 +684,10 @@ def __init__(self, csp_recv_port: ty.Optional[CspRecvPort], process_model: AbstractProcessModel, shape: ty.Tuple[int, ...] = tuple(), - d_type: type = int): + d_type: type = int, + transform_funcs: ty.Optional[ty.List[ft.partial]] = None): + + self._transform_funcs = transform_funcs self._csp_recv_port = csp_recv_port self._csp_send_port = csp_send_port self.var_name = var_name @@ -692,6 +719,25 @@ def service(self): """ pass + def _transform(self, recv_data: np.array) -> np.array: + """Applies all transformation function pointers to the input data. + + Parameters + ---------- + recv_data : numpy.ndarray + data received on the port that shall be transformed + + Returns + ------- + recv_data : numpy.ndarray + received data, transformed by the incoming virtual ports + """ + if self._transform_funcs: + # apply all transformation functions to the received data + for f in self._transform_funcs: + recv_data = f(recv_data) + return recv_data + class PyVarPortVectorDense(PyVarPort): """Python implementation of VarPort for dense vector data.""" @@ -712,7 +758,7 @@ def service(self): # Set the value of the Var with the given data if enum_equal(cmd, VarPortCmd.SET): - data = self._csp_recv_port.recv() + data = self._transform(self._csp_recv_port.recv()) setattr(self._process_model, self.var_name, data) elif enum_equal(cmd, VarPortCmd.GET): data = getattr(self._process_model, self.var_name) diff --git a/src/lava/magma/core/process/ports/ports.py b/src/lava/magma/core/process/ports/ports.py index dfa82c301..a3e0c6792 100644 --- a/src/lava/magma/core/process/ports/ports.py +++ b/src/lava/magma/core/process/ports/ports.py @@ -178,6 +178,39 @@ def get_incoming_virtual_ports(self) -> ty.List["AbstractVirtualPort"]: return virtual_ports + def get_outgoing_virtual_ports(self) -> ty.List["AbstractVirtualPort"]: + """Returns the list of all outgoing virtual ports in order from + the current port to the destination port. + + Returns + ------- + virtual_ports : list(AbstractVirtualPorts) + the list of all outgoing virtual ports, sorted from source to + destination port + """ + if len(self.out_connections) == 0: + return [] + else: + virtual_ports = [] + num_virtual_ports = 0 + for p in self.out_connections: + virtual_ports += p.get_outgoing_virtual_ports() + if isinstance(p, AbstractVirtualPort): + # TODO (MR): ConcatPorts are not yet supported by the + # compiler - until then, an exception is raised. + if isinstance(p, ConcatPort): + raise NotImplementedError("ConcatPorts are not yet " + "supported.") + + virtual_ports.append(p) + num_virtual_ports += 1 + + if num_virtual_ports > 1: + raise NotImplementedError("Forking a virtual port is " + "not yet supported.") + + return virtual_ports + def get_dst_ports(self, _include_self=False) -> ty.List["AbstractPort"]: """Returns the list of all destination ports that this port connects to either directly or indirectly (through other ports).""" @@ -192,7 +225,7 @@ def get_dst_ports(self, _include_self=False) -> ty.List["AbstractPort"]: ports += p.get_dst_ports(True) return ports - def reshape(self, new_shape: ty.Tuple) -> "ReshapePort": + def reshape(self, new_shape: ty.Tuple[int, ...]) -> "ReshapePort": """Reshapes this port by deriving and returning a new virtual ReshapePort with the new shape. This implies that the resulting ReshapePort can only be forward connected to another port. @@ -202,16 +235,10 @@ def reshape(self, new_shape: ty.Tuple) -> "ReshapePort": :param new_shape: New shape of port. Number of total elements must not change. """ - # TODO (MR): Implement for other types of Ports - if not (isinstance(self, OutPort) - or isinstance(self, AbstractVirtualPort)): - raise NotImplementedError("reshape/flatten are only implemented " - "for OutPorts") - if self.size != math.prod(new_shape): raise pe.ReshapeError(self.shape, new_shape) - reshape_port = ReshapePort(new_shape) + reshape_port = ReshapePort(new_shape, old_shape=self.shape) self._connect_forward( [reshape_port], AbstractPort, assert_same_shape=False ) @@ -249,7 +276,8 @@ def concat_with( def transpose( self, - axes: ty.Optional[ty.Union[ty.Tuple, ty.List]] = None + axes: ty.Optional[ty.Union[ty.Tuple[int, ...], + ty.List]] = None ) -> "TransposePort": """Permutes the tensor dimension of this port by deriving and returning a new virtual TransposePort the new permuted dimension. This implies @@ -261,12 +289,6 @@ def transpose( :param axes: Order of permutation. Number of total elements and number of dimensions must not change. """ - # TODO (MR): Implement for other types of Ports - if not (isinstance(self, OutPort) - or isinstance(self, AbstractVirtualPort)): - raise NotImplementedError("transpose is only implemented for " - "OutPorts") - if axes is None: axes = tuple(reversed(range(len(self.shape)))) else: @@ -693,7 +715,25 @@ def connect(self, ports: ty.Union["AbstractPort", ty.List["AbstractPort"]]): self._connect_forward(to_list(ports), port_type) @abstractmethod - def get_transform_func(self) -> ft.partial: + def get_transform_func_fwd(self) -> ft.partial: + """Returns a function pointer that implements the forward (fwd) + transformation of the virtual port. + + Returns + ------- + function_pointer : functools.partial + a function pointer that can be applied to incoming data""" + pass + + @abstractmethod + def get_transform_func_bwd(self) -> ft.partial: + """Returns a function pointer that implements the backward (bwd) + transformation of the virtual port. + + Returns + ------- + function_pointer : functools.partial + a function pointer that can be applied to incoming data""" pass @@ -703,12 +743,34 @@ class ReshapePort(AbstractVirtualPort): It is used by the compiler to map the indices of the underlying tensor-valued data array from the derived to the new shape.""" - def __init__(self, new_shape: ty.Tuple): + def __init__(self, + new_shape: ty.Tuple[int, ...], + old_shape: ty.Tuple[int, ...]): AbstractPort.__init__(self, new_shape) + self.old_shape = old_shape - def get_transform_func(self) -> ft.partial: + def get_transform_func_fwd(self) -> ft.partial: + """Returns a function pointer that implements the forward (fwd) + transformation of the ReshapePort, which reshapes incoming data to + a new shape (the shape of the destination Process). + + Returns + ------- + function_pointer : functools.partial + a function pointer that can be applied to incoming data""" return ft.partial(np.reshape, newshape=self.shape) + def get_transform_func_bwd(self) -> ft.partial: + """Returns a function pointer that implements the backward (bwd) + transformation of the ReshapePort, which reshapes incoming data to + a new shape (the shape of the source Process). + + Returns + ------- + function_pointer : functools.partial + a function pointer that can be applied to incoming data""" + return ft.partial(np.reshape, newshape=self.old_shape) + class ConcatPort(AbstractVirtualPort): """A ConcatPort is a virtual port that allows to concatenate multiple @@ -751,7 +813,11 @@ def _get_new_shape(ports: ty.List[AbstractPort], axis): new_shape = shapes_ex_axis[0] return new_shape[:axis] + (total_size,) + new_shape[axis:] - def get_transform_func(self) -> ft.partial: + def get_transform_func_fwd(self) -> ft.partial: + # TODO (MR): not yet implemented + raise NotImplementedError() + + def get_transform_func_bwd(self) -> ft.partial: # TODO (MR): not yet implemented raise NotImplementedError() @@ -771,11 +837,32 @@ class TransposePort(AbstractVirtualPort): def __init__(self, new_shape: ty.Tuple[int, ...], axes: ty.Tuple[int, ...]): - self._axes = axes + self.axes = axes AbstractPort.__init__(self, new_shape) - def get_transform_func(self) -> ft.partial: - return ft.partial(np.transpose, axes=self._axes) + def get_transform_func_fwd(self) -> ft.partial: + """Returns a function pointer that implements the forward (fwd) + transformation of the TransposePort, which transposes (permutes) + incoming data according to a specific order of axes (to match the + destination Process). + + Returns + ------- + function_pointer : functools.partial + a function pointer that can be applied to incoming data""" + return ft.partial(np.transpose, axes=self.axes) + + def get_transform_func_bwd(self) -> ft.partial: + """Returns a function pointer that implements the backward (bwd) + transformation of the TransposePort, which transposes (permutes) + incoming data according to a specific order of axes (to match the + source Process). + + Returns + ------- + function_pointer : functools.partial + a function pointer that can be applied to incoming data""" + return ft.partial(np.transpose, axes=np.argsort(self.axes)) # ToDo: TBD... diff --git a/tests/lava/magma/core/process/ports/test_virtual_ports_in_process.py b/tests/lava/magma/core/process/ports/test_virtual_ports_in_process.py index d415e8e23..deb9cef91 100644 --- a/tests/lava/magma/core/process/ports/test_virtual_ports_in_process.py +++ b/tests/lava/magma/core/process/ports/test_virtual_ports_in_process.py @@ -19,13 +19,17 @@ from lava.magma.core.run_configs import Loihi1SimCfg from lava.magma.core.model.py.ports import ( PyInPort, - PyOutPort + PyOutPort, + PyRefPort, + PyVarPort ) from lava.magma.core.process.ports.ports import ( AbstractPort, AbstractVirtualPort, InPort, - OutPort + OutPort, + RefPort, + VarPort ) @@ -33,12 +37,18 @@ class MockVirtualPort(AbstractVirtualPort, AbstractPort): - """A mock-up of a virtual port that reshapes the input.""" - def __init__(self, new_shape: ty.Tuple): + """A mock-up of a virtual port that permutes the axes of the input.""" + def __init__(self, + new_shape: ty.Tuple[int, ...], + axes: ty.Tuple[int, ...]): AbstractPort.__init__(self, new_shape) + self.axes = axes - def get_transform_func(self) -> ft.partial: - return ft.partial(np.reshape, newshape=self.shape) + def get_transform_func_fwd(self) -> ft.partial: + return ft.partial(np.transpose, axes=self.axes) + + def get_transform_func_bwd(self) -> ft.partial: + return ft.partial(np.transpose, axes=np.argsort(self.axes)) class TestVirtualPortNetworkTopologies(unittest.TestCase): @@ -48,29 +58,238 @@ class TestVirtualPortNetworkTopologies(unittest.TestCase): def setUp(self) -> None: self.num_steps = 1 self.shape = (4, 3, 2) - self.new_shape = (12, 2) + self.new_shape = (2, 4, 3) + self.axes = (2, 0, 1) self.input_data = np.random.randint(256, size=self.shape) - def test_virtual_ports_between_hierarchical_processes(self) -> None: + def test_outport_to_inport_in_hierarchical_processes(self) -> None: """Tests a virtual port between an OutPort of a hierarchical Process and an InPort of another hierarchical Process.""" source = HOutPortProcess(data=self.input_data) sink = HInPortProcess(shape=self.new_shape) - virtual_port = MockVirtualPort(new_shape=self.new_shape) + virtual_port = MockVirtualPort(new_shape=self.new_shape, + axes=self.axes) source.out_port._connect_forward( [virtual_port], AbstractPort, assert_same_shape=False ) virtual_port.connect(sink.in_port) - sink.run(condition=RunSteps(num_steps=self.num_steps), - run_cfg=Loihi1SimCfg(select_tag='floating_pt')) - output = sink.data.get() - sink.stop() + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = sink.data.get() + finally: + sink.stop() + + expected = self.input_data.transpose(self.axes) + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + def test_inport_to_inport_in_a_hierarchical_process(self) -> None: + """Tests a virtual port between an InPort of a hierarchical Process + and an InPort in a nested Process. + + The data comes from an OutPortProcess and enters the hierarchical + Process (HVPInPortProcess) via its InPort. That InPort is connected + via a virtual port to the InPort of the nested Process. The data is + from there written into a Var 'data' of the nested Process, for which + the Var 's_data' of the hierarchical Process is an alias. + """ + + out_port_process = OutPortProcess(data=self.input_data) + h_proc = HVPInPortProcess(h_shape=self.shape, + s_shape=self.new_shape, + axes=self.axes) + + out_port_process.out_port.connect(h_proc.in_port) + + try: + h_proc.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = h_proc.s_data.get() + finally: + h_proc.stop() + + expected = self.input_data.transpose(self.axes) + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + def test_outport_to_outport_in_a_hierarchical_process(self) -> None: + """Tests a virtual port between an OutPort of a child Process and an + OutPort of the corresponding hierarchical parent Process. + + The data comes from the child Process, is passed from its OutPort + through a virtual port (where it is reshaped) to the OutPort of the + hierarchical Process (HVPOutPortProcess). From there it is passed to + the InPort of the InPortProcess and written into a Var 'data'. + """ + + h_proc = HVPOutPortProcess(h_shape=self.new_shape, + data=self.input_data, + axes=self.axes) + in_port_process = InPortProcess(shape=self.new_shape) + + h_proc.out_port.connect(in_port_process.in_port) + + try: + h_proc.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = in_port_process.data.get() + finally: + h_proc.stop() + + expected = self.input_data.transpose(self.axes) + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + def test_refport_to_refport_write_in_a_hierarchical_process(self) -> None: + """Tests a virtual port between a RefPort of a child Process + and a RefPort of the corresponding hierarchical parent Process. + For this test, the nested RefPortWriteProcess writes data into the + VarPort. + + The data comes from the child Process, is passed from its RefPort + through a virtual port (where it is reshaped) to the RefPort of the + hierarchical Process (HVPRefPortWriteProcess). From there it is + passed to the VarPort of the VarPortProcess and written into a + Var 'data'. + """ + + h_proc = HVPRefPortWriteProcess(h_shape=self.new_shape, + data=self.input_data, + axes=self.axes) + var_port_process = VarPortProcess(data=np.zeros(self.new_shape)) + + h_proc.ref_port.connect(var_port_process.var_port) + + try: + h_proc.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = var_port_process.data.get() + finally: + h_proc.stop() + + expected = self.input_data.transpose(self.axes) + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + def test_refport_to_refport_read_in_a_hierarchical_process(self) -> None: + """Tests a virtual port between a RefPort of a child Process + and a RefPort of the corresponding hierarchical parent Process. + For this test, the nested RefPortReadProcess reads data from the + VarPort. + + The data comes from a Var 'data' in the VarPortProcess. From there it + passes through the RefPort of the hierarchical Process + (HVPRefPortReadProcess), then through a virtual port (where it is + reshaped) to the RefPort of the child Process (RefPortReadProcess), + where it is written into a Var. That Var is again an alias for the + Var of the parent Process. + """ + + h_proc = HVPRefPortReadProcess(h_shape=self.new_shape, + s_shape=self.shape, + axes=self.axes) + var_port_process = \ + VarPortProcess(data=self.input_data.transpose(self.axes)) + + h_proc.ref_port.connect(var_port_process.var_port) + + try: + h_proc.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = h_proc.s_data.get() + finally: + h_proc.stop() + + expected = self.input_data + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + def test_varport_to_varport_write_in_a_hierarchical_process(self) -> None: + """Tests a virtual port between a VarPort of a child Process + and a VarPort of the corresponding hierarchical parent Process. + For this test, data is written into the hierarchical Process. + + The data comes from a RefPortWriteProcess, is passed + to the VarPort of a hierarchical Process (HVPVarPortProcess), + from there through a virtual port (where it is reshaped) to the + VarPort of the child Process (VarPortProcess). From there it is + written into a Var 'data', which is an alias for a Var 's_data' in + the parent Process. + """ + + ref_proc = RefPortWriteProcess(data=self.input_data) + h_proc = HVPVarPortProcess(h_shape=self.shape, + s_data=np.zeros(self.new_shape), + axes=self.axes) + + ref_proc.ref_port.connect(h_proc.var_port) + + try: + h_proc.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = h_proc.s_data.get() + finally: + h_proc.stop() + + expected = self.input_data.transpose(self.axes) + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + def test_varport_to_varport_read_in_a_hierarchical_process(self) -> None: + """Tests a virtual port between a VarPort of a child Process + and a VarPort of the corresponding hierarchical parent Process. + For this test, data is read from the hierarchical Process. + + The data comes from the child VarPortProcess, from there it passes + through its VarPort, through a virtual port (where it is reshaped), + to the VarPort of the parent Process (HVPVarPortProcess), + to a RefPortReadProcess, and from there written into a Var. + """ + + ref_proc = RefPortReadProcess(data=np.zeros(self.shape)) + h_proc = HVPVarPortProcess(h_shape=self.shape, + s_data=self.input_data.transpose(self.axes), + axes=self.axes) + + ref_proc.ref_port.connect(h_proc.var_port) + + try: + h_proc.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = ref_proc.data.get() + finally: + h_proc.stop() - expected = self.input_data.reshape(self.new_shape) + expected = self.input_data self.assertTrue( np.all(output == expected), f'Input and output do not match.\n' @@ -83,11 +302,12 @@ def test_chaining_multiple_virtual_ports(self) -> None: flatten() method.""" source = OutPortProcess(data=self.input_data) - shape_final = (int(np.prod(self.shape)),) - sink = InPortProcess(shape=shape_final) + sink = InPortProcess(shape=self.shape) - virtual_port1 = MockVirtualPort(new_shape=self.new_shape) - virtual_port2 = MockVirtualPort(new_shape=shape_final) + virtual_port1 = MockVirtualPort(new_shape=self.new_shape, + axes=self.axes) + virtual_port2 = MockVirtualPort(new_shape=self.shape, + axes=tuple(np.argsort(self.axes))) source.out_port._connect_forward( [virtual_port1], AbstractPort, assert_same_shape=False @@ -97,12 +317,14 @@ def test_chaining_multiple_virtual_ports(self) -> None: ) virtual_port2.connect(sink.in_port) - sink.run(condition=RunSteps(num_steps=self.num_steps), - run_cfg=Loihi1SimCfg(select_tag='floating_pt')) - output = sink.data.get() - sink.stop() + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = sink.data.get() + finally: + sink.stop() - expected = self.input_data.ravel() + expected = self.input_data self.assertTrue( np.all(output == expected), f'Input and output do not match.\n' @@ -117,8 +339,10 @@ def test_joining_virtual_ports_throws_exception(self) -> None: source2 = OutPortProcess(data=self.input_data) sink = InPortProcess(shape=self.new_shape) - virtual_port1 = MockVirtualPort(new_shape=self.new_shape) - virtual_port2 = MockVirtualPort(new_shape=self.new_shape) + virtual_port1 = MockVirtualPort(new_shape=self.new_shape, + axes=self.axes) + virtual_port2 = MockVirtualPort(new_shape=self.new_shape, + axes=self.axes) source1.out_port._connect_forward( [virtual_port1], AbstractPort, assert_same_shape=False @@ -141,10 +365,7 @@ class TestTransposePort(unittest.TestCase): def setUp(self) -> None: self.num_steps = 1 self.axes = (2, 0, 1) - self.axes_reverse = list(self.axes) - for idx, ax in enumerate(self.axes): - self.axes_reverse[ax] = idx - self.axes_reverse = tuple(self.axes_reverse) + self.axes_reverse = np.argsort(self.axes) self.shape = (4, 3, 2) self.shape_transposed = tuple(self.shape[i] for i in self.axes) self.shape_transposed_reverse = \ @@ -159,10 +380,36 @@ def test_transpose_outport_to_inport(self) -> None: source.out_port.transpose(axes=self.axes).connect(sink.in_port) - sink.run(condition=RunSteps(num_steps=self.num_steps), - run_cfg=Loihi1SimCfg(select_tag='floating_pt')) - output = sink.data.get() - sink.stop() + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = sink.data.get() + finally: + sink.stop() + + expected = self.input_data.transpose(self.axes) + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + def test_transpose_refport_write_to_varport(self) -> None: + """Tests a virtual TransposePort between a RefPort and a VarPort, + where the RefPort writes to the VarPort.""" + + source = RefPortWriteProcess(data=self.input_data) + sink = VarPortProcess(data=np.zeros(self.shape_transposed)) + + source.ref_port.transpose(axes=self.axes).connect(sink.var_port) + + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = sink.data.get() + finally: + sink.stop() expected = self.input_data.transpose(self.axes) self.assertTrue( @@ -172,6 +419,30 @@ def test_transpose_outport_to_inport(self) -> None: f'{expected[output!=expected] =}\n' ) + def test_transpose_refport_read_from_varport(self) -> None: + """Tests a virtual TransposePort between a RefPort and a VarPort, + where the RefPort reads from the VarPort.""" + + source = RefPortReadProcess(data=np.zeros(self.shape)) + sink = VarPortProcess(data=self.input_data.transpose(self.axes)) + + source.ref_port.transpose(self.axes).connect(sink.var_port) + + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = source.data.get() + finally: + sink.stop() + + expected = self.input_data + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + class TestReshapePort(unittest.TestCase): """Tests virtual ReshapePorts on Processes that are executed.""" @@ -191,10 +462,12 @@ def test_reshape_outport_to_inport(self) -> None: source.out_port.reshape(new_shape=self.shape_reshaped).connect( sink.in_port) - sink.run(condition=RunSteps(num_steps=self.num_steps), - run_cfg=Loihi1SimCfg(select_tag='floating_pt')) - output = sink.data.get() - sink.stop() + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = sink.data.get() + finally: + sink.stop() expected = self.input_data.reshape(self.shape_reshaped) self.assertTrue( @@ -204,6 +477,54 @@ def test_reshape_outport_to_inport(self) -> None: f'{expected[output!=expected] =}\n' ) + def test_reshape_refport_write_to_varport(self) -> None: + """Tests a virtual ReshapePort between a RefPort and a VarPort, + where the RefPort writes to the VarPort.""" + + source = RefPortWriteProcess(data=self.input_data) + sink = VarPortProcess(data=np.zeros(self.shape_reshaped)) + + source.ref_port.reshape(self.shape_reshaped).connect(sink.var_port) + + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = sink.data.get() + finally: + sink.stop() + + expected = self.input_data.reshape(self.shape_reshaped) + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + def test_reshape_refport_read_from_varport(self) -> None: + """Tests a virtual ReshapePort between a RefPort and a VarPort, + where the RefPort reads from the VarPort.""" + + source = RefPortReadProcess(data=np.zeros(self.shape)) + sink = VarPortProcess(data=self.input_data.reshape(self.shape_reshaped)) + + source.ref_port.reshape(self.shape_reshaped).connect(sink.var_port) + + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = source.data.get() + finally: + sink.stop() + + expected = self.input_data + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + class TestFlattenPort(unittest.TestCase): """Tests virtual ReshapePorts, created by the flatten() method, @@ -224,10 +545,12 @@ def test_flatten_outport_to_inport(self) -> None: source.out_port.flatten().connect(sink.in_port) - sink.run(condition=RunSteps(num_steps=self.num_steps), - run_cfg=Loihi1SimCfg(select_tag='floating_pt')) - output = sink.data.get() - sink.stop() + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = sink.data.get() + finally: + sink.stop() expected = self.input_data.ravel() self.assertTrue( @@ -237,39 +560,61 @@ def test_flatten_outport_to_inport(self) -> None: f'{expected[output!=expected] =}\n' ) + def test_flatten_refport_write_to_varport(self) -> None: + """Tests a virtual ReshapePort with flatten() between a RefPort and a + VarPort, where the RefPort writes to the VarPort.""" -# A minimal Process with an OutPort -class OutPortProcess(AbstractProcess): - def __init__(self, data: np.ndarray) -> None: - super().__init__(data=data) - self.data = Var(shape=data.shape, init=data) - self.out_port = OutPort(shape=data.shape) + source = RefPortWriteProcess(data=self.input_data) + sink = VarPortProcess(data=np.zeros(self.shape_reshaped)) + source.ref_port.flatten().connect(sink.var_port) -# A minimal Process with an InPort -class InPortProcess(AbstractProcess): - def __init__(self, shape: ty.Tuple[int, ...]) -> None: - super().__init__(shape=shape) - self.data = Var(shape=shape, init=np.zeros(shape)) - self.in_port = InPort(shape=shape) + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = sink.data.get() + finally: + sink.stop() + + expected = self.input_data.ravel() + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + def test_flatten_refport_read_from_varport(self) -> None: + """Tests a virtual ReshapePort between a RefPort and a VarPort, + where the RefPort reads from the VarPort.""" -# A minimal hierarchical Process with an OutPort -class HOutPortProcess(AbstractProcess): + source = RefPortReadProcess(data=np.zeros(self.shape)) + sink = VarPortProcess(data=self.input_data.reshape(self.shape_reshaped)) + + source.ref_port.flatten().connect(sink.var_port) + + try: + sink.run(condition=RunSteps(num_steps=self.num_steps), + run_cfg=Loihi1SimCfg(select_tag='floating_pt')) + output = source.data.get() + finally: + sink.stop() + + expected = self.input_data + self.assertTrue( + np.all(output == expected), + f'Input and output do not match.\n' + f'{output[output!=expected]=}\n' + f'{expected[output!=expected] =}\n' + ) + + +# A minimal Process with an OutPort +class OutPortProcess(AbstractProcess): def __init__(self, data: np.ndarray) -> None: super().__init__(data=data) self.data = Var(shape=data.shape, init=data) self.out_port = OutPort(shape=data.shape) - self.proc_params['data'] = data - - -# A minimal hierarchical Process with an InPort and a Var -class HInPortProcess(AbstractProcess): - def __init__(self, shape: ty.Tuple[int, ...]) -> None: - super().__init__(shape=shape) - self.data = Var(shape=shape, init=np.zeros(shape)) - self.in_port = InPort(shape=shape) - self.proc_params['shape'] = shape # A minimal PyProcModel implementing OutPortProcess @@ -285,6 +630,14 @@ def run_spk(self): self.log.info("Sent output data of OutPortProcess: ", str(self.data)) +# A minimal Process with an InPort +class InPortProcess(AbstractProcess): + def __init__(self, shape: ty.Tuple[int, ...]) -> None: + super().__init__(shape=shape) + self.data = Var(shape=shape, init=np.zeros(shape)) + self.in_port = InPort(shape=shape) + + # A minimal PyProcModel implementing InPortProcess @implements(proc=InPortProcess, protocol=LoihiProtocol) @requires(CPU) @@ -298,6 +651,14 @@ def run_spk(self): self.log.info("Received input data for InPortProcess: ", str(self.data)) +# A minimal hierarchical Process with an OutPort +class HOutPortProcess(AbstractProcess): + def __init__(self, data: np.ndarray) -> None: + super().__init__(data=data) + self.out_port = OutPort(shape=data.shape) + self.proc_params['data'] = data + + # A minimal hierarchical ProcModel with a nested OutPortProcess @implements(proc=HOutPortProcess) class SubHOutPortProcModel(AbstractSubProcessModel): @@ -306,6 +667,15 @@ def __init__(self, proc): self.out_proc.out_port.connect(proc.out_port) +# A minimal hierarchical Process with an InPort and a Var +class HInPortProcess(AbstractProcess): + def __init__(self, shape: ty.Tuple[int, ...]) -> None: + super().__init__(shape=shape) + self.data = Var(shape=shape, init=np.zeros(shape)) + self.in_port = InPort(shape=shape) + self.proc_params['shape'] = shape + + # A minimal hierarchical ProcModel with a nested InPortProcess and an aliased # Var @implements(proc=HInPortProcess) @@ -316,5 +686,232 @@ def __init__(self, proc): proc.data.alias(self.in_proc.data) +# A minimal hierarchical Process with an InPort, where the SubProcessModel +# connects the InPort to another InPort via a virtual port. The data that +# comes in through the InPort will eventually be accessible through the +# 's_data' Var. +class HVPInPortProcess(AbstractProcess): + def __init__(self, + h_shape: ty.Tuple[int, ...], + s_shape: ty.Tuple[int, ...], + axes: ty.Tuple[int, ...]) -> None: + super().__init__() + self.s_data = Var(shape=s_shape, init=np.zeros(s_shape)) + self.in_port = InPort(shape=h_shape) + self.proc_params['s_shape'] = s_shape + self.proc_params['axes'] = axes + + +# A minimal hierarchical ProcModel with a nested InPortProcess and an aliased +# Var +@implements(proc=HVPInPortProcess) +class SubHVPInPortProcModel(AbstractSubProcessModel): + def __init__(self, proc): + self.in_proc = InPortProcess(shape=proc.proc_params['s_shape']) + + virtual_port = MockVirtualPort(new_shape=proc.proc_params['s_shape'], + axes=proc.proc_params['axes']) + proc.in_port._connect_forward( + [virtual_port], AbstractPort, assert_same_shape=False + ) + virtual_port.connect(self.in_proc.in_port) + + proc.s_data.alias(self.in_proc.data) + + +# A minimal hierarchical Process with an OutPort, where the data that is +# given as an argument may have a different shape than the OutPort of the +# Process. +class HVPOutPortProcess(AbstractProcess): + def __init__(self, + h_shape: ty.Tuple[int, ...], + data: np.ndarray, + axes: ty.Tuple[int, ...]) -> None: + super().__init__(h_shape=h_shape, data=data) + self.out_port = OutPort(shape=h_shape) + self.proc_params['data'] = data + self.proc_params['h_shape'] = h_shape + self.proc_params['axes'] = axes + + +# A minimal hierarchical ProcModel with a nested OutPortProcess +@implements(proc=HVPOutPortProcess) +class SubHVPOutPortProcModel(AbstractSubProcessModel): + def __init__(self, proc): + self.out_proc = OutPortProcess(data=proc.proc_params['data']) + + virtual_port = MockVirtualPort(new_shape=proc.proc_params['h_shape'], + axes=proc.proc_params['axes']) + self.out_proc.out_port._connect_forward( + [virtual_port], AbstractPort, assert_same_shape=False + ) + virtual_port.connect(proc.out_port) + + +# A minimal Process with a RefPort that writes +class RefPortWriteProcess(AbstractProcess): + def __init__(self, data: np.ndarray) -> None: + super().__init__(data=data) + self.data = Var(shape=data.shape, init=data) + self.ref_port = RefPort(shape=data.shape) + + +# A minimal PyProcModel implementing RefPortWriteProcess +@implements(proc=RefPortWriteProcess, protocol=LoihiProtocol) +@requires(CPU) +@tag('floating_pt') +class PyRefPortWriteProcessModelFloat(PyLoihiProcessModel): + ref_port: PyRefPort = LavaPyType(PyRefPort.VEC_DENSE, np.int32) + data: np.ndarray = LavaPyType(np.ndarray, np.int32) + + def post_guard(self): + return True + + def run_post_mgmt(self): + self.ref_port.write(self.data) + self.log.info("Sent output data of RefPortWriteProcess: ", + str(self.data)) + + +# A minimal Process with a RefPort that reads +class RefPortReadProcess(AbstractProcess): + def __init__(self, data: np.ndarray) -> None: + super().__init__(data=data) + self.data = Var(shape=data.shape, init=data) + self.ref_port = RefPort(shape=data.shape) + + +# A minimal PyProcModel implementing RefPortReadProcess +@implements(proc=RefPortReadProcess, protocol=LoihiProtocol) +@requires(CPU) +@tag('floating_pt') +class PyRefPortReadProcessModelFloat(PyLoihiProcessModel): + ref_port: PyRefPort = LavaPyType(PyRefPort.VEC_DENSE, np.int32) + data: np.ndarray = LavaPyType(np.ndarray, np.int32) + + def post_guard(self): + return True + + def run_post_mgmt(self): + self.data = self.ref_port.read() + self.log.info("Received input data for RefPortReadProcess: ", + str(self.data)) + + +# A minimal Process with a Var and a VarPort +class VarPortProcess(AbstractProcess): + def __init__(self, data: np.ndarray) -> None: + super().__init__(data=data) + self.data = Var(shape=data.shape, init=data) + self.var_port = VarPort(self.data) + + +# A minimal PyProcModel implementing VarPortProcess +@implements(proc=VarPortProcess, protocol=LoihiProtocol) +@requires(CPU) +@tag('floating_pt') +class PyVarPortProcessModelFloat(PyLoihiProcessModel): + var_port: PyInPort = LavaPyType(PyVarPort.VEC_DENSE, np.int32) + data: np.ndarray = LavaPyType(np.ndarray, np.int32) + + +# A minimal hierarchical Process with a RefPort, where the data that is +# given as an argument may have a different shape than the RefPort of the +# Process. +class HVPRefPortWriteProcess(AbstractProcess): + def __init__(self, + h_shape: ty.Tuple[int, ...], + data: np.ndarray, + axes: ty.Tuple[int, ...]) -> None: + super().__init__(h_shape=h_shape, data=data) + self.ref_port = RefPort(shape=h_shape) + self.proc_params['data'] = data + self.proc_params['h_shape'] = h_shape + self.proc_params['axes'] = axes + + +# A minimal hierarchical ProcModel with a nested RefPortWriteProcess +@implements(proc=HVPRefPortWriteProcess) +class SubHVPRefPortWriteProcModel(AbstractSubProcessModel): + def __init__(self, proc): + self.ref_write_proc = RefPortWriteProcess(data=proc.proc_params['data']) + + virtual_port = MockVirtualPort(new_shape=proc.proc_params['h_shape'], + axes=proc.proc_params['axes']) + self.ref_write_proc.ref_port._connect_forward( + [virtual_port], AbstractPort, assert_same_shape=False + ) + virtual_port.connect(proc.ref_port) + + +# A minimal hierarchical Process with a RefPort, where the data that is +# given as an argument may have a different shape than the RefPort of the +# Process. +class HVPRefPortReadProcess(AbstractProcess): + def __init__(self, + h_shape: ty.Tuple[int, ...], + s_shape: ty.Tuple[int, ...], + axes: ty.Tuple[int, ...]) -> None: + super().__init__(h_shape=h_shape, s_shape=s_shape) + self.ref_port = RefPort(shape=h_shape) + self.s_data = Var(s_shape) + self.proc_params['s_shape'] = s_shape + self.proc_params['h_shape'] = h_shape + self.proc_params['axes'] = axes + + +# A minimal hierarchical ProcModel with a nested RefPortReadProcess +@implements(proc=HVPRefPortReadProcess) +class SubHVPRefPortReadProcModel(AbstractSubProcessModel): + def __init__(self, proc): + self.ref_read_proc = \ + RefPortReadProcess(data=np.zeros(proc.proc_params['s_shape'])) + + virtual_port = MockVirtualPort(new_shape=proc.proc_params['h_shape'], + axes=proc.proc_params['axes']) + self.ref_read_proc.ref_port._connect_forward( + [virtual_port], AbstractPort, assert_same_shape=False + ) + virtual_port.connect(proc.ref_port) + + proc.s_data.alias(self.ref_read_proc.data) + + +# A minimal hierarchical Process with a VarPort, where the data that is +# given as an argument may have a different shape than the VarPort of the +# Process. +class HVPVarPortProcess(AbstractProcess): + def __init__(self, + h_shape: ty.Tuple[int, ...], + s_data: np.ndarray, + axes: ty.Tuple[int, ...]) -> None: + super().__init__(h_shape=h_shape, s_data=s_data) + self.h_data = Var(h_shape) + self.s_data = Var(s_data.shape) + self.var_port = VarPort(self.h_data) + + self.proc_params['s_data'] = s_data + self.proc_params['h_shape'] = h_shape + self.proc_params['axes'] = axes + + +# A minimal hierarchical ProcModel with a nested RefPortReadProcess +@implements(proc=HVPVarPortProcess) +class SubHVPVarPortProcModel(AbstractSubProcessModel): + def __init__(self, proc): + s_data = proc.proc_params['s_data'] + self.var_proc = \ + VarPortProcess(data=s_data) + + virtual_port = MockVirtualPort(new_shape=s_data.shape, + axes=proc.proc_params['axes']) + proc.var_port._connect_forward( + [virtual_port], AbstractPort, assert_same_shape=False + ) + virtual_port.connect(self.var_proc.var_port) + + proc.s_data.alias(self.var_proc.data) + + if __name__ == '__main__': unittest.main()