Skip to content

Commit

Permalink
Factory function that converts PyLoihi process models to PyAsync mode…
Browse files Browse the repository at this point in the history
…ls on the fly (#595)

* update refport unittest to always wait when it writes to port for consistent behavior

Signed-off-by: bamsumit <[email protected]>

* Removed pyproject changes

Signed-off-by: bamsumit <[email protected]>

* Fix to convolution tests. Fixed imcompatible mnist_pretrained for old python versions.

Signed-off-by: bamsumit <[email protected]>

* Missing moudle parent fix

Signed-off-by: bamsumit <[email protected]>

* Added ConvVarModel

* Loihi 2 Async implemented

Signed-off-by: bamsumit <[email protected]>

* Removed vestigial run_spk in LIF models

Signed-off-by: bamsumit <[email protected]>

* unittest added

Signed-off-by: bamsumit <[email protected]>

* Forcing multiprocessing to always fork to avoid pikcling error  on Windows

Signed-off-by: bamsumit <[email protected]>

* Forcing multiprocessing to always fork to avoid pikcling error on Windows

Signed-off-by: bamsumit <[email protected]>

* ForRemoved forced  multiprocessing fork

Signed-off-by: bamsumit <[email protected]>

* Linting fix

Signed-off-by: bamsumit <[email protected]>

* Forcing multiprocessing to always fork to avoid pikcling error on Mac

Signed-off-by: bamsumit <[email protected]>

* Removed support for windows

Signed-off-by: bamsumit <[email protected]>

---------

Signed-off-by: bamsumit <[email protected]>
Co-authored-by: Joyesh Mishra <[email protected]>
  • Loading branch information
bamsumit and joyeshmishra authored Feb 2, 2023
1 parent aa84ac2 commit 83df172
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 2 deletions.
128 changes: 127 additions & 1 deletion src/lava/magma/core/model/py/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from abc import ABC, abstractmethod
import logging
import numpy as np
import platform

from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort, \
CspSelector
Expand All @@ -16,6 +17,7 @@
enum_equal,
MGMT_COMMAND,
MGMT_RESPONSE, )
from lava.magma.core.sync.protocols.async_protocol import AsyncProtocol


class AbstractPyProcessModel(AbstractProcessModel, ABC):
Expand Down Expand Up @@ -404,7 +406,8 @@ def _pause(self):
"""
Command handler for Pause Command.
"""
self.process_to_service.send(PyLoihiProcessModel.Response.STATUS_PAUSED)
self.process_to_service.send(
PyLoihiProcessModel.Response.STATUS_PAUSED)

def _handle_pause_or_stop_req(self):
"""
Expand Down Expand Up @@ -537,3 +540,126 @@ def add_ports_for_polling(self):
Add various ports to poll for communication on ports
"""
pass


def _get_attr_dict(
model_class: ty.Type[PyLoihiProcessModel]
) -> ty.Dict[str, ty.Any]:
"""Get a dictionary of non-callable public attributes of a class.
Parameters
----------
model_class : ty.Type[PyLoihiProcessModel]
A class inherited from PyLoihiProcessModel.
Returns
-------
ty.Dict[str, ty.Any]
Dictionary of attribute name and it's value.
"""
var_names = [v for v, m in vars(model_class).items()
if not (v.startswith('_') or callable(m))]
var_dict = {var_name: getattr(model_class, var_name)
for var_name in var_names}
if model_class == PyLoihiProcessModel:
return {}
for base in model_class.__bases__:
var_dict = {**_get_attr_dict(base), **var_dict}
return var_dict


def _get_callable_dict(
model_class: ty.Type[PyLoihiProcessModel]
) -> ty.Dict[str, ty.Callable]:
"""Get a dictionary of callable public members of a class.
Parameters
----------
model_class : ty.Type[PyLoihiProcessModel]
A class inherited from PyLoihiProcessModel.
Returns
-------
ty.Dict[str, ty.Callable]
Dictionary of callable name and it's pointer.
"""
callable_names = [v for v, m in vars(model_class).items()
if callable(m) and not v.startswith('_')]
callable_dict = {callable_name: getattr(model_class, callable_name)
for callable_name in callable_names}
if model_class == PyLoihiProcessModel:
return {}
for base in model_class.__bases__:
callable_dict = {**_get_callable_dict(base), **callable_dict}
return callable_dict


def PyLoihiModelToPyAsyncModel(
py_loihi_model: ty.Type[PyLoihiProcessModel]
) -> ty.Type[PyAsyncProcessModel]:
"""Factory function that converts Py-Loihi process models
to equivalent Py-Async definition.
Parameters
----------
py_loihi_model : ty.Type[PyLoihiProcessModel]
Py-Loihi model that describes the functional behavior of a process
using Loihi Protocol.
Returns
-------
ty.Type[PyAsyncProcessModel]
Equivalent Py-Async protocol model. The async process model
class name is the original loihi process model class name with Async
postfix.
"""
# The exclude_vars and exclude_callables are
# based on the constructor of PyLoihiProcessModel and PyAsyncProcModel
if platform.system() == 'Windows':
raise OSError('Conversion of process models on the fly is not '
'supported on Windows system. It will result in '
'pickling error when lava threads for model execution '
'are spawned. The fundamental reason is Windows OS '
'does not support forking and needs to use pickle.')

exclude_vars = ['time_step', 'phase']
exclude_callables = ['run_spk',
'pre_guard', 'run_pre_mgmt',
'post_guard', 'run_post_mgmt',
'implements_process', 'implements_protocol']
name = py_loihi_model.__name__ + 'Async'
var_dict = _get_attr_dict(py_loihi_model)
var_dict['implements_process'] = py_loihi_model.implements_process
var_dict['implements_protocol'] = AsyncProtocol
callable_dict = {k: v for k, v in _get_callable_dict(py_loihi_model).items()
if k not in exclude_callables}

def __init__(self, proc_params: dict):
# New constructor of the PyAsyncModel implementation.
PyAsyncProcessModel.__init__(self, proc_params)
ref_model = py_loihi_model(proc_params)
attributes = [v for v, m in vars(ref_model).items()
if not (v.startswith('_') or callable(m))
and v not in var_dict.keys()
and v not in vars(self)
and v not in exclude_vars]
for attr in attributes:
setattr(self, attr, getattr(ref_model, attr))
self.time_step = 1

def run_async(self) -> None:
# New run_async behavior of PyAsyncModel implementation.
while self.time_step != self.num_steps + 1:
if py_loihi_model.pre_guard(self):
py_loihi_model.run_pre_mgmt(self)
py_loihi_model.run_spk(self)
if py_loihi_model.post_guard(self):
py_loihi_model.run_post_mgmt(self)
self.time_step += 1

py_async_model = type(name, (PyAsyncProcessModel,),
{'__init__': __init__,
'run_async': run_async,
**var_dict,
**callable_dict})
return py_async_model
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class PyCChannel:
import MessageInfrastructureInterface


import platform
if platform.system() != 'Windows':
mp.set_start_method('fork')


"""Implements the Message Infrastructure Interface using Python
MultiProcessing Library. The MultiProcessing API is used to create actors
which will participate in exchanging messages. The Channel Infrastructure
Expand Down
1 change: 0 additions & 1 deletion src/lava/proc/lif/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ def run_spk(self):
execution orchestrated by a PyLoihiProcessModel using the
LoihiProtocol.
"""
super().run_spk()
# Receive synaptic input
a_in_data = self.a_in.recv()

Expand Down
157 changes: 157 additions & 0 deletions tests/lava/magma/core/model/py/test_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright (C) 2023 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import unittest
import platform
import numpy as np

from lava.magma.core.run_conditions import RunSteps
from lava.magma.core.run_configs import Loihi2SimCfg
from lava.magma.core.sync.protocols.async_protocol import AsyncProtocol
from lava.magma.core.model.py.model import (
PyLoihiProcessModel, PyLoihiModelToPyAsyncModel
)

from lava.proc import io
from lava.proc.lif.process import LIF
from lava.proc.sdn.process import SigmaDelta
from lava.proc.dense.process import Dense

from lava.proc.lif.models import PyLifModelFloat, PyLifModelBitAcc
from lava.proc.sdn.models import PySigmaDeltaModelFixed
from lava.proc.dense.models import PyDenseModelFloat


py_loihi_models = [PyLifModelFloat, PyLifModelBitAcc, PySigmaDeltaModelFixed,
PyDenseModelFloat]


class CustomRunConfig(Loihi2SimCfg):
"""Custom run config that converts PyLoihi models to PyAsync models."""

def __init__(self, **kwargs):
super().__init__(**kwargs)

def select(self, proc, proc_models):
pm = super().select(proc, proc_models)
if issubclass(pm, PyLoihiProcessModel):
return PyLoihiModelToPyAsyncModel(pm)
return pm


class TestPyLoihiToPyAsync(unittest.TestCase):
@unittest.skipIf(platform.system() == 'Windows',
'Model conversion is not supported on Windows.')
def test_model_conversion(self):
"""Test model conversion"""
for py_loihi_model in py_loihi_models:
py_async_model = PyLoihiModelToPyAsyncModel(py_loihi_model)
self.assertTrue(py_loihi_model.implements_process
== py_async_model.implements_process)
self.assertTrue(
py_async_model.implements_protocol == AsyncProtocol)
self.assertTrue(hasattr(py_async_model, 'run_async'))

@unittest.skipIf(platform.system() == 'Windows',
'Model conversion is not supported on Windows.')
def test_lif_dense_lif(self):
"""Test LIF-Dense-LIF equivalency."""
in_size = 10
out_size = 8
weights = np.arange(in_size * out_size).reshape(out_size, in_size) - 25
weights *= 2
bias = 20 + np.arange(in_size)

input_lif_params = {'shape': (in_size,),
'du': 0,
'dv': 0,
'bias_mant': bias,
'bias_exp': 6,
'vth': 20}
output_lif_params = {'shape': (out_size,),
'du': 4096,
'dv': 4096,
'vth': 1024}
dense_params = {'weights': weights}

input_lif = LIF(**input_lif_params)
output_lif = LIF(**output_lif_params)
dense = Dense(**dense_params)
input_lif.s_out.connect(dense.s_in)
dense.a_out.connect(output_lif.a_in)

run_cnd = RunSteps(num_steps=2)
# simply run the pyproc model
output_lif.run(condition=run_cnd,
run_cfg=Loihi2SimCfg(select_tag='fixed_pt'))
current_gt = output_lif.u.get()
voltage_gt = output_lif.v.get()
output_lif.stop()

# Run the same network in async mode.
# Currently we don't allow the same process to run twice
# Copy the model used for pyproc model
input_lif = LIF(**input_lif_params)
output_lif = LIF(**output_lif_params)
dense = Dense(**dense_params)
input_lif.s_out.connect(dense.s_in)
dense.a_out.connect(output_lif.a_in)
output_lif.run(condition=run_cnd,
run_cfg=CustomRunConfig(select_tag='fixed_pt'))
current = output_lif.u.get()
voltage = output_lif.v.get()
output_lif.stop()

self.assertTrue(np.array_equal(current, current_gt))
self.assertTrue(np.array_equal(voltage, voltage_gt))

@unittest.skipIf(platform.system() == 'Windows',
'Model conversion is not supported on Windows.')
def test_sdn_dense_sdn(self):
"""Test LIF-Dense-LIF equivalency."""
in_size = 10
out_size = 8
weights = np.arange(in_size * out_size).reshape(out_size, in_size) - 25
weights *= 2
bias = 20 + np.arange(in_size)

input_params = {'shape': (in_size,),
'vth': 22,
'bias': bias,
'spike_exp': 6,
'state_exp': 6}
output_params = {'shape': (out_size,),
'vth': 2,
'spike_exp': 6,
'state_exp': 6}
dense_params = {'weights': weights, 'num_message_bits': 16}

input = SigmaDelta(**input_params)
output = SigmaDelta(**output_params)
dense = Dense(**dense_params)
input.s_out.connect(dense.s_in)
dense.a_out.connect(output.a_in)

run_cnd = RunSteps(num_steps=2)
# simply run the pyproc model
output.run(condition=run_cnd,
run_cfg=Loihi2SimCfg(select_tag='fixed_pt'))
sigma_gt = output.sigma.get()
output.stop()

# Run the same network in async mode.
# Currently we don't allow the same process to run twice
# Copy the model used for pyproc model
input = SigmaDelta(**input_params)
output = SigmaDelta(**output_params)
dense = Dense(**dense_params)
input.s_out.connect(dense.s_in)
dense.a_out.connect(output.a_in)

output.run(condition=run_cnd,
run_cfg=CustomRunConfig(select_tag='fixed_pt'))
sigma = output.sigma.get()
output.stop()

self.assertTrue(np.array_equal(sigma, sigma_gt))

0 comments on commit 83df172

Please sign in to comment.