Skip to content

Commit

Permalink
Messaging refactor develop (#310)
Browse files Browse the repository at this point in the history
* Copy the python file from depricated_message_infrastructure/*.py

* change multiprocessing.py to adapt interface

* fix cpplint error

* Update cpplib side MultiProcessing:
1. message_infrastructure_logging.h: logging for development
2. abstract_actor.h&posix_actor.cc: achieve stop function for signal
3. shm.h: Add SharedMemoryManager to cpplib
4. others: Update CMake, test and fix some type error'

fix pylint error

* Fix MultiProcessing Error and Cpplint Error

* Remove Actor interface for python

* Fix package path error

* Update API Interface
1. multiprocessing.py: Fix bug for sharedmemmanager name
2. remove lava package dependency for message_infrastructure
3. multiprocessing.cc: Add shmm and actors function
4. abstract_actor.h: Modify the posix actor and interface design
5. add and update test for multiprocessing

Signed-off-by: Miao, Hongda <[email protected]>

* Update import from message_infrastructure:
1. Move message_interface_enum.py to message_infrastructure
2. Update import syntax
3. Others: change multiprocessing init

Signed-off-by: Miao, Hongda <[email protected]>

* Update test_runtime.py import

Signed-off-by: Miao, Hongda <[email protected]>

* Update bash setenv.sh to add message_infrastructure

Signed-off-by: Miao, Hongda <[email protected]>
  • Loading branch information
hongdami committed Aug 17, 2022
1 parent 5ca249f commit fc15bbd
Show file tree
Hide file tree
Showing 22 changed files with 459 additions and 48 deletions.
2 changes: 1 addition & 1 deletion src/lava/magma/compiler/builders/channel_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ChannelType,
)
from lava.magma.compiler.utils import PortInitializer
from lava.magma.runtime.message_infrastructure \
from message_infrastructure \
.message_infrastructure_interface import (MessageInfrastructureInterface)

if ty.TYPE_CHECKING:
Expand Down
4 changes: 2 additions & 2 deletions src/lava/magma/compiler/channels/pypychannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
)

if ty.TYPE_CHECKING:
from lava.magma.runtime.message_infrastructure \
.message_infrastructure_interface import (
from message_infrastructure.message_infrastructure_interface \
import (
MessageInfrastructureInterface)


Expand Down
2 changes: 1 addition & 1 deletion src/lava/magma/core/process/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from lava.magma.core.process.interfaces import \
AbstractProcessMember, IdGeneratorSingleton
from lava.magma.core.process.message_interface_enum import ActorType
from message_infrastructure.message_interface_enum import ActorType
from lava.magma.core.process.ports.ports import \
InPort, OutPort, RefPort, VarPort
from lava.magma.core.process.variable import Var
Expand Down
5 changes: 4 additions & 1 deletion src/lava/magma/runtime/message_infrastructure/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ project(message_passing)
set (CMAKE_CXX_STANDARD 17)
include_directories(message_infrastructure/csrc)

file (GLOB MULTI_PROC_SRCS "message_infrastructure/csrc/multiprocessing.cc")
set(MULTI_PROC_SRCS
"message_infrastructure/csrc/multiprocessing.cc"
"message_infrastructure/csrc/posix_actor.cc"
)

file (GLOB PY_WRAPPER "message_infrastructure/csrc/message_infrastructure_py_wrapper.cc")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from MessageInfrastructurePywrapper import CppMultiProcessing
from MessageInfrastructurePywrapper import SharedMemManager
from MessageInfrastructurePywrapper import Actor
from MessageInfrastructurePywrapper import ProcessType
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,41 @@
#ifndef ABSTRACT_ACTOR_H_
#define ABSTRACT_ACTOR_H_

#include <functional>

namespace message_infrastructure {

enum ActorStatus {
StatsError = -1,
StatsRuning = 0,
StatsStopped = 1
};

class AbstractActor {
public:
virtual int GetPid() = 0;
virtual int Stop() = 0;
virtual int Pause() = 0;

int pid_;
};

class PosixActor : public AbstractActor {
public:
explicit PosixActor(int pid) {
explicit PosixActor(int pid, std::function<void()> target_fn) {
this->pid_ = pid;
this->target_fn_ = target_fn;
}
int GetPid() {
return this->pid_;
}
int Stop() {
return 0;
}
int Pause() {
return 0;
int Wait();
int Stop();
int GetStatus() {
return this->status_;
}
// int Trace();
private:
std::function<void()> target_fn_ = NULL;
int status_ = StatsStopped;
};

using ActorPtr = AbstractActor *;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (C) 2021 Intel Corporation
// SPDX-License-Identifier: BSD-3-Clause
// See: https://spdx.org/licenses/

#ifndef MESSAGE_INFRASTRUCTURE_LOGGING_H_
#define MESSAGE_INFRASTRUCTURE_LOGGING_H_

#include <stdio.h>
#define LOG_MP (1) // log for multiprocessing


#define LAVA_LOG(_cond, _fmt, ...) { \
if ((_cond)) { \
printf("[CPP INFO]" _fmt, ## __VA_ARGS__); \
} \
}

#define LAVA_LOG_WARN(_cond, _fmt, ...) { \
if ((_cond)) { \
printf("[CPP WARNING]" _fmt, __FUNCTION__, ## __VA_ARGS__); \
} \
}

#define LAVA_LOG_ERR(_fmt, ...) { \
printf("[CPP ERROR]" _fmt, __FUNCTION__, ## __VA_ARGS__); \
}

#endif // MESSAGE_INFRASTRUCTURE_LOGGING_H_
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

#include "multiprocessing.h"
// #include "shm.h"
#include "abstract_actor.h"
#include "shm.h"
// #include "shmem_channel.h"
// #include "shmem_port.h"

Expand All @@ -15,11 +17,27 @@ namespace message_infrastructure {
namespace py = pybind11;

PYBIND11_MODULE(MessageInfrastructurePywrapper, m) {
py::class_<MultiProcessing> (m, "MultiProcessing")
py::class_<MultiProcessing> (m, "CppMultiProcessing")
.def(py::init<>())
.def("build_actor", &MultiProcessing::BuildActor)
.def("check_actor", &MultiProcessing::CheckActor)
.def("get_actors", &MultiProcessing::GetActors)
.def("get_shmm", &MultiProcessing::GetSharedMemManager)
.def("stop", &MultiProcessing::Stop);
py::enum_<ProcessType> (m, "ProcessType")
.value("ErrorProcess", ErrorProcess)
.value("ChildProcess", ChildProcess)
.value("ParentProcess", ParentProcess);
py::class_<SharedMemManager> (m, "SharedMemManager")
.def(py::init<>())
.def("alloc_mem", &SharedMemManager::AllocSharedMemory)
.def("stop", &SharedMemManager::Stop);
py::class_<PosixActor> (m, "Actor")
.def("wait", &PosixActor::Wait)
.def("stop", &PosixActor::Stop)
.def("get_status", &PosixActor::GetStatus);
// .def("trace", &PosixActor::Trace);

/*
py::class_<ShmemSendPort> (m, "ShmemSendPort")
.def(py::init<std::string, SharedMemory*, Proto*, size_t, sem_t*, sem_t*>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,61 @@
// See: https://spdx.org/licenses/

#include "multiprocessing.h"
#include "message_infrastructure_logging.h"

#include <sys/wait.h>
#include <unistd.h>
#include <iostream>

namespace message_infrastructure {

#define CPP_INFO "[CPP_INFO] "
MultiProcessing::MultiProcessing() {
shmm_ = new SharedMemManager();
}

void MultiProcessing::BuildActor(std::function<void()> target_fn) {
int MultiProcessing::BuildActor(std::function<void()> target_fn) {
pid_t pid = fork();

if (pid < 0) {
std::cout << CPP_INFO << "cannot allocate pid\n";
}

if (pid > 0) {
std::cout << CPP_INFO << "parent, create actor\n";
ActorPtr actor = new PosixActor(pid);
LAVA_LOG(LOG_MP, "Parent Process, create child process %d\n", pid);
ActorPtr actor = new PosixActor(pid, target_fn);
actors_.push_back(actor);
return ParentProcess;
}

if (pid == 0) {
std::cout << CPP_INFO << "child, new process\n";
LAVA_LOG(LOG_MP, "child, new process\n");
target_fn();
exit(0);
}

LAVA_LOG_ERR("Cannot allocate new pid for the process\n");
return ErrorProcess;

}

void MultiProcessing::Stop() {
int MultiProcessing::Stop() {
int error_cnts = 0;

for (auto actor : actors_) {
error_cnts += actor->Stop();
}

LAVA_LOG(LOG_MP, "Stop Actors, error: %d\n", error_cnts);
return error_cnts;
}

void MultiProcessing::CheckActor() {
for(auto actor : actors_){
std::cout << CPP_INFO << actor->pid_ << std::endl;
for (auto actor : actors_) {
LAVA_LOG(LOG_MP, "Actor info %d\n", actor->GetPid());
}
}

SharedMemManager* MultiProcessing::GetSharedMemManager() {
return this->shmm_;
}

std::vector<ActorPtr>& MultiProcessing::GetActors() {
return this->actors_;
}

} // namespace message_infrastructure
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@

namespace message_infrastructure {

enum ProcessType {
ErrorProcess = -1,
ChildProcess = 0,
ParentProcess = 1
};

class MultiProcessing {
public:
// stop each actor in vector actors;
void Stop();
void BuildActor(std::function<void()>);
MultiProcessing();
int Stop();
int BuildActor(std::function<void()>);
void CheckActor();
std::vector<ActorPtr>& GetActors();
SharedMemManager* GetSharedMemManager();

private:
std::vector<ActorPtr> actors_;
// SharedMemManager shmm_;
SharedMemManager *shmm_;
};

} // namespace message_infrastructure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (C) 2021 Intel Corporation
// SPDX-License-Identifier: BSD-3-Clause
// See: https://spdx.org/licenses/

#include "abstract_actor.h"
#include "message_infrastructure_logging.h"

#include <sys/wait.h>
#include <sys/types.h>

namespace message_infrastructure {

int PosixActor::Stop() {
int status;
// TODO: Add the options can be as args of the function
int options = 0;
int ret = waitpid(this->pid_, &status, options);

if (ret < 0) {
LAVA_LOG_ERR("process %d waitpid error\n", this->pid_);
return -1;
}

// Check the status
return 0;

}

int PosixActor::Wait() {
return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,48 @@
#include <semaphore.h>
#include <unistd.h>

#include <vector>

namespace message_infrastructure {

class SharedMemory {
};

class SharedMemManager {
public:
int AllocSharedMemory(size_t mem_size) {
int shmid = shmget(key_, mem_size, 0644|IPC_CREAT);
if (shmid < 0)
return -1;

shms_.push_back(shmid);
key_++;
return shmid;
}

int DeleteSharedMemory(int shmid) {
// Release specific shared memory
int del_cnt = 0;
for (auto it = shms_.begin(); it != shms_.end(); it++) {
if ((*it) == shmid) {
shms_.erase(it);
del_cnt++;
}
}
return del_cnt;
}

int Stop() {
int stop_cnt = shms_.size();
shms_.clear();
return stop_cnt;
}

private:
key_t key_ = 0xdead;
std::vector<int> shms_;
};

} // namespace message_infrastructure

#endif // SHM_H_
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (C) 2021-22 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/
from message_infrastructure.message_interface_enum import ActorType
from message_infrastructure.multiprocessing import MultiProcessing

"""Factory class to create the messaging infrastructure"""


class MessageInfrastructureFactory:
"""Creates the message infrastructure instance based on type"""
@staticmethod
def create(factory_type: ActorType):
"""type of actor framework being chosen"""
if factory_type == ActorType.MultiProcessing:
return MultiProcessing()
else:
raise Exception("Unsupported factory_type")
Loading

0 comments on commit fc15bbd

Please sign in to comment.