Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messaging refactor develop #310

Merged
2 changes: 1 addition & 1 deletion src/lava/magma/compiler/builders/channel_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ChannelType,
)
from lava.magma.compiler.utils import PortInitializer
from lava.magma.runtime.message_infrastructure \
from lava.magma.runtime.message_infrastructure.message_infrastructure \
killight98 marked this conversation as resolved.
Show resolved Hide resolved
.message_infrastructure_interface import (MessageInfrastructureInterface)

if ty.TYPE_CHECKING:
Expand Down
3 changes: 2 additions & 1 deletion src/lava/magma/compiler/channels/pypychannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

if ty.TYPE_CHECKING:
from lava.magma.runtime.message_infrastructure \
.message_infrastructure_interface import (
.message_infrastructure.message_infrastructure_interface \
killight98 marked this conversation as resolved.
Show resolved Hide resolved
import (
MessageInfrastructureInterface)


Expand Down
5 changes: 4 additions & 1 deletion src/lava/magma/runtime/message_infrastructure/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ project(message_passing)
set (CMAKE_CXX_STANDARD 17)
include_directories(message_infrastructure/csrc)

file (GLOB MULTI_PROC_SRCS "message_infrastructure/csrc/multiprocessing.cc")
set(MULTI_PROC_SRCS
"message_infrastructure/csrc/multiprocessing.cc"
"message_infrastructure/csrc/posix_actor.cc"
)

file (GLOB PY_WRAPPER "message_infrastructure/csrc/message_infrastructure_py_wrapper.cc")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ class AbstractActor {
public:
virtual int GetPid() = 0;
virtual int Stop() = 0;
virtual int Pause() = 0;

int pid_;
};

Expand All @@ -24,12 +22,7 @@ class PosixActor : public AbstractActor {
int GetPid() {
return this->pid_;
}
int Stop() {
return 0;
}
int Pause() {
return 0;
}
int Stop();
};

using ActorPtr = AbstractActor *;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (C) 2021 Intel Corporation
// SPDX-License-Identifier: BSD-3-Clause
// See: https://spdx.org/licenses/

#ifndef MESSAGE_INFRASTRUCTURE_LOGGING_H_
#define MESSAGE_INFRASTRUCTURE_LOGGING_H_

#include <stdio.h>
#define LOG_MP (1) // log for multiprocessing


#define LAVA_LOG(_cond, _fmt, ...) { \
if ((_cond)) { \
printf("[CPP INFO]" _fmt, ## __VA_ARGS__); \
} \
}

#define LAVA_LOG_WARN(_cond, _fmt, ...) { \
if ((_cond)) { \
printf("[CPP WARNING]" _fmt, __FUNCTION__, ## __VA_ARGS__); \
} \
}

#define LAVA_LOG_ERR(_fmt, ...) { \
printf("[CPP ERROR]" _fmt, __FUNCTION__, ## __VA_ARGS__); \
}

#endif // MESSAGE_INFRASTRUCTURE_LOGGING_H_
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <pybind11/pybind11.h>

#include "multiprocessing.h"
// #include "shm.h"
#include "shm.h"
// #include "shmem_channel.h"
// #include "shmem_port.h"

Expand All @@ -15,11 +15,15 @@ namespace message_infrastructure {
namespace py = pybind11;

PYBIND11_MODULE(MessageInfrastructurePywrapper, m) {
py::class_<MultiProcessing> (m, "MultiProcessing")
py::class_<MultiProcessing> (m, "CppMultiProcessing")
.def(py::init<>())
.def("build_actor", &MultiProcessing::BuildActor)
.def("check_actor", &MultiProcessing::CheckActor)
.def("stop", &MultiProcessing::Stop);
py::class_<SharedMemManager> (m, "SharedMemManager")
.def(py::init<>())
.def("alloc_mem", &SharedMemManager::AllocSharedMemory);

/*
py::class_<ShmemSendPort> (m, "ShmemSendPort")
.def(py::init<std::string, SharedMemory*, Proto*, size_t, sem_t*, sem_t*>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,47 @@
// See: https://spdx.org/licenses/

#include "multiprocessing.h"
#include "message_infrastructure_logging.h"

#include <sys/wait.h>
#include <unistd.h>
#include <iostream>

namespace message_infrastructure {

#define CPP_INFO "[CPP_INFO] "

void MultiProcessing::BuildActor(std::function<void()> target_fn) {
int MultiProcessing::BuildActor() {
pid_t pid = fork();

if (pid < 0) {
std::cout << CPP_INFO << "cannot allocate pid\n";
}

if (pid > 0) {
std::cout << CPP_INFO << "parent, create actor\n";
LAVA_LOG(LOG_MP, "Parent Process, create child process %d\n", pid);
ActorPtr actor = new PosixActor(pid);
actors_.push_back(actor);
return PARENT_PROCESS;
}

if (pid == 0) {
std::cout << CPP_INFO << "child, new process\n";
target_fn();
exit(0);
LAVA_LOG(LOG_MP, "child, new process\n");
return CHILD_PROCESS;
}

LAVA_LOG_ERR("Cannot allocate new pid for the process\n");
return ERROR_PROCESS;

}

void MultiProcessing::Stop() {
int MultiProcessing::Stop() {
int error_cnts = 0;

for (auto actor : actors_) {
error_cnts += actor->Stop();
}

LAVA_LOG(LOG_MP, "Stop Actors, error: %d\n", error_cnts);
return error_cnts;
}

void MultiProcessing::CheckActor() {
for(auto actor : actors_){
std::cout << CPP_INFO << actor->pid_ << std::endl;
for (auto actor : actors_) {
LAVA_LOG(LOG_MP, "Actor info %d\n", actor->GetPid());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@

namespace message_infrastructure {

enum ChannelType {
killight98 marked this conversation as resolved.
Show resolved Hide resolved
ERROR_PROCESS = -1,
CHILD_PROCESS = 0,
PARENT_PROCESS = 1
};

class MultiProcessing {
public:
// stop each actor in vector actors;
void Stop();
void BuildActor(std::function<void()>);
int Stop();
int BuildActor();
void CheckActor();
private:
std::vector<ActorPtr> actors_;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (C) 2021 Intel Corporation
// SPDX-License-Identifier: BSD-3-Clause
// See: https://spdx.org/licenses/

#include "abstract_actor.h"
#include "message_infrastructure_logging.h"

#include <sys/wait.h>
#include <sys/types.h>

namespace message_infrastructure {

int PosixActor::Stop() {
int status;
// TODO: Add the options can be as args of the function
int options = 0;
int ret = waitpid(this->pid_, &status, options);

if (ret < 0) {
LAVA_LOG_ERR("process %d waitpid error\n", this->pid_);
return -1;
}

// Check the status
return 0;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,34 @@
#include <semaphore.h>
#include <unistd.h>

#include <vector>

namespace message_infrastructure {

class SharedMemory {
};

class SharedMemManager {
public:
int AllocSharedMemory(size_t mem_size) {
int shmid = shmget(key_, mem_size, 0644|IPC_CREAT);
if (shmid < 0)
return -1;

shms_.push_back(shmid);
key_++;
return shmid;
}

int DeleteSharedMemory(int key) {
// Release specific shared memroy
return 0;
}
private:
key_t key_ = 0xdead;
std::vector<int> shms_;
};

} // namespace message_infrastructure

#endif // SHM_H_
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (C) 2021-22 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/
from lava.magma.core.process.message_interface_enum import ActorType
killight98 marked this conversation as resolved.
Show resolved Hide resolved
from lava.magma.runtime.message_infrastructure.message_infrastructure \
.multiprocessing import MultiProcessing
killight98 marked this conversation as resolved.
Show resolved Hide resolved

"""Factory class to create the messaging infrastructure"""


class MessageInfrastructureFactory:
"""Creates the message infrastructure instance based on type"""
@staticmethod
def create(factory_type: ActorType):
"""type of actor framework being chosen"""
if factory_type == ActorType.MultiProcessing:
return MultiProcessing()
else:
raise Exception("Unsupported factory_type")
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (C) 2021-22 Intel Corporation
# SPDX-License-Identifier: LGPL 2.1 or later
# See: https://spdx.org/licenses/
import typing as ty
if ty.TYPE_CHECKING:
from lava.magma.core.process.process import AbstractProcess
from lava.magma.compiler.builders.py_builder import PyProcessBuilder
from lava.magma.compiler.builders.runtimeservice_builder import \
RuntimeServiceBuilder
from abc import ABC, abstractmethod

from lava.magma.compiler.channels.interfaces import ChannelType, Channel
from lava.magma.core.sync.domain import SyncDomain

"""A Message Infrastructure Interface which can create actors which would
participate in message passing/exchange, start and stop them as well as
declare the underlying Channel Infrastructure Class to be used for message
passing implementation."""


class MessageInfrastructureInterface(ABC):
"""Interface to provide the ability to create actors which can
communicate via message passing"""
@abstractmethod
def start(self):
"""Starts the messaging infrastructure"""
pass

@abstractmethod
def stop(self):
"""Stops the messaging infrastructure"""
pass

@abstractmethod
def build_actor(self, target_fn: ty.Callable, builder: ty.Union[
ty.Dict['AbstractProcess', 'PyProcessBuilder'], ty.Dict[
SyncDomain, 'RuntimeServiceBuilder']]):
"""Given a target_fn starts a system process"""
pass

@property
@abstractmethod
def actors(self) -> ty.List[ty.Any]:
"""Returns a list of actors"""
pass

@abstractmethod
def channel_class(self, channel_type: ChannelType) -> ty.Type[Channel]:
"""Given the Channel Type, Return the Channel Implementation to
be used during execution"""
pass
Loading