Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Message Infrastructure #29

Merged
merged 7 commits into from
Nov 11, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 61 additions & 68 deletions src/lava/magma/compiler/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,29 @@
import typing as ty

from lava.magma.core.sync.protocol import AbstractSyncProtocol
from lava.magma.runtime.runtime_service import (
PyRuntimeService,
AbstractRuntimeService,
)
from lava.magma.runtime.message_infrastructure.message_infrastructure_interface\
import MessageInfrastructureInterface
from lava.magma.runtime.runtime_service import PyRuntimeService, \
AbstractRuntimeService

if ty.TYPE_CHECKING:
from lava.magma.core.process.process import AbstractProcess
from lava.magma.core.model.model import AbstractProcessModel
from lava.magma.runtime.runtime import Runtime

from abc import ABC, abstractmethod
from multiprocessing.managers import SharedMemoryManager

import numpy as np
from dataclasses import dataclass

from lava.magma.compiler.channels.pypychannel import (
PyPyChannel,
CspSendPort,
CspRecvPort,
)
from lava.magma.compiler.channels.pypychannel import CspSendPort, CspRecvPort
from lava.magma.core.model.py.model import AbstractPyProcessModel
from lava.magma.core.model.py.type import LavaPyType
from lava.magma.compiler.utils import VarInitializer, PortInitializer
from lava.magma.core.model.py.ports import (
AbstractPyPort,
PyInPort,
PyOutPort,
PyRefPort,
)
from lava.magma.compiler.channels.interfaces import AbstractCspPort, Channel
from lava.magma.core.model.py.ports import AbstractPyPort, \
PyInPort, PyOutPort, PyRefPort
from lava.magma.compiler.channels.interfaces import AbstractCspPort, Channel, \
ChannelType


class AbstractProcessBuilder(ABC):
Expand Down Expand Up @@ -497,19 +489,19 @@ class ChannelBuilderMp(AbstractChannelBuilder):
"""A ChannelBuilder assuming Python multi-processing is used as messaging
and multi processing backbone.
"""

channel_type: ty.Type[Channel]
channel_type: ChannelType
src_process: "AbstractProcess"
dst_process: "AbstractProcess"
src_port_initializer: PortInitializer
dst_port_initializer: PortInitializer

def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
def build(self, messaging_infrastructure: MessageInfrastructureInterface) \
-> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : SharedMemoryManager
messaging_infrastructure : MessageInfrastructureInterface
Returns
-------
Expand All @@ -521,38 +513,35 @@ def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
Exception
Can't build channel of type specified
"""
if self.channel_type == PyPyChannel:
return PyPyChannel(
messaging_infrastructure,
self.src_port_initializer.name,
self.dst_port_initializer.name,
self.src_port_initializer.shape,
self.src_port_initializer.d_type,
self.src_port_initializer.size,
)
else:
raise Exception(f"Can't build channel of type {self.channel_type}")
channel_class = messaging_infrastructure.channel_class(
channel_type=self.channel_type)
return channel_class(
messaging_infrastructure,
self.src_port_initializer.name,
self.dst_port_initializer.name,
self.src_port_initializer.shape,
self.src_port_initializer.d_type,
self.src_port_initializer.size,
)


@dataclass
class ServiceChannelBuilderMp(AbstractChannelBuilder):
"""A RuntimeServiceChannelBuilder assuming Python multi-processing is used
as messaging and multi processing backbone.
"""

channel_type: ty.Type[Channel]
src_process: ty.Union[AbstractRuntimeServiceBuilder,
"AbstractProcessModel"]
dst_process: ty.Union[AbstractRuntimeServiceBuilder,
"AbstractProcessModel"]
channel_type: ChannelType
src_process: ty.Union[AbstractRuntimeServiceBuilder, "AbstractProcessModel"]
dst_process: ty.Union[AbstractRuntimeServiceBuilder, "AbstractProcessModel"]
port_initializer: PortInitializer

def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
def build(self, messaging_infrastructure: MessageInfrastructureInterface) \
-> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : SharedMemoryManager
messaging_infrastructure : MessageInfrastructureInterface
Returns
-------
Expand All @@ -564,37 +553,39 @@ def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
Exception
Can't build channel of type specified
"""
if self.channel_type == PyPyChannel:
channel_name: str = self.port_initializer.name
return PyPyChannel(
messaging_infrastructure,
channel_name + "_src",
channel_name + "_dst",
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
)
else:
raise Exception(f"Can't build channel of type {self.channel_type}")
channel_class = messaging_infrastructure.channel_class(
channel_type=self.channel_type)

channel_name: str = (
self.port_initializer.name
)
return channel_class(
messaging_infrastructure,
channel_name + "_src",
channel_name + "_dst",
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
)


@dataclass
class RuntimeChannelBuilderMp(AbstractChannelBuilder):
"""A RuntimeChannelBuilder assuming Python multi-processing is
used as messaging and multi processing backbone.
"""

channel_type: ty.Type[Channel]
channel_type: ChannelType
src_process: ty.Union[AbstractRuntimeServiceBuilder, ty.Type["Runtime"]]
dst_process: ty.Union[AbstractRuntimeServiceBuilder, ty.Type["Runtime"]]
port_initializer: PortInitializer

def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
def build(self, messaging_infrastructure: MessageInfrastructureInterface) \
-> Channel:
"""Given the message passing framework builds a channel
Parameters
----------
messaging_infrastructure : SharedMemoryManager
messaging_infrastructure : MessageInfrastructureInterface
Returns
-------
Expand All @@ -606,15 +597,17 @@ def build(self, messaging_infrastructure: SharedMemoryManager) -> Channel:
Exception
Can't build channel of type specified
"""
if self.channel_type == PyPyChannel:
channel_name: str = self.port_initializer.name
return PyPyChannel(
messaging_infrastructure,
channel_name + "_src",
channel_name + "_dst",
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
)
else:
raise Exception(f"Can't build channel of type {self.channel_type}")
channel_class = messaging_infrastructure.channel_class(
channel_type=self.channel_type)

channel_name: str = (
self.port_initializer.name
)
return channel_class(
messaging_infrastructure,
channel_name + "_src",
channel_name + "_dst",
self.port_initializer.shape,
self.port_initializer.d_type,
self.port_initializer.size,
)
9 changes: 9 additions & 0 deletions src/lava/magma/compiler/channels/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/
import typing as ty
from enum import IntEnum

import numpy as np
from abc import ABC, abstractmethod

Expand Down Expand Up @@ -61,3 +63,10 @@ def src_port(self) -> AbstractCspSendPort:
@abstractmethod
def dst_port(self) -> AbstractCspRecvPort:
pass


class ChannelType(IntEnum):
"""Type of a channel given the two process models"""
PyPy = 0
CPy = 1
PyC = 2
59 changes: 56 additions & 3 deletions src/lava/magma/compiler/channels/pypychannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
AbstractCspSendPort,
AbstractCspRecvPort,
)
if ty.TYPE_CHECKING:
from lava.magma.runtime.message_infrastructure\
.message_infrastructure_interface import MessageInfrastructureInterface


@dataclass
Expand All @@ -32,6 +35,23 @@ class CspSendPort(AbstractCspSendPort):
"""

def __init__(self, name, shm, proto, size, req, ack):
"""[summary]

Parameters
----------
name : str
[description]
shm : [type]
[description]
proto : [type]
[description]
size : [type]
[description]
req : [type]
[description]
ack : [type]
[description]
"""
self._name = name
self._shm = shm
self._shape = proto.shape
Expand All @@ -42,6 +62,8 @@ def __init__(self, name, shm, proto, size, req, ack):
self._size = size
self._idx = 0
self._done = False
self._array = []
self._semaphore = None
self.thread = None

@property
Expand All @@ -66,7 +88,7 @@ def start(self):
np.ndarray(
shape=self._shape,
dtype=self._dtype,
buffer=self._shm.buf[self._nbytes * i : self._nbytes * (i + 1)],
buffer=self._shm.buf[self._nbytes * i: self._nbytes * (i + 1)],
)
for i in range(self._size)
]
Expand Down Expand Up @@ -152,6 +174,17 @@ class CspRecvPort(AbstractCspRecvPort):
"""

def __init__(self, name, shm, proto, size, req, ack):
"""[summary]

Parameters
----------
name : str
shm : SharedMemory
proto : [type]
size : int
req : [type]
ack : [type]
"""
self._name = name
self._shm = shm
self._shape = proto.shape
Expand All @@ -162,6 +195,8 @@ def __init__(self, name, shm, proto, size, req, ack):
self._ack = ack
self._idx = 0
self._done = False
self._array = []
self._queue = None
self.thread = None

@property
Expand All @@ -186,7 +221,7 @@ def start(self):
np.ndarray(
shape=self._shape,
dtype=self._dtype,
buffer=self._shm.buf[self._nbytes * i : self._nbytes * (i + 1)],
buffer=self._shm.buf[self._nbytes * i: self._nbytes * (i + 1)],
)
for i in range(self._size)
]
Expand Down Expand Up @@ -241,8 +276,26 @@ class PyPyChannel(Channel):
"""Helper class to create the set of send and recv port and encapsulate
them inside a common structure. We call this a PyPyChannel"""

def __init__(self, smm, src_name, dst_name, shape, dtype, size):
def __init__(self,
message_infrastructure: 'MessageInfrastructureInterface',
src_name,
dst_name,
shape,
dtype,
size):
"""[summary]

Parameters
----------
message_infrastructure: MessageInfrastructureInterface
src_name : str
dst_name : str
shape : ty.Tuple[int, ...]
dtype : ty.Type[np.intc]
size : int
"""
nbytes = np.prod(shape) * np.dtype(dtype).itemsize
smm = message_infrastructure.smm
shm = smm.SharedMemory(int(nbytes * size))
req = Pipe(duplex=False)
ack = Pipe(duplex=False)
Expand Down
Loading