Skip to content

Commit

Permalink
UDF based on existing function infra (#4804)
Browse files Browse the repository at this point in the history
Added UDF support using C++.

---------

Co-authored-by: zhaojunnan <[email protected]>
Co-authored-by: Wey Gu <[email protected]>
Co-authored-by: Cheng Xuntao <[email protected]>
  • Loading branch information
4 people authored Feb 13, 2023
1 parent a6d31b3 commit 26bec49
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 1 deletion.
7 changes: 7 additions & 0 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@
# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true

# enable udf, written in c++ only for now
--enable_udf=true

# set the directory where the .so files of udf are stored, when enable_udf is true
--udf_path=/home/nebula/dev/nebula/udf/

########## session ##########
# Maximum number of sessions that can be created per IP and per user
--max_sessions_per_ip_per_user=300
Expand All @@ -116,3 +122,4 @@
--memory_purge_enabled=true
# memory background purge interval in seconds
--memory_purge_interval_seconds=10

2 changes: 2 additions & 0 deletions src/common/function/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ nebula_add_library(
function_manager_obj OBJECT
FunctionManager.cpp
../geo/GeoFunction.cpp
FunctionUdfManager.cpp
GraphFunction.h
)

nebula_add_library(
Expand Down
13 changes: 13 additions & 0 deletions src/common/function/FunctionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <boost/algorithm/string/replace.hpp>
#include <cstdint>

#include "FunctionUdfManager.h"
#include "common/base/Base.h"
#include "common/datatypes/DataSet.h"
#include "common/datatypes/Edge.h"
Expand All @@ -28,12 +29,18 @@
#include "common/thrift/ThriftTypes.h"
#include "common/time/TimeUtils.h"
#include "common/time/WallClock.h"
#include "graph/service/GraphFlags.h"

DEFINE_bool(enable_udf, false, "enable udf");

namespace nebula {

// static
FunctionManager &FunctionManager::instance() {
static FunctionManager instance;
if (FLAGS_enable_udf) {
static FunctionUdfManager udfManager;
}
return instance;
}

Expand Down Expand Up @@ -440,6 +447,9 @@ StatusOr<Value::Type> FunctionManager::getReturnType(const std::string &funcName
}
auto iter = typeSignature_.find(func);
if (iter == typeSignature_.end()) {
if (FLAGS_enable_udf) {
return FunctionUdfManager::getUdfReturnType(funcName, argsType);
}
return Status::Error("Function `%s' not defined", funcName.c_str());
}

Expand Down Expand Up @@ -2930,6 +2940,9 @@ Status FunctionManager::find(const std::string &func, const size_t arity) {
std::transform(func.begin(), func.end(), func.begin(), ::tolower);
auto iter = functions_.find(func);
if (iter == functions_.end()) {
if (FLAGS_enable_udf) {
return FunctionUdfManager::loadUdfFunction(func, arity);
}
return Status::Error("Function `%s' not defined", func.c_str());
}
// check arity
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/FunctionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class FunctionManager final {
static StatusOr<Value::Type> getReturnType(const std::string &funcName,
const std::vector<Value::Type> &argsType);

private:
// The attributes of the function call
struct FunctionAttributes final {
size_t minArity_{0};
Expand All @@ -89,6 +88,7 @@ class FunctionManager final {
}
}

private:
/**
* FunctionManager functions as a singleton, since the dynamic loading is
* process-wide.
Expand Down
204 changes: 204 additions & 0 deletions src/common/function/FunctionUdfManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "FunctionUdfManager.h"

#include <dirent.h>
#include <dlfcn.h>

#include <cstring>
#include <iostream>

#include "graph/service/GraphFlags.h"

DEFINE_string(udf_path, "lib/udf", "path to hold the udf");

namespace nebula {

static const char *dlsym_error;
static std::unordered_map<std::string, Value::Type> udfFunReturnType_;
static std::unordered_map<std::string, std::vector<std::vector<nebula::Value::Type>>>
udfFunInputType_;
std::unordered_map<std::string, FunctionManager::FunctionAttributes> udfFunctions_;

std::atomic<bool> expired_{};
std::atomic<bool> try_to_expire_{};
std::mutex mutex_;
std::condition_variable expired_cond_;

FunctionUdfManager &FunctionUdfManager::instance() {
static FunctionUdfManager instance;
return instance;
}

std::vector<std::string> getFilesList(const std::string &path, const char *ftype) {
std::vector<std::string> filenames;
DIR *pDir;
struct dirent *ptr;
if (!(pDir = opendir(path.c_str()))) {
LOG(ERROR) << "UDF Folder doesn't Exist!" << dlsym_error;
return filenames;
}
while ((ptr = readdir(pDir)) != 0) {
if (strcmp(ptr->d_name, ".") != 0 && strcmp(ptr->d_name, "..") != 0 &&
strcmp((ptr->d_name) + strlen(ptr->d_name) - strlen(ftype), ftype) == 0) {
filenames.emplace_back(ptr->d_name);
LOG(INFO) << "Load UDF SO Name: " << ptr->d_name;
}
}
closedir(pDir);
return filenames;
}

FunctionUdfManager::create_f *FunctionUdfManager::getGraphFunctionClass(void *func_handle) {
auto *create_func = reinterpret_cast<create_f *>(dlsym(func_handle, "create"));
dlsym_error = dlerror();
if (dlsym_error) {
LOG(ERROR) << "Cannot load symbol create: " << dlsym_error;
}
return create_func;
}

FunctionUdfManager::destroy_f *FunctionUdfManager::deleteGraphFunctionClass(void *func_handle) {
auto *destroy_func = reinterpret_cast<destroy_f *>(dlsym(func_handle, "destroy"));
dlsym_error = dlerror();
if (dlsym_error) {
LOG(ERROR) << "Cannot load symbol destroy: " << dlsym_error;
}
return destroy_func;
}

FunctionUdfManager::FunctionUdfManager() {
initAndLoadSoFunction();
expired_ = true;
try_to_expire_ = false;

std::thread([this]() {
while (!try_to_expire_) {
std::this_thread::sleep_for(std::chrono::seconds(300));
initAndLoadSoFunction();
}
{
std::lock_guard<std::mutex> locker(mutex_);
expired_ = true;
expired_cond_.notify_one();
}
}).detach();
}

void FunctionUdfManager::initAndLoadSoFunction() {
auto udfPath = FLAGS_udf_path;
LOG(INFO) << "Load UDF so library: " << udfPath;
std::vector<std::string> files = getFilesList(udfPath, ".so");

for (auto &file : files) {
const std::string &path = udfPath;
std::string so_path_string = path + file;
const char *soPath = so_path_string.c_str();
try {
void *func_handle = dlopen(soPath, RTLD_LAZY);
if (!func_handle) {
LOG(ERROR) << "Cannot load udf library: " << dlerror();
}
dlerror();

create_f *create_func = getGraphFunctionClass(func_handle);
destroy_f *destroy_func = deleteGraphFunctionClass(func_handle);
if (create_func == nullptr || destroy_func == nullptr) {
LOG(ERROR) << "GraphFunction Create Or Destroy Error: " << soPath;
break;
}

GraphFunction *gf = create_func();
char *funName = gf->name();
udfFunInputType_.emplace(funName, gf->inputType());
udfFunReturnType_.emplace(funName, gf->returnType());
addSoUdfFunction(funName, soPath, gf->minArity(), gf->maxArity(), gf->isPure());

destroy_func(gf);
dlclose(func_handle);
} catch (...) {
LOG(ERROR) << "load So library Error: " << soPath;
}
}
}

StatusOr<Value::Type> FunctionUdfManager::getUdfReturnType(
std::string func, const std::vector<Value::Type> &argsType) {
if (udfFunReturnType_.find(func) != udfFunReturnType_.end()) {
if (udfFunInputType_.find(func) != udfFunInputType_.end()) {
auto iter = udfFunInputType_.find(func);
for (const auto &args : iter->second) {
if (argsType == args || args[0] == Value::Type::NULLVALUE ||
args[0] == Value::Type::__EMPTY__) {
return udfFunReturnType_[func];
}
}
}
return Status::Error("Parameter's type error");
}
return Status::Error("Function `%s' not defined", func.c_str());
}

StatusOr<const FunctionManager::FunctionAttributes> nebula::FunctionUdfManager::loadUdfFunction(
std::string func, size_t arity) {
auto iter = udfFunctions_.find(func);
if (iter == udfFunctions_.end()) {
return Status::Error("Function `%s' not defined", func.c_str());
}
auto minArity = iter->second.minArity_;
auto maxArity = iter->second.maxArity_;
if (arity < minArity || arity > maxArity) {
if (minArity == maxArity) {
return Status::Error(
"Arity not match for function `%s': "
"provided %lu but %lu expected.",
func.c_str(),
arity,
minArity);
} else {
return Status::Error(
"Arity not match for function `%s': "
"provided %lu but %lu-%lu expected.",
func.c_str(),
arity,
minArity,
maxArity);
}
}
return iter->second;
}

void FunctionUdfManager::addSoUdfFunction(
char *funName, const char *soPath, size_t minArity, size_t maxArity, bool isPure) {
auto &attr = udfFunctions_[funName];
attr.minArity_ = minArity;
attr.maxArity_ = maxArity;
attr.isAlwaysPure_ = isPure;
std::string path = soPath;
attr.body_ = [path](const auto &args) -> Value {
try {
char *soPath2 = const_cast<char *>(path.c_str());
void *func_handle = dlopen(soPath2, RTLD_LAZY);
if (!func_handle) {
LOG(ERROR) << "Cannot load udf library: " << dlerror();
}
dlerror();

create_f *create_func = getGraphFunctionClass(func_handle);
destroy_f *destroy_func = deleteGraphFunctionClass(func_handle);

GraphFunction *gf = create_func();
Value res = gf->body(args);
destroy_func(gf);
dlclose(func_handle);
return res;
} catch (...) {
return Value::kNullBadData;
}
};
}

} // namespace nebula
40 changes: 40 additions & 0 deletions src/common/function/FunctionUdfManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef COMMON_FUNCTION_FUNCTIONUDFMANAGER_H_
#define COMMON_FUNCTION_FUNCTIONUDFMANAGER_H_

#include "FunctionManager.h"
#include "GraphFunction.h"

namespace nebula {

class FunctionManager;

class FunctionUdfManager {
public:
typedef GraphFunction *(create_f)();
typedef void(destroy_f)(GraphFunction *);

static StatusOr<Value::Type> getUdfReturnType(const std::string functionName,
const std::vector<Value::Type> &argsType);

static StatusOr<const FunctionManager::FunctionAttributes> loadUdfFunction(
std::string functionName, size_t arity);

static FunctionUdfManager &instance();

FunctionUdfManager();

private:
static create_f *getGraphFunctionClass(void *func_handle);
static destroy_f *deleteGraphFunctionClass(void *func_handle);

void addSoUdfFunction(char *funName, const char *soPath, size_t i, size_t i1, bool b);
void initAndLoadSoFunction();
};

} // namespace nebula
#endif // COMMON_FUNCTION_FUNCTIONUDFMANAGER_H_
38 changes: 38 additions & 0 deletions src/common/function/GraphFunction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef COMMON_FUNCTION_GRAPHFUNCTION_H
#define COMMON_FUNCTION_GRAPHFUNCTION_H

#include <vector>

#include "common/datatypes/Value.h"

class GraphFunction;

extern "C" GraphFunction *create();
extern "C" void destroy(GraphFunction *function);

class GraphFunction {
public:
virtual ~GraphFunction() = default;

virtual char *name() = 0;

virtual std::vector<std::vector<nebula::Value::Type>> inputType() = 0;

virtual nebula::Value::Type returnType() = 0;

virtual size_t minArity() = 0;

virtual size_t maxArity() = 0;

virtual bool isPure() = 0;

virtual nebula::Value body(
const std::vector<std::reference_wrapper<const nebula::Value>> &args) = 0;
};

#endif // COMMON_FUNCTION_GRAPHFUNCTION_H
2 changes: 2 additions & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ DECLARE_int32(listen_backlog);
DECLARE_string(listen_netdev);
DECLARE_string(local_ip);
DECLARE_string(pid_file);
DECLARE_bool(enable_udf);
DECLARE_string(udf_path);
DECLARE_bool(local_config);
DECLARE_bool(accept_partial_success);
DECLARE_bool(disable_octal_escape_char);
Expand Down
Loading

0 comments on commit 26bec49

Please sign in to comment.