diff --git a/src/lava/proc/graded/models.py b/src/lava/proc/graded/models.py new file mode 100644 index 000000000..b87f925a9 --- /dev/null +++ b/src/lava/proc/graded/models.py @@ -0,0 +1,179 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import numpy as np + +from lava.magma.core.sync.protocols.loihi_protocol import LoihiProtocol +from lava.magma.core.model.py.ports import PyInPort, PyOutPort +from lava.magma.core.model.py.type import LavaPyType +from lava.magma.core.resources import CPU +from lava.magma.core.decorator import implements, requires, tag +from lava.magma.core.model.py.model import PyLoihiProcessModel + +from lava.proc.graded.process import GradedVec, NormVecDelay, InvSqrt + + +class AbstractGradedVecModel(PyLoihiProcessModel): + """Implementation of GradedVec""" + + a_in = None + s_out = None + v = None + vth = None + exp = None + + def run_spk(self) -> None: + """The run function that performs the actual computation during + execution orchestrated by a PyLoihiProcessModel using the + LoihiProtocol. + """ + a_in_data = self.a_in.recv() + self.v += a_in_data + + is_spike = np.abs(self.v) > self.vth + sp_out = self.v * is_spike + + self.v[:] = 0 + + self.s_out.send(sp_out) + + +@implements(proc=GradedVec, protocol=LoihiProtocol) +@requires(CPU) +@tag('fixed_pt') +class PyGradedVecModelFixed(AbstractGradedVecModel): + """Fixed point implementation of GradedVec""" + a_in = LavaPyType(PyInPort.VEC_DENSE, np.int32, precision=24) + s_out = LavaPyType(PyOutPort.VEC_DENSE, np.int32, precision=24) + vth: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + v: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + exp: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + + +@implements(proc=NormVecDelay, protocol=LoihiProtocol) +@requires(CPU) +@tag('fixed_pt') +class NormVecDelayModel(PyLoihiProcessModel): + """Implementation of NormVecDelay. This process is typically part of + a network for normalization. + """ + a_in1 = LavaPyType(PyInPort.VEC_DENSE, np.int32, precision=24) + a_in2 = LavaPyType(PyInPort.VEC_DENSE, np.int32, precision=24) + s_out = LavaPyType(PyOutPort.VEC_DENSE, np.int32, precision=24) + s2_out = LavaPyType(PyOutPort.VEC_DENSE, np.int32, precision=24) + + vth: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + exp: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + v: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + v2: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + + def run_spk(self) -> None: + """The run function that performs the actual computation during + execution orchestrated by a PyLoihiProcessModel using the + LoihiProtocol. + """ + a_in_data1 = self.a_in1.recv() + a_in_data2 = self.a_in2.recv() + + vsq = a_in_data1 ** 2 + self.s2_out.send(vsq) + + self.v2 = self.v + self.v = a_in_data1 + + output = self.v2 * a_in_data2 + + is_spike = np.abs(output) > self.vth + sp_out = output * is_spike + + self.s_out.send(sp_out) + + +@implements(proc=InvSqrt, protocol=LoihiProtocol) +@requires(CPU) +@tag('float') +class InvSqrtModelFloat(PyLoihiProcessModel): + """Implementation of InvSqrt in floating point""" + a_in = LavaPyType(PyInPort.VEC_DENSE, float) + s_out = LavaPyType(PyOutPort.VEC_DENSE, float) + + fp_base: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + + def run_spk(self) -> None: + """The run function that performs the actual computation during + execution orchestrated by a PyLoihiProcessModel using the + LoihiProtocol. + """ + a_in_data = self.a_in.recv() + sp_out = 1 / (a_in_data ** 0.5) + + self.s_out.send(sp_out) + + +def make_fpinv_table(fp_base): + """ + Creates the table for fp inverse algorithm. + """ + n_bits = 24 + B = 2**fp_base + + Y_est = np.zeros((n_bits), dtype='int') + n_adj = 1.238982962 + + for m in range(n_bits): # span the 24 bits, negate the decimal base + Y_est[n_bits - m - 1] = 2 * int(B / (2**((m - fp_base) / 2) * n_adj)) + + return Y_est + + +def clz(val): + """ + Count lead zeros. + """ + return (24 - (int(np.log2(val)) + 1)) + + +def inv_sqrt(s_fp, n_iters=5, b_fraction=12): + """ + Runs the fixed point inverse square root algorithm + """ + Y_est = make_fpinv_table(b_fraction) + m = clz(s_fp) + b_i = int(s_fp) + Y_i = Y_est[m] + y_i = Y_i // 2 + + for _ in range(n_iters): + b_i = np.right_shift(np.right_shift(b_i * Y_i, + b_fraction + 1) * Y_i, + b_fraction + 1) + Y_i = np.left_shift(3, b_fraction) - b_i + y_i = np.right_shift(y_i * Y_i, b_fraction + 1) + + return y_i + + +@implements(proc=InvSqrt, protocol=LoihiProtocol) +@requires(CPU) +@tag('fixed_pt') +class InvSqrtModelFP(PyLoihiProcessModel): + """Implementation of InvSqrt in fixed point""" + + a_in = LavaPyType(PyInPort.VEC_DENSE, np.int32, precision=24) + s_out = LavaPyType(PyOutPort.VEC_DENSE, np.int32, precision=24) + fp_base: np.ndarray = LavaPyType(np.ndarray, np.int32, precision=24) + + def run_spk(self) -> None: + """The run function that performs the actual computation during + execution orchestrated by a PyLoihiProcessModel using the + LoihiProtocol. + """ + a_in_data = self.a_in.recv() + + if np.any(a_in_data) == 0: + sp_out = 0 * a_in_data + else: + sp_out = np.array([inv_sqrt(a_in_data, 5)]) + + self.s_out.send(sp_out) diff --git a/src/lava/proc/graded/process.py b/src/lava/proc/graded/process.py new file mode 100644 index 000000000..6c5943a46 --- /dev/null +++ b/src/lava/proc/graded/process.py @@ -0,0 +1,144 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import numpy as np +import typing as ty + +from lava.magma.core.process.process import AbstractProcess +from lava.magma.core.process.variable import Var +from lava.magma.core.process.ports.ports import InPort, OutPort + + +def loihi2round(vv): + """ + Round values in numpy array the way loihi 2 + performs rounding/truncation. + """ + return np.fix(vv + (vv > 0) - 0.5).astype('int') + + +class GradedVec(AbstractProcess): + """GradedVec + Graded spike vector layer. Transmits accumulated input as + graded spike with no dynamics. + + v[t] = a_in + s_out = v[t] * (v[t] > vth) + + Parameters + ---------- + shape: tuple(int) + number and topology of neurons + vth: int + threshold for spiking + exp: int + fixed point base + """ + + def __init__( + self, + shape: ty.Tuple[int, ...], + vth: ty.Optional[int] = 1, + exp: ty.Optional[int] = 0) -> None: + + super().__init__(shape=shape) + + self.a_in = InPort(shape=shape) + self.s_out = OutPort(shape=shape) + + self.v = Var(shape=shape, init=0) + self.vth = Var(shape=(1,), init=vth) + self.exp = Var(shape=(1,), init=exp) + + @property + def shape(self) -> ty.Tuple[int, ...]: + """Return shape of the Process.""" + return self.proc_params['shape'] + + +class NormVecDelay(AbstractProcess): + """NormVec + Normalizable graded spike vector. Used in conjunction with + InvSqrt process to create normalization layer. + + When configured with InvSqrt, the process will output a normalized + vector with graded spike values. The output is delayed by 2 timesteps. + + NormVecDelay has two input and two output channels. The process + outputs the square input values on the second channel to the InvSqrt + neuron. The process waits two timesteps to receive the inverse + square root value returned by the InvSqrt process. The value received + on the second input channel is multiplied by the primary input + value, and the result is output on the primary output channel. + + v[t] = a_in1 + s2_out = v[t] ** 2 + s_out = v[t-2] * a_in2 + + Parameters + ---------- + shape: tuple(int) + number and topology of neurons + vth: int + threshold for spiking + exp: int + fixed point base + """ + + def __init__( + self, + shape: ty.Tuple[int, ...], + vth: ty.Optional[int] = 1, + exp: ty.Optional[int] = 0) -> None: + + super().__init__(shape=shape) + + self.a_in1 = InPort(shape=shape) + self.a_in2 = InPort(shape=shape) + + self.s_out = OutPort(shape=shape) + self.s2_out = OutPort(shape=shape) + + self.vth = Var(shape=(1,), init=vth) + self.exp = Var(shape=(1,), init=exp) + + self.v = Var(shape=shape, init=np.zeros(shape, 'int32')) + self.v2 = Var(shape=shape, init=np.zeros(shape, 'int32')) + + @property + def shape(self) -> ty.Tuple[int, ...]: + """Return shape of the Process.""" + return self.proc_params['shape'] + + +class InvSqrt(AbstractProcess): + """InvSqrt + Neuron model for computing inverse square root with 24-bit + fixed point values. Designed to be used in conjunction with + NormVecDelay. + + v[t] = a_in + s_out = 1 / sqrt(v[t]) + + Parameters + ---------- + fp_base : int + Base of the fixed-point representation + """ + + def __init__( + self, + shape: ty.Tuple[int, ...], + fp_base: ty.Optional[int] = 12) -> None: + super().__init__(shape=shape) + + # base of the decimal point + self.fp_base = Var(shape=(1,), init=fp_base) + self.a_in = InPort(shape=shape) + self.s_out = OutPort(shape=shape) + + @property + def shape(self) -> ty.Tuple[int, ...]: + """Return shape of the Process.""" + return self.proc_params['shape'] diff --git a/tests/lava/proc/graded/test_graded.py b/tests/lava/proc/graded/test_graded.py new file mode 100644 index 000000000..b4c4b5027 --- /dev/null +++ b/tests/lava/proc/graded/test_graded.py @@ -0,0 +1,273 @@ +# Copyright (C) 2023 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ + +import unittest +import numpy as np +from scipy.sparse import csr_matrix + +from lava.proc.graded.process import GradedVec, NormVecDelay, InvSqrt +from lava.proc.graded.models import inv_sqrt +from lava.proc.dense.process import Dense +from lava.proc.sparse.process import Sparse +from lava.proc import io + +from lava.magma.core.run_conditions import RunSteps +from lava.magma.core.run_configs import Loihi2SimCfg + + +class TestGradedVecProc(unittest.TestCase): + """Tests for GradedVec""" + + def test_gradedvec_dot_dense(self): + """Tests that GradedVec and Dense computes dot product""" + num_steps = 10 + v_thresh = 1 + + weights1 = np.zeros((10, 1)) + weights1[:, 0] = (np.arange(10) - 5) * 0.2 + + inp_data = np.zeros((weights1.shape[1], num_steps)) + inp_data[:, 2] = 1000 + inp_data[:, 6] = 20000 + + weight_exp = 7 + weights1 *= 2**weight_exp + weights1 = weights1.astype('int') + + dense1 = Dense(weights=weights1, num_message_bits=24, + weight_exp=-weight_exp) + vec1 = GradedVec(shape=(weights1.shape[0],), + vth=v_thresh) + + generator = io.source.RingBuffer(data=inp_data) + logger = io.sink.RingBuffer(shape=(weights1.shape[0],), + buffer=num_steps) + + generator.s_out.connect(dense1.s_in) + dense1.a_out.connect(vec1.a_in) + vec1.s_out.connect(logger.a_in) + + vec1.run(condition=RunSteps(num_steps=num_steps), + run_cfg=Loihi2SimCfg(select_tag='fixed_pt')) + out_data = logger.data.get().astype('int') + vec1.stop() + + ww = np.floor(weights1 / 2) * 2 + expected_out = np.floor((ww @ inp_data) / 2**weight_exp) + + self.assertTrue(np.all(out_data[:, (3, 7)] == expected_out[:, (2, 6)])) + + def test_gradedvec_dot_sparse(self): + """Tests that GradedVec and Dense computes dot product""" + num_steps = 10 + v_thresh = 1 + + weights1 = np.zeros((10, 1)) + weights1[:, 0] = (np.arange(10) - 5) * 0.2 + + inp_data = np.zeros((weights1.shape[1], num_steps)) + inp_data[:, 2] = 1000 + inp_data[:, 6] = 20000 + + weight_exp = 7 + weights1 *= 2**weight_exp + weights1 = weights1.astype('int') + + sparse1 = Sparse(weights=csr_matrix(weights1), + num_message_bits=24, + weight_exp=-weight_exp) + vec1 = GradedVec(shape=(weights1.shape[0],), + vth=v_thresh) + + generator = io.source.RingBuffer(data=inp_data) + logger = io.sink.RingBuffer(shape=(weights1.shape[0],), + buffer=num_steps) + + generator.s_out.connect(sparse1.s_in) + sparse1.a_out.connect(vec1.a_in) + vec1.s_out.connect(logger.a_in) + + vec1.run(condition=RunSteps(num_steps=num_steps), + run_cfg=Loihi2SimCfg(select_tag='fixed_pt')) + out_data = logger.data.get().astype('int') + vec1.stop() + + ww = np.floor(weights1 / 2) * 2 + expected_out = np.floor((ww @ inp_data) / 2**weight_exp) + + self.assertTrue(np.all(out_data[:, (3, 7)] == expected_out[:, (2, 6)])) + + +class TestInvSqrtProc(unittest.TestCase): + """Tests for inverse square process.""" + + def test_invsqrt_calc(self): + """Checks the InvSqrt calculation""" + fp_base = 12 # base of the decimal point + + num_steps = 25 + weights1 = np.zeros((1, 1)) + weights1[0, 0] = 1 + weight_exp = 7 + + weights1 *= 2**weight_exp + weights1 = weights1.astype('int') + + in_vals = [2**(i % 24) for i in range(num_steps)] + + inp_data = np.zeros((weights1.shape[1], num_steps)) + + for i in range(num_steps): + inp_data[:, i] = in_vals[i] + + dense1 = Dense(weights=weights1, + num_message_bits=24, + weight_exp=-weight_exp) + vec1 = InvSqrt(shape=(weights1.shape[0],), + fp_base=fp_base) + + generator = io.source.RingBuffer(data=inp_data) + logger = io.sink.RingBuffer(shape=(weights1.shape[0],), + buffer=num_steps) + + generator.s_out.connect(dense1.s_in) + dense1.a_out.connect(vec1.a_in) + vec1.s_out.connect(logger.a_in) + + vec1.run(condition=RunSteps(num_steps=num_steps), + run_cfg=Loihi2SimCfg(select_tag='fixed_pt')) + out_data = logger.data.get().astype('int') + vec1.stop() + + expected_out = np.array([inv_sqrt(inp_data[0, i], 5) + for i in range(num_steps)]) + + self.assertTrue(np.all(expected_out[:-1] == out_data[:, 1:])) + + +class TestNormVecDelayProc(unittest.TestCase): + """Tests for NormVecDelay""" + + def test_norm_vec_delay_out1(self): + """Checks the first channel output of NormVecDelay""" + weight_exp = 7 + num_steps = 10 + + weights1 = np.zeros((1, 1)) + weights1[0, 0] = 1 + + weights1 *= 2**weight_exp + weights1 = weights1.astype('int') + + weights2 = np.zeros((1, 1)) + weights2[0, 0] = 0.5 + + weights2 *= 2**weight_exp + weights2 = weights2.astype('int') + + inp_data1 = np.zeros((weights1.shape[1], num_steps)) + inp_data2 = np.zeros((weights2.shape[1], num_steps)) + + inp_data1[:, 2] = 10 + inp_data1[:, 6] = 30 + inp_data2[:, :] = 20 + + dense1 = Dense(weights=weights1, + num_message_bits=24, + weight_exp=-weight_exp) + dense2 = Dense(weights=weights2, + num_message_bits=24, + weight_exp=-weight_exp) + + vec1 = NormVecDelay(shape=(weights1.shape[0],)) + + generator1 = io.source.RingBuffer(data=inp_data1) + generator2 = io.source.RingBuffer(data=inp_data2) + logger = io.sink.RingBuffer(shape=(weights1.shape[0],), + buffer=num_steps) + + generator1.s_out.connect(dense1.s_in) + dense1.a_out.connect(vec1.a_in1) + + generator2.s_out.connect(dense2.s_in) + dense2.a_out.connect(vec1.a_in2) + + vec1.s_out.connect(logger.a_in) + + vec1.run(condition=RunSteps(num_steps=num_steps), + run_cfg=Loihi2SimCfg(select_tag='fixed_pt')) + out_data = logger.data.get().astype('int') + vec1.stop() + + ch1 = (weights1 @ inp_data1) / 2**weight_exp + ch2 = (weights2 @ inp_data2) / 2**weight_exp + + # I'm using roll to account for the two step delay but hacky + # be careful if inputs change + # hmm.. there seems to be a delay step missing compared to + # ncmodel, not sure where the delay should go... + expected_out = np.roll(ch1, 1) * ch2 + + # Then there is one extra timestep from hardware + self.assertTrue(np.all(expected_out[:, :-1] == out_data[:, 1:])) + + def test_norm_vec_delay_out2(self): + """Checks the second channel output of NormVecDelay""" + weight_exp = 7 + num_steps = 10 + + weights1 = np.zeros((1, 1)) + weights1[0, 0] = 1 + + weights1 *= 2**weight_exp + weights1 = weights1.astype('int') + + weights2 = np.zeros((1, 1)) + weights2[0, 0] = 0.5 + + weights2 *= 2**weight_exp + weights2 = weights2.astype('int') + + inp_data1 = np.zeros((weights1.shape[1], num_steps)) + inp_data2 = np.zeros((weights2.shape[1], num_steps)) + + inp_data1[:, 2] = 10 + inp_data1[:, 6] = 30 + inp_data2[:, :] = 20 + + dense1 = Dense(weights=weights1, + num_message_bits=24, + weight_exp=-weight_exp) + dense2 = Dense(weights=weights2, + num_message_bits=24, + weight_exp=-weight_exp) + + vec1 = NormVecDelay(shape=(weights1.shape[0],)) + + generator1 = io.source.RingBuffer(data=inp_data1) + generator2 = io.source.RingBuffer(data=inp_data2) + logger = io.sink.RingBuffer(shape=(weights1.shape[0],), + buffer=num_steps) + + generator1.s_out.connect(dense1.s_in) + dense1.a_out.connect(vec1.a_in1) + + generator2.s_out.connect(dense2.s_in) + dense2.a_out.connect(vec1.a_in2) + + vec1.s2_out.connect(logger.a_in) + + vec1.run(condition=RunSteps(num_steps=num_steps), + run_cfg=Loihi2SimCfg(select_tag='fixed_pt')) + out_data = logger.data.get().astype('int') + vec1.stop() + + ch1 = (weights1 @ inp_data1) / 2**weight_exp + expected_out = ch1 ** 2 + # then there is one extra timestep from hardware + self.assertTrue(np.all(expected_out[:, :-1] == out_data[:, 1:])) + + +if __name__ == '__main__': + unittest.main()