Skip to content

Commit

Permalink
PyInPort: probe() implementation (#77)
Browse files Browse the repository at this point in the history
* PyInPort probe method + unit tests.
  • Loading branch information
gkarray authored Nov 22, 2021
1 parent 912d0f8 commit b9ecd83
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 9 deletions.
29 changes: 20 additions & 9 deletions src/lava/magma/core/model/py/ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,18 @@ def peek(self):
pass

def probe(self) -> bool:
pass
"""Executes probe method of all csp ports and accumulates the returned
bool values with AND operation. The accumulator acc is initialized to
True.
Returns the accumulated bool value.
"""
# Returns True only when probe returns True for all _csp_recv_ports.
return ft.reduce(
lambda acc, csp_port: acc and csp_port.probe(),
self._csp_recv_ports,
True,
)


class PyInPortVectorDense(PyInPort):
Expand Down Expand Up @@ -180,20 +191,20 @@ def csp_ports(self) -> ty.List[AbstractCspPort]:
return []

def read(
self,
self,
) -> ty.Union[
np.ndarray, ty.Tuple[np.ndarray, np.ndarray], int, ty.Tuple[int, int]
]:
pass

def write(
self,
data: ty.Union[
np.ndarray,
ty.Tuple[np.ndarray, np.ndarray],
int,
ty.Tuple[int, int],
],
self,
data: ty.Union[
np.ndarray,
ty.Tuple[np.ndarray, np.ndarray],
int,
ty.Tuple[int, int],
],
):
pass

Expand Down
1 change: 1 addition & 0 deletions tests/lava/magma/core/model/py/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__import__("pkg_resources").declare_namespace(__name__)
99 changes: 99 additions & 0 deletions tests/lava/magma/core/model/py/test_ports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright (C) 2021 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
# See: https://spdx.org/licenses/
import unittest
import time
from multiprocessing.managers import SharedMemoryManager
import numpy as np

from lava.magma.compiler.channels.interfaces import AbstractCspSendPort, \
AbstractCspRecvPort

from lava.magma.core.model.py.ports import PyInPort, PyInPortVectorDense, \
PyOutPort, PyOutPortVectorDense

from lava.magma.compiler.channels.pypychannel import PyPyChannel


class MockInterface:
def __init__(self, smm):
self.smm = smm


def get_channel(smm, data, size, name="test_channel") -> PyPyChannel:
mock = MockInterface(smm)
return PyPyChannel(
message_infrastructure=mock,
src_name=name,
dst_name=name,
shape=data.shape,
dtype=data.dtype,
size=size
)


class TestPyPorts(unittest.TestCase):
def probe_test_routine(self, cls):
"""Routine that tests probe method on one implementation of PyInPorts.
"""
smm = SharedMemoryManager()

try:
smm.start()

data = np.ones((4, 4))

channel_1 = get_channel(smm, data, data.size)
send_csp_port_1: AbstractCspSendPort = channel_1.src_port
recv_csp_port_1: AbstractCspRecvPort = channel_1.dst_port

channel_2 = get_channel(smm, data, data.size)
send_csp_port_2: AbstractCspSendPort = channel_2.src_port
recv_csp_port_2: AbstractCspRecvPort = channel_2.dst_port

# Create two different PyOutPort
send_py_port_1: PyOutPort = \
PyOutPortVectorDense([send_csp_port_1], None)
send_py_port_2: PyOutPort = \
PyOutPortVectorDense([send_csp_port_2], None)
# Create PyInPort with current implementation
recv_py_port: PyInPort = \
cls([recv_csp_port_1, recv_csp_port_2], None)

recv_py_port.start()
send_py_port_1.start()
send_py_port_2.start()

# Send data through first PyOutPort
send_py_port_1.send(data)
# Send data through second PyOutPort
send_py_port_2.send(data)
# Sleep to let message reach the PyInPort
time.sleep(0.001)
# Probe PyInPort
probe_value = recv_py_port.probe()

# probe_value should be True if message reached the PyInPort
self.assertTrue(probe_value)

# Get data that reached PyInPort to empty buffer
_ = recv_py_port.recv()
# Probe PyInPort
probe_value = recv_py_port.probe()

# probe_value should be False since PyInPort's buffer was emptied
self.assertFalse(probe_value)
finally:
smm.shutdown()

def test_py_in_port_probe(self):
"""Tests PyInPort probe method on all implementations of PyInPorts."""
# TODO: (GK) Add other classes when we support them
classes = [PyInPortVectorDense]

for cls in classes:
self.probe_test_routine(cls)


if __name__ == '__main__':
unittest.main()

0 comments on commit b9ecd83

Please sign in to comment.