From c8dd3ede9d14ec5839b109fe1571efba7d3cd926 Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Fri, 12 Aug 2022 16:45:02 +0800 Subject: [PATCH 01/11] Copy the python file from depricated_message_infrastructure/*.py --- .../message_infrastructure/factory.py | 19 +++ .../message_infrastructure_interface.py | 51 ++++++++ .../message_infrastructure/multiprocessing.py | 113 ++++++++++++++++++ .../message_infrastructure/nx.py | 44 +++++++ .../runtime/message_infrastructure/setenv.sh | 5 +- 5 files changed, 230 insertions(+), 2 deletions(-) create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/nx.py diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py new file mode 100644 index 000000000..5588e1aaf --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py @@ -0,0 +1,19 @@ +# Copyright (C) 2021-22 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ +from lava.magma.core.process.message_interface_enum import ActorType +from lava.magma.runtime.message_infrastructure.multiprocessing import \ + MultiProcessing + +"""Factory class to create the messaging infrastructure""" + + +class MessageInfrastructureFactory: + """Creates the message infrastructure instance based on type""" + @staticmethod + def create(factory_type: ActorType): + """type of actor framework being chosen""" + if factory_type == ActorType.MultiProcessing: + return MultiProcessing() + else: + raise Exception("Unsupported factory_type") diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py new file mode 100644 index 000000000..ac88850da --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py @@ -0,0 +1,51 @@ +# Copyright (C) 2021-22 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ +import typing as ty +if ty.TYPE_CHECKING: + from lava.magma.core.process.process import AbstractProcess + from lava.magma.compiler.builders.py_builder import PyProcessBuilder + from lava.magma.compiler.builders.runtimeservice_builder import \ + RuntimeServiceBuilder +from abc import ABC, abstractmethod + +from lava.magma.compiler.channels.interfaces import ChannelType, Channel +from lava.magma.core.sync.domain import SyncDomain + +"""A Message Infrastructure Interface which can create actors which would +participate in message passing/exchange, start and stop them as well as +declare the underlying Channel Infrastructure Class to be used for message +passing implementation.""" + + +class MessageInfrastructureInterface(ABC): + """Interface to provide the ability to create actors which can + communicate via message passing""" + @abstractmethod + def start(self): + """Starts the messaging infrastructure""" + pass + + @abstractmethod + def stop(self): + """Stops the messaging infrastructure""" + pass + + @abstractmethod + def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ + ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[ + SyncDomain, 'RuntimeServiceBuilder']]): + """Given a target_fn starts a system process""" + pass + + @property + @abstractmethod + def actors(self) -> ty.List[ty.Any]: + """Returns a list of actors""" + pass + + @abstractmethod + def channel_class(self, channel_type: ChannelType) -> ty.Type[Channel]: + """Given the Channel Type, Return the Channel Implementation to + be used during execution""" + pass diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py new file mode 100644 index 000000000..5b374afe2 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -0,0 +1,113 @@ +# Copyright (C) 2021-22 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ +import typing as ty +if ty.TYPE_CHECKING: + from lava.magma.core.process.process import AbstractProcess + from lava.magma.compiler.builders.py_builder import PyProcessBuilder + from lava.magma.compiler.builders.runtimeservice_builder import \ + RuntimeServiceBuilder + +import multiprocessing as mp +import os +from multiprocessing.managers import SharedMemoryManager +import traceback + +from lava.magma.compiler.channels.interfaces import ChannelType, Channel +from lava.magma.compiler.channels.pypychannel import PyPyChannel +try: + from lava.magma.compiler.channels.cpychannel import \ + CPyChannel, PyCChannel +except ImportError: + class CPyChannel: + pass + + class PyCChannel: + pass + +from lava.magma.core.sync.domain import SyncDomain +from lava.magma.runtime.message_infrastructure.message_infrastructure_interface\ + import MessageInfrastructureInterface + + +"""Implements the Message Infrastructure Interface using Python +MultiProcessing Library. The MultiProcessing API is used to create actors +which will participate in exchanging messages. The Channel Infrastructure +further uses the SharedMemoryManager from MultiProcessing Library to +implement the communication backend in this implementation.""" + + +class SystemProcess(mp.Process): + """Wraps a process so that the exceptions can be collected if present""" + + def __init__(self, *args, **kwargs): + mp.Process.__init__(self, *args, **kwargs) + self._pconn, self._cconn = mp.Pipe() + self._exception = None + + def run(self): + try: + mp.Process.run(self) + self._cconn.send(None) + except Exception as e: + tb = traceback.format_exc() + self._cconn.send((e, tb)) + + @property + def exception(self): + if self._pconn.poll(): + self._exception = self._pconn.recv() + return self._exception + + +class MultiProcessing(MessageInfrastructureInterface): + """Implements message passing using shared memory and multiprocessing""" + + def __init__(self): + self._smm: ty.Optional[SharedMemoryManager] = None + self._actors: ty.List[SystemProcess] = [] + + @property + def actors(self): + """Returns a list of actors""" + return self._actors + + @property + def smm(self): + """Returns the underlying shared memory manager""" + return self._smm + + def start(self): + """Starts the shared memory manager""" + self._smm = SharedMemoryManager() + self._smm.start() + + def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ + ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[ + SyncDomain, 'RuntimeServiceBuilder']]) -> ty.Any: + """Given a target_fn starts a system (os) process""" + system_process = SystemProcess(target=target_fn, + args=(), + kwargs={"builder": builder}) + system_process.start() + self._actors.append(system_process) + return system_process + + def stop(self): + """Stops the shared memory manager""" + for actor in self._actors: + if actor._parent_pid == os.getpid(): + actor.join() + self._smm.shutdown() + + def channel_class(self, channel_type: ChannelType) -> ty.Type[Channel]: + """Given a channel type, returns the shared memory based class + implementation for the same""" + if channel_type == ChannelType.PyPy: + return PyPyChannel + elif channel_type == ChannelType.PyC: + return PyCChannel + elif channel_type == ChannelType.CPy: + return CPyChannel + else: + raise Exception(f"Unsupported channel type {channel_type}") diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/nx.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/nx.py new file mode 100644 index 000000000..164c4d819 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/nx.py @@ -0,0 +1,44 @@ +# Copyright (C) 2021-22 Intel Corporation +# SPDX-License-Identifier: LGPL 2.1 or later +# See: https://spdx.org/licenses/ + +import typing as ty + +from lava.magma.compiler.channels.interfaces import ChannelType +from lava.magma.core.sync.domain import SyncDomain +from lava.magma.runtime.message_infrastructure \ + .message_infrastructure_interface import \ + MessageInfrastructureInterface + + +class NxBoardMsgInterface(MessageInfrastructureInterface): + """Implements message passing using nx board""" + + @property + def actors(self): + """Returns a list of actors""" + pass + + def start(self): + """Starts the shared memory manager""" + pass + + def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ + ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[ + SyncDomain, 'RuntimeServiceBuilder']]) -> ty.Any: + """Given a target_fn starts a system (os) process""" + pass + + def stop(self): + """Stops the shared memory manager""" + pass + + def channel_class(self, channel_type: ChannelType) -> ty.Type[Channel]: + """Given a channel type, returns the shared memory based class + implementation for the same""" + if channel_type == ChannelType.CNc: + return CNcChannel + elif channel_type == ChannelType.NcC: + return NcCChannel + else: + raise Exception(f"Unsupported channel type {channel_type}") diff --git a/src/lava/magma/runtime/message_infrastructure/setenv.sh b/src/lava/magma/runtime/message_infrastructure/setenv.sh index f8d8b5786..b21e3babd 100644 --- a/src/lava/magma/runtime/message_infrastructure/setenv.sh +++ b/src/lava/magma/runtime/message_infrastructure/setenv.sh @@ -1,2 +1,3 @@ -export PYTHONPATH=${PWD}/build -export LD_LIBRARY_PATH=${PWD}/build +SCRIPTPATH=$(cd `dirname -- $0` && pwd) +export PYTHONPATH="${SCRIPTPATH}/build:$PYTHONPATH" +export LD_LIBRARY_PATH="${SCRIPTPATH}/build:$LD_LIBRARY_PATH" From 97e9a8f8df1eef91f8b064aa5304bc97d9f25935 Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Fri, 12 Aug 2022 17:12:31 +0800 Subject: [PATCH 02/11] change multiprocessing.py to adapt interface --- .../csrc/message_infrastructure_py_wrapper.cc | 2 +- .../message_infrastructure/factory.py | 2 +- .../message_infrastructure/multiprocessing.py | 63 ++++++++----------- 3 files changed, 28 insertions(+), 39 deletions(-) diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc index fed050245..7c7bf2883 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc @@ -15,7 +15,7 @@ namespace message_infrastructure { namespace py = pybind11; PYBIND11_MODULE(MessageInfrastructurePywrapper, m) { - py::class_ (m, "MultiProcessing") + py::class_ (m, "CppMultiProcessing") .def(py::init<>()) .def("build_actor", &MultiProcessing::BuildActor) .def("check_actor", &MultiProcessing::CheckActor) diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py index 5588e1aaf..b072ef195 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ from lava.magma.core.process.message_interface_enum import ActorType -from lava.magma.runtime.message_infrastructure.multiprocessing import \ +from lava.magma.runtime.message_infrastructure.message_infrastructure.multiprocessing import \ MultiProcessing """Factory class to create the messaging infrastructure""" diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py index 5b374afe2..d28845cb8 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -1,6 +1,7 @@ # Copyright (C) 2021-22 Intel Corporation # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ +from pty import CHILD import typing as ty if ty.TYPE_CHECKING: from lava.magma.core.process.process import AbstractProcess @@ -8,13 +9,15 @@ from lava.magma.compiler.builders.runtimeservice_builder import \ RuntimeServiceBuilder -import multiprocessing as mp -import os -from multiprocessing.managers import SharedMemoryManager -import traceback +from MessageInfrastructurePywrapper import CppMultiProcessing +from MessageInfrastructurePywrapper import SharedMemoryManager +from MessageInfrastructurePywrapper import Actor + +from enum import Enum from lava.magma.compiler.channels.interfaces import ChannelType, Channel from lava.magma.compiler.channels.pypychannel import PyPyChannel + try: from lava.magma.compiler.channels.cpychannel import \ CPyChannel, PyCChannel @@ -26,8 +29,8 @@ class PyCChannel: pass from lava.magma.core.sync.domain import SyncDomain -from lava.magma.runtime.message_infrastructure.message_infrastructure_interface\ - import MessageInfrastructureInterface +from lava.magma.runtime.message_infrastructure.message_infrastructure\ + .message_infrastructure_interface import MessageInfrastructureInterface """Implements the Message Infrastructure Interface using Python @@ -37,40 +40,24 @@ class PyCChannel: implement the communication backend in this implementation.""" -class SystemProcess(mp.Process): - """Wraps a process so that the exceptions can be collected if present""" - - def __init__(self, *args, **kwargs): - mp.Process.__init__(self, *args, **kwargs) - self._pconn, self._cconn = mp.Pipe() - self._exception = None - - def run(self): - try: - mp.Process.run(self) - self._cconn.send(None) - except Exception as e: - tb = traceback.format_exc() - self._cconn.send((e, tb)) - - @property - def exception(self): - if self._pconn.poll(): - self._exception = self._pconn.recv() - return self._exception +class ProcessType(Enum): + ERR_PROC = -1 + PARENT_PROC = 0 + CHILD_PROC = 1 class MultiProcessing(MessageInfrastructureInterface): """Implements message passing using shared memory and multiprocessing""" def __init__(self): + self._mp: ty.Optional[CppMultiProcessing] = None self._smm: ty.Optional[SharedMemoryManager] = None - self._actors: ty.List[SystemProcess] = [] + self._actors: ty.List[Actor] = [] @property def actors(self): """Returns a list of actors""" - return self._actors + return self._mp.get_actors() @property def smm(self): @@ -79,26 +66,28 @@ def smm(self): def start(self): """Starts the shared memory manager""" + self._mp = CppMultiProcessing() self._smm = SharedMemoryManager() - self._smm.start() def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[ SyncDomain, 'RuntimeServiceBuilder']]) -> ty.Any: """Given a target_fn starts a system (os) process""" + system_process = SystemProcess(target=target_fn, args=(), kwargs={"builder": builder}) - system_process.start() - self._actors.append(system_process) - return system_process + ret = self._mp.build_actor() + if ret == ProcessType.ERR_PROC: + exit(-1) + if ret == ProcessType.CHILD_PROC: + target_fn(args=(), kwargs={"builder": builder}) + exit(0) def stop(self): """Stops the shared memory manager""" - for actor in self._actors: - if actor._parent_pid == os.getpid(): - actor.join() - self._smm.shutdown() + self._mp.stop() + self._smm.stop() def channel_class(self, channel_type: ChannelType) -> ty.Type[Channel]: """Given a channel type, returns the shared memory based class From 8ef9c5da80cf58e01e2b67c1b59372edb9b4521d Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Mon, 15 Aug 2022 14:54:37 +0800 Subject: [PATCH 03/11] fix cpplint error --- .../message_infrastructure/factory.py | 4 ++-- .../message_infrastructure/multiprocessing.py | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py index b072ef195..510a1c733 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py @@ -2,8 +2,8 @@ # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ from lava.magma.core.process.message_interface_enum import ActorType -from lava.magma.runtime.message_infrastructure.message_infrastructure.multiprocessing import \ - MultiProcessing +from lava.magma.runtime.message_infrastructure.message_infrastructure \ + .multiprocessing import MultiProcessing """Factory class to create the messaging infrastructure""" diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py index d28845cb8..4307519b9 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -13,7 +13,7 @@ from MessageInfrastructurePywrapper import SharedMemoryManager from MessageInfrastructurePywrapper import Actor -from enum import Enum +from enum import Enum from lava.magma.compiler.channels.interfaces import ChannelType, Channel from lava.magma.compiler.channels.pypychannel import PyPyChannel @@ -74,14 +74,12 @@ def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ SyncDomain, 'RuntimeServiceBuilder']]) -> ty.Any: """Given a target_fn starts a system (os) process""" - system_process = SystemProcess(target=target_fn, - args=(), - kwargs={"builder": builder}) ret = self._mp.build_actor() if ret == ProcessType.ERR_PROC: exit(-1) if ret == ProcessType.CHILD_PROC: - target_fn(args=(), kwargs={"builder": builder}) + kwargs = {"builder": builder} + target_fn(**kwargs) exit(0) def stop(self): From 8eadce5d2ff1a76780c937841a6b32597ac97040 Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Mon, 15 Aug 2022 18:10:14 +0800 Subject: [PATCH 04/11] Update cpplib side MultiProcessing: 1. message_infrastructure_logging.h: logging for development 2. abstract_actor.h&posix_actor.cc: achieve stop function for signal 3. shm.h: Add SharedMemoryManager to cpplib 4. others: Update CMake, test and fix some type error' fix pylint error --- .../message_infrastructure/CMakeLists.txt | 5 ++- .../csrc/abstract_actor.h | 9 +---- .../csrc/message_infrastructure_logging.h | 28 +++++++++++++++ .../csrc/message_infrastructure_py_wrapper.cc | 6 +++- .../csrc/multiprocessing.cc | 35 +++++++++++-------- .../csrc/multiprocessing.h | 10 ++++-- .../csrc/posix_actor.cc | 29 +++++++++++++++ .../message_infrastructure/csrc/shm.h | 24 +++++++++++++ .../message_infrastructure/multiprocessing.py | 2 +- .../test/test_example.py | 21 ++++++----- src/lava/magma/runtime/runtime.py | 8 ++--- 11 files changed, 136 insertions(+), 41 deletions(-) create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_logging.h create mode 100644 src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc diff --git a/src/lava/magma/runtime/message_infrastructure/CMakeLists.txt b/src/lava/magma/runtime/message_infrastructure/CMakeLists.txt index 83bd84834..0c8a56eab 100644 --- a/src/lava/magma/runtime/message_infrastructure/CMakeLists.txt +++ b/src/lava/magma/runtime/message_infrastructure/CMakeLists.txt @@ -4,7 +4,10 @@ project(message_passing) set (CMAKE_CXX_STANDARD 17) include_directories(message_infrastructure/csrc) -file (GLOB MULTI_PROC_SRCS "message_infrastructure/csrc/multiprocessing.cc") +set(MULTI_PROC_SRCS + "message_infrastructure/csrc/multiprocessing.cc" + "message_infrastructure/csrc/posix_actor.cc" +) file (GLOB PY_WRAPPER "message_infrastructure/csrc/message_infrastructure_py_wrapper.cc") diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h index 91bf29767..a99bfa2df 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h @@ -11,8 +11,6 @@ class AbstractActor { public: virtual int GetPid() = 0; virtual int Stop() = 0; - virtual int Pause() = 0; - int pid_; }; @@ -24,12 +22,7 @@ class PosixActor : public AbstractActor { int GetPid() { return this->pid_; } - int Stop() { - return 0; - } - int Pause() { - return 0; - } + int Stop(); }; using ActorPtr = AbstractActor *; diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_logging.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_logging.h new file mode 100644 index 000000000..674e81855 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_logging.h @@ -0,0 +1,28 @@ +// Copyright (C) 2021 Intel Corporation +// SPDX-License-Identifier: BSD-3-Clause +// See: https://spdx.org/licenses/ + +#ifndef MESSAGE_INFRASTRUCTURE_LOGGING_H_ +#define MESSAGE_INFRASTRUCTURE_LOGGING_H_ + +#include +#define LOG_MP (1) // log for multiprocessing + + +#define LAVA_LOG(_cond, _fmt, ...) { \ + if ((_cond)) { \ + printf("[CPP INFO]" _fmt, ## __VA_ARGS__); \ + } \ +} + +#define LAVA_LOG_WARN(_cond, _fmt, ...) { \ + if ((_cond)) { \ + printf("[CPP WARNING]" _fmt, __FUNCTION__, ## __VA_ARGS__); \ + } \ +} + +#define LAVA_LOG_ERR(_fmt, ...) { \ + printf("[CPP ERROR]" _fmt, __FUNCTION__, ## __VA_ARGS__); \ +} + +#endif // MESSAGE_INFRASTRUCTURE_LOGGING_H_ diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc index 7c7bf2883..0d19d8f29 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc @@ -6,7 +6,7 @@ #include #include "multiprocessing.h" -// #include "shm.h" +#include "shm.h" // #include "shmem_channel.h" // #include "shmem_port.h" @@ -20,6 +20,10 @@ PYBIND11_MODULE(MessageInfrastructurePywrapper, m) { .def("build_actor", &MultiProcessing::BuildActor) .def("check_actor", &MultiProcessing::CheckActor) .def("stop", &MultiProcessing::Stop); + py::class_ (m, "SharedMemManager") + .def(py::init<>()) + .def("alloc_mem", &SharedMemManager::AllocSharedMemory); + /* py::class_ (m, "ShmemSendPort") .def(py::init()) diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc index b52f1b7e2..9bf4b9a9f 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc @@ -3,42 +3,47 @@ // See: https://spdx.org/licenses/ #include "multiprocessing.h" +#include "message_infrastructure_logging.h" #include #include -#include namespace message_infrastructure { -#define CPP_INFO "[CPP_INFO] " - -void MultiProcessing::BuildActor(std::function target_fn) { +int MultiProcessing::BuildActor() { pid_t pid = fork(); - if (pid < 0) { - std::cout << CPP_INFO << "cannot allocate pid\n"; - } - if (pid > 0) { - std::cout << CPP_INFO << "parent, create actor\n"; + LAVA_LOG(LOG_MP, "Parent Process, create child process %d\n", pid); ActorPtr actor = new PosixActor(pid); actors_.push_back(actor); + return PARENT_PROCESS; } if (pid == 0) { - std::cout << CPP_INFO << "child, new process\n"; - target_fn(); - exit(0); + LAVA_LOG(LOG_MP, "child, new process\n"); + return CHILD_PROCESS; } + LAVA_LOG_ERR("Cannot allocate new pid for the process\n"); + return ERROR_PROCESS; + } -void MultiProcessing::Stop() { +int MultiProcessing::Stop() { + int error_cnts = 0; + + for (auto actor : actors_) { + error_cnts += actor->Stop(); + } + + LAVA_LOG(LOG_MP, "Stop Actors, error: %d\n", error_cnts); + return error_cnts; } void MultiProcessing::CheckActor() { - for(auto actor : actors_){ - std::cout << CPP_INFO << actor->pid_ << std::endl; + for (auto actor : actors_) { + LAVA_LOG(LOG_MP, "Actor info %d\n", actor->GetPid()); } } diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h index 86ee1b7c6..aa19690f1 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h @@ -13,11 +13,17 @@ namespace message_infrastructure { +enum ChannelType { + ERROR_PROCESS = -1, + CHILD_PROCESS = 0, + PARENT_PROCESS = 1 +}; + class MultiProcessing { public: // stop each actor in vector actors; - void Stop(); - void BuildActor(std::function); + int Stop(); + int BuildActor(); void CheckActor(); private: std::vector actors_; diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc new file mode 100644 index 000000000..315fcea48 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc @@ -0,0 +1,29 @@ +// Copyright (C) 2021 Intel Corporation +// SPDX-License-Identifier: BSD-3-Clause +// See: https://spdx.org/licenses/ + +#include "abstract_actor.h" +#include "message_infrastructure_logging.h" + +#include +#include + +namespace message_infrastructure { + +int PosixActor::Stop() { + int status; + // TODO: Add the options can be as args of the function + int options = 0; + int ret = waitpid(this->pid_, &status, options); + + if (ret < 0) { + LAVA_LOG_ERR("process %d waitpid error\n", this->pid_); + return -1; + } + + // Check the status + return 0; + +} + +} \ No newline at end of file diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h index 3ded37910..7ee89f021 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h @@ -13,11 +13,35 @@ #include #include +#include + namespace message_infrastructure { class SharedMemory { }; +class SharedMemManager { + public: + int AllocSharedMemory(size_t mem_size) { + int shmid = shmget(key_, mem_size, 0644|IPC_CREAT); + if (shmid < 0) + return -1; + + shms_.push_back(shmid); + key_++; + return shmid; + } + + int DeleteSharedMemory(int key) { + // Release specific shared memroy + return 0; + } + + private: + key_t key_ = 0xdead; + std::vector shms_; +}; + } // namespace message_infrastructure #endif // SHM_H_ diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py index 4307519b9..d96928a99 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -57,7 +57,7 @@ def __init__(self): @property def actors(self): """Returns a list of actors""" - return self._mp.get_actors() + return self._actors @property def smm(self): diff --git a/src/lava/magma/runtime/message_infrastructure/test/test_example.py b/src/lava/magma/runtime/message_infrastructure/test/test_example.py index 40ba6aacf..973e85c01 100644 --- a/src/lava/magma/runtime/message_infrastructure/test/test_example.py +++ b/src/lava/magma/runtime/message_infrastructure/test/test_example.py @@ -2,22 +2,25 @@ # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ -from MessageInfrastructurePywrapper import MultiProcessing +from MessageInfrastructurePywrapper import CppMultiProcessing +from MessageInfrastructurePywrapper import SharedMemManager import time -def print_hello(): - print("hello") - - def main(): - mp = MultiProcessing() + mp = CppMultiProcessing() for i in range(5): - mp.build_actor(print_hello) + ret = mp.build_actor() + if ret == 0 : + print("child process, exit") + exit(0) mp.check_actor() + mp.stop() + + shm = SharedMemManager() + for i in range(5): + print("shared id:", shm.alloc_mem(10)) main() -print("sleep 5") -time.sleep(5) diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index cd4712c7d..ddeaf6040 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -13,10 +13,10 @@ from lava.magma.compiler.channels.pypychannel import CspRecvPort, CspSendPort from lava.magma.compiler.var_model import AbstractVarModel from lava.magma.core.process.message_interface_enum import ActorType -from lava.magma.runtime.message_infrastructure.factory import \ - MessageInfrastructureFactory -from lava.magma.runtime.message_infrastructure. \ - message_infrastructure_interface import \ +from lava.magma.runtime.message_infrastructure.message_infrastructure \ + .factory import MessageInfrastructureFactory +from lava.magma.runtime.message_infrastructure.message_infrastructure \ + .message_infrastructure_interface import \ MessageInfrastructureInterface from lava.magma.runtime.mgmt_token_enums import (MGMT_COMMAND, MGMT_RESPONSE, enum_equal, enum_to_np) From 63acc749855ae54ed49124726b0a55fb49af3766 Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Mon, 15 Aug 2022 18:38:46 +0800 Subject: [PATCH 05/11] Fix MultiProcessing Error and Cpplint Error --- .../message_infrastructure/message_infrastructure/csrc/shm.h | 1 - .../message_infrastructure/multiprocessing.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h index 7ee89f021..9a71d84dc 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h @@ -36,7 +36,6 @@ class SharedMemManager { // Release specific shared memroy return 0; } - private: key_t key_ = 0xdead; std::vector shms_; diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py index d96928a99..b56c1c2e7 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -10,7 +10,7 @@ RuntimeServiceBuilder from MessageInfrastructurePywrapper import CppMultiProcessing -from MessageInfrastructurePywrapper import SharedMemoryManager +from MessageInfrastructurePywrapper import SharedMemManager from MessageInfrastructurePywrapper import Actor from enum import Enum From 81c2e1d8a8f2a9f5a3b281fd0b28c4a57d2e5e2e Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Mon, 15 Aug 2022 18:44:21 +0800 Subject: [PATCH 06/11] Remove Actor interface for python --- .../message_infrastructure/multiprocessing.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py index b56c1c2e7..63286e85b 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -11,7 +11,6 @@ from MessageInfrastructurePywrapper import CppMultiProcessing from MessageInfrastructurePywrapper import SharedMemManager -from MessageInfrastructurePywrapper import Actor from enum import Enum From 967f707557621e2d7707d720b6c9af7749d58453 Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Mon, 15 Aug 2022 19:01:52 +0800 Subject: [PATCH 07/11] Fix package path error --- src/lava/magma/compiler/builders/channel_builder.py | 2 +- src/lava/magma/compiler/channels/pypychannel.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lava/magma/compiler/builders/channel_builder.py b/src/lava/magma/compiler/builders/channel_builder.py index 5ffdc76ce..57aa6b340 100644 --- a/src/lava/magma/compiler/builders/channel_builder.py +++ b/src/lava/magma/compiler/builders/channel_builder.py @@ -15,7 +15,7 @@ ChannelType, ) from lava.magma.compiler.utils import PortInitializer -from lava.magma.runtime.message_infrastructure \ +from lava.magma.runtime.message_infrastructure.message_infrastructure \ .message_infrastructure_interface import (MessageInfrastructureInterface) if ty.TYPE_CHECKING: diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index 80fa608b7..754955c4b 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -17,7 +17,8 @@ if ty.TYPE_CHECKING: from lava.magma.runtime.message_infrastructure \ - .message_infrastructure_interface import ( + .message_infrastructure.message_infrastructure_interface \ + import ( MessageInfrastructureInterface) From 4cd04edf2d9fa27aa997c5190108dd95ae34f237 Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Tue, 16 Aug 2022 22:01:20 +0800 Subject: [PATCH 08/11] Update API Interface 1. multiprocessing.py: Fix bug for sharedmemmanager name 2. remove lava package dependency for message_infrastructure 3. multiprocessing.cc: Add shmm and actors function 4. abstract_actor.h: Modify the posix actor and interface design 5. add and update test for multiprocessing Signed-off-by: Miao, Hongda --- .../message_infrastructure/__init__.py | 4 + .../csrc/abstract_actor.h | 19 ++++- .../csrc/message_infrastructure_py_wrapper.cc | 16 +++- .../csrc/multiprocessing.cc | 23 ++++-- .../csrc/multiprocessing.h | 17 +++-- .../csrc/posix_actor.cc | 4 + .../message_infrastructure/csrc/shm.h | 20 ++++- .../message_infrastructure_interface.py | 18 +---- .../message_infrastructure/multiprocessing.py | 74 +++---------------- .../test/test_example.py | 49 +++++++++++- .../test/test_multiprocessing.py | 62 ++++++++++++++++ 11 files changed, 206 insertions(+), 100 deletions(-) create mode 100644 src/lava/magma/runtime/message_infrastructure/test/test_multiprocessing.py diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/__init__.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/__init__.py index e69de29bb..6b8b8c436 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/__init__.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/__init__.py @@ -0,0 +1,4 @@ +from MessageInfrastructurePywrapper import CppMultiProcessing +from MessageInfrastructurePywrapper import SharedMemManager +from MessageInfrastructurePywrapper import Actor +from MessageInfrastructurePywrapper import ProcessType diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h index a99bfa2df..85ce96a9a 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/abstract_actor.h @@ -5,8 +5,16 @@ #ifndef ABSTRACT_ACTOR_H_ #define ABSTRACT_ACTOR_H_ +#include + namespace message_infrastructure { +enum ActorStatus { + StatsError = -1, + StatsRuning = 0, + StatsStopped = 1 +}; + class AbstractActor { public: virtual int GetPid() = 0; @@ -16,13 +24,22 @@ class AbstractActor { class PosixActor : public AbstractActor { public: - explicit PosixActor(int pid) { + explicit PosixActor(int pid, std::function target_fn) { this->pid_ = pid; + this->target_fn_ = target_fn; } int GetPid() { return this->pid_; } + int Wait(); int Stop(); + int GetStatus() { + return this->status_; + } + // int Trace(); + private: + std::function target_fn_ = NULL; + int status_ = StatsStopped; }; using ActorPtr = AbstractActor *; diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc index 0d19d8f29..989558e88 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/message_infrastructure_py_wrapper.cc @@ -4,8 +4,10 @@ #include #include +#include #include "multiprocessing.h" +#include "abstract_actor.h" #include "shm.h" // #include "shmem_channel.h" // #include "shmem_port.h" @@ -19,10 +21,22 @@ PYBIND11_MODULE(MessageInfrastructurePywrapper, m) { .def(py::init<>()) .def("build_actor", &MultiProcessing::BuildActor) .def("check_actor", &MultiProcessing::CheckActor) + .def("get_actors", &MultiProcessing::GetActors) + .def("get_shmm", &MultiProcessing::GetSharedMemManager) .def("stop", &MultiProcessing::Stop); + py::enum_ (m, "ProcessType") + .value("ErrorProcess", ErrorProcess) + .value("ChildProcess", ChildProcess) + .value("ParentProcess", ParentProcess); py::class_ (m, "SharedMemManager") .def(py::init<>()) - .def("alloc_mem", &SharedMemManager::AllocSharedMemory); + .def("alloc_mem", &SharedMemManager::AllocSharedMemory) + .def("stop", &SharedMemManager::Stop); + py::class_ (m, "Actor") + .def("wait", &PosixActor::Wait) + .def("stop", &PosixActor::Stop) + .def("get_status", &PosixActor::GetStatus); + // .def("trace", &PosixActor::Trace); /* py::class_ (m, "ShmemSendPort") diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc index 9bf4b9a9f..538994741 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.cc @@ -10,23 +10,28 @@ namespace message_infrastructure { -int MultiProcessing::BuildActor() { +MultiProcessing::MultiProcessing() { + shmm_ = new SharedMemManager(); +} + +int MultiProcessing::BuildActor(std::function target_fn) { pid_t pid = fork(); if (pid > 0) { LAVA_LOG(LOG_MP, "Parent Process, create child process %d\n", pid); - ActorPtr actor = new PosixActor(pid); + ActorPtr actor = new PosixActor(pid, target_fn); actors_.push_back(actor); - return PARENT_PROCESS; + return ParentProcess; } if (pid == 0) { LAVA_LOG(LOG_MP, "child, new process\n"); - return CHILD_PROCESS; + target_fn(); + exit(0); } LAVA_LOG_ERR("Cannot allocate new pid for the process\n"); - return ERROR_PROCESS; + return ErrorProcess; } @@ -47,4 +52,12 @@ void MultiProcessing::CheckActor() { } } +SharedMemManager* MultiProcessing::GetSharedMemManager() { + return this->shmm_; +} + +std::vector& MultiProcessing::GetActors() { + return this->actors_; +} + } // namespace message_infrastructure diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h index aa19690f1..1552803d6 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/multiprocessing.h @@ -13,21 +13,24 @@ namespace message_infrastructure { -enum ChannelType { - ERROR_PROCESS = -1, - CHILD_PROCESS = 0, - PARENT_PROCESS = 1 +enum ProcessType { + ErrorProcess = -1, + ChildProcess = 0, + ParentProcess = 1 }; class MultiProcessing { public: - // stop each actor in vector actors; + MultiProcessing(); int Stop(); - int BuildActor(); + int BuildActor(std::function); void CheckActor(); + std::vector& GetActors(); + SharedMemManager* GetSharedMemManager(); + private: std::vector actors_; - // SharedMemManager shmm_; + SharedMemManager *shmm_; }; } // namespace message_infrastructure diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc index 315fcea48..bdc4697b7 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/posix_actor.cc @@ -26,4 +26,8 @@ int PosixActor::Stop() { } +int PosixActor::Wait() { + return 0; +} + } \ No newline at end of file diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h index 9a71d84dc..8ba9953aa 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/csrc/shm.h @@ -32,10 +32,24 @@ class SharedMemManager { return shmid; } - int DeleteSharedMemory(int key) { - // Release specific shared memroy - return 0; + int DeleteSharedMemory(int shmid) { + // Release specific shared memory + int del_cnt = 0; + for (auto it = shms_.begin(); it != shms_.end(); it++) { + if ((*it) == shmid) { + shms_.erase(it); + del_cnt++; + } + } + return del_cnt; } + + int Stop() { + int stop_cnt = shms_.size(); + shms_.clear(); + return stop_cnt; + } + private: key_t key_ = 0xdead; std::vector shms_; diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py index ac88850da..07047421c 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_infrastructure_interface.py @@ -2,16 +2,8 @@ # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ import typing as ty -if ty.TYPE_CHECKING: - from lava.magma.core.process.process import AbstractProcess - from lava.magma.compiler.builders.py_builder import PyProcessBuilder - from lava.magma.compiler.builders.runtimeservice_builder import \ - RuntimeServiceBuilder from abc import ABC, abstractmethod -from lava.magma.compiler.channels.interfaces import ChannelType, Channel -from lava.magma.core.sync.domain import SyncDomain - """A Message Infrastructure Interface which can create actors which would participate in message passing/exchange, start and stop them as well as declare the underlying Channel Infrastructure Class to be used for message @@ -32,9 +24,7 @@ def stop(self): pass @abstractmethod - def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ - ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[ - SyncDomain, 'RuntimeServiceBuilder']]): + def build_actor(self, target_fn: ty.Callable, builder): """Given a target_fn starts a system process""" pass @@ -43,9 +33,3 @@ def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ def actors(self) -> ty.List[ty.Any]: """Returns a list of actors""" pass - - @abstractmethod - def channel_class(self, channel_type: ChannelType) -> ty.Type[Channel]: - """Given the Channel Type, Return the Channel Implementation to - be used during execution""" - pass diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py index 63286e85b..bcfbe3b42 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -1,35 +1,15 @@ # Copyright (C) 2021-22 Intel Corporation # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ -from pty import CHILD import typing as ty -if ty.TYPE_CHECKING: - from lava.magma.core.process.process import AbstractProcess - from lava.magma.compiler.builders.py_builder import PyProcessBuilder - from lava.magma.compiler.builders.runtimeservice_builder import \ - RuntimeServiceBuilder +from functools import partial -from MessageInfrastructurePywrapper import CppMultiProcessing -from MessageInfrastructurePywrapper import SharedMemManager +from message_infrastructure import CppMultiProcessing +from message_infrastructure import SharedMemManager +from message_infrastructure import Actor -from enum import Enum - -from lava.magma.compiler.channels.interfaces import ChannelType, Channel -from lava.magma.compiler.channels.pypychannel import PyPyChannel - -try: - from lava.magma.compiler.channels.cpychannel import \ - CPyChannel, PyCChannel -except ImportError: - class CPyChannel: - pass - - class PyCChannel: - pass - -from lava.magma.core.sync.domain import SyncDomain -from lava.magma.runtime.message_infrastructure.message_infrastructure\ - .message_infrastructure_interface import MessageInfrastructureInterface +from message_infrastructure.message_infrastructure_interface \ + import MessageInfrastructureInterface """Implements the Message Infrastructure Interface using Python @@ -39,61 +19,31 @@ class PyCChannel: implement the communication backend in this implementation.""" -class ProcessType(Enum): - ERR_PROC = -1 - PARENT_PROC = 0 - CHILD_PROC = 1 - - class MultiProcessing(MessageInfrastructureInterface): """Implements message passing using shared memory and multiprocessing""" def __init__(self): self._mp: ty.Optional[CppMultiProcessing] = None - self._smm: ty.Optional[SharedMemoryManager] = None - self._actors: ty.List[Actor] = [] @property def actors(self): """Returns a list of actors""" - return self._actors + return self._mp.get_actors() @property def smm(self): """Returns the underlying shared memory manager""" - return self._smm + return self._mp.get_shmm() def start(self): - """Starts the shared memory manager""" + """Init the MultiProcessing""" self._mp = CppMultiProcessing() - self._smm = SharedMemoryManager() - def build_actor(self, target_fn: ty.Callable, builder: ty.Union[ - ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[ - SyncDomain, 'RuntimeServiceBuilder']]) -> ty.Any: + def build_actor(self, target_fn: ty.Callable, builder) -> ty.Any: """Given a target_fn starts a system (os) process""" - - ret = self._mp.build_actor() - if ret == ProcessType.ERR_PROC: - exit(-1) - if ret == ProcessType.CHILD_PROC: - kwargs = {"builder": builder} - target_fn(**kwargs) - exit(0) + bound_target_fn = partial(target_fn, builder=builder) + ret = self._mp.build_actor(bound_target_fn) def stop(self): """Stops the shared memory manager""" self._mp.stop() - self._smm.stop() - - def channel_class(self, channel_type: ChannelType) -> ty.Type[Channel]: - """Given a channel type, returns the shared memory based class - implementation for the same""" - if channel_type == ChannelType.PyPy: - return PyPyChannel - elif channel_type == ChannelType.PyC: - return PyCChannel - elif channel_type == ChannelType.CPy: - return CPyChannel - else: - raise Exception(f"Unsupported channel type {channel_type}") diff --git a/src/lava/magma/runtime/message_infrastructure/test/test_example.py b/src/lava/magma/runtime/message_infrastructure/test/test_example.py index 973e85c01..921ddbe21 100644 --- a/src/lava/magma/runtime/message_infrastructure/test/test_example.py +++ b/src/lava/magma/runtime/message_infrastructure/test/test_example.py @@ -1,26 +1,67 @@ # Copyright (C) 2021 Intel Corporation # SPDX-License-Identifier: BSD-3-Clause # See: https://spdx.org/licenses/ +import traceback +from functools import partial from MessageInfrastructurePywrapper import CppMultiProcessing from MessageInfrastructurePywrapper import SharedMemManager -import time +from MessageInfrastructurePywrapper import ProcessType +from MessageInfrastructurePywrapper import Actor + + +class Builder(): + def build(self): + print("Builder run build") + + def start(self, *args, **kwargs): + print("Builder run start") + + +def target_fn(*args, **kwargs): + """ + Function to build and attach a system process to + + :param args: List Parameters to be passed onto the process + :param kwargs: Dict Parameters to be passed onto the process + :return: None + """ + try: + builder = kwargs.pop("builder") + actor = builder.build() + except Exception as e: + print("Encountered Fatal Exception: " + str(e)) + print("Traceback: ") + print(traceback.format_exc()) + raise e def main(): + builder = Builder() mp = CppMultiProcessing() + bound_target_fn = partial(target_fn, builder=builder) + bound_target_fn() for i in range(5): - ret = mp.build_actor() - if ret == 0 : + ret = mp.build_actor(bound_target_fn) + if ret == ProcessType.ChildProcess : print("child process, exit") exit(0) mp.check_actor() mp.stop() - shm = SharedMemManager() + actors = mp.get_actors() + print(actors) + print("actor status: ", actors[0].get_status()) + + shm = mp.get_shmm() + for i in range(5): print("shared id:", shm.alloc_mem(10)) + shm2 = mp.get_shmm() + print("stop num: ", shm2.stop()) + print("stop num: ", shm.stop()) + main() diff --git a/src/lava/magma/runtime/message_infrastructure/test/test_multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/test/test_multiprocessing.py new file mode 100644 index 000000000..0dcb83c78 --- /dev/null +++ b/src/lava/magma/runtime/message_infrastructure/test/test_multiprocessing.py @@ -0,0 +1,62 @@ +# Copyright (C) 2021 Intel Corporation +# SPDX-License-Identifier: BSD-3-Clause +# See: https://spdx.org/licenses/ +import traceback +from functools import partial + +from message_infrastructure import CppMultiProcessing +from message_infrastructure import SharedMemManager +from message_infrastructure import ProcessType +from message_infrastructure import Actor +from message_infrastructure.multiprocessing import MultiProcessing + + +class Builder(): + def build(self, i): + print("Builder run build ", i) + + +def target_fn(*args, **kwargs): + """ + Function to build and attach a system process to + + :param args: List Parameters to be passed onto the process + :param kwargs: Dict Parameters to be passed onto the process + :return: None + """ + try: + builder = kwargs.pop("builder") + idx = kwargs.pop("idx") + builder.build(idx) + except Exception as e: + print("Encountered Fatal Exception: " + str(e)) + print("Traceback: ") + print(traceback.format_exc()) + raise e + + +def test_multiprocessing(): + mp = MultiProcessing() + mp.start() + builder = Builder() + for i in range(5): + bound_target_fn = partial(target_fn, idx=i) + ret = mp.build_actor(bound_target_fn, builder) + if ret == ProcessType.ChildProcess : + print("child process, exit") + exit(0) + + shmm = mp.smm + for i in range(5): + print("shared memory id: ", shmm.alloc_mem(8)) + + actors = mp.actors + print(actors) + print("actor status: ", actors[0].get_status()) + print("stop num: ", shmm.stop()) + print("stop num: ", shmm.stop()) + + mp.stop() + + +test_multiprocessing() From 024fde23dc1bb580951b74ac043c13963d8162ef Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Wed, 17 Aug 2022 10:33:23 +0800 Subject: [PATCH 09/11] Update import from message_infrastructure: 1. Move message_interface_enum.py to message_infrastructure 2. Update import syntax 3. Others: change multiprocessing init Signed-off-by: Miao, Hongda --- src/lava/magma/compiler/builders/channel_builder.py | 2 +- src/lava/magma/compiler/channels/pypychannel.py | 3 +-- src/lava/magma/core/process/process.py | 2 +- .../message_infrastructure/factory.py | 5 ++--- .../message_infrastructure}/message_interface_enum.py | 0 .../message_infrastructure/multiprocessing.py | 4 ++-- src/lava/magma/runtime/runtime.py | 8 +++----- 7 files changed, 10 insertions(+), 14 deletions(-) rename src/lava/magma/{core/process => runtime/message_infrastructure/message_infrastructure}/message_interface_enum.py (100%) diff --git a/src/lava/magma/compiler/builders/channel_builder.py b/src/lava/magma/compiler/builders/channel_builder.py index 57aa6b340..7c1bb157d 100644 --- a/src/lava/magma/compiler/builders/channel_builder.py +++ b/src/lava/magma/compiler/builders/channel_builder.py @@ -15,7 +15,7 @@ ChannelType, ) from lava.magma.compiler.utils import PortInitializer -from lava.magma.runtime.message_infrastructure.message_infrastructure \ +from message_infrastructure \ .message_infrastructure_interface import (MessageInfrastructureInterface) if ty.TYPE_CHECKING: diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index 754955c4b..015f1b07f 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -16,8 +16,7 @@ ) if ty.TYPE_CHECKING: - from lava.magma.runtime.message_infrastructure \ - .message_infrastructure.message_infrastructure_interface \ + from message_infrastructure.message_infrastructure_interface \ import ( MessageInfrastructureInterface) diff --git a/src/lava/magma/core/process/process.py b/src/lava/magma/core/process/process.py index 7e2da39d3..410ce1203 100644 --- a/src/lava/magma/core/process/process.py +++ b/src/lava/magma/core/process/process.py @@ -11,7 +11,7 @@ from lava.magma.core.process.interfaces import \ AbstractProcessMember, IdGeneratorSingleton -from lava.magma.core.process.message_interface_enum import ActorType +from message_infrastructure.message_interface_enum import ActorType from lava.magma.core.process.ports.ports import \ InPort, OutPort, RefPort, VarPort from lava.magma.core.process.variable import Var diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py index 510a1c733..8746012e3 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/factory.py @@ -1,9 +1,8 @@ # Copyright (C) 2021-22 Intel Corporation # SPDX-License-Identifier: LGPL 2.1 or later # See: https://spdx.org/licenses/ -from lava.magma.core.process.message_interface_enum import ActorType -from lava.magma.runtime.message_infrastructure.message_infrastructure \ - .multiprocessing import MultiProcessing +from message_infrastructure.message_interface_enum import ActorType +from message_infrastructure.multiprocessing import MultiProcessing """Factory class to create the messaging infrastructure""" diff --git a/src/lava/magma/core/process/message_interface_enum.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_interface_enum.py similarity index 100% rename from src/lava/magma/core/process/message_interface_enum.py rename to src/lava/magma/runtime/message_infrastructure/message_infrastructure/message_interface_enum.py diff --git a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py index bcfbe3b42..fdd590f93 100644 --- a/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py +++ b/src/lava/magma/runtime/message_infrastructure/message_infrastructure/multiprocessing.py @@ -23,7 +23,7 @@ class MultiProcessing(MessageInfrastructureInterface): """Implements message passing using shared memory and multiprocessing""" def __init__(self): - self._mp: ty.Optional[CppMultiProcessing] = None + self._mp: ty.Optional[CppMultiProcessing] = CppMultiProcessing() @property def actors(self): @@ -37,7 +37,7 @@ def smm(self): def start(self): """Init the MultiProcessing""" - self._mp = CppMultiProcessing() + pass def build_actor(self, target_fn: ty.Callable, builder) -> ty.Any: """Given a target_fn starts a system (os) process""" diff --git a/src/lava/magma/runtime/runtime.py b/src/lava/magma/runtime/runtime.py index ddeaf6040..a7731ef13 100644 --- a/src/lava/magma/runtime/runtime.py +++ b/src/lava/magma/runtime/runtime.py @@ -12,11 +12,9 @@ import numpy as np from lava.magma.compiler.channels.pypychannel import CspRecvPort, CspSendPort from lava.magma.compiler.var_model import AbstractVarModel -from lava.magma.core.process.message_interface_enum import ActorType -from lava.magma.runtime.message_infrastructure.message_infrastructure \ - .factory import MessageInfrastructureFactory -from lava.magma.runtime.message_infrastructure.message_infrastructure \ - .message_infrastructure_interface import \ +from message_infrastructure.message_interface_enum import ActorType +from message_infrastructure.factory import MessageInfrastructureFactory +from message_infrastructure.message_infrastructure_interface import \ MessageInfrastructureInterface from lava.magma.runtime.mgmt_token_enums import (MGMT_COMMAND, MGMT_RESPONSE, enum_equal, enum_to_np) From f502f810b816d1a9ffdc223b9f8a27b88152f769 Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Wed, 17 Aug 2022 10:49:32 +0800 Subject: [PATCH 10/11] Update test_runtime.py import Signed-off-by: Miao, Hongda --- tests/lava/magma/runtime/test_runtime.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/lava/magma/runtime/test_runtime.py b/tests/lava/magma/runtime/test_runtime.py index e25295db1..d65564f02 100644 --- a/tests/lava/magma/runtime/test_runtime.py +++ b/tests/lava/magma/runtime/test_runtime.py @@ -3,7 +3,7 @@ from unittest.mock import Mock, PropertyMock from lava.magma.compiler.executable import Executable -from lava.magma.core.process.message_interface_enum import ActorType +from message_infrastructure.message_interface_enum import ActorType from lava.magma.core.resources import HeadNode, Loihi2System from lava.magma.compiler.node import Node, NodeConfig from lava.magma.runtime.runtime import Runtime From 2caf1ba57befae92f5f8d6f9c2651524d2217f9b Mon Sep 17 00:00:00 2001 From: "Miao, Hongda" Date: Wed, 17 Aug 2022 11:23:37 +0800 Subject: [PATCH 11/11] Update bash setenv.sh to add message_infrastructure Signed-off-by: Miao, Hongda --- src/lava/magma/runtime/message_infrastructure/setenv.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lava/magma/runtime/message_infrastructure/setenv.sh b/src/lava/magma/runtime/message_infrastructure/setenv.sh index b21e3babd..2a77fe3ee 100644 --- a/src/lava/magma/runtime/message_infrastructure/setenv.sh +++ b/src/lava/magma/runtime/message_infrastructure/setenv.sh @@ -1,3 +1,3 @@ SCRIPTPATH=$(cd `dirname -- $0` && pwd) -export PYTHONPATH="${SCRIPTPATH}/build:$PYTHONPATH" -export LD_LIBRARY_PATH="${SCRIPTPATH}/build:$LD_LIBRARY_PATH" +export PYTHONPATH="${SCRIPTPATH}/build:${SCRIPTPATH}:$PYTHONPATH" +export LD_LIBRARY_PATH="${SCRIPTPATH}/build:${SCRIPTPATH}:$LD_LIBRARY_PATH"