From fc15bbdfc3049e3929b78ca6b64298a2727cdd93 Mon Sep 17 00:00:00 2001 From: hongdami Date: Wed, 17 Aug 2022 11:53:10 +0800 Subject: [PATCH] Messaging refactor develop (#310) * Copy the python file from depricated_message_infrastructure/*.py * change multiprocessing.py to adapt interface * fix cpplint error * Update cpplib side MultiProcessing: 1. message_infrastructure_logging.h: logging for development 2. abstract_actor.h&posix_actor.cc: achieve stop function for signal 3. shm.h: Add SharedMemoryManager to cpplib 4. others: Update CMake, test and fix some type error' fix pylint error * Fix MultiProcessing Error and Cpplint Error * Remove Actor interface for python * Fix package path error * Update API Interface 1. multiprocessing.py: Fix bug for sharedmemmanager name 2. remove lava package dependency for message_infrastructure 3. multiprocessing.cc: Add shmm and actors function 4. abstract_actor.h: Modify the posix actor and interface design 5. add and update test for multiprocessing Signed-off-by: Miao, Hongda * Update import from message_infrastructure: 1. Move message_interface_enum.py to message_infrastructure 2. Update import syntax 3. Others: change multiprocessing init Signed-off-by: Miao, Hongda * Update test_runtime.py import Signed-off-by: Miao, Hongda * Update bash setenv.sh to add message_infrastructure Signed-off-by: Miao, Hongda --- .../compiler/builders/channel_builder.py | 2 +- .../magma/compiler/channels/pypychannel.py | 4 +- src/lava/magma/core/process/process.py | 2 +- .../message_infrastructure/CMakeLists.txt | 5 +- .../message_infrastructure/__init__.py | 4 ++ .../csrc/abstract_actor.h | 26 +++++--- .../csrc/message_infrastructure_logging.h | 28 +++++++++ .../csrc/message_infrastructure_py_wrapper.cc | 22 ++++++- .../csrc/multiprocessing.cc | 44 +++++++++---- .../csrc/multiprocessing.h | 17 +++-- .../csrc/posix_actor.cc | 33 ++++++++++ .../message_infrastructure/csrc/shm.h | 37 +++++++++++ .../message_infrastructure/factory.py | 18 ++++++ .../message_infrastructure_interface.py | 35 +++++++++++ .../message_interface_enum.py | 0 .../message_infrastructure/multiprocessing.py | 49 +++++++++++++++ .../message_infrastructure/nx.py | 44 +++++++++++++ .../runtime/message_infrastructure/setenv.sh | 5 +- .../test/test_example.py | 60 +++++++++++++++--- .../test/test_multiprocessing.py | 62 +++++++++++++++++++ src/lava/magma/runtime/runtime.py | 8 +-- tests/lava/magma/runtime/test_runtime.py | 2 +- 22 files changed, 459 insertions(+), 48 deletions(-) create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_logging.h create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py rename src/lava/magma/{core/process => runtime/message_infrastructure/message_infrastructure}/message_interface_enum.py (100%) create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/nx.py create mode 100644 src/lava/magma/runtime/message_infrastructure/test/test_multiprocessing.py diff --git a/src/lava/magma/compiler/builders/channel_builder.py b/src/lava/magma/compiler/builders/channel_builder.py index 5ffdc76ce..7c1bb157d 100644 --- a/src/lava/magma/compiler/builders/channel_builder.py +++ b/src/lava/magma/compiler/builders/channel_builder.py @@ -15,7 +15,7 @@ ChannelType, ) from lava.magma.compiler.utils import PortInitializer -from lava.magma.runtime.message_infrastructure \ +from message_infrastructure \ .message_infrastructure_interface import (MessageInfrastructureInterface) if ty.TYPE_CHECKING: diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index 80fa608b7..015f1b07f 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -16,8 +16,8 @@ ) if ty.TYPE_CHECKING: - from lava.magma.runtime.message_infrastructure \ - .message_infrastructure_interface import ( + from message_infrastructure.message_infrastructure_interface \ + import ( MessageInfrastructureInterface) diff --git a/src/lava/magma/core/process/process.py b/src/lava/magma/core/process/process.py index 7e2da39d3..410ce1203 100644 --- a/src/lava/magma/core/process/process.py +++ b/src/lava/magma/core/process/process.py @@ -11,7 +11,7 @@ from lava.magma.core.process.interfaces import \ AbstractProcessMember, IdGeneratorSingleton -from lava.magma.core.process.message_interface_enum import ActorType +from message_infrastructure.message_interface_enum import ActorType from lava.magma.core.process.ports.ports import \ InPort, OutPort, RefPort, VarPort from lava.magma.core.process.variable import Var diff --git a/src/lava/magma/runtime/message_infrastructure/CMakeLists.txt b/src/lava/magma/runtime/message_infrastructure/CMakeLists.txt index 83bd84834..0c8a56eab 100644 --- a/src/lava/magma/runtime/message_infrastructure/CMakeLists.txt +++ b/src/lava/magma/runtime/message_infrastructure/CMakeLists.txt @@ -4,7 +4,10 @@ project(message_passing) set (CMAKE_CXX_STANDARD 17) include_directories(message_infrastructure/csrc) -file (GLOB MULTI_PROC_SRCS "message_infrastructure/csrc/multiprocessing.cc") +set(MULTI_PROC_SRCS + "message_infrastructure/csrc/multiprocessing.cc" + "message_infrastructure/csrc/posix_actor.cc" +) file (GLOB PY_WRAPPER "message_infrastructure/csrc/message_infrastructure_py_wrapper.cc") diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/__init__.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/__init__.py index e69de29bb..6b8b8c436 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/__init__.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/__init__.py @@ -0,0 +1,4 @@ +from MessageInfrastructurePywrapper import CppMultiProcessing +from MessageInfrastructurePywrapper import SharedMemManager +from MessageInfrastructurePywrapper import Actor +from MessageInfrastructurePywrapper import ProcessType diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h index 91bf29767..85ce96a9a 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h @@ -5,31 +5,41 @@ #ifndef ABSTRACT_ACTOR_H_ #define ABSTRACT_ACTOR_H_ +#include + namespace message_infrastructure { +enum ActorStatus { + StatsError = -1, + StatsRuning = 0, + StatsStopped = 1 +}; + class AbstractActor { public: virtual int GetPid() = 0; virtual int Stop() = 0; - virtual int Pause() = 0; - int pid_; }; class PosixActor : public AbstractActor { public: - explicit PosixActor(int pid) { + explicit PosixActor(int pid, std::function target_fn) { this->pid_ = pid; + this->target_fn_ = target_fn; } int GetPid() { return this->pid_; } - int Stop() { - return 0; - } - int Pause() { - return 0; + int Wait(); + int Stop(); + int GetStatus() { + return this->status_; } + // int Trace(); + private: + std::function target_fn_ = NULL; + int status_ = StatsStopped; }; using ActorPtr = AbstractActor *; diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_logging.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_logging.h new file mode 100644 index 000000000..674e81855 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_logging.h @@ -0,0 +1,28 @@ +// Copyright (C) 2021 Intel Corporation +// SPDX-License-Identifier: BSD-3-Clause +// See: https://spdx.org/licenses/ + +#ifndef MESSAGE_INFRASTRUCTURE_LOGGING_H_ +#define MESSAGE_INFRASTRUCTURE_LOGGING_H_ + +#include +#define LOG_MP (1) // log for multiprocessing + + +#define LAVA_LOG(_cond, _fmt, ...) { \ + if ((_cond)) { \ + printf("[CPP INFO]" _fmt, ## __VA_ARGS__); \ + } \ +} + +#define LAVA_LOG_WARN(_cond, _fmt, ...) { \ + if ((_cond)) { \ + printf("[CPP WARNING]" _fmt, __FUNCTION__, ## __VA_ARGS__); \ + } \ +} + +#define LAVA_LOG_ERR(_fmt, ...) { \ + printf("[CPP ERROR]" _fmt, __FUNCTION__, ## __VA_ARGS__); \ +} + +#endif // MESSAGE_INFRASTRUCTURE_LOGGING_H_ diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc index fed050245..989558e88 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc @@ -4,9 +4,11 @@ #include #include +#include #include "multiprocessing.h" -// #include "shm.h" +#include "abstract_actor.h" +#include "shm.h" // #include "shmem_channel.h" // #include "shmem_port.h" @@ -15,11 +17,27 @@ namespace message_infrastructure { namespace py = pybind11; PYBIND11_MODULE(MessageInfrastructurePywrapper, m) { - py::class_ (m, "MultiProcessing") + py::class_ (m, "CppMultiProcessing") .def(py::init<>()) .def("build_actor", &MultiProcessing::BuildActor) .def("check_actor", &MultiProcessing::CheckActor) + .def("get_actors", &MultiProcessing::GetActors) + .def("get_shmm", &MultiProcessing::GetSharedMemManager) .def("stop", &MultiProcessing::Stop); + py::enum_ (m, "ProcessType") + .value("ErrorProcess", ErrorProcess) + .value("ChildProcess", ChildProcess) + .value("ParentProcess", ParentProcess); + py::class_ (m, "SharedMemManager") + .def(py::init<>()) + .def("alloc_mem", &SharedMemManager::AllocSharedMemory) + .def("stop", &SharedMemManager::Stop); + py::class_ (m, "Actor") + .def("wait", &PosixActor::Wait) + .def("stop", &PosixActor::Stop) + .def("get_status", &PosixActor::GetStatus); + // .def("trace", &PosixActor::Trace); + /* py::class_ (m, "ShmemSendPort") .def(py::init()) diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc index b52f1b7e2..538994741 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc @@ -3,43 +3,61 @@ // See: https://spdx.org/licenses/ #include "multiprocessing.h" +#include "message_infrastructure_logging.h" #include #include -#include namespace message_infrastructure { -#define CPP_INFO "[CPP_INFO] " +MultiProcessing::MultiProcessing() { + shmm_ = new SharedMemManager(); +} -void MultiProcessing::BuildActor(std::function target_fn) { +int MultiProcessing::BuildActor(std::function target_fn) { pid_t pid = fork(); - if (pid < 0) { - std::cout << CPP_INFO << "cannot allocate pid\n"; - } - if (pid > 0) { - std::cout << CPP_INFO << "parent, create actor\n"; - ActorPtr actor = new PosixActor(pid); + LAVA_LOG(LOG_MP, "Parent Process, create child process %d\n", pid); + ActorPtr actor = new PosixActor(pid, target_fn); actors_.push_back(actor); + return ParentProcess; } if (pid == 0) { - std::cout << CPP_INFO << "child, new process\n"; + LAVA_LOG(LOG_MP, "child, new process\n"); target_fn(); exit(0); } + LAVA_LOG_ERR("Cannot allocate new pid for the process\n"); + return ErrorProcess; + } -void MultiProcessing::Stop() { +int MultiProcessing::Stop() { + int error_cnts = 0; + + for (auto actor : actors_) { + error_cnts += actor->Stop(); + } + + LAVA_LOG(LOG_MP, "Stop Actors, error: %d\n", error_cnts); + return error_cnts; } void MultiProcessing::CheckActor() { - for(auto actor : actors_){ - std::cout << CPP_INFO << actor->pid_ << std::endl; + for (auto actor : actors_) { + LAVA_LOG(LOG_MP, "Actor info %d\n", actor->GetPid()); } } +SharedMemManager* MultiProcessing::GetSharedMemManager() { + return this->shmm_; +} + +std::vector& MultiProcessing::GetActors() { + return this->actors_; +} + } // namespace message_infrastructure diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h index 86ee1b7c6..1552803d6 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h @@ -13,15 +13,24 @@ namespace message_infrastructure { +enum ProcessType { + ErrorProcess = -1, + ChildProcess = 0, + ParentProcess = 1 +}; + class MultiProcessing { public: - // stop each actor in vector actors; - void Stop(); - void BuildActor(std::function); + MultiProcessing(); + int Stop(); + int BuildActor(std::function); void CheckActor(); + std::vector& GetActors(); + SharedMemManager* GetSharedMemManager(); + private: std::vector actors_; - // SharedMemManager shmm_; + SharedMemManager *shmm_; }; } // namespace message_infrastructure diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc new file mode 100644 index 000000000..bdc4697b7 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc @@ -0,0 +1,33 @@ +// Copyright (C) 2021 Intel Corporation +// SPDX-License-Identifier: BSD-3-Clause +// See: https://spdx.org/licenses/ + +#include "abstract_actor.h" +#include "message_infrastructure_logging.h" + +#include +#include + +namespace message_infrastructure { + +int PosixActor::Stop() { + int status; + // TODO: Add the options can be as args of the function + int options = 0; + int ret = waitpid(this->pid_, &status, options); + + if (ret < 0) { + LAVA_LOG_ERR("process %d waitpid error\n", this->pid_); + return -1; + } + + // Check the status + return 0; + +} + +int PosixActor::Wait() { + return 0; +} + +} \ No newline at end of file diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h index 3ded37910..8ba9953aa 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h @@ -13,11 +13,48 @@ #include #include +#include + namespace message_infrastructure { class SharedMemory { }; +class SharedMemManager { + public: + int AllocSharedMemory(size_t mem_size) { + int shmid = shmget(key_, mem_size, 0644|IPC_CREAT); + if (shmid < 0) + return -1; + + shms_.push_back(shmid); + key_++; + return shmid; + } + + int DeleteSharedMemory(int shmid) { + // Release specific shared memory + int del_cnt = 0; + for (auto it = shms_.begin(); it != shms_.end(); it++) { + if ((*it) == shmid) { + shms_.erase(it); + del_cnt++; + } + } + return del_cnt; + } + + int Stop() { + int stop_cnt = shms_.size(); + shms_.clear(); + return stop_cnt; + } + + private: + key_t key_ = 0xdead; + std::vector shms_; +}; + } // namespace message_infrastructure #endif // SHM_H_ diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py new file mode 100644 index 000000000..8746012e3 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py @@ -0,0 +1,18 @@ +# Copyright (C) 2021-22 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ +from message_infrastructure.message_interface_enum import ActorType +from message_infrastructure.multiprocessing import MultiProcessing + +"""Factory class to create the messaging infrastructure""" + + +class MessageInfrastructureFactory: + """Creates the message infrastructure instance based on type""" + @staticmethod + def create(factory_type: ActorType): + """type of actor framework being chosen""" + if factory_type == ActorType.MultiProcessing: + return MultiProcessing() + else: + raise Exception("Unsupported factory_type") diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py new file mode 100644 index 000000000..07047421c --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py @@ -0,0 +1,35 @@ +# Copyright (C) 2021-22 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ +import typing as ty +from abc import ABC, abstractmethod + +"""A Message Infrastructure Interface which can create actors which would +participate in message passing/exchange, start and stop them as well as +declare the underlying Channel Infrastructure Class to be used for message +passing implementation.""" + + +class MessageInfrastructureInterface(ABC): + """Interface to provide the ability to create actors which can + communicate via message passing""" + @abstractmethod + def start(self): + """Starts the messaging infrastructure""" + pass + + @abstractmethod + def stop(self): + """Stops the messaging infrastructure""" + pass + + @abstractmethod + def build_actor(self, target_fn: ty.Callable, builder): + """Given a target_fn starts a system process""" + pass + + @property + @abstractmethod + def actors(self) -> ty.List[ty.Any]: + """Returns a list of actors""" + pass diff --git a/src/lava/magma/core/process/message_interface_enum.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_interface_enum.py similarity index 100% rename from src/lava/magma/core/process/message_interface_enum.py rename to src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_interface_enum.py diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py new file mode 100644 index 000000000..fdd590f93 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -0,0 +1,49 @@ +# Copyright (C) 2021-22 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ +import typing as ty +from functools import partial + +from message_infrastructure import CppMultiProcessing +from message_infrastructure import SharedMemManager +from message_infrastructure import Actor + +from message_infrastructure.message_infrastructure_interface \ + import MessageInfrastructureInterface + + +"""Implements the Message Infrastructure Interface using Python +MultiProcessing Library. The MultiProcessing API is used to create actors +which will participate in exchanging messages. The Channel Infrastructure +further uses the SharedMemoryManager from MultiProcessing Library to +implement the communication backend in this implementation.""" + + +class MultiProcessing(MessageInfrastructureInterface): + """Implements message passing using shared memory and multiprocessing""" + + def __init__(self): + self._mp: ty.Optional[CppMultiProcessing] = CppMultiProcessing() + + @property + def actors(self): + """Returns a list of actors""" + return self._mp.get_actors() + + @property + def smm(self): + """Returns the underlying shared memory manager""" + return self._mp.get_shmm() + + def start(self): + """Init the MultiProcessing""" + pass + + def build_actor(self, target_fn: ty.Callable, builder) -> ty.Any: + """Given a target_fn starts a system (os) process""" + bound_target_fn = partial(target_fn, builder=builder) + ret = self._mp.build_actor(bound_target_fn) + + def stop(self): + """Stops the shared memory manager""" + self._mp.stop() diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/nx.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/nx.py new file mode 100644 index 000000000..164c4d819 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/nx.py @@ -0,0 +1,44 @@ +# Copyright (C) 2021-22 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ + +import typing as ty + +from lava.magma.compiler.channels.interfaces import ChannelType +from lava.magma.core.sync.domain import SyncDomain +from lava.magma.runtime.message_infrastructure \ + .message_infrastructure_interface import \ + MessageInfrastructureInterface + + +class NxBoardMsgInterface(MessageInfrastructureInterface): + """Implements message passing using nx board""" + + @property + def actors(self): + """Returns a list of actors""" + pass + + def start(self): + """Starts the shared memory manager""" + pass + + def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ + ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[ + SyncDomain, 'RuntimeServiceBuilder']]) -> ty.Any: + """Given a target_fn starts a system (os) process""" + pass + + def stop(self): + """Stops the shared memory manager""" + pass + + def channel_class(self, channel_type: ChannelType) -> ty.Type[Channel]: + """Given a channel type, returns the shared memory based class + implementation for the same""" + if channel_type == ChannelType.CNc: + return CNcChannel + elif channel_type == ChannelType.NcC: + return NcCChannel + else: + raise Exception(f"Unsupported channel type {channel_type}") diff --git a/src/lava/magma/runtime/message_infrastructure/setenv.sh b/src/lava/magma/runtime/message_infrastructure/setenv.sh index f8d8b5786..2a77fe3ee 100644 --- a/src/lava/magma/runtime/message_infrastructure/setenv.sh +++ b/src/lava/magma/runtime/message_infrastructure/setenv.sh @@ -1,2 +1,3 @@ -export PYTHONPATH=${PWD}/build -export LD_LIBRARY_PATH=${PWD}/build +SCRIPTPATH=$(cd `dirname -- $0` && pwd) +export PYTHONPATH="${SCRIPTPATH}/build:${SCRIPTPATH}:$PYTHONPATH" +export LD_LIBRARY_PATH="${SCRIPTPATH}/build:${SCRIPTPATH}:$LD_LIBRARY_PATH" diff --git a/src/lava/magma/runtime/message_infrastructure/test/test_example.py b/src/lava/magma/runtime/message_infrastructure/test/test_example.py index 40ba6aacf..921ddbe21 100644 --- a/src/lava/magma/runtime/message_infrastructure/test/test_example.py +++ b/src/lava/magma/runtime/message_infrastructure/test/test_example.py @@ -1,23 +1,67 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ +import traceback +from functools import partial -from MessageInfrastructurePywrapper import MultiProcessing -import time +from MessageInfrastructurePywrapper import CppMultiProcessing +from MessageInfrastructurePywrapper import SharedMemManager +from MessageInfrastructurePywrapper import ProcessType +from MessageInfrastructurePywrapper import Actor -def print_hello(): - print("hello") +class Builder(): + def build(self): + print("Builder run build") + + def start(self, *args, **kwargs): + print("Builder run start") + + +def target_fn(*args, **kwargs): + """ + Function to build and attach a system process to + + :param args: List Parameters to be passed onto the process + :param kwargs: Dict Parameters to be passed onto the process + :return: None + """ + try: + builder = kwargs.pop("builder") + actor = builder.build() + except Exception as e: + print("Encountered Fatal Exception: " + str(e)) + print("Traceback: ") + print(traceback.format_exc()) + raise e def main(): - mp = MultiProcessing() + builder = Builder() + mp = CppMultiProcessing() + bound_target_fn = partial(target_fn, builder=builder) + bound_target_fn() for i in range(5): - mp.build_actor(print_hello) + ret = mp.build_actor(bound_target_fn) + if ret == ProcessType.ChildProcess : + print("child process, exit") + exit(0) mp.check_actor() + mp.stop() + + actors = mp.get_actors() + print(actors) + print("actor status: ", actors[0].get_status()) + + shm = mp.get_shmm() + + for i in range(5): + print("shared id:", shm.alloc_mem(10)) + + shm2 = mp.get_shmm() + print("stop num: ", shm2.stop()) + print("stop num: ", shm.stop()) main() -print("sleep 5") -time.sleep(5) diff --git a/src/lava/magma/runtime/message_infrastructure/test/test_multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/test/test_multiprocessing.py new file mode 100644 index 000000000..0dcb83c78 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/test/test_multiprocessing.py @@ -0,0 +1,62 @@ +# Copyright (C) 2021 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ +import traceback +from functools import partial + +from message_infrastructure import CppMultiProcessing +from message_infrastructure import SharedMemManager +from message_infrastructure import ProcessType +from message_infrastructure import Actor +from message_infrastructure.multiprocessing import MultiProcessing + + +class Builder(): + def build(self, i): + print("Builder run build ", i) + + +def target_fn(*args, **kwargs): + """ + Function to build and attach a system process to + + :param args: List Parameters to be passed onto the process + :param kwargs: Dict Parameters to be passed onto the process + :return: None + """ + try: + builder = kwargs.pop("builder") + idx = kwargs.pop("idx") + builder.build(idx) + except Exception as e: + print("Encountered Fatal Exception: " + str(e)) + print("Traceback: ") + print(traceback.format_exc()) + raise e + + +def test_multiprocessing(): + mp = MultiProcessing() + mp.start() + builder = Builder() + for i in range(5): + bound_target_fn = partial(target_fn, idx=i) + ret = mp.build_actor(bound_target_fn, builder) + if ret == ProcessType.ChildProcess : + print("child process, exit") + exit(0) + + shmm = mp.smm + for i in range(5): + print("shared memory id: ", shmm.alloc_mem(8)) + + actors = mp.actors + print(actors) + print("actor status: ", actors[0].get_status()) + print("stop num: ", shmm.stop()) + print("stop num: ", shmm.stop()) + + mp.stop() + + +test_multiprocessing() diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index cd4712c7d..a7731ef13 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -12,11 +12,9 @@ import numpy as np from lava.magma.compiler.channels.pypychannel import CspRecvPort, CspSendPort from lava.magma.compiler.var_model import AbstractVarModel -from lava.magma.core.process.message_interface_enum import ActorType -from lava.magma.runtime.message_infrastructure.factory import \ - MessageInfrastructureFactory -from lava.magma.runtime.message_infrastructure. \ - message_infrastructure_interface import \ +from message_infrastructure.message_interface_enum import ActorType +from message_infrastructure.factory import MessageInfrastructureFactory +from message_infrastructure.message_infrastructure_interface import \ MessageInfrastructureInterface from lava.magma.runtime.mgmt_token_enums import (MGMT_COMMAND, MGMT_RESPONSE, enum_equal, enum_to_np) diff --git a/tests/lava/magma/runtime/test_runtime.py b/tests/lava/magma/runtime/test_runtime.py index e25295db1..d65564f02 100644 --- a/tests/lava/magma/runtime/test_runtime.py +++ b/tests/lava/magma/runtime/test_runtime.py @@ -3,7 +3,7 @@ from unittest.mock import Mock, PropertyMock from lava.magma.compiler.executable import Executable -from lava.magma.core.process.message_interface_enum import ActorType +from message_infrastructure.message_interface_enum import ActorType from lava.magma.core.resources import HeadNode, Loihi2System from lava.magma.compiler.node import Node, NodeConfig from lava.magma.runtime.runtime import Runtime