Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NxSDKRuntimeService #182

Merged
merged 22 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/lava/magma/compiler/builders/builder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2021 Intel Corporation
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

Expand All @@ -10,8 +10,11 @@
from lava.magma.core.sync.protocol import AbstractSyncProtocol
from lava.magma.runtime.message_infrastructure.message_infrastructure_interface\
import MessageInfrastructureInterface
from lava.magma.runtime.runtime_service import PyRuntimeService, \
AbstractRuntimeService
from lava.magma.runtime.runtime_services.enums import LoihiVersion
from lava.magma.runtime.runtime_services.runtime_service import (
AbstractRuntimeService,
NxSDKRuntimeService
)

if ty.TYPE_CHECKING:
from lava.magma.core.process.process import AbstractProcess
Expand Down Expand Up @@ -487,14 +490,21 @@ def set_csp_proc_ports(self, csp_ports: ty.List[AbstractCspPort]):
if isinstance(port, CspRecvPort):
self.csp_proc_recv_port.update({port.name: port})

def build(self) -> PyRuntimeService:
"""Build Runtime Service
def build(self,
loihi_version: LoihiVersion = LoihiVersion.N3
) -> AbstractRuntimeService:
"""Build the runtime service

Returns
-------
PyRuntimeService
A concreate instance of AbstractRuntimeService
[PyRuntimeService or NxSDKRuntimeService]
"""
rs = self.rs_class(protocol=self.sync_protocol)
if isinstance(self.rs_class, NxSDKRuntimeService):
rs = self.rs_class(protocol=self.sync_protocol,
loihi_version=loihi_version)
else:
rs = self.rs_class(protocol=self.sync_protocol)
rs.runtime_service_id = self._runtime_service_id
rs.model_ids = self._model_ids

Expand Down
5 changes: 3 additions & 2 deletions src/lava/magma/compiler/builders/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2021 Intel Corporation
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

Expand All @@ -9,7 +9,8 @@
from lava.magma.compiler.channels.interfaces import AbstractCspPort
from lava.magma.core.model.model import AbstractProcessModel
from lava.magma.core.sync.protocol import AbstractSyncProtocol
from lava.magma.runtime.runtime_service import AbstractRuntimeService
from lava.magma.runtime.runtime_services.runtime_service import \
AbstractRuntimeService


class AbstractProcessBuilder(ABC):
Expand Down
18 changes: 11 additions & 7 deletions src/lava/magma/compiler/compiler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (C) 2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/
import logging
import importlib
import importlib.util as import_utils
import inspect
Expand Down Expand Up @@ -47,7 +48,10 @@

# ToDo: (AW) Document all class methods and class
class Compiler:
def __init__(self, compile_cfg: ty.Optional[ty.Dict[str, ty.Any]] = None):
def __init__(self, compile_cfg: ty.Optional[ty.Dict[str, ty.Any]] = None,
loglevel=logging.WARNING):
self.log = logging.getLogger(__name__)
self.log.setLevel(loglevel)
self._compile_config = {"pypy_channel_size": 64}
if compile_cfg:
self._compile_config.update(compile_cfg)
Expand Down Expand Up @@ -195,12 +199,12 @@ def _select_proc_models(
run_cfg: RunConfig) -> ty.Type[AbstractProcessModel]:
"""Selects a ProcessModel from list of provided models given RunCfg."""
selected_proc_model = run_cfg.select(proc, models)
err_msg = f"RunConfig {run_cfg.__class__.__qualname__}.select() must " \
f"return a sub-class of AbstractProcessModel. Got" \
f" {type(selected_proc_model)} instead."
if not isinstance(selected_proc_model, type):
raise AssertionError(err_msg)
if not issubclass(selected_proc_model, AbstractProcessModel):

if not isinstance(selected_proc_model, type) \
or not issubclass(selected_proc_model, AbstractProcessModel):
err_msg = f"RunConfig {run_cfg.__class__.__qualname__}.select()" \
f" must return a sub-class of AbstractProcessModel. Got" \
f" {type(selected_proc_model)} instead."
raise AssertionError(err_msg)

return selected_proc_model
Expand Down
6 changes: 5 additions & 1 deletion src/lava/magma/core/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# See: https://spdx.org/licenses/
from __future__ import annotations
import typing as ty
import logging
from abc import ABC

if ty.TYPE_CHECKING:
Expand Down Expand Up @@ -56,7 +57,10 @@ class level attributes with the same name if they exist.
required_resources: ty.List[ty.Type[AbstractResource]] = []
tags: ty.List[str] = []

def __init__(self, proc_params: ty.Dict[str, ty.Any]) -> None:
def __init__(self, proc_params: ty.Dict[str, ty.Any],
loglevel=logging.WARNING) -> None:
self.log = logging.getLogger(__name__)
self.log.setLevel(loglevel)
self.proc_params: ty.Dict[str, ty.Any] = proc_params

def __repr__(self):
Expand Down
209 changes: 208 additions & 1 deletion src/lava/magma/core/model/nc/model.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
# Copyright (C) 2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
from abc import ABC, abstractmethod
import logging
import numpy as np
import typing as ty

from lava.magma.core.model.model import AbstractProcessModel
from lava.magma.core.model.nc.ports import AbstractNcPort, NcVarPort
from lava.magma.compiler.channels.pypychannel import CspSelector, CspSendPort, CspRecvPort
from lava.magma.runtime.mgmt_token_enums import (
MGMT_COMMAND,
MGMT_RESPONSE,
enum_equal,
enum_to_np
)

try:
from nxsdk.arch.base.nxboard import NxBoard
except(ImportError):
class NxBoard():
pass


# ToDo: Move somewhere else. Just created for typing
Expand All @@ -12,6 +29,17 @@ def alloc(self, *args, **kwargs):


class Net(ABC):
"""Represents a collection of logical entities (Attribute Groups)
that consume resources on a NeuroCore.

* InputAxons
* Synapses
* DendriticAccumulator
* Compartments
* OutputAxons
* Synaptic pre traces
* Synaptic post traces
"""
def __init__(self):
self.out_ax = AbstractNodeGroup()
self.cx = AbstractNodeGroup()
Expand All @@ -29,7 +57,43 @@ def connect(self, from_thing, to_thing):


class AbstractNcProcessModel(AbstractProcessModel, ABC):
"""Abstract interface for a NeuroCore ProcessModels."""
"""Abstract interface for a NeuroCore ProcessModels

Example for how variables and ports might be initialized:
a_in: NcInPort = LavaNcType(NcInPort.VEC_DENSE, float)
s_out: NcInPort = LavaNcType(NcOutPort.VEC_DENSE, bool, precision=1)
u: np.ndarray = LavaNcType(np.ndarray, np.int32, precision=24)
v: np.ndarray = LavaNcType(np.ndarray, np.int32, precision=24)
bias: np.ndarray = LavaNcType(np.ndarray, np.int16, precision=12)
du: int = LavaNcType(int, np.uint16, precision=12)
"""
def __init__(self, proc_params: ty.Dict[str, ty.Any],
loglevel=logging.WARNING) -> None:
super().__init__(proc_params, loglevel=loglevel)
self.model_id: ty.Optional[int] = None
self.service_to_process: ty.Optional[CspRecvPort] = None
self.process_to_service: ty.Optional[CspSendPort] = None
self.nc_ports: ty.List[AbstractNcPort] = []
self.var_ports: ty.List[NcVarPort] = []
self.var_id_to_var_map: ty.Dict[int, ty.Any] = {}

def __setattr__(self, key: str, value: ty.Any):
self.__dict__[key] = value
if isinstance(value, AbstractNcPort):
self.nc_ports.append(value)
# Store all VarPorts for efficient RefPort -> VarPort handling
if isinstance(value, NcVarPort):
self.var_ports.append(value)

@abstractmethod
def run(self):
pass

def join(self):
self.service_to_process.join()
self.process_to_service.join()
for p in self.nc_ports:
p.join()

@abstractmethod
def allocate(self, net: Net):
Expand All @@ -38,3 +102,146 @@ def allocate(self, net: Net):
Note: This should work as before.
"""
pass


class NcProcessModel(AbstractNcProcessModel):
mgkwill marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, proc_params: ty.Dict[str, ty.Any],
board: ty.Type[NxBoard],
loglevel=logging.WARNING):
super(AbstractNcProcessModel, self).__init__(proc_params,
loglevel=loglevel)
self.board = board
mgkwill marked this conversation as resolved.
Show resolved Hide resolved

def start(self):
self.service_to_process.start()
self.process_to_service.start()
for p in self.nc_ports:
p.start()
# self.board.start()
mgkwill marked this conversation as resolved.
Show resolved Hide resolved
self.run()

def allocate(self):
pass

def run(self):
"""Retrieves commands from the runtime service calls
their corresponding methods of the ProcessModels. The phase
is retrieved from runtime service (service_to_process). After
calling the method of a phase of all ProcessModels the runtime
service is informed about completion. The loop ends when the
STOP command is received."""
selector = CspSelector()
channel_actions = [(self.service_to_process, lambda: 'cmd')]
action = 'cmd'
while True:
if action == 'cmd':
cmd = self.service_to_process.recv()
if enum_equal(cmd, MGMT_COMMAND.STOP):
self.board.stop()
mgkwill marked this conversation as resolved.
Show resolved Hide resolved
self.process_to_service.send(MGMT_RESPONSE.TERMINATED)
self.join()
return
if enum_equal(cmd, MGMT_COMMAND.PAUSE):
self.board.pause()
self.process_to_service.send(MGMT_RESPONSE.PAUSED)
self.join()
return
try:
if enum_equal(cmd, MGMT_COMMAND.RUN):
self.process_to_service.send(MGMT_RESPONSE.DONE)
num_steps = self.service_to_process.recv()
if num_steps > 0:
# self.board.run(numSteps=num_steps, aSync=False)
mgkwill marked this conversation as resolved.
Show resolved Hide resolved
self.process_to_service.send(MGMT_RESPONSE.DONE)
else:
self.log.error(f"Exception: number of time steps"
f"not greater than 0, cannot invoke "
f"run(num_steps) in {self.__class__}")
self.process_to_service.send(MGMT_RESPONSE.ERROR)
elif enum_equal(cmd, MGMT_COMMAND.GET_DATA):
# Handle get/set Var requests from runtime service
self._handle_get_var()
elif enum_equal(cmd, MGMT_COMMAND.SET_DATA):
# Handle get/set Var requests from runtime service
self._handle_set_var()
else:
raise ValueError(
f"Wrong Phase Info Received : {cmd}")
except Exception as inst:
self.log.error(f"Exception {inst} occured while"
f" running command {cmd} in {self.__class__}")
# Inform runtime service about termination
self.process_to_service.send(MGMT_RESPONSE.ERROR)
self.join()
raise inst
else:
# Handle VarPort requests from RefPorts
self._handle_var_port(action)

for var_port in self.var_ports:
for csp_port in var_port.csp_ports:
if isinstance(csp_port, CspRecvPort):
channel_actions.append(
(csp_port, lambda: var_port))
action = selector.select(*channel_actions)

def _handle_get_var(self):
"""Handles the get Var command from runtime service."""
# 1. Receive Var ID and retrieve the Var
var_id = int(self.service_to_process.recv()[0].item())
var_name = self.var_id_to_var_map[var_id]
var = getattr(self, var_name)

# Here get the var from Loihi

# 2. Send Var data
data_port = self.process_to_service
# Header corresponds to number of values
# Data is either send once (for int) or one by one (array)
if isinstance(var, int) or isinstance(var, np.integer):
data_port.send(enum_to_np(1))
data_port.send(enum_to_np(var))
elif isinstance(var, np.ndarray):
# FIXME: send a whole vector (also runtime_service.py)
var_iter = np.nditer(var)
num_items: np.integer = np.prod(var.shape)
data_port.send(enum_to_np(num_items))
for value in var_iter:
data_port.send(enum_to_np(value, np.float64))

def _handle_set_var(self):
"""Handles the set Var command from runtime service."""
# 1. Receive Var ID and retrieve the Var
var_id = int(self.service_to_process.recv()[0].item())
var_name = self.var_id_to_var_map[var_id]
var = getattr(self, var_name)

# 2. Receive Var data
data_port = self.service_to_process
if isinstance(var, int) or isinstance(var, np.integer):
# First item is number of items (1) - not needed
data_port.recv()
# Data to set
buffer = data_port.recv()[0]
if isinstance(var, int):
setattr(self, var_name, buffer.item())
else:
setattr(self, var_name, buffer.astype(var.dtype))
elif isinstance(var, np.ndarray):
# First item is number of items
num_items = data_port.recv()[0]
var_iter = np.nditer(var, op_flags=['readwrite'])
# Set data one by one
for i in var_iter:
if num_items == 0:
break
num_items -= 1
i[...] = data_port.recv()[0]
else:
raise RuntimeError("Unsupported type")

# Here set var in Loihi?

def _handle_var_port(self, var_port):
"""Handles read/write requests on the given VarPort."""
var_port.service()
Loading