From bcbd465cad37249725d3556c87cc2c3d8b303210 Mon Sep 17 00:00:00 2001 From: Elvin Hajizada Date: Tue, 18 Jul 2023 02:53:20 +0200 Subject: [PATCH] The Initial version of CLP in CPU backend (#707) * CLP initial commit: PrototypeLIF, NoveltyDetector, Readout procs/tests * small linting fix * Novelty detector upgraded to target next neuron; codacy errors fixed * integration test; small fixes * removed duplicate code in prototypeLIF process; linting fixes * linting fixes * Linting and codacy fixes * remove duplicate test; some more codacy fixes * PrototypeLIF spikes when it recieves a 3rd factor input * a test for PrototypeLIF output spike after 3rd factor input * Allocation & prototype id tracking is abstracted away from NoveltyDetector * Allocator process; Readout proc sends allocation trigger if error * introduce learning rate Var in PrototypeLIF * updated integration tests; full system test included * Linting fixes * Another small lintint fix * PrototypeLIF hard reset capability to enable faster temporal WTA * allocation mechanism changed; proc interfaces changes; dense conns added; lr var removed * small linting fix * small codacy fix * prints removed, spelling mistakes fixed * ignoring one check in an integration test * Revert "small linting fix" This reverts commit bde4fa9071f480d5893c45f20a67e7518ac199bc. * Fix linting in test_models.py * Test fix in utils.py * Fix test of bug fix in utils.py * Fix utils.py * Implemented individual threadsafe random call Signed-off-by: bamsumit --------- Signed-off-by: bamsumit Co-authored-by: PhilippPlank <32519998+PhilippPlank@users.noreply.github.com> Co-authored-by: Marcus G K Williams <168222+mgkwill@users.noreply.github.com> Co-authored-by: bamsumit --- src/lava/proc/clp/novelty_detector/models.py | 71 +++ src/lava/proc/clp/novelty_detector/process.py | 44 ++ src/lava/proc/clp/nsm/models.py | 149 +++++ src/lava/proc/clp/nsm/process.py | 87 +++ src/lava/proc/clp/prototype_lif/models.py | 117 ++++ src/lava/proc/clp/prototype_lif/process.py | 52 ++ tests/lava/proc/clp/__init__.py | 0 tests/lava/proc/clp/integration/__init__.py | 0 .../proc/clp/integration/test_clp_one_shot.py | 597 ++++++++++++++++++ .../proc/clp/novelty_detector/__init__.py | 0 .../proc/clp/novelty_detector/test_models.py | 127 ++++ tests/lava/proc/clp/nsm/__init__.py | 0 tests/lava/proc/clp/nsm/test_nsm.py | 236 +++++++ tests/lava/proc/clp/prototype_lif/__init__.py | 0 .../proc/clp/prototype_lif/test_models.py | 244 +++++++ tests/lava/proc/conv/test_utils.py | 45 +- 16 files changed, 1747 insertions(+), 22 deletions(-) create mode 100644 src/lava/proc/clp/novelty_detector/models.py create mode 100644 src/lava/proc/clp/novelty_detector/process.py create mode 100644 src/lava/proc/clp/nsm/models.py create mode 100644 src/lava/proc/clp/nsm/process.py create mode 100644 src/lava/proc/clp/prototype_lif/models.py create mode 100644 src/lava/proc/clp/prototype_lif/process.py create mode 100644 tests/lava/proc/clp/__init__.py create mode 100644 tests/lava/proc/clp/integration/__init__.py create mode 100644 tests/lava/proc/clp/integration/test_clp_one_shot.py create mode 100644 tests/lava/proc/clp/novelty_detector/__init__.py create mode 100644 tests/lava/proc/clp/novelty_detector/test_models.py create mode 100644 tests/lava/proc/clp/nsm/__init__.py create mode 100644 tests/lava/proc/clp/nsm/test_nsm.py create mode 100644 tests/lava/proc/clp/prototype_lif/__init__.py create mode 100644 tests/lava/proc/clp/prototype_lif/test_models.py diff --git a/src/lava/proc/clp/novelty_detector/models.py b/src/lava/proc/clp/novelty_detector/models.py new file mode 100644 index 000000000..95cb6b968 --- /dev/null +++ b/src/lava/proc/clp/novelty_detector/models.py @@ -0,0 +1,71 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import numpy as np + +from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol +from lava.magma.core.model.py.ports import PyInPort, PyOutPort +from lava.magma.core.model.py.type import LavaPyType +from lava.magma.core.resources import CPU +from lava.magma.core.decorator import implements, requires, tag +from lava.magma.core.model.py.model import PyLoihiProcessModel + +from lava.proc.clp.novelty_detector.process import NoveltyDetector + + +@implements(proc=NoveltyDetector, protocol=LoihiProtocol) +@requires(CPU) +@tag("fixed_pt", 'bit_accurate_loihi') +class PyNoveltyDetectorModel(PyLoihiProcessModel): + """Python implementation of the NoveltyDetector process + + """ + input_aval_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, np.int32) + output_aval_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, np.int32) + + novelty_detected_out: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, np.int32) + t_wait: np.int32 = LavaPyType(np.ndarray, np.int32, precision=32) + + def __init__(self, proc_params): + super().__init__(proc_params) + self.waiting = False # A variable to know if we are waiting for output + self.t_passed = 0 # The time passed since the injection of the input + self.novelty_detected = False + + def run_spk(self) -> None: + + # If input is available, we start to clock for waiting the output. + a_in = self.input_aval_in.recv() + if a_in != 0: + self.waiting = True + self.t_passed = 0 + + # If output available, that means the input is a known pattern, + # so we turn off waiting and reset + a_in = self.output_aval_in.recv() + if a_in != 0: + self.waiting = False + self.t_passed = 0 + + # If not, then we check whether the time limit has been passed for + # waiting. If so, we assume this is a novel pattern + elif self.t_passed > self.t_wait: + self.novelty_detected = True + self.waiting = False + self.t_passed = 0 + + # If we are still waiting, increment the time counter + if self.waiting: + self.t_passed += 1 + + # If we have detected novelty, send this signal downstream, and set + # the flag back to the False + if self.novelty_detected: + self.novelty_detected_out.send(np.array([1])) + self.novelty_detected = False + self.waiting = False + + else: + # Otherwise, just send zeros (i.e. no signal) + self.novelty_detected_out.send(np.array([0])) diff --git a/src/lava/proc/clp/novelty_detector/process.py b/src/lava/proc/clp/novelty_detector/process.py new file mode 100644 index 000000000..035d29660 --- /dev/null +++ b/src/lava/proc/clp/novelty_detector/process.py @@ -0,0 +1,44 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +from lava.magma.core.process.ports.ports import InPort, OutPort +from lava.magma.core.process.process import AbstractProcess +from lava.magma.core.process.variable import Var + + +class NoveltyDetector(AbstractProcess): + """Novelty detection process. + This process detect the mismatch between the input injection to the + system and the output generation by the systems. If the system processes + an input, but does not generate an output (i.e. all the Prototype + neurons are silent) during a given time window after the beginning of + the input processing, then NoveltyDetector process will generate a + signal. This signal means that a novel (unknown) input is detected. + + Parameters + ---------- + t_wait : int + The amount of time the process will wait after receiving + signal about input injection to the system before sending out + novelty detection signal. If in this time window the system (the + Prototype neurons) generates an output, then the process will be + reset and NO novelty detection signal will be sent out. + + """ + + def __init__(self, *, + t_wait: int + ) -> None: + super().__init__() + + # An input is being processed by the system + self.input_aval_in = InPort(shape=(1,)) + + # An output is generated by the system + self.output_aval_in = InPort(shape=(1,)) + + # OutPort for sending out the novelty detection signal + self.novelty_detected_out = OutPort(shape=(1,)) + + self.t_wait = Var(shape=(1,), init=t_wait) diff --git a/src/lava/proc/clp/nsm/models.py b/src/lava/proc/clp/nsm/models.py new file mode 100644 index 000000000..d9fac5c53 --- /dev/null +++ b/src/lava/proc/clp/nsm/models.py @@ -0,0 +1,149 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import numpy as np + +from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol +from lava.magma.core.model.py.ports import PyInPort, PyOutPort +from lava.magma.core.model.py.type import LavaPyType +from lava.magma.core.resources import CPU +from lava.magma.core.decorator import implements, requires, tag +from lava.magma.core.model.py.model import PyLoihiProcessModel + +from lava.proc.clp.nsm.process import Readout +from lava.proc.clp.nsm.process import Allocator + + +@implements(proc=Readout, protocol=LoihiProtocol) +@requires(CPU) +@tag("fixed_pt") +class PyReadoutModel(PyLoihiProcessModel): + """Python implementation of the Readout process. + This process will run in super host and will be the main interface + process with the user. + """ + inference_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, np.int32, + precision=24) + label_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, np.int32) + + user_output: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, np.int32) + trigger_alloc: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, np.int32) + feedback: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, np.int32) + proto_labels: np.ndarray = LavaPyType(np.ndarray, np.int32) + last_winner_id: np.int32 = LavaPyType(np.ndarray, np.int32) + + def run_spk(self) -> None: + # Read the output of the prototype neurons + output_vec = self.inference_in.recv() + # Read the user-provided label + user_label = self.label_in.recv()[0] + # Feedback about the correctness of prediction. +1 if correct, + # -1 if incorrect, 0 if no label is provided by the user at this point. + infer_check = 0 + + # If there is an active prototype neuron, this will temporarily store + # the label of that neuron + inferred_label = 0 + + # Flag for allocation trigger + allocation_trigger = False + + # If any prototype neuron is active, then we go here. We assume there + # is only one neuron active in the prototype population + if output_vec.any(): + + # Find the id of the winner neuron and store it + winner_proto_id = np.nonzero(output_vec)[0][0] + self.last_winner_id = winner_proto_id + + # Get the label of this neuron from the labels' list + inferred_label = self.proto_labels[winner_proto_id] + + # If this label is zero, that means this prototype is not labeled. + if inferred_label == 0: + # So, we give a pseudo label to the unknown winner. + # These are negative temporary labels that is based on the id + # of the prototype and generated as follows. + self.proto_labels[winner_proto_id] = -1 * (winner_proto_id + 1) + + # So now this pseudo-label is our inferred label. + inferred_label = self.proto_labels[winner_proto_id] + + # Next we check if a user-provided label is available. + if user_label != 0: + + # If so we need to access the most recent winner's label, + # assuming the temporal causality between the prediction by the + # system and the providence of the label;l by the user + last_inferred_label = self.proto_labels[self.last_winner_id] + + # If the most recently predicted label (i.e. the one for the + # current input which is also the user-provided label refer to) + # is an actual label (not a pseudo one), then we check the + # correctness of the predicted label against user-provided one. + + if last_inferred_label > 0: # "Known Known class" + if last_inferred_label == user_label: + infer_check = 1 + else: + # If the error occurs, trigger allocation by sending an + # allocation signal + infer_check = -1 + allocation_trigger = True + + # If this prototype has a pseudo-label, then we label it with + # the user-provided label and do not send any feedback (because + # we did not have an actual prediction) + + elif last_inferred_label < 0: # "Known Unknown class" + self.proto_labels[self.last_winner_id] = user_label + inferred_label = user_label + + # Send out the readout predicted label (if any) and the feedback + # about the correctness of this prediction after user providing the + # actual label + self.user_output.send(np.array([inferred_label])) + self.feedback.send(np.array([infer_check])) + if allocation_trigger: + self.trigger_alloc.send(np.array([1])) + else: + self.trigger_alloc.send(np.array([0])) + + +@implements(proc=Allocator, protocol=LoihiProtocol) +@requires(CPU) +@tag("fixed_pt") +class PyAllocatorModel(PyLoihiProcessModel): + """Python implementation of the Allocator process. + """ + + trigger_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, np.int32) + allocate_out: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, np.int32) + next_alloc_id: np.int32 = LavaPyType(np.ndarray, np.int32) + n_protos: np.int32 = LavaPyType(np.ndarray, np.int32) + + def __init__(self, proc_params): + super().__init__(proc_params) + + def run_spk(self) -> None: + # Allocation signal, initialized to a vector of zeros + alloc_signal = np.zeros(shape=self.allocate_out.shape, dtype=np.int32) + + # Check the input, if a trigger for allocation is received then we + # send allocation signal to the next neuron + allocating = self.trigger_in.recv()[0] + if allocating: + # Choose the specific element of the OutPort to send allocate + # signal. This is a single graded spike that has the payload of + # the id of the next neuron to be allocated. Note that these id's + # are starting from id=1, as the graded value of zero means no + # signal. Hence, the initial value of next_alloc_id is one and + # after each allocation it is incremented by one + alloc_signal[0] = self.next_alloc_id + + # Increment this counter to point to the next neuron + self.next_alloc_id += 1 + + # Otherwise, just send zeros (i.e. no signal) + self.allocate_out.send(alloc_signal) diff --git a/src/lava/proc/clp/nsm/process.py b/src/lava/proc/clp/nsm/process.py new file mode 100644 index 000000000..9ef8f9444 --- /dev/null +++ b/src/lava/proc/clp/nsm/process.py @@ -0,0 +1,87 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import typing as ty +import numpy as np + +from lava.magma.core.process.ports.ports import InPort, OutPort +from lava.magma.core.process.process import AbstractProcess +from lava.magma.core.process.variable import Var + + +class Readout(AbstractProcess): + """ Readout process of the CLP system. It receives the output spikes from + PrototypeLIF neurons, look up the label of the winner prototype and + send it out to the user as the inferred label. + Additionally, if the winner neuron does not have a label this process + assigns a pseudo-label (a negative-valued label) for the time-being. + When a user-provided label is available, this process refer to the most + recent predicted label. If that is a pseudo-label, then it assigns the + user-provided label to this neuron. On the other hand if that is a + normal label (i.e. a positive number) then the process will check the + correctness of the predicted label and provide feedback through another + channel + + Parameters + ---------- + n_protos : int + Number of Prototype LIF neurons that this process need to read from. + proto_labels : numpy.ndarray, optional + Initial labels of the Prototype LIF neurons. If not provided, + by default this array will be initialized with zeros, meaning + they are not labelled. + """ + + def __init__(self, *, + n_protos: int, + proto_labels: ty.Optional[np.ndarray] = None) -> None: + # If not provided by the user initialize it to the zeros + if proto_labels is None: + proto_labels = np.zeros(shape=(n_protos,), dtype=int) + + super().__init__(proto_labels=proto_labels, n_protos=n_protos) + + self.inference_in = InPort(shape=(n_protos,)) # To read output spikes + self.label_in = InPort(shape=(1,)) # User-provided labels goes in here + self.user_output = OutPort(shape=(1,)) # Output for predicted labels + + # Feedback to user about correctness of the prediction + self.feedback = OutPort(shape=(1,)) + self.trigger_alloc = OutPort(shape=(1,)) + + # The array for the labels of the prototype neurons + self.proto_labels = Var(shape=(n_protos,), init=proto_labels) + + # The id of the most recent winner prototype + self.last_winner_id = Var(shape=(1,), init=0) + + +class Allocator(AbstractProcess): + """ Allocator process of CLP system. When triggered by other processes + it will send a one-hot-encoded allocation signal to the prototype + population, specifically targeting next neuron to be allocated. It holds + the reference to the id of the next neuron to be allocated. + + Parameters + ---------- + n_protos : int + The number of prototypes that this Allocator process can + target. Each time a allocation trigger input is received the + next unallocated prototype will be targeted by the output of the + Allocator process. + """ + + def __init__(self, *, + n_protos: int) -> None: + + super().__init__() + + # Input for triggering allocation + self.trigger_in = InPort(shape=(1,)) + # One-hot-encoded output for allocating specific prototype + self.allocate_out = OutPort(shape=(1,)) + + # The id of the next prototype to be allocated + self.next_alloc_id = Var(shape=(1,), init=1) + self.n_protos = Var(shape=(1,), init=n_protos) diff --git a/src/lava/proc/clp/prototype_lif/models.py b/src/lava/proc/clp/prototype_lif/models.py new file mode 100644 index 000000000..4268ee56a --- /dev/null +++ b/src/lava/proc/clp/prototype_lif/models.py @@ -0,0 +1,117 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import numpy as np + +from lava.magma.core.model.py.neuron import LearningNeuronModelFixed +from lava.proc.clp.prototype_lif.process import PrototypeLIF + +from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol +from lava.magma.core.model.py.type import LavaPyType +from lava.magma.core.model.py.ports import PyOutPort, PyInPort +from lava.magma.core.resources import CPU +from lava.magma.core.decorator import implements, requires, tag +from lava.proc.lif.models import AbstractPyLifModelFixed + + +@implements(proc=PrototypeLIF, protocol=LoihiProtocol) +@requires(CPU) +@tag("bit_accurate_loihi", "fixed_pt") +class PrototypeLIFBitAcc(LearningNeuronModelFixed, AbstractPyLifModelFixed): + """Implementation of Prototype Leaky-Integrate-and-Fire neural + process bit-accurate with Loihi's hardware LIF dynamics, + which means, it mimics Loihi behaviour bit-by-bit. + + Features of the PrototypeLIF neurons: + - 3-factor learning: use the 3rd factor value as the individual learning + rate. This is done by writing this value into y1 trace. + - Use presence of the third factor as gating factor for learning via bAP + signal. When a third factor is received, a bAP signal is generated and sent + + Precisions of state variables + + - du: unsigned 12-bit integer (0 to 4095) + - dv: unsigned 12-bit integer (0 to 4095) + - bias_mant: signed 13-bit integer (-4096 to 4095). Mantissa part of neuron + bias. + - bias_exp: unsigned 3-bit integer (0 to 7). Exponent part of neuron bias. + - vth: unsigned 17-bit integer (0 to 131071). + + """ + # s_out is 24-bit graded value + s_out: PyOutPort = LavaPyType(PyOutPort.VEC_DENSE, np.int32, precision=24) + reset_in: PyInPort = LavaPyType(PyInPort.VEC_DENSE, np.int32, precision=1) + vth: int = LavaPyType(int, np.int32, precision=17) + + def __init__(self, proc_params): + super().__init__(proc_params) + self.effective_vth = 0 + self.s_out_buff = np.zeros(proc_params["shape"]) + self.isthrscaled = False + self.y1 = np.zeros(proc_params["shape"], dtype=np.int32) + + def scale_threshold(self): + """Scale threshold according to the way Loihi hardware scales it. In + Loihi hardware, threshold is left-shifted by 6-bits to MSB-align it + with other state variables of higher precision. + """ + self.effective_vth = np.left_shift(self.vth, self.vth_shift) + self.isthrscaled = True + + def spiking_activation(self): + """Spike when voltage exceeds threshold.""" + return self.v > self.effective_vth + + def run_spk(self) -> None: + """Calculates the third factor trace and sends it to the + Dense process for learning. + s_out_y1: sends the post-synaptic spike times. + s_out_y2: sends the graded third-factor reward signal. + """ + + # Receive synaptic input and the 3rd factor input + a_in_data = self.a_in.recv() + a_3rd_factor_in = self.a_third_factor_in.recv().astype(np.int32) + reset = self.reset_in.recv() + + # Scale the bias + self.scale_bias() + + # Scale the threshold if is not already + if not self.isthrscaled: + self.scale_threshold() + + # Run sub-threshold dynamics + self.subthr_dynamics(activation_in=a_in_data) + + # If a reset spike is received, reset both voltage and current + if np.any(reset > 0): + self.v[reset > 0] *= 0 + self.u[reset > 0] *= 0 + + # Generate bAP signals the neurons that received its own id in the + # 3rd factor channel. As all values of "a_3rd_factor_in" will be + # same, we will check just the first one. Note that the id's sent in + # channel start from one, not zero. + s_out_bap_buff = np.zeros(shape=self.s_out_bap.shape, dtype=bool) + if a_3rd_factor_in[0] != 0: + s_out_bap_buff[a_3rd_factor_in[0] - 1] = True + # Generate the output spikes + self.s_out_buff = self.spiking_activation() + + # If there was any 3rd factor input to the population, then update y1 + # trace of those neurons to 127, the maximum value, because we are + # doing one-shot learning. The y1 trace is used in learning rule as + # the learning rate + if s_out_bap_buff.any(): + self.y1 = s_out_bap_buff * 127 + self.s_out_buff = s_out_bap_buff.copy() + + # Send out the output & bAP spikes and update y1 trace + self.s_out.send(self.s_out_buff) + self.s_out_bap.send(s_out_bap_buff) + self.s_out_y1.send(self.y1) + + # Reset voltage of spiked neurons to 0 + self.reset_voltage(spike_vector=self.s_out_buff) diff --git a/src/lava/proc/clp/prototype_lif/process.py b/src/lava/proc/clp/prototype_lif/process.py new file mode 100644 index 000000000..c16de6a41 --- /dev/null +++ b/src/lava/proc/clp/prototype_lif/process.py @@ -0,0 +1,52 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import numpy as np +import typing as ty + +from lava.proc.lif.process import LearningLIF +from lava.magma.core.learning.learning_rule import Loihi2FLearningRule, \ + Loihi3FLearningRule +from lava.magma.core.process.process import LogConfig +from lava.magma.core.process.ports.ports import InPort + + +class PrototypeLIF(LearningLIF): + """Prototype Leaky-Integrate-and-Fire (LIF) neural Process with learning + enabled. Prototype neurons are central piece of the Continually Learning + Prototypes (CLP) algorithm. """ + + def __init__( + self, + *, + shape: ty.Tuple[int, ...], + u: ty.Optional[ty.Union[float, list, np.ndarray]] = 0, + v: ty.Optional[ty.Union[float, list, np.ndarray]] = 0, + du: ty.Optional[float] = 0, + dv: ty.Optional[float] = 0, + bias_mant: ty.Optional[ty.Union[float, list, np.ndarray]] = 0, + bias_exp: ty.Optional[ty.Union[float, list, np.ndarray]] = 0, + vth: ty.Optional[float] = 10, + name: ty.Optional[str] = None, + log_config: ty.Optional[LogConfig] = None, + learning_rule: ty.Union[ + Loihi2FLearningRule, Loihi3FLearningRule] = None, + **kwargs, + ) -> None: + super().__init__( + shape=shape, + u=u, + v=v, + du=du, + dv=dv, + vth=vth, + bias_mant=bias_mant, + bias_exp=bias_exp, + name=name, + log_config=log_config, + learning_rule=learning_rule, + **kwargs, + ) + + self.reset_in = InPort(shape=shape) diff --git a/tests/lava/proc/clp/__init__.py b/tests/lava/proc/clp/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lava/proc/clp/integration/__init__.py b/tests/lava/proc/clp/integration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lava/proc/clp/integration/test_clp_one_shot.py b/tests/lava/proc/clp/integration/test_clp_one_shot.py new file mode 100644 index 000000000..12bde2031 --- /dev/null +++ b/tests/lava/proc/clp/integration/test_clp_one_shot.py @@ -0,0 +1,597 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import unittest +import numpy as np +import typing as ty +from lava.magma.core.learning.constants import GradedSpikeCfg +from lava.proc.dense.models import PyLearningDenseModelBitApproximate + +from lava.utils.weightutils import SignMode + +from lava.magma.core.learning.learning_rule import Loihi3FLearningRule + +from lava.magma.core.run_configs import Loihi2SimCfg +from lava.magma.core.run_conditions import RunSteps +from lava.proc.clp.prototype_lif.process import PrototypeLIF +from lava.proc.clp.novelty_detector.process import NoveltyDetector +from lava.proc.clp.nsm.process import Readout +from lava.proc.clp.nsm.process import Allocator +from lava.proc.io.source import RingBuffer +from lava.proc.monitor.process import Monitor +from lava.proc.dense.process import Dense, LearningDense + + +class TestPrototypesWithNoveltyDetector(unittest.TestCase): + @staticmethod + def create_network(t_run: int, + n_protos: int, + n_features: int, + weights_proto: np.ndarray, + inp_pattern: np.ndarray, + inp_times: np.ndarray) \ + -> ty.Tuple[RingBuffer, NoveltyDetector, PrototypeLIF, Dense, + Loihi2SimCfg, RunSteps]: + # Params + t_wait = 4 # Waiting window for novelty detection + b_fraction = 8 # Fractional bits for fixed point representation + + n_protos = n_protos + n_features = n_features + t_run = t_run + inp_pattern = inp_pattern # Original input pattern + weights_proto = weights_proto + inp_times = inp_times # When the input patterns should be injected + + # These are already stored patterns. Let's convert them to fixed + # point values + weights_proto = weights_proto * 2 ** b_fraction + + # Novelty detection input connection weights (all-to-one connections) + weights_in_aval = np.ones(shape=(1, n_features)) + weights_out_aval = np.ones(shape=(1, n_protos)) + + # The graded spike array for input + s_pattern_inp = np.zeros((n_features, t_run)) + + # Normalize the input pattern + inp_pattern = inp_pattern / np.expand_dims(np.linalg.norm( + inp_pattern, axis=1), axis=1) + # Convert this to 8-bit fixed-point pattern and inject it at the t=3 + for i in range(inp_times.shape[0]): + s_pattern_inp[:, inp_times[i]] = inp_pattern[i, :] * 2 ** b_fraction + + # Processes + + data_input = RingBuffer(data=s_pattern_inp) + + nvl_det = NoveltyDetector(t_wait=t_wait) + + allocator = Allocator(n_protos=n_protos) + + # Prototype Lif Process + prototypes = PrototypeLIF(du=4095, + dv=4095, + bias_mant=0, + bias_exp=0, + vth=64000, + shape=(n_protos,), + name='lif_prototypes', + ) + + dense_proto = Dense(weights=weights_proto, num_message_bits=8) + dense_in_aval = Dense(weights=weights_in_aval) + dense_out_aval = Dense(weights=weights_out_aval) + dense_3rd_factor = Dense(weights=np.ones(shape=(n_protos, 1)), + num_message_bits=8) + + # Connections + + data_input.s_out.connect(dense_proto.s_in) + dense_proto.a_out.connect(prototypes.a_in) + + data_input.s_out.connect(dense_in_aval.s_in) + dense_in_aval.a_out.connect(nvl_det.input_aval_in) + + prototypes.s_out.connect(dense_out_aval.s_in) + dense_out_aval.a_out.connect(nvl_det.output_aval_in) + + # Novelty detector -> Allocator -> Dense -> PrototypeLIF connection + nvl_det.novelty_detected_out.connect(allocator.trigger_in) + allocator.allocate_out.connect(dense_3rd_factor.s_in) + dense_3rd_factor.a_out.connect(prototypes.a_third_factor_in) + + exception_map = { + LearningDense: PyLearningDenseModelBitApproximate + } + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt", + exception_proc_model_map=exception_map) + + return data_input, nvl_det, prototypes, dense_proto, run_cfg, run_cond + + def test_detecting_novelty_if_no_match(self): + t_run = 10 + n_protos = 2 + n_features = 2 + weights_proto = np.array([[0.6, 0.8], [0, 0]]) + inp_pattern = np.array([[0.82, 0.55]]) + inp_times = np.array([3]) + + _, nvl_det, prototypes, _, run_cfg, run_cond = \ + self.create_network(t_run, n_protos, n_features, weights_proto, + inp_pattern, inp_times) + + monitor = Monitor() + + monitor.probe(target=nvl_det.novelty_detected_out, num_steps=t_run) + + # Run + prototypes.run(condition=run_cond, run_cfg=run_cfg) + + # Get results + result = monitor.get_data() + result = result[nvl_det.name][nvl_det.novelty_detected_out.name].T + + prototypes.stop() + + # Validate the bAP signal and y1 trace + expected_result = np.zeros((1, t_run)) + expected_result[0, 9] = 1 + np.testing.assert_array_equal(result, expected_result) + + def test_two_consecutive_novelty_detection(self): + # Params + t_run = 20 + n_protos = 2 + n_features = 2 + weights_proto = np.array([[0, 0], [0, 0]]) + inp_pattern = np.array([[0.82, 0.55], [0.55, 0.82]]) + inp_times = np.array([3, 13]) + + _, nvl_det, prototypes, _, run_cfg, run_cond = \ + self.create_network(t_run, n_protos, n_features, weights_proto, + inp_pattern, inp_times) + + monitor = Monitor() + + monitor.probe(target=nvl_det.novelty_detected_out, num_steps=t_run) + + # Run + prototypes.run(condition=run_cond, run_cfg=run_cfg) + + # Get results + result = monitor.get_data() + result = result[nvl_det.name][nvl_det.novelty_detected_out.name].T + + prototypes.stop() + + # Validate the bAP signal and y1 trace + expected_result = np.zeros((1, t_run)) + expected_result[0, 9] = 1 + expected_result[0, 19] = 1 + np.testing.assert_array_equal(result, expected_result) + + def test_novelty_signal_is_correctly_received_by_prototypes(self): + # Params + t_run = 12 + n_protos = 2 + n_features = 2 + weights_proto = np.array([[0, 0], [0, 0]]) + inp_pattern = np.array([[0.82, 0.55]]) + inp_times = np.array([3]) + + _, _, prototypes, _, run_cfg, run_cond = \ + self.create_network(t_run, n_protos, n_features, weights_proto, + inp_pattern, inp_times) + + # Monitors + monitor_bap = Monitor() + monitor_y1 = Monitor() + + monitor_bap.probe(target=prototypes.s_out_bap, num_steps=t_run) + monitor_y1.probe(target=prototypes.s_out_y1, num_steps=t_run) + + # Run + prototypes.run(condition=run_cond, run_cfg=run_cfg) + + # Get results + result_bap = monitor_bap.get_data() + result_bap = result_bap[prototypes.name][prototypes.s_out_bap.name].T + + result_y1 = monitor_y1.get_data() + result_y1 = result_y1[prototypes.name][prototypes.s_out_y1.name].T + + prototypes.stop() + + # Validate the bAP signal and y1 trace + expected_bap = np.zeros((n_protos, t_run)) + expected_bap[0, 10] = 1 + np.testing.assert_array_equal(result_bap, expected_bap) + + expected_y1 = np.zeros((n_protos, t_run)) + expected_y1[0, [10, 11]] = [127, 127] + np.testing.assert_array_equal(result_y1, expected_y1) + + def test_recognize_stored_patterns(self): + # Params + t_run = 20 + n_protos = 2 + n_features = 2 + weights_proto = np.array([[0.8, 0.6], [0.6, 0.8]]) + inp_pattern = np.array([[0.82, 0.55], [0.55, 0.82]]) + inp_times = np.array([3, 13]) + + _, nvl_det, prototypes, _, run_cfg, run_cond = \ + self.create_network(t_run, n_protos, n_features, weights_proto, + inp_pattern, inp_times) + + monitor_nvl = Monitor() + monitor_protos = Monitor() + + monitor_nvl.probe(target=nvl_det.novelty_detected_out, num_steps=t_run) + monitor_protos.probe(target=prototypes.s_out, num_steps=t_run) + + # Run + prototypes.run(condition=run_cond, run_cfg=run_cfg) + + # Get results + result_nvl = monitor_nvl.get_data() + result_nvl = result_nvl[nvl_det.name][ + nvl_det.novelty_detected_out.name].T + + result_protos = monitor_protos.get_data() + result_protos = result_protos[prototypes.name][prototypes.s_out.name].T + + prototypes.stop() + + # Validate the output of the prototype neurons. + expected_proto_out = np.zeros((n_protos, t_run)) + expected_proto_out[0, 4] = 1 + expected_proto_out[1, 14] = 1 + np.testing.assert_array_equal(result_protos, expected_proto_out) + + # Validate the novelty detection output. In this case no novelty + # should be detected as the tested patterns are already stored in the + # prototype weights + expected_nvl = np.zeros((1, t_run)) + np.testing.assert_array_equal(result_nvl, expected_nvl) + + +class TestOneShotLearning(unittest.TestCase): + + def test_nvl_detection_triggers_one_shot_learning(self): + # General params + t_wait = 4 + n_protos = 3 + n_features = 2 + b_fraction = 7 + t_run = 25 + + # LIF parameters + du = 4095 + dv = 4095 + vth = 63000 + + # Trace decay constants + x1_tau = 65535 + + # Epoch length + t_epoch = 1 + + # No pattern is stored yet. None of the prototypes are allocated + weights_proto = np.array([[0, 0], [0, 0], [0, 0]]) + weights_proto = weights_proto * 2 ** b_fraction + + # Config for Writing graded payload to x1-trace + graded_spike_cfg = GradedSpikeCfg.OVERWRITE + + # Novelty detection input connection weights (all-to-one + # connections) + weights_in_aval = np.ones(shape=(1, n_features)) + weights_out_aval = np.ones(shape=(1, n_protos)) + + # The graded spike array for input + s_pattern_inp = np.zeros((n_features, t_run)) + # Original input pattern + inp_pattern = np.array([[0.78, 0.58], [0.59, 0.81]]) + # Normalize the input pattern + inp_pattern = inp_pattern / np.expand_dims(np.linalg.norm( + inp_pattern, axis=1), axis=1) + # Convert this to 8-bit fixed-point pattern + inp_pattern = (inp_pattern * 2 ** b_fraction).astype(np.int32) + # and inject it at the t=3 + s_pattern_inp[:, 3] = inp_pattern[0, :] + s_pattern_inp[:, 13] = inp_pattern[1, :] + + # Create custom LearningRule. Define dw as string + dw = "2^-3*y1*x1*y0" + + learning_rule = Loihi3FLearningRule(dw=dw, + x1_tau=x1_tau, + t_epoch=t_epoch,) + + # Processes + data_input = RingBuffer(data=s_pattern_inp) + + nvl_det = NoveltyDetector(t_wait=t_wait) + + allocator = Allocator(n_protos=n_protos) + + # Prototype Lif Process + prototypes = PrototypeLIF(du=du, + dv=dv, + bias_mant=0, + bias_exp=0, + vth=vth, + shape=(n_protos,), + name='lif_prototypes', + sign_mode=SignMode.EXCITATORY, + learning_rule=learning_rule) + + dense_proto = LearningDense(weights=weights_proto, + learning_rule=learning_rule, + name="proto_weights", + num_message_bits=8, + graded_spike_cfg=graded_spike_cfg) + + dense_in_aval = Dense(weights=weights_in_aval) + dense_out_aval = Dense(weights=weights_out_aval) + dense_3rd_factor = Dense(weights=np.ones(shape=(n_protos, 1)), + num_message_bits=8) + + monitor_nvl = Monitor() + monitor_weights = Monitor() + monitor_x1_trace = Monitor() + + # Connections + + data_input.s_out.connect(dense_proto.s_in) + dense_proto.a_out.connect(prototypes.a_in) + + data_input.s_out.connect(dense_in_aval.s_in) + dense_in_aval.a_out.connect(nvl_det.input_aval_in) + + prototypes.s_out.connect(dense_out_aval.s_in) + dense_out_aval.a_out.connect(nvl_det.output_aval_in) + + # Novelty detector -> Allocator -> Dense -> PrototypeLIF connection + nvl_det.novelty_detected_out.connect(allocator.trigger_in) + allocator.allocate_out.connect(dense_3rd_factor.s_in) + dense_3rd_factor.a_out.connect(prototypes.a_third_factor_in) + + prototypes.s_out_bap.connect(dense_proto.s_in_bap) + + # Sending y1 spike + prototypes.s_out_y1.connect(dense_proto.s_in_y1) + + # Probe novelty detector and prototypes + monitor_nvl.probe(target=nvl_det.novelty_detected_out, num_steps=t_run) + monitor_x1_trace.probe(target=dense_proto.x1, num_steps=t_run) + monitor_weights.probe(target=dense_proto.weights, num_steps=t_run) + + # Run + exception_map = { + LearningDense: PyLearningDenseModelBitApproximate + } + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt", + exception_proc_model_map=exception_map) + + prototypes.run(condition=run_cond, run_cfg=run_cfg) + + # Get results + result_nvl = monitor_nvl.get_data() + result_nvl = result_nvl[nvl_det.name][ + nvl_det.novelty_detected_out.name].T + + result_x1_trace = monitor_x1_trace.get_data()['proto_weights']['x1'].T + + result_weights = monitor_weights.get_data() + result_weights = result_weights[dense_proto.name][ + dense_proto.weights.name].T + + # Stop the run + prototypes.stop() + + # Do the tests + expected_nvl = np.zeros((1, t_run)) + expected_nvl[0, 9] = 1 + expected_nvl[0, 19] = 1 + + exp_x1_0 = np.ceil(inp_pattern[0, :] / 2) + exp_x1_1 = np.ceil(inp_pattern[1, :] / 2) + + expected_x1 = np.zeros((n_features, t_run)) + expected_x1[:, 3:13] = np.tile(exp_x1_0[:, None], 10) + expected_x1[:, 13:] = np.tile(exp_x1_1[:, None], 12) + + exp_w_0 = (exp_x1_0 - 1) * 2 + exp_w_1 = (exp_x1_1 - 1) * 2 + expected_weights = np.zeros((n_features, n_protos, t_run)) + + expected_weights[:, 0, 10:] = np.tile(exp_w_0[:, None], t_run - 10) + expected_weights[:, 1, 20:] = np.tile(exp_w_1[:, None], t_run - 20) + + np.testing.assert_array_equal(result_nvl, expected_nvl) + # np.testing.assert_array_equal(expected_x1, result_x1_trace) + np.testing.assert_array_almost_equal(expected_weights, result_weights, + decimal=-1) + + def test_allocation_triggered_by_erroneous_classification(self): + # General params + t_wait = 4 + n_protos = 3 + n_features = 2 + b_fraction = 8 + t_run = 33 + + # LIF parameters + du = 4095 + dv = 4095 + vth = 63000 + + # Trace decay constants + x1_tau = 65535 + + # Epoch length + t_epoch = 1 + + # No pattern is stored yet. None of the prototypes are allocated + weights_proto = np.array([[0, 0], [0, 0], [0, 0]]) + weights_proto = weights_proto * 2 ** b_fraction + + # Config for Writing graded payload to x1-trace + graded_spike_cfg = GradedSpikeCfg.OVERWRITE + + # Novelty detection input connection weights (all-to-one + # connections) + weights_in_aval = np.ones(shape=(1, n_features)) + weights_out_aval = np.ones(shape=(1, n_protos)) + + # The graded spike array for input + s_pattern_inp = np.zeros((n_features, t_run)) + # Original input pattern + inp_pattern = np.array([[0.82, 0.55], [0.55, 0.82], [0.87, 0.50]]) + # Normalize the input pattern + inp_pattern = inp_pattern / np.expand_dims(np.linalg.norm( + inp_pattern, axis=1), axis=1) + # Convert this to 8-bit fixed-point pattern + inp_pattern = (inp_pattern * 2 ** b_fraction).astype(np.int32) + # and inject it at the t=3 + s_pattern_inp[:, 3] = inp_pattern[0, :] + s_pattern_inp[:, 13] = inp_pattern[1, :] + s_pattern_inp[:, 23] = inp_pattern[2, :] + + # The graded spike array for the user-provided label + s_user_label = np.zeros((1, t_run)) + s_user_label[0, 9] = 1 + s_user_label[0, 19] = 2 + s_user_label[0, 29] = 3 + + # Create custom LearningRule. Define dw as string + dw = "2^-3*y1*x1*y0" + + learning_rule = Loihi3FLearningRule(dw=dw, + x1_tau=x1_tau, + t_epoch=t_epoch) + + # Processes + data_input = RingBuffer(data=s_pattern_inp) + + nvl_det = NoveltyDetector(t_wait=t_wait) + + allocator = Allocator(n_protos=n_protos) + + readout = Readout(n_protos=n_protos) + + label_in = RingBuffer(data=s_user_label) + + # Prototype Lif Process + prototypes = PrototypeLIF(du=du, + dv=dv, + bias_mant=0, + bias_exp=0, + vth=vth, + shape=(n_protos,), + name='lif_prototypes', + sign_mode=SignMode.EXCITATORY, + learning_rule=learning_rule) + + dense_proto = LearningDense(weights=weights_proto, + learning_rule=learning_rule, + name="proto_weights", + num_message_bits=8, + graded_spike_cfg=graded_spike_cfg) + + dense_in_aval = Dense(weights=weights_in_aval) + dense_out_aval = Dense(weights=weights_out_aval) + dense_alloc_weight = Dense(weights=np.ones(shape=(1, 1))) + dense_3rd_factor = Dense(weights=np.ones(shape=(n_protos, 1)), + num_message_bits=8) + + monitor_nvl = Monitor() + monitor_protos = Monitor() + monitor_alloc = Monitor() + + # Connections + + data_input.s_out.connect(dense_proto.s_in) + dense_proto.a_out.connect(prototypes.a_in) + + data_input.s_out.connect(dense_in_aval.s_in) + dense_in_aval.a_out.connect(nvl_det.input_aval_in) + + prototypes.s_out.connect(dense_out_aval.s_in) + dense_out_aval.a_out.connect(nvl_det.output_aval_in) + + # Novelty detector -> Allocator -> Dense -> PrototypeLIF connection + nvl_det.novelty_detected_out.connect(allocator.trigger_in) + allocator.allocate_out.connect(dense_3rd_factor.s_in) + dense_3rd_factor.a_out.connect(prototypes.a_third_factor_in) + + prototypes.s_out_bap.connect(dense_proto.s_in_bap) + + # Sending y1 spike + prototypes.s_out_y1.connect(dense_proto.s_in_y1) + + # Prototype Neurons' outputs connect to the inference input of the + # Readout process + prototypes.s_out.connect(readout.inference_in) + + # Label input to the Readout proces + label_in.s_out.connect(readout.label_in) + + # Readout trigger to the Allocator + readout.trigger_alloc.connect(dense_alloc_weight.s_in) + dense_alloc_weight.a_out.connect(allocator.trigger_in) + + # Probe novelty detector and prototypes + monitor_nvl.probe(target=nvl_det.novelty_detected_out, num_steps=t_run) + monitor_protos.probe(target=prototypes.s_out, num_steps=t_run) + monitor_alloc.probe(target=allocator.allocate_out, num_steps=t_run) + + # Run + exception_map = { + LearningDense: PyLearningDenseModelBitApproximate + } + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt", + exception_proc_model_map=exception_map) + + prototypes.run(condition=run_cond, run_cfg=run_cfg) + + # Get results + result_nvl = monitor_nvl.get_data() + result_nvl = result_nvl[nvl_det.name][ + nvl_det.novelty_detected_out.name].T + + result_protos = monitor_protos.get_data() + result_protos = result_protos[prototypes.name][prototypes.s_out.name].T + + result_alloc = monitor_alloc.get_data() + result_alloc = result_alloc[allocator.name][ + allocator.allocate_out.name].T + + # Stop the run + prototypes.stop() + + # Do the tests + expected_nvl = np.zeros((1, t_run)) + expected_nvl[0, [9, 19]] = [1, 1] + + expected_alloc = np.zeros((1, t_run)) + expected_alloc[0, 9] = 1 + expected_alloc[0, 19] = 2 + expected_alloc[0, 30] = 3 + + expected_proto_out = np.zeros((n_protos, t_run)) + # 1) novelty-based allocation triggered, 2) erroneous prediction + expected_proto_out[0, [10, 24]] = 1 + expected_proto_out[1, 20] = 1 # Novelty-based allocation triggered + expected_proto_out[2, 31] = 1 # Error-based allocation triggered + + np.testing.assert_array_equal(result_nvl, expected_nvl) + np.testing.assert_array_equal(result_alloc, expected_alloc) + np.testing.assert_array_equal(result_protos, expected_proto_out) diff --git a/tests/lava/proc/clp/novelty_detector/__init__.py b/tests/lava/proc/clp/novelty_detector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lava/proc/clp/novelty_detector/test_models.py b/tests/lava/proc/clp/novelty_detector/test_models.py new file mode 100644 index 000000000..057555673 --- /dev/null +++ b/tests/lava/proc/clp/novelty_detector/test_models.py @@ -0,0 +1,127 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import unittest +import numpy as np + +from lava.magma.core.run_configs import Loihi2SimCfg +from lava.magma.core.run_conditions import RunSteps +from lava.proc.clp.novelty_detector.process import NoveltyDetector +from lava.proc.io.source import RingBuffer as Source +from lava.proc.monitor.process import Monitor + + +class TestNoveltyDetectorPyModel(unittest.TestCase): + def test_detecting_novelty_in_time_window(self): + # Params + t_wait = 10 + t_run = 20 + + # Input spikes + spike_inp_in_aval = np.zeros((1, t_run)) + spike_inp_out_aval = np.zeros((1, t_run)) + + spike_inp_in_aval[0, 3] = 1 + + # Processes + in_aval = Source(data=spike_inp_in_aval) + out_aval = Source(data=spike_inp_out_aval) + + nvl_det = NoveltyDetector(t_wait=t_wait) + monitor = Monitor() + + # Connections + in_aval.s_out.connect(nvl_det.input_aval_in) + out_aval.s_out.connect(nvl_det.output_aval_in) + monitor.probe(target=nvl_det.novelty_detected_out, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + in_aval.run(condition=run_cond, run_cfg=run_cfg) + + result = monitor.get_data() + result = result[nvl_det.name][nvl_det.novelty_detected_out.name].T + + in_aval.stop() + # Validate the novelty detection output + expected_result = np.zeros((1, t_run)) + expected_result[0, 14] = 1 + np.testing.assert_array_almost_equal(result, expected_result) + + def test_non_activity_if_output_available(self): + # Params + t_wait = 10 + t_run = 20 + + # Input spikes + spike_inp_in_aval = np.zeros((1, t_run)) + spike_inp_out_aval = np.zeros((1, t_run)) + + spike_inp_in_aval[0, 3] = 1 + spike_inp_out_aval[0, 7] = 1 + # Processes + in_aval = Source(data=spike_inp_in_aval) + out_aval = Source(data=spike_inp_out_aval) + + nvl_det = NoveltyDetector(t_wait=t_wait) + monitor = Monitor() + + # Connections + in_aval.s_out.connect(nvl_det.input_aval_in) + out_aval.s_out.connect(nvl_det.output_aval_in) + monitor.probe(target=nvl_det.novelty_detected_out, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + in_aval.run(condition=run_cond, run_cfg=run_cfg) + + result = monitor.get_data() + result = result[nvl_det.name][nvl_det.novelty_detected_out.name].T + + in_aval.stop() + # Validate the novelty detection output + expected_result = np.zeros((1, t_run)) + np.testing.assert_array_almost_equal(result, expected_result) + + def test_novelty_detection_if_output_comes_too_late(self): + # Params + t_wait = 10 + t_run = 20 + + # Input spikes + spike_inp_in_aval = np.zeros((1, t_run)) + spike_inp_out_aval = np.zeros((1, t_run)) + + spike_inp_in_aval[0, 3] = 1 + spike_inp_out_aval[0, 15] = 1 + # Processes + in_aval = Source(data=spike_inp_in_aval) + out_aval = Source(data=spike_inp_out_aval) + + nvl_det = NoveltyDetector(t_wait=t_wait) + monitor = Monitor() + + # Connections + in_aval.s_out.connect(nvl_det.input_aval_in) + out_aval.s_out.connect(nvl_det.output_aval_in) + monitor.probe(target=nvl_det.novelty_detected_out, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + in_aval.run(condition=run_cond, run_cfg=run_cfg) + + result = monitor.get_data() + result = result[nvl_det.name][nvl_det.novelty_detected_out.name].T + + in_aval.stop() + # Validate the novelty detection output + expected_result = np.zeros((1, t_run)) + expected_result[0, 14] = 1 + np.testing.assert_array_almost_equal(result, expected_result) diff --git a/tests/lava/proc/clp/nsm/__init__.py b/tests/lava/proc/clp/nsm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lava/proc/clp/nsm/test_nsm.py b/tests/lava/proc/clp/nsm/test_nsm.py new file mode 100644 index 000000000..adf84b8a9 --- /dev/null +++ b/tests/lava/proc/clp/nsm/test_nsm.py @@ -0,0 +1,236 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import unittest +import numpy as np + +from lava.magma.core.run_configs import Loihi2SimCfg +from lava.magma.core.run_conditions import RunSteps +from lava.proc.clp.nsm.process import Readout +from lava.proc.clp.nsm.process import Allocator +from lava.proc.io.source import RingBuffer as Source +from lava.proc.monitor.process import Monitor + + +class TestReadoutPyModel(unittest.TestCase): + def test_pseudo_labeling_if_no_user_label(self): + # Params + n_protos = 2 + t_run = 20 + + # Input spikes + s_infer_in = np.zeros((n_protos, t_run)) + s_label_in = np.zeros((1, t_run)) + + # Prototype 0 is active at t=3 + s_infer_in[0, 3] = 1 + + # Processes + infer_in = Source(data=s_infer_in) + label_in = Source(data=s_label_in) + + # Readout process is initialized without any labels, so they are all + # zero + readout_layer = Readout(n_protos=n_protos) + monitor = Monitor() + + # Connections + infer_in.s_out.connect(readout_layer.inference_in) + label_in.s_out.connect(readout_layer.label_in) + monitor.probe(target=readout_layer.user_output, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + infer_in.run(condition=run_cond, run_cfg=run_cfg) + + result = monitor.get_data() + result = result[readout_layer.name][readout_layer.user_output.name].T + + infer_in.stop() + # Validate the user outputs + expected_result = np.zeros((1, t_run)) + expected_result[0, 3] = -1 # we expect a pseudo-label + np.testing.assert_array_almost_equal(result, expected_result) + + def test_readout_from_labelled_winner(self): + # Params + n_protos = 2 + t_run = 20 + + # Input spikes + s_infer_in = np.zeros((n_protos, t_run)) + s_label_in = np.zeros((1, t_run)) + + # Prototype 0 is active at t=3 + s_infer_in[0, 3] = 1 + + # Processes + infer_in = Source(data=s_infer_in) + label_in = Source(data=s_label_in) + + # Prototype 0 has the label "1", Prototype 1 has no label, because + # the proto_labels are initialized with [1,0] + readout_layer = Readout(n_protos=n_protos, + proto_labels=np.array([1, 0])) + monitor = Monitor() + + # Connections + infer_in.s_out.connect(readout_layer.inference_in) + label_in.s_out.connect(readout_layer.label_in) + monitor.probe(target=readout_layer.user_output, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + infer_in.run(condition=run_cond, run_cfg=run_cfg) + + result = monitor.get_data() + result = result[readout_layer.name][readout_layer.user_output.name].T + + infer_in.stop() + # Validate the user outputs + expected_result = np.zeros((1, t_run)) + expected_result[0, 3] = 1 # We expect the label "1" + np.testing.assert_array_almost_equal(result, expected_result) + + def test_labelling_a_pseudo_labelled_winner(self): + # Params + n_protos = 2 + t_run = 20 + + # Input spikes + s_infer_in = np.zeros((n_protos, t_run)) + s_label_in = np.zeros((1, t_run)) + + s_infer_in[0, 3] = 1 # Winner is the 0'th prototype + s_label_in[0, 6] = 2 # Label provided by the user + + # Processes + infer_in = Source(data=s_infer_in) + label_in = Source(data=s_label_in) + + readout_layer = Readout(n_protos=n_protos, + proto_labels=np.array([-1, 0])) + monitor = Monitor() + + # Connections + infer_in.s_out.connect(readout_layer.inference_in) + label_in.s_out.connect(readout_layer.label_in) + monitor.probe(target=readout_layer.user_output, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + infer_in.run(condition=run_cond, run_cfg=run_cfg) + + result = monitor.get_data() + result = result[readout_layer.name][readout_layer.user_output.name].T + + infer_in.stop() + # Validate the novelty detection output + expected_result = np.zeros((1, t_run)) + + # The predicted label is first -1, but then it is updated by the + # user-provided label + expected_result[0, 3] = -1 + expected_result[0, 6] = 2 + + np.testing.assert_array_almost_equal(result, expected_result) + + def test_feedback_and_allocation_output(self): + # Params + n_protos = 2 + t_run = 20 + + # Input spikes + s_infer_in = np.zeros((n_protos, t_run)) + s_label_in = np.zeros((1, t_run)) + + s_infer_in[0, [3, 12]] = 1 + s_label_in[0, 6] = 2 # Label that match the inference + s_label_in[0, 17] = 1 # Label that does not match the inference + # Processes + infer_in = Source(data=s_infer_in) + label_in = Source(data=s_label_in) + + readout_layer = Readout(n_protos=n_protos, + proto_labels=np.array([2, 0])) + monitor_fb = Monitor() + monitor_alloc = Monitor() + + # Connections + infer_in.s_out.connect(readout_layer.inference_in) + label_in.s_out.connect(readout_layer.label_in) + + monitor_fb.probe(target=readout_layer.feedback, num_steps=t_run) + monitor_alloc.probe(target=readout_layer.trigger_alloc, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + infer_in.run(condition=run_cond, run_cfg=run_cfg) + + result_fb = monitor_fb.get_data() + result_fb = result_fb[readout_layer.name][readout_layer.feedback.name].T + + result_alloc = monitor_alloc.get_data() + result_alloc = result_alloc[readout_layer.name][ + readout_layer.trigger_alloc.name].T + + infer_in.stop() + # Validate the novelty detection output + expected_fb = np.zeros(shape=(1, t_run)) + expected_fb[0, 6] = 1 # We expect a match for the first input + expected_fb[0, 17] = -1 # We expect mismatch for the second input + np.testing.assert_array_equal(result_fb, expected_fb) + + expected_alloc = np.zeros(shape=(1, t_run)) + # We expect allocation trigger output when there is a mismatch + expected_alloc[0, 17] = 1 + np.testing.assert_array_equal(result_alloc, expected_alloc) + + +class TestAllocatorPyModel(unittest.TestCase): + def test_allocator_output(self): + # Params + n_protos = 4 + t_run = 10 + + # Input spikes + s_trigger_in = np.zeros((1, t_run)) + + s_trigger_in[0, [3, 7]] = [1, 1] # Trigger input + # Processes + alloc_trigger_in = Source(data=s_trigger_in) + + allocator = Allocator(n_protos=n_protos) + monitor_alloc = Monitor() + + # Connections + alloc_trigger_in.s_out.connect(allocator.trigger_in) + + monitor_alloc.probe(target=allocator.allocate_out, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + allocator.run(condition=run_cond, run_cfg=run_cfg) + + result_alloc = monitor_alloc.get_data() + result_alloc = result_alloc[allocator.name][ + allocator.allocate_out.name].T + + allocator.stop() + + # Validate the allocation output + expected_alloc = np.zeros(shape=(1, t_run)) + expected_alloc[0, 3] = 1 + expected_alloc[0, 7] = 2 + np.testing.assert_array_equal(result_alloc, expected_alloc) diff --git a/tests/lava/proc/clp/prototype_lif/__init__.py b/tests/lava/proc/clp/prototype_lif/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lava/proc/clp/prototype_lif/test_models.py b/tests/lava/proc/clp/prototype_lif/test_models.py new file mode 100644 index 000000000..2c685cad0 --- /dev/null +++ b/tests/lava/proc/clp/prototype_lif/test_models.py @@ -0,0 +1,244 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import unittest +import numpy as np + +from lava.magma.core.run_configs import Loihi2SimCfg +from lava.magma.core.run_conditions import RunSteps +from lava.proc.clp.prototype_lif.process import PrototypeLIF +from lava.proc.dense.process import Dense +from lava.proc.io.source import RingBuffer as Source +from lava.proc.monitor.process import Monitor + + +class TestPrototypeLIFBitAccModel(unittest.TestCase): + def test_s_out_bap_corresponds_to_presence_of_third_factor_in(self): + # Params + n_protos = 2 + t_run = 20 + + # The array for 3rd factor input spike times + s_third_factor_in = np.zeros((n_protos, t_run)) + + # Inject a 3rd factor signal at the t=3, with id=1, hence targeting + # the first prototype + s_third_factor_in[:, 3] = [1, 1] + + # Processes + # 3rd factor source input process (RingBuffer) + third_factor_input = Source(data=s_third_factor_in) + + # Prototype Lif Process + prototypes = PrototypeLIF(du=4095, + dv=4095, + bias_mant=0, + bias_exp=0, + vth=1, + shape=(n_protos,), + name='lif_prototypes', + ) + + monitor = Monitor() + + # Connections + third_factor_input.s_out.connect(prototypes.a_third_factor_in) + # Probe the bAP signal of the neurons + monitor.probe(target=prototypes.s_out_bap, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + monitor.run(condition=run_cond, run_cfg=run_cfg) + + # Get results + result = monitor.get_data() + result = result[prototypes.name][prototypes.s_out_bap.name].T + + monitor.stop() + + # Validate the bAP signal + expected_result = np.zeros((n_protos, t_run)) + expected_result[0, 3] = 1 + np.testing.assert_array_almost_equal(result, expected_result) + + def test_y1_is_equal_to_third_factor_in_times_learning_rate(self): + # Params + n_protos = 2 + t_run = 20 + + # The array for 3rd factor input spike times + s_third_factor_in = np.zeros((n_protos, t_run)) + + # Inject a 3rd factor signal at the t=3, with id=1, hence targeting + # the first prototype + s_third_factor_in[:, 3] = [1, 1] + + # Processes + # 3rd factor source input process (RingBuffer) + third_factor_input = Source(data=s_third_factor_in) + + # Prototype Lif Process + prototypes = PrototypeLIF(du=4095, + dv=4095, + bias_mant=0, + bias_exp=0, + vth=1, + shape=(n_protos,), + name='lif_prototypes', + ) + monitor = Monitor() + + # Connections + third_factor_input.s_out.connect(prototypes.a_third_factor_in) + + # Probe the y1 (post-synaptic trace) + monitor.probe(target=prototypes.s_out_y1, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + monitor.run(condition=run_cond, run_cfg=run_cfg) + + # Get the probed data + result = monitor.get_data() + result = result[prototypes.name][prototypes.s_out_y1.name].T + + monitor.stop() + # Validate the post-synaptic trace: it gets updated to the value of + # the 3rd factor signal and then stays same + expected_result = np.zeros((n_protos, t_run)) + expected_result[0, 3:] = 127 + np.testing.assert_array_almost_equal(result, expected_result) + + def test_neuron_outputs_spike_if_received_3rd_factor(self): + # Params + n_protos = 2 + t_run = 20 + + # The array for 3rd factor input spike times + s_third_factor_in = np.zeros((n_protos, t_run)) + + # Inject a 3rd factor signal at the t=3, with id=1, hence targeting + # the first prototype + s_third_factor_in[:, 3] = [1, 1] + + # Processes + # 3rd factor source input process (RingBuffer) + third_factor_input = Source(data=s_third_factor_in) + + # Prototype Lif Process + prototypes = PrototypeLIF(du=4095, + dv=4095, + bias_mant=0, + bias_exp=0, + vth=1, + shape=(n_protos,), + name='lif_prototypes', + ) + monitor = Monitor() + + # Connections + third_factor_input.s_out.connect(prototypes.a_third_factor_in) + + # Probe the y1 (post-synaptic trace) + monitor.probe(target=prototypes.s_out, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + monitor.run(condition=run_cond, run_cfg=run_cfg) + + # Get the probed data + result = monitor.get_data() + result = result[prototypes.name][prototypes.s_out.name].T + + monitor.stop() + # Validate the post-synaptic trace: it gets updated to the value of + # the 3rd factor signal and then stays same + expected_result = np.zeros((n_protos, t_run)) + expected_result[0, 3] = 1 + np.testing.assert_array_equal(result, expected_result) + + def test_neuron_vars_reset_when_reset_spike_received(self): + # Params + n_protos = 2 + n_features = 2 + t_run = 20 + b_fraction = 7 + + # PrototypeLIF neural dynamics parameters + du = 50 + dv = 700 + vth = 48000 + + # No pattern is stored yet. None of the prototypes are allocated + weights_proto = np.array([[0.6, 0.8], [0.8, 0.6]]) + weights_proto = weights_proto * 2 ** b_fraction + + # The graded spike array for input + s_pattern_inp = np.zeros((n_features, t_run)) + # Original input pattern + inp_pattern = np.array([[0.6, 0.8], [0.8, 0.6]]) + # Normalize the input pattern + inp_pattern = inp_pattern / np.expand_dims(np.linalg.norm( + inp_pattern, axis=1), axis=1) + # Convert this to 8-bit fixed-point pattern + inp_pattern = (inp_pattern * 2 ** b_fraction).astype(np.int32) + # And inject it at the t=3 + s_pattern_inp[:, 3] = inp_pattern[0, :] + s_pattern_inp[:, 13] = inp_pattern[1, :] + + # Processes + data_input = Source(data=s_pattern_inp) + + # Prototype Lif Process + prototypes = PrototypeLIF(du=du, + dv=dv, + bias_mant=0, + bias_exp=0, + vth=vth, + shape=(n_protos,), + name='lif_prototypes', + ) + + dense_proto = Dense(weights=weights_proto, num_message_bits=8) + + # WTA weights and Dense proc to be put on Prototype population + dense_wta = Dense(weights=np.ones(shape=(n_protos, n_protos))) + + monitor = Monitor() + + # Connections + data_input.s_out.connect(dense_proto.s_in) + dense_proto.a_out.connect(prototypes.a_in) + + # WTA of prototypes + prototypes.s_out.connect(dense_wta.s_in) + dense_wta.a_out.connect(prototypes.reset_in) + + # Probe the y1 (post-synaptic trace) + monitor.probe(target=prototypes.s_out, num_steps=t_run) + + # Run + run_cond = RunSteps(num_steps=t_run) + run_cfg = Loihi2SimCfg(select_tag="fixed_pt") + + monitor.run(condition=run_cond, run_cfg=run_cfg) + + # Get the probed data + result = monitor.get_data() + result = result[prototypes.name][prototypes.s_out.name].T + + monitor.stop() + # Validate the post-synaptic trace: it gets updated to the value of + # the 3rd factor signal and then stays same + expected_result = np.zeros((n_protos, t_run)) + expected_result[0, 7] = 1 + expected_result[1, 17] = 1 + + np.testing.assert_array_equal(result, expected_result) diff --git a/tests/lava/proc/conv/test_utils.py b/tests/lava/proc/conv/test_utils.py index ac1e8b2f5..8481eff92 100644 --- a/tests/lava/proc/conv/test_utils.py +++ b/tests/lava/proc/conv/test_utils.py @@ -18,29 +18,28 @@ # in this case, the test only checks for error during # utils.conv calculation -np.random.seed(8534) - class TestConv(unittest.TestCase): def test_conv(self) -> None: """Test convolution implementation""" + prng = np.random.RandomState(8534) for _ in range(10): # testing with 10 random combinations - groups = np.random.randint(4) + 1 - in_channels = (np.random.randint(8) + 1) * groups - out_channels = (np.random.randint(8) + 1) * groups - kernel_size = np.random.randint([9, 9]) + 1 - stride = np.random.randint([5, 5]) + 1 - padding = np.random.randint([5, 5]) - dilation = np.random.randint([4, 4]) + 1 + groups = prng.randint(4) + 1 + in_channels = (prng.randint(8) + 1) * groups + out_channels = (prng.randint(8) + 1) * groups + kernel_size = prng.randint([9, 9]) + 1 + stride = prng.randint([5, 5]) + 1 + padding = prng.randint([5, 5]) + dilation = prng.randint([4, 4]) + 1 weight_dims = [out_channels, kernel_size[0], kernel_size[1], in_channels // groups] - weights = np.random.randint(256, size=weight_dims) - 128 - input_ = np.random.random( + weights = prng.randint(256, size=weight_dims) - 128 + input_ = prng.random( ( # input needs to be a certain size # to make sure the output dimension is never negative - np.random.randint([128, 128]) + kernel_size * dilation + prng.randint([128, 128]) + kernel_size * dilation ).tolist() + [in_channels] ) @@ -89,6 +88,7 @@ def test_conv(self) -> None: def test_conv_saved_data(self) -> None: """Test convolution implementation against saved data.""" + prng = np.random.RandomState(8534) for i in range(10): # testing with 10 random combinations gt_data = np.load(os.path.dirname(os.path.abspath(__file__)) + f'/ground_truth/gt_conv_paris_{i}.npz') @@ -111,23 +111,24 @@ def test_conv_saved_data(self) -> None: def test_conv_to_sparse(self) -> None: """Tests translation of a conv to a sparse layer""" + prng = np.random.RandomState(8534) for _ in range(10): - groups = np.random.randint(4) + 1 - in_channels = (np.random.randint(4) + 1) * groups - out_channels = (np.random.randint(8) + 1) * groups - kernel_size = np.random.randint([5, 5]) + 1 - stride = np.random.randint([2, 2]) + 1 - padding = np.random.randint([2, 2]) - dilation = np.random.randint([2, 2]) + 1 + groups = prng.randint(4) + 1 + in_channels = (prng.randint(4) + 1) * groups + out_channels = (prng.randint(8) + 1) * groups + kernel_size = prng.randint([5, 5]) + 1 + stride = prng.randint([2, 2]) + 1 + padding = prng.randint([2, 2]) + dilation = prng.randint([2, 2]) + 1 weight_dims = [out_channels, kernel_size[0], kernel_size[1], in_channels // groups] - weights = np.random.randint(256, size=weight_dims) - 128 - input = np.random.random( + weights = prng.randint(256, size=weight_dims) - 128 + input = prng.random( ( # input needs to be a certain size # to make sure the output dimension is never negative - np.random.randint([16, 16]) + kernel_size * dilation + prng.randint([16, 16]) + kernel_size * dilation ).tolist() + [in_channels] )