Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messaging refactor develop #310

Merged
2 changes: 1 addition & 1 deletion src/lava/magma/compiler/builders/channel_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ChannelType,
)
from lava.magma.compiler.utils import PortInitializer
from lava.magma.runtime.message_infrastructure \
from message_infrastructure \
.message_infrastructure_interface import (MessageInfrastructureInterface)

if ty.TYPE_CHECKING:
Expand Down
4 changes: 2 additions & 2 deletions src/lava/magma/compiler/channels/pypychannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
)

if ty.TYPE_CHECKING:
from lava.magma.runtime.message_infrastructure \
.message_infrastructure_interface import (
from message_infrastructure.message_infrastructure_interface \
import (
MessageInfrastructureInterface)


Expand Down
2 changes: 1 addition & 1 deletion src/lava/magma/core/process/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from lava.magma.core.process.interfaces import \
AbstractProcessMember, IdGeneratorSingleton
from lava.magma.core.process.message_interface_enum import ActorType
from message_infrastructure.message_interface_enum import ActorType
from lava.magma.core.process.ports.ports import \
InPort, OutPort, RefPort, VarPort
from lava.magma.core.process.variable import Var
Expand Down
5 changes: 4 additions & 1 deletion src/lava/magma/runtime/message_infrastructure/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ project(message_passing)
set (CMAKE_CXX_STANDARD 17)
include_directories(message_infrastructure/csrc)

file (GLOB MULTI_PROC_SRCS "message_infrastructure/csrc/multiprocessing.cc")
set(MULTI_PROC_SRCS
"message_infrastructure/csrc/multiprocessing.cc"
"message_infrastructure/csrc/posix_actor.cc"
)

file (GLOB PY_WRAPPER "message_infrastructure/csrc/message_infrastructure_py_wrapper.cc")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from MessageInfrastructurePywrapper import CppMultiProcessing
from MessageInfrastructurePywrapper import SharedMemManager
from MessageInfrastructurePywrapper import Actor
from MessageInfrastructurePywrapper import ProcessType
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,41 @@
#ifndef ABSTRACT_ACTOR_H_
#define ABSTRACT_ACTOR_H_

#include <functional>

namespace message_infrastructure {

enum ActorStatus {
StatsError = -1,
StatsRuning = 0,
StatsStopped = 1
};

class AbstractActor {
public:
virtual int GetPid() = 0;
virtual int Stop() = 0;
virtual int Pause() = 0;

int pid_;
};

class PosixActor : public AbstractActor {
public:
explicit PosixActor(int pid) {
explicit PosixActor(int pid, std::function<void()> target_fn) {
this->pid_ = pid;
this->target_fn_ = target_fn;
}
int GetPid() {
return this->pid_;
}
int Stop() {
return 0;
}
int Pause() {
return 0;
int Wait();
int Stop();
int GetStatus() {
return this->status_;
}
// int Trace();
private:
std::function<void()> target_fn_ = NULL;
int status_ = StatsStopped;
};

using ActorPtr = AbstractActor *;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (C) 2021 Intel Corporation
// SPDX-License-Identifier: BSD-3-Clause
// See: https://spdx.org/licenses/

#ifndef MESSAGE_INFRASTRUCTURE_LOGGING_H_
#define MESSAGE_INFRASTRUCTURE_LOGGING_H_

#include <stdio.h>
#define LOG_MP (1) // log for multiprocessing


#define LAVA_LOG(_cond, _fmt, ...) { \
if ((_cond)) { \
printf("[CPP INFO]" _fmt, ## __VA_ARGS__); \
} \
}

#define LAVA_LOG_WARN(_cond, _fmt, ...) { \
if ((_cond)) { \
printf("[CPP WARNING]" _fmt, __FUNCTION__, ## __VA_ARGS__); \
} \
}

#define LAVA_LOG_ERR(_fmt, ...) { \
printf("[CPP ERROR]" _fmt, __FUNCTION__, ## __VA_ARGS__); \
}

#endif // MESSAGE_INFRASTRUCTURE_LOGGING_H_
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

#include "multiprocessing.h"
// #include "shm.h"
#include "abstract_actor.h"
#include "shm.h"
// #include "shmem_channel.h"
// #include "shmem_port.h"

Expand All @@ -15,11 +17,27 @@ namespace message_infrastructure {
namespace py = pybind11;

PYBIND11_MODULE(MessageInfrastructurePywrapper, m) {
py::class_<MultiProcessing> (m, "MultiProcessing")
py::class_<MultiProcessing> (m, "CppMultiProcessing")
.def(py::init<>())
.def("build_actor", &MultiProcessing::BuildActor)
.def("check_actor", &MultiProcessing::CheckActor)
.def("get_actors", &MultiProcessing::GetActors)
.def("get_shmm", &MultiProcessing::GetSharedMemManager)
.def("stop", &MultiProcessing::Stop);
py::enum_<ProcessType> (m, "ProcessType")
.value("ErrorProcess", ErrorProcess)
.value("ChildProcess", ChildProcess)
.value("ParentProcess", ParentProcess);
py::class_<SharedMemManager> (m, "SharedMemManager")
.def(py::init<>())
.def("alloc_mem", &SharedMemManager::AllocSharedMemory)
.def("stop", &SharedMemManager::Stop);
py::class_<PosixActor> (m, "Actor")
.def("wait", &PosixActor::Wait)
.def("stop", &PosixActor::Stop)
.def("get_status", &PosixActor::GetStatus);
// .def("trace", &PosixActor::Trace);

/*
py::class_<ShmemSendPort> (m, "ShmemSendPort")
.def(py::init<std::string, SharedMemory*, Proto*, size_t, sem_t*, sem_t*>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,61 @@
// See: https://spdx.org/licenses/

#include "multiprocessing.h"
#include "message_infrastructure_logging.h"

#include <sys/wait.h>
#include <unistd.h>
#include <iostream>

namespace message_infrastructure {

#define CPP_INFO "[CPP_INFO] "
MultiProcessing::MultiProcessing() {
shmm_ = new SharedMemManager();
}

void MultiProcessing::BuildActor(std::function<void()> target_fn) {
int MultiProcessing::BuildActor(std::function<void()> target_fn) {
pid_t pid = fork();

if (pid < 0) {
std::cout << CPP_INFO << "cannot allocate pid\n";
}

if (pid > 0) {
std::cout << CPP_INFO << "parent, create actor\n";
ActorPtr actor = new PosixActor(pid);
LAVA_LOG(LOG_MP, "Parent Process, create child process %d\n", pid);
ActorPtr actor = new PosixActor(pid, target_fn);
actors_.push_back(actor);
return ParentProcess;
}

if (pid == 0) {
std::cout << CPP_INFO << "child, new process\n";
LAVA_LOG(LOG_MP, "child, new process\n");
target_fn();
exit(0);
}

LAVA_LOG_ERR("Cannot allocate new pid for the process\n");
return ErrorProcess;

}

void MultiProcessing::Stop() {
int MultiProcessing::Stop() {
int error_cnts = 0;

for (auto actor : actors_) {
error_cnts += actor->Stop();
}

LAVA_LOG(LOG_MP, "Stop Actors, error: %d\n", error_cnts);
return error_cnts;
}

void MultiProcessing::CheckActor() {
for(auto actor : actors_){
std::cout << CPP_INFO << actor->pid_ << std::endl;
for (auto actor : actors_) {
LAVA_LOG(LOG_MP, "Actor info %d\n", actor->GetPid());
}
}

SharedMemManager* MultiProcessing::GetSharedMemManager() {
return this->shmm_;
}

std::vector<ActorPtr>& MultiProcessing::GetActors() {
return this->actors_;
}

} // namespace message_infrastructure
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@

namespace message_infrastructure {

enum ProcessType {
ErrorProcess = -1,
ChildProcess = 0,
ParentProcess = 1
};

class MultiProcessing {
public:
// stop each actor in vector actors;
void Stop();
void BuildActor(std::function<void()>);
MultiProcessing();
int Stop();
int BuildActor(std::function<void()>);
void CheckActor();
std::vector<ActorPtr>& GetActors();
SharedMemManager* GetSharedMemManager();

private:
std::vector<ActorPtr> actors_;
// SharedMemManager shmm_;
SharedMemManager *shmm_;
};

} // namespace message_infrastructure
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (C) 2021 Intel Corporation
// SPDX-License-Identifier: BSD-3-Clause
// See: https://spdx.org/licenses/

#include "abstract_actor.h"
#include "message_infrastructure_logging.h"

#include <sys/wait.h>
#include <sys/types.h>

namespace message_infrastructure {

int PosixActor::Stop() {
int status;
// TODO: Add the options can be as args of the function
int options = 0;
int ret = waitpid(this->pid_, &status, options);

if (ret < 0) {
LAVA_LOG_ERR("process %d waitpid error\n", this->pid_);
return -1;
}

// Check the status
return 0;

}

int PosixActor::Wait() {
return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,48 @@
#include <semaphore.h>
#include <unistd.h>

#include <vector>

namespace message_infrastructure {

class SharedMemory {
};

class SharedMemManager {
public:
int AllocSharedMemory(size_t mem_size) {
int shmid = shmget(key_, mem_size, 0644|IPC_CREAT);
if (shmid < 0)
return -1;

shms_.push_back(shmid);
key_++;
return shmid;
}

int DeleteSharedMemory(int shmid) {
// Release specific shared memory
int del_cnt = 0;
for (auto it = shms_.begin(); it != shms_.end(); it++) {
if ((*it) == shmid) {
shms_.erase(it);
del_cnt++;
}
}
return del_cnt;
}

int Stop() {
int stop_cnt = shms_.size();
shms_.clear();
return stop_cnt;
}

private:
key_t key_ = 0xdead;
std::vector<int> shms_;
};

} // namespace message_infrastructure

#endif // SHM_H_
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (C) 2021-22 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/
from message_infrastructure.message_interface_enum import ActorType
from message_infrastructure.multiprocessing import MultiProcessing

"""Factory class to create the messaging infrastructure"""


class MessageInfrastructureFactory:
"""Creates the message infrastructure instance based on type"""
@staticmethod
def create(factory_type: ActorType):
"""type of actor framework being chosen"""
if factory_type == ActorType.MultiProcessing:
return MultiProcessing()
else:
raise Exception("Unsupported factory_type")
Loading