Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NxSDKRuntimeService #182

Merged
merged 22 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 208 additions & 70 deletions src/lava/magma/compiler/builders/builder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright (C) 2021 Intel Corporation
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/

import logging
import typing as ty

import numpy as np
Expand All @@ -10,12 +11,14 @@
from lava.magma.core.sync.protocol import AbstractSyncProtocol
from lava.magma.runtime.message_infrastructure.message_infrastructure_interface\
import MessageInfrastructureInterface
from lava.magma.runtime.runtime_service import PyRuntimeService, \
AbstractRuntimeService
from lava.magma.runtime.runtime_services.enums import LoihiVersion
from lava.magma.runtime.runtime_services.runtime_service import (
AbstractRuntimeService,
NxSdkRuntimeService
)

if ty.TYPE_CHECKING:
from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.model.model import AbstractProcessModel
from lava.magma.runtime.runtime import Runtime

from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort
Expand All @@ -24,6 +27,9 @@
AbstractRuntimeServiceBuilder,
AbstractChannelBuilder
)
from lava.magma.core.model.model import AbstractProcessModel
from lava.magma.core.model.nc.model import AbstractNcProcessModel
from lava.magma.core.model.nc.type import LavaNcType
from lava.magma.core.model.py.model import AbstractPyProcessModel
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.compiler.utils import VarInitializer, PortInitializer, \
Expand All @@ -39,49 +45,21 @@
ChannelType


class PyProcessBuilder(AbstractProcessBuilder):
"""A PyProcessBuilder instantiates and initializes a PyProcessModel.

The compiler creates a PyProcessBuilder for each PyProcessModel. In turn,
the runtime, loads a PyProcessBuilder onto a compute node where it builds
the PyProcessModel and its associated ports.

In order to build the PyProcessModel, the builder inspects all LavaType
class variables of a PyProcessModel, creates the corresponding data type
with the specified properties, the shape and the initial value provided by
the Lava Var. In addition, the builder creates the required PyPort
instances. Finally, the builder assigns both port and variable
implementations to the PyProcModel.

Once the PyProcessModel is built, it is the RuntimeService's job to
connect channels to ports and start the process.

Note: For unit testing it should be possible to build processes locally
instead of on a remote node. For pure atomic unit testing a ProcessModel
locally, PyInPorts and PyOutPorts must be fed manually with data.
"""
class _AbstractProcessBuilder(AbstractProcessBuilder):
"""A _AbstractProcessBuilder instantiates and initializes
an AbstractProcessModel but is not meant to be used
directly but inherited from"""

def __init__(
self, proc_model: ty.Type[AbstractPyProcessModel],
model_id: int,
proc_params: ty.Dict[str, ty.Any] = None):
super(PyProcessBuilder, self).__init__(
self, proc_model: ty.Type[AbstractProcessModel],
model_id: int):
super(_AbstractProcessBuilder, self).__init__(
proc_model=proc_model,
model_id=model_id
)
if not issubclass(proc_model, AbstractPyProcessModel):
raise AssertionError("Is not a subclass of AbstractPyProcessModel")
self.vars: ty.Dict[str, VarInitializer] = {}
self.py_ports: ty.Dict[str, PortInitializer] = {}
self.ref_ports: ty.Dict[str, PortInitializer] = {}
self.var_ports: ty.Dict[str, VarPortInitializer] = {}
self.csp_ports: ty.Dict[str, ty.List[AbstractCspPort]] = {}
self.csp_rs_send_port: ty.Dict[str, CspSendPort] = {}
self.csp_rs_recv_port: ty.Dict[str, CspRecvPort] = {}
self.proc_params = proc_params

@property
def proc_model(self) -> ty.Type[AbstractPyProcessModel]:
def proc_model(self) -> ty.Type[AbstractProcessModel]:
return self._proc_model

# ToDo: Perhaps this should even be done in Compiler?
Expand Down Expand Up @@ -137,6 +115,68 @@ def _check_not_assigned_yet(
f"Member '{key}' already found in {m_type}."
)

# ToDo: Also check that Vars are initializable with var.value provided
def set_variables(self, variables: ty.List[VarInitializer]):
"""Appends the given list of variables to the ProcessModel. Used by the
compiler to create a ProcessBuilder during the compilation of
ProcessModels.

Parameters
----------
variables : ty.List[VarInitializer]

"""
self._check_members_exist(variables, "Var")
new_vars = {v.name: v for v in variables}
self._check_not_assigned_yet(self.vars, new_vars.keys(), "vars")
self.vars.update(new_vars)


class PyProcessBuilder(_AbstractProcessBuilder):
"""A PyProcessBuilder instantiates and initializes a PyProcessModel.

The compiler creates a PyProcessBuilder for each PyProcessModel. In turn,
the runtime, loads a PyProcessBuilder onto a compute node where it builds
the PyProcessModel and its associated ports.

In order to build the PyProcessModel, the builder inspects all LavaType
class variables of a PyProcessModel, creates the corresponding data type
with the specified properties, the shape and the initial value provided by
the Lava Var. In addition, the builder creates the required PyPort
instances. Finally, the builder assigns both port and variable
implementations to the PyProcModel.

Once the PyProcessModel is built, it is the RuntimeService's job to
connect channels to ports and start the process.

Note: For unit testing it should be possible to build processes locally
instead of on a remote node. For pure atomic unit testing a ProcessModel
locally, PyInPorts and PyOutPorts must be fed manually with data.
"""

def __init__(
self, proc_model: ty.Type[AbstractPyProcessModel],
model_id: int,
proc_params: ty.Dict[str, ty.Any] = None):
super(PyProcessBuilder, self).__init__(
proc_model=proc_model,
model_id=model_id
)
if not issubclass(proc_model, AbstractPyProcessModel):
raise AssertionError("Is not a subclass of AbstractPyProcessModel")
self.vars: ty.Dict[str, VarInitializer] = {}
self.py_ports: ty.Dict[str, PortInitializer] = {}
self.ref_ports: ty.Dict[str, PortInitializer] = {}
self.var_ports: ty.Dict[str, VarPortInitializer] = {}
self.csp_ports: ty.Dict[str, ty.List[AbstractCspPort]] = {}
self.csp_rs_send_port: ty.Dict[str, CspSendPort] = {}
self.csp_rs_recv_port: ty.Dict[str, CspRecvPort] = {}
self.proc_params = proc_params

@property
def proc_model(self) -> ty.Type[AbstractPyProcessModel]:
return self._proc_model

def check_all_vars_and_ports_set(self):
"""Checks that Vars and PyPorts assigned from Process have a
corresponding LavaPyType.
Expand Down Expand Up @@ -191,22 +231,6 @@ def check_lava_py_types(self):
f"PyRefPort in '{self.proc_model.__name__}'."
)

# ToDo: Also check that Vars are initializable with var.value provided
def set_variables(self, variables: ty.List[VarInitializer]):
"""Appends the given list of variables to the ProcessModel. Used by the
compiler to create a ProcessBuilder during the compilation of
ProcessModels.

Parameters
----------
variables : ty.List[VarInitializer]

"""
self._check_members_exist(variables, "Var")
new_vars = {v.name: v for v in variables}
self._check_not_assigned_yet(self.vars, new_vars.keys(), "vars")
self.vars.update(new_vars)

def set_py_ports(self, py_ports: ty.List[PortInitializer], check=True):
"""Appends the given list of PyPorts to the ProcessModel. Used by the
compiler to create a ProcessBuilder during the compilation of
Expand Down Expand Up @@ -425,16 +449,107 @@ def build(self):
return pm


class CProcessBuilder(AbstractProcessBuilder):
class CProcessBuilder(_AbstractProcessBuilder):
"""C Process Builder"""

pass


class NcProcessBuilder(AbstractProcessBuilder):
"""Neuromorphic Core Process Builder"""
class NcProcessBuilder(_AbstractProcessBuilder):
"""NcProcessBuilder instantiates and initializes an NcProcessModel.

pass
The compiler creates a NcProcessBuilder for each NcProcessModel. In turn,
the runtime, loads a NcProcessBuilder onto a compute node where it builds
the NcProcessModel and its associated vars.

In order to build the NcProcessModel, the builder inspects all LavaType
class variables of a NcProcessModel, creates the corresponding data type
with the specified properties, the shape and the initial value provided by
the Lava Var. Finally, the builder assigns variable
implementations to the NcProcModel."""
def __init__(
self, proc_model: ty.Type[AbstractNcProcessModel],
model_id: int,
proc_params: ty.Dict[str, ty.Any] = None):
super(NcProcessBuilder, self).__init__(
proc_model=proc_model,
model_id=model_id
)
if not issubclass(proc_model, AbstractNcProcessModel):
raise AssertionError("Is not a subclass of AbstractNcProcessModel")
self.vars: ty.Dict[str, VarInitializer] = {}
self.proc_params = proc_params

def _get_lava_type(self, name: str) -> LavaNcType:
return getattr(self.proc_model, name)

def check_all_vars_set(self):
"""Checks that Vars assigned from Process have a
corresponding LavaNcType.

Raises
------
AssertionError
No LavaNcType found in ProcModel
"""
for attr_name in dir(self.proc_model):
attr = getattr(self.proc_model, attr_name)
if isinstance(attr, LavaNcType):
if (
attr_name not in self.vars
):
raise AssertionError(
f"No LavaNcType '{attr_name}' found in ProcModel "
f"'{self.proc_model.__name__}'."
)

def set_rs_csp_ports(self, csp_ports: ty.List[AbstractCspPort]):
pass

def build(self):
"""Builds a NcProcessModel at runtime within Runtime.

The Compiler initializes the NcProcBuilder with the ProcModel,
VarInitializers and PortInitializers.

At deployment to a node, the Builder.build(..) gets executed
resulting in the following:
1. ProcModel gets instantiated
2. Vars are initialized and assigned to ProcModel

Returns
-------
AbstractNcProcessModel


Raises
------
NotImplementedError
"""

pm = self.proc_model(self.proc_params)
pm.model_id = self._model_id

# Initialize Vars
for name, v in self.vars.items():
# Build variable
lt = self._get_lava_type(name)
if issubclass(lt.cls, np.ndarray):
var = lt.cls(v.shape, lt.d_type)
var[:] = v.value
elif issubclass(lt.cls, (int, float)):
var = v.value
else:
raise NotImplementedError

# Create dynamic variable attribute on ProcModel
setattr(pm, name, var)
# Create private attribute for variable precision
setattr(pm, "_" + name + "_p", lt.precision)

pm.var_id_to_var_map[v.var_id] = name

return pm


class RuntimeServiceBuilder(AbstractRuntimeServiceBuilder):
Expand All @@ -446,14 +561,19 @@ def __init__(
protocol: ty.Type[AbstractSyncProtocol],
runtime_service_id: int,
model_ids: ty.List[int],
loihi_version: ty.Type[LoihiVersion],
loglevel: int = logging.WARNING
):
super(RuntimeServiceBuilder, self).__init__(rs_class, protocol)
self.log = logging.getLogger(__name__)
self.log.setLevel(loglevel)
self._runtime_service_id = runtime_service_id
self._model_ids: ty.List[int] = model_ids
self.csp_send_port: ty.Dict[str, CspSendPort] = {}
self.csp_recv_port: ty.Dict[str, CspRecvPort] = {}
self.csp_proc_send_port: ty.Dict[str, CspSendPort] = {}
self.csp_proc_recv_port: ty.Dict[str, CspRecvPort] = {}
self.loihi_version: ty.Type[LoihiVersion] = loihi_version

@property
def runtime_service_id(self):
Expand Down Expand Up @@ -487,24 +607,40 @@ def set_csp_proc_ports(self, csp_ports: ty.List[AbstractCspPort]):
if isinstance(port, CspRecvPort):
self.csp_proc_recv_port.update({port.name: port})

def build(self) -> PyRuntimeService:
"""Build Runtime Service
def build(self,
loihi_version: LoihiVersion = LoihiVersion.N3
) -> AbstractRuntimeService:
"""Build the runtime service

Returns
-------
PyRuntimeService
A concreate instance of AbstractRuntimeService
[PyRuntimeService or NxSdkRuntimeService]
"""
rs = self.rs_class(protocol=self.sync_protocol)

self.log.debug("RuntimeService Class: " + str(self.rs_class))
nxsdk_rts = False
if self.rs_class == NxSdkRuntimeService:
rs = self.rs_class(protocol=self.sync_protocol,
loihi_version=loihi_version)
nxsdk_rts = True
self.log.debug("Initilized NxSdkRuntimeService")
else:
rs = self.rs_class(protocol=self.sync_protocol)
self.log.debug("Initilized PyRuntimeService")
rs.runtime_service_id = self._runtime_service_id
rs.model_ids = self._model_ids

for port in self.csp_proc_send_port.values():
if "service_to_process" in port.name:
rs.service_to_process.append(port)
if not nxsdk_rts:
for port in self.csp_proc_send_port.values():
if "service_to_process" in port.name:
rs.service_to_process.append(port)

for port in self.csp_proc_recv_port.values():
if "process_to_service" in port.name:
rs.process_to_service.append(port)
for port in self.csp_proc_recv_port.values():
if "process_to_service" in port.name:
rs.process_to_service.append(port)

self.log.debug("Setup 'RuntimeService <--> Rrocess; ports")

for port in self.csp_send_port.values():
if "service_to_runtime" in port.name:
Expand All @@ -514,6 +650,8 @@ def build(self) -> PyRuntimeService:
if "runtime_to_service" in port.name:
rs.runtime_to_service = port

self.log.debug("Setup 'Runtime <--> RuntimeService' ports")

return rs


Expand Down
Loading