Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDF based on existing function infra #4804

Merged
merged 8 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@
# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true

# enable udf, written in c++ only for now
--enable_udf=true

# set the directory where the .so files of udf are stored, when enable_udf is true
--udf_path=/home/nebula/dev/nebula/udf/

########## session ##########
# Maximum number of sessions that can be created per IP and per user
--max_sessions_per_ip_per_user=300
Expand All @@ -116,3 +122,4 @@
--memory_purge_enabled=true
# memory background purge interval in seconds
--memory_purge_interval_seconds=10

2 changes: 2 additions & 0 deletions src/common/function/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ nebula_add_library(
function_manager_obj OBJECT
FunctionManager.cpp
../geo/GeoFunction.cpp
FunctionUdfManager.cpp
GraphFunction.h
)

nebula_add_library(
Expand Down
13 changes: 13 additions & 0 deletions src/common/function/FunctionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/algorithm/string/replace.hpp>
#include <cstdint>

#include "FunctionUdfManager.h"
#include "common/base/Base.h"
#include "common/datatypes/DataSet.h"
#include "common/datatypes/Edge.h"
Expand All @@ -28,12 +29,18 @@
#include "common/thrift/ThriftTypes.h"
#include "common/time/TimeUtils.h"
#include "common/time/WallClock.h"
#include "graph/service/GraphFlags.h"

DEFINE_bool(enable_udf, false, "enable udf");

namespace nebula {

// static
FunctionManager &FunctionManager::instance() {
static FunctionManager instance;
if (FLAGS_enable_udf) {
static FunctionUdfManager udfManager;
}
return instance;
}

Expand Down Expand Up @@ -440,6 +447,9 @@ StatusOr<Value::Type> FunctionManager::getReturnType(const std::string &funcName
}
auto iter = typeSignature_.find(func);
if (iter == typeSignature_.end()) {
if (FLAGS_enable_udf) {
return FunctionUdfManager::getUdfReturnType(funcName, argsType);
}
return Status::Error("Function `%s' not defined", funcName.c_str());
}

Expand Down Expand Up @@ -2930,6 +2940,9 @@ Status FunctionManager::find(const std::string &func, const size_t arity) {
std::transform(func.begin(), func.end(), func.begin(), ::tolower);
auto iter = functions_.find(func);
if (iter == functions_.end()) {
if (FLAGS_enable_udf) {
return FunctionUdfManager::loadUdfFunction(func, arity);
}
return Status::Error("Function `%s' not defined", func.c_str());
}
// check arity
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/FunctionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class FunctionManager final {
static StatusOr<Value::Type> getReturnType(const std::string &funcName,
const std::vector<Value::Type> &argsType);

private:
// The attributes of the function call
struct FunctionAttributes final {
size_t minArity_{0};
Expand All @@ -89,6 +88,7 @@ class FunctionManager final {
}
}

private:
/**
* FunctionManager functions as a singleton, since the dynamic loading is
* process-wide.
Expand Down
204 changes: 204 additions & 0 deletions src/common/function/FunctionUdfManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "FunctionUdfManager.h"

#include <dirent.h>
#include <dlfcn.h>

#include <cstring>
#include <iostream>

#include "graph/service/GraphFlags.h"

DEFINE_string(udf_path, "lib/udf", "path to hold the udf");

namespace nebula {

static const char *dlsym_error;
static std::unordered_map<std::string, Value::Type> udfFunReturnType_;
static std::unordered_map<std::string, std::vector<std::vector<nebula::Value::Type>>>
udfFunInputType_;
std::unordered_map<std::string, FunctionManager::FunctionAttributes> udfFunctions_;

std::atomic<bool> expired_{};
std::atomic<bool> try_to_expire_{};
std::mutex mutex_;
std::condition_variable expired_cond_;

FunctionUdfManager &FunctionUdfManager::instance() {
static FunctionUdfManager instance;
return instance;
}

std::vector<std::string> getFilesList(const std::string &path, const char *ftype) {
std::vector<std::string> filenames;
DIR *pDir;
struct dirent *ptr;
if (!(pDir = opendir(path.c_str()))) {
LOG(ERROR) << "UDF Folder doesn't Exist!" << dlsym_error;
return filenames;
}
while ((ptr = readdir(pDir)) != 0) {
if (strcmp(ptr->d_name, ".") != 0 && strcmp(ptr->d_name, "..") != 0 &&
strcmp((ptr->d_name) + strlen(ptr->d_name) - strlen(ftype), ftype) == 0) {
filenames.emplace_back(ptr->d_name);
LOG(INFO) << "Load UDF SO Name: " << ptr->d_name;
}
}
closedir(pDir);
return filenames;
}

FunctionUdfManager::create_f *FunctionUdfManager::getGraphFunctionClass(void *func_handle) {
auto *create_func = reinterpret_cast<create_f *>(dlsym(func_handle, "create"));
dlsym_error = dlerror();
if (dlsym_error) {
LOG(ERROR) << "Cannot load symbol create: " << dlsym_error;
}
return create_func;
}

FunctionUdfManager::destroy_f *FunctionUdfManager::deleteGraphFunctionClass(void *func_handle) {
auto *destroy_func = reinterpret_cast<destroy_f *>(dlsym(func_handle, "destroy"));
dlsym_error = dlerror();
if (dlsym_error) {
LOG(ERROR) << "Cannot load symbol destroy: " << dlsym_error;
}
return destroy_func;
}

FunctionUdfManager::FunctionUdfManager() {
initAndLoadSoFunction();
expired_ = true;
try_to_expire_ = false;

std::thread([this]() {
while (!try_to_expire_) {
std::this_thread::sleep_for(std::chrono::seconds(300));
initAndLoadSoFunction();
}
{
std::lock_guard<std::mutex> locker(mutex_);
expired_ = true;
expired_cond_.notify_one();
}
}).detach();
}

void FunctionUdfManager::initAndLoadSoFunction() {
auto udfPath = FLAGS_udf_path;
LOG(INFO) << "Load UDF so library: " << udfPath;
std::vector<std::string> files = getFilesList(udfPath, ".so");

for (auto &file : files) {
const std::string &path = udfPath;
std::string so_path_string = path + file;
const char *soPath = so_path_string.c_str();
try {
void *func_handle = dlopen(soPath, RTLD_LAZY);
if (!func_handle) {
LOG(ERROR) << "Cannot load udf library: " << dlerror();
}
dlerror();

create_f *create_func = getGraphFunctionClass(func_handle);
destroy_f *destroy_func = deleteGraphFunctionClass(func_handle);
if (create_func == nullptr || destroy_func == nullptr) {
LOG(ERROR) << "GraphFunction Create Or Destroy Error: " << soPath;
break;
}

GraphFunction *gf = create_func();
char *funName = gf->name();
udfFunInputType_.emplace(funName, gf->inputType());
udfFunReturnType_.emplace(funName, gf->returnType());
addSoUdfFunction(funName, soPath, gf->minArity(), gf->maxArity(), gf->isPure());

destroy_func(gf);
dlclose(func_handle);
} catch (...) {
LOG(ERROR) << "load So library Error: " << soPath;
}
}
}

StatusOr<Value::Type> FunctionUdfManager::getUdfReturnType(
std::string func, const std::vector<Value::Type> &argsType) {
if (udfFunReturnType_.find(func) != udfFunReturnType_.end()) {
if (udfFunInputType_.find(func) != udfFunInputType_.end()) {
auto iter = udfFunInputType_.find(func);
for (const auto &args : iter->second) {
if (argsType == args || args[0] == Value::Type::NULLVALUE ||
args[0] == Value::Type::__EMPTY__) {
return udfFunReturnType_[func];
}
}
}
return Status::Error("Parameter's type error");
}
return Status::Error("Function `%s' not defined", func.c_str());
}

StatusOr<const FunctionManager::FunctionAttributes> nebula::FunctionUdfManager::loadUdfFunction(
std::string func, size_t arity) {
auto iter = udfFunctions_.find(func);
if (iter == udfFunctions_.end()) {
return Status::Error("Function `%s' not defined", func.c_str());
}
auto minArity = iter->second.minArity_;
auto maxArity = iter->second.maxArity_;
if (arity < minArity || arity > maxArity) {
if (minArity == maxArity) {
return Status::Error(
"Arity not match for function `%s': "
"provided %lu but %lu expected.",
func.c_str(),
arity,
minArity);
} else {
return Status::Error(
"Arity not match for function `%s': "
"provided %lu but %lu-%lu expected.",
func.c_str(),
arity,
minArity,
maxArity);
}
}
return iter->second;
}

void FunctionUdfManager::addSoUdfFunction(
char *funName, const char *soPath, size_t minArity, size_t maxArity, bool isPure) {
auto &attr = udfFunctions_[funName];
attr.minArity_ = minArity;
attr.maxArity_ = maxArity;
attr.isAlwaysPure_ = isPure;
std::string path = soPath;
attr.body_ = [path](const auto &args) -> Value {
try {
char *soPath2 = const_cast<char *>(path.c_str());
void *func_handle = dlopen(soPath2, RTLD_LAZY);
if (!func_handle) {
LOG(ERROR) << "Cannot load udf library: " << dlerror();
}
dlerror();

create_f *create_func = getGraphFunctionClass(func_handle);
destroy_f *destroy_func = deleteGraphFunctionClass(func_handle);

GraphFunction *gf = create_func();
Value res = gf->body(args);
destroy_func(gf);
dlclose(func_handle);
return res;
} catch (...) {
return Value::kNullBadData;
}
};
}

} // namespace nebula
40 changes: 40 additions & 0 deletions src/common/function/FunctionUdfManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef COMMON_FUNCTION_FUNCTIONUDFMANAGER_H_
#define COMMON_FUNCTION_FUNCTIONUDFMANAGER_H_

#include "FunctionManager.h"
#include "GraphFunction.h"

namespace nebula {

class FunctionManager;

class FunctionUdfManager {
public:
typedef GraphFunction *(create_f)();
typedef void(destroy_f)(GraphFunction *);

static StatusOr<Value::Type> getUdfReturnType(const std::string functionName,
const std::vector<Value::Type> &argsType);

static StatusOr<const FunctionManager::FunctionAttributes> loadUdfFunction(
std::string functionName, size_t arity);

static FunctionUdfManager &instance();

FunctionUdfManager();

private:
static create_f *getGraphFunctionClass(void *func_handle);
static destroy_f *deleteGraphFunctionClass(void *func_handle);

void addSoUdfFunction(char *funName, const char *soPath, size_t i, size_t i1, bool b);
void initAndLoadSoFunction();
};

} // namespace nebula
#endif // COMMON_FUNCTION_FUNCTIONUDFMANAGER_H_
38 changes: 38 additions & 0 deletions src/common/function/GraphFunction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef COMMON_FUNCTION_GRAPHFUNCTION_H
#define COMMON_FUNCTION_GRAPHFUNCTION_H

#include <vector>

#include "common/datatypes/Value.h"

class GraphFunction;

extern "C" GraphFunction *create();
extern "C" void destroy(GraphFunction *function);

class GraphFunction {
public:
virtual ~GraphFunction() = default;

virtual char *name() = 0;

virtual std::vector<std::vector<nebula::Value::Type>> inputType() = 0;

virtual nebula::Value::Type returnType() = 0;

virtual size_t minArity() = 0;

virtual size_t maxArity() = 0;

virtual bool isPure() = 0;

virtual nebula::Value body(
const std::vector<std::reference_wrapper<const nebula::Value>> &args) = 0;
};

#endif // COMMON_FUNCTION_GRAPHFUNCTION_H
2 changes: 2 additions & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ DECLARE_int32(listen_backlog);
DECLARE_string(listen_netdev);
DECLARE_string(local_ip);
DECLARE_string(pid_file);
DECLARE_bool(enable_udf);
DECLARE_string(udf_path);
DECLARE_bool(local_config);
DECLARE_bool(accept_partial_success);
DECLARE_bool(disable_octal_escape_char);
Expand Down
Loading