Skip to content

Commit

Permalink
Refactor Message Infrastructure (lava-nc#29)
Browse files Browse the repository at this point in the history
* Refactor Message Infrastructure
  • Loading branch information
joyeshmishra authored Nov 11, 2021
1 parent 78d1f69 commit 1f04f8d
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 124 deletions.
129 changes: 61 additions & 68 deletions src/lava/magma/compiler/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,29 @@
import typing as ty

from lava.magma.core.sync.protocol import AbstractSyncProtocol
from lava.magma.runtime.runtime_service import (
PyRuntimeService,
AbstractRuntimeService,
)
from lava.magma.runtime.message_infrastructure.message_infrastructure_interface\
import MessageInfrastructureInterface
from lava.magma.runtime.runtime_service import PyRuntimeService, \
AbstractRuntimeService

if ty.TYPE_CHECKING:
from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.model.model import AbstractProcessModel
from lava.magma.runtime.runtime import Runtime

from abc import ABC, abstractmethod
from multiprocessing.managers import SharedMemoryManager

import numpy as np
from dataclasses import dataclass

from lava.magma.compiler.channels.pypychannel import (
PyPyChannel,
CspSendPort,
CspRecvPort,
)
from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort
from lava.magma.core.model.py.model import AbstractPyProcessModel
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.compiler.utils import VarInitializer, PortInitializer
from lava.magma.core.model.py.ports import (
AbstractPyPort,
PyInPort,
PyOutPort,
PyRefPort,
)
from lava.magma.compiler.channels.interfaces import AbstractCspPort, Channel
from lava.magma.core.model.py.ports import AbstractPyPort, \
PyInPort, PyOutPort, PyRefPort
from lava.magma.compiler.channels.interfaces import AbstractCspPort, Channel, \
ChannelType


class AbstractProcessBuilder(ABC):
Expand Down Expand Up @@ -497,19 +489,19 @@ class ChannelBuilderMp(AbstractChannelBuilder):
"""A ChannelBuilder assuming Python multi-processing is used as messaging
and multi processing backbone.
"""

channel_type: ty.Type[Channel]
channel_type: ChannelType
src_process: "AbstractProcess"
dst_process: "AbstractProcess"
src_port_initializer: PortInitializer
dst_port_initializer: PortInitializer

def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
def build(self, messaging_infrastructure: MessageInfrastructureInterface) \
-> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : SharedMemoryManager
messaging_infrastructure : MessageInfrastructureInterface
Returns
-------
Expand All @@ -521,38 +513,35 @@ def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
Exception
Can't build channel of type specified
"""
if self.channel_type == PyPyChannel:
return PyPyChannel(
messaging_infrastructure,
self.src_port_initializer.name,
self.dst_port_initializer.name,
self.src_port_initializer.shape,
self.src_port_initializer.d_type,
self.src_port_initializer.size,
)
else:
raise Exception(f"Can't build channel of type {self.channel_type}")
channel_class = messaging_infrastructure.channel_class(
channel_type=self.channel_type)
return channel_class(
messaging_infrastructure,
self.src_port_initializer.name,
self.dst_port_initializer.name,
self.src_port_initializer.shape,
self.src_port_initializer.d_type,
self.src_port_initializer.size,
)


@dataclass
class ServiceChannelBuilderMp(AbstractChannelBuilder):
"""A RuntimeServiceChannelBuilder assuming Python multi-processing is used
as messaging and multi processing backbone.
"""

channel_type: ty.Type[Channel]
src_process: ty.Union[AbstractRuntimeServiceBuilder,
"AbstractProcessModel"]
dst_process: ty.Union[AbstractRuntimeServiceBuilder,
"AbstractProcessModel"]
channel_type: ChannelType
src_process: ty.Union[AbstractRuntimeServiceBuilder, "AbstractProcessModel"]
dst_process: ty.Union[AbstractRuntimeServiceBuilder, "AbstractProcessModel"]
port_initializer: PortInitializer

def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
def build(self, messaging_infrastructure: MessageInfrastructureInterface) \
-> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : SharedMemoryManager
messaging_infrastructure : MessageInfrastructureInterface
Returns
-------
Expand All @@ -564,37 +553,39 @@ def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
Exception
Can't build channel of type specified
"""
if self.channel_type == PyPyChannel:
channel_name: str = self.port_initializer.name
return PyPyChannel(
messaging_infrastructure,
channel_name + "_src",
channel_name + "_dst",
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
)
else:
raise Exception(f"Can't build channel of type {self.channel_type}")
channel_class = messaging_infrastructure.channel_class(
channel_type=self.channel_type)

channel_name: str = (
self.port_initializer.name
)
return channel_class(
messaging_infrastructure,
channel_name + "_src",
channel_name + "_dst",
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
)


@dataclass
class RuntimeChannelBuilderMp(AbstractChannelBuilder):
"""A RuntimeChannelBuilder assuming Python multi-processing is
used as messaging and multi processing backbone.
"""

channel_type: ty.Type[Channel]
channel_type: ChannelType
src_process: ty.Union[AbstractRuntimeServiceBuilder, ty.Type["Runtime"]]
dst_process: ty.Union[AbstractRuntimeServiceBuilder, ty.Type["Runtime"]]
port_initializer: PortInitializer

def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
def build(self, messaging_infrastructure: MessageInfrastructureInterface) \
-> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : SharedMemoryManager
messaging_infrastructure : MessageInfrastructureInterface
Returns
-------
Expand All @@ -606,15 +597,17 @@ def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
Exception
Can't build channel of type specified
"""
if self.channel_type == PyPyChannel:
channel_name: str = self.port_initializer.name
return PyPyChannel(
messaging_infrastructure,
channel_name + "_src",
channel_name + "_dst",
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
)
else:
raise Exception(f"Can't build channel of type {self.channel_type}")
channel_class = messaging_infrastructure.channel_class(
channel_type=self.channel_type)

channel_name: str = (
self.port_initializer.name
)
return channel_class(
messaging_infrastructure,
channel_name + "_src",
channel_name + "_dst",
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
)
9 changes: 9 additions & 0 deletions src/lava/magma/compiler/channels/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/
import typing as ty
from enum import IntEnum

import numpy as np
from abc import ABC, abstractmethod

Expand Down Expand Up @@ -61,3 +63,10 @@ def src_port(self) -> AbstractCspSendPort:
@abstractmethod
def dst_port(self) -> AbstractCspRecvPort:
pass


class ChannelType(IntEnum):
"""Type of a channel given the two process models"""
PyPy = 0
CPy = 1
PyC = 2
54 changes: 50 additions & 4 deletions src/lava/magma/compiler/channels/pypychannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
AbstractCspSendPort,
AbstractCspRecvPort,
)
if ty.TYPE_CHECKING:
from lava.magma.runtime.message_infrastructure\
.message_infrastructure_interface import MessageInfrastructureInterface


@dataclass
Expand All @@ -24,14 +27,24 @@ class Proto:
nbytes: int


# ToDo: (AW) Do not create any class attributes outside of __init__
class CspSendPort(AbstractCspSendPort):
"""
CspSendPort is a low level send port implementation based on CSP
semantics. It can be understood as the input port of a CSP channel.
"""

def __init__(self, name, shm, proto, size, req, ack):
"""Instantiates CspSendPort object and class attributes
Parameters
----------
name : str
shm : SharedMemory
proto : Proto
size : int
req : Pipe
ack : Pipe
"""
self._name = name
self._shm = shm
self._shape = proto.shape
Expand All @@ -42,6 +55,8 @@ def __init__(self, name, shm, proto, size, req, ack):
self._size = size
self._idx = 0
self._done = False
self._array = []
self._semaphore = None
self.thread = None

@property
Expand All @@ -66,7 +81,7 @@ def start(self):
np.ndarray(
shape=self._shape,
dtype=self._dtype,
buffer=self._shm.buf[self._nbytes * i : self._nbytes * (i + 1)],
buffer=self._shm.buf[self._nbytes * i: self._nbytes * (i + 1)],
)
for i in range(self._size)
]
Expand Down Expand Up @@ -152,6 +167,17 @@ class CspRecvPort(AbstractCspRecvPort):
"""

def __init__(self, name, shm, proto, size, req, ack):
"""Instantiates CspRecvPort object and class attributes
Parameters
----------
name : str
shm : SharedMemory
proto : Proto
size : int
req : Pipe
ack : Pipe
"""
self._name = name
self._shm = shm
self._shape = proto.shape
Expand All @@ -162,6 +188,8 @@ def __init__(self, name, shm, proto, size, req, ack):
self._ack = ack
self._idx = 0
self._done = False
self._array = []
self._queue = None
self.thread = None

@property
Expand All @@ -186,7 +214,7 @@ def start(self):
np.ndarray(
shape=self._shape,
dtype=self._dtype,
buffer=self._shm.buf[self._nbytes * i : self._nbytes * (i + 1)],
buffer=self._shm.buf[self._nbytes * i: self._nbytes * (i + 1)],
)
for i in range(self._size)
]
Expand Down Expand Up @@ -241,8 +269,26 @@ class PyPyChannel(Channel):
"""Helper class to create the set of send and recv port and encapsulate
them inside a common structure. We call this a PyPyChannel"""

def __init__(self, smm, src_name, dst_name, shape, dtype, size):
def __init__(self,
message_infrastructure: 'MessageInfrastructureInterface',
src_name,
dst_name,
shape,
dtype,
size):
"""Instantiates PyPyChannel object and class attributes
Parameters
----------
message_infrastructure: MessageInfrastructureInterface
src_name : str
dst_name : str
shape : ty.Tuple[int, ...]
dtype : ty.Type[np.intc]
size : int
"""
nbytes = np.prod(shape) * np.dtype(dtype).itemsize
smm = message_infrastructure.smm
shm = smm.SharedMemory(int(nbytes * size))
req = Pipe(duplex=False)
ack = Pipe(duplex=False)
Expand Down
Loading

0 comments on commit 1f04f8d

Please sign in to comment.