Skip to content

Commit

Permalink
[native] Retrieve Json function metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Sep 10, 2024
1 parent a08e427 commit 568e1cf
Show file tree
Hide file tree
Showing 23 changed files with 1,693 additions and 61 deletions.
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ target_link_libraries(
$<TARGET_OBJECTS:presto_protocol>
presto_common
presto_exception
presto_function_metadata
presto_http
presto_operators
velox_aggregates
Expand Down
16 changes: 16 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
Expand Down Expand Up @@ -385,6 +386,16 @@ void PrestoServer::run() {
http::kMimeTypeApplicationJson)
.sendWithEOM();
});
if (systemConfig->prestoNativeSidecar()) {
httpServer_->registerGet(
"/v1/functions",
[server = this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
server->getFunctionSignatures(downstream);
});
}

if (systemConfig->enableRuntimeMetricsCollection()) {
enableWorkerStatsReporting();
Expand Down Expand Up @@ -1419,6 +1430,11 @@ void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
}

void PrestoServer::getFunctionSignatures(
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, getFunctionsMetadata());
}

protocol::NodeStatus PrestoServer::fetchNodeStatus() {
auto systemConfig = SystemConfig::instance();
const int64_t nodeMemoryGb = systemConfig->systemMemoryGb();
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ class PrestoServer {

void reportNodeStatus(proxygen::ResponseHandler* downstream);

void getFunctionSignatures(proxygen::ResponseHandler* downstream);

protocol::NodeStatus fetchNodeStatus();

void populateMemAndCPUInfo();
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ target_link_libraries(presto_types presto_type_converter velox_type_fbhive

set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool)

add_library(presto_function_metadata OBJECT FunctionMetadata.cpp)

target_link_libraries(presto_function_metadata velox_function_registry)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
262 changes: 262 additions & 0 deletions presto-native-execution/presto_cpp/main/types/FunctionMetadata.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"
#include "velox/exec/Aggregate.h"
#include "velox/exec/AggregateFunctionRegistry.h"
#include "velox/exec/WindowFunction.h"
#include "velox/expression/SimpleFunctionRegistry.h"
#include "velox/functions/FunctionRegistry.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace facebook::presto {

namespace {

// The keys in velox function maps are of the format
// `catalog.schema.function_name`. This utility function extracts the
// three parts, {catalog, schema, function_name}, from the registered function.
const std::vector<std::string> getFunctionNameParts(
const std::string& registeredFunction) {
std::vector<std::string> parts;
folly::split('.', registeredFunction, parts, true);
VELOX_USER_CHECK(
parts.size() == 3,
fmt::format("Prefix missing for function {}", registeredFunction));
return parts;
}

// TODO: Companion function information will subsequently be retrieved from the
// function metadata once this velox PR is merged:
// https://github.com/facebookincubator/velox/pull/9250.
// A function name is a companion function's if the name is an existing
// aggregation function name followed by specific suffixes.
bool isCompanionFunctionName(
const std::string& name,
const std::unordered_map<std::string, exec::AggregateFunctionEntry>&
aggregateFunctions) {
auto suffixOffset = name.rfind("_partial");
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_merge_extract");
}
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_merge");
}
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_extract");
}
if (suffixOffset == std::string::npos) {
return false;
}
return aggregateFunctions.count(name.substr(0, suffixOffset)) > 0;
}

const protocol::AggregationFunctionMetadata getAggregationFunctionMetadata(
const std::string& name,
const AggregateFunctionSignature& signature) {
protocol::AggregationFunctionMetadata metadata;
metadata.intermediateType = signature.intermediateType().toString();
metadata.isOrderSensitive =
getAggregateFunctionEntry(name)->metadata.orderSensitive;
return metadata;
}

const exec::VectorFunctionMetadata getScalarMetadata(const std::string& name) {
auto simpleFunctionMetadata =
exec::simpleFunctions().getFunctionSignaturesAndMetadata(name);
if (simpleFunctionMetadata.size()) {
// Functions like abs are registered as simple functions for primitive
// types, and as a vector function for complex types like DECIMAL. So do not
// throw an error if function metadata is not found in simple function
// signature map.
return simpleFunctionMetadata.back().first;
}

auto vectorFunctionMetadata = exec::getVectorFunctionMetadata(name);
if (vectorFunctionMetadata.has_value()) {
return vectorFunctionMetadata.value();
}
VELOX_UNREACHABLE("Metadata for function {} not found", name);
}

const protocol::RoutineCharacteristics getRoutineCharacteristics(
const std::string& name,
const protocol::FunctionKind& kind) {
protocol::Determinism determinism;
protocol::NullCallClause nullCallClause;
if (kind == protocol::FunctionKind::SCALAR) {
auto metadata = getScalarMetadata(name);
determinism = metadata.deterministic
? protocol::Determinism::DETERMINISTIC
: protocol::Determinism::NOT_DETERMINISTIC;
nullCallClause = metadata.defaultNullBehavior
? protocol::NullCallClause::RETURNS_NULL_ON_NULL_INPUT
: protocol::NullCallClause::CALLED_ON_NULL_INPUT;
} else {
// Default metadata values of DETERMINISTIC and CALLED_ON_NULL_INPUT for
// non-scalar functions.
determinism = protocol::Determinism::DETERMINISTIC;
nullCallClause = protocol::NullCallClause::CALLED_ON_NULL_INPUT;
}

protocol::RoutineCharacteristics routineCharacteristics;
routineCharacteristics.language =
std::make_shared<protocol::Language>(protocol::Language({"CPP"}));
routineCharacteristics.determinism =
std::make_shared<protocol::Determinism>(determinism);
routineCharacteristics.nullCallClause =
std::make_shared<protocol::NullCallClause>(nullCallClause);
return routineCharacteristics;
}

const protocol::JsonBasedUdfFunctionMetadata buildFunctionMetadata(
const std::string& name,
const std::string& schema,
const protocol::FunctionKind& kind,
const FunctionSignature& signature,
const AggregateFunctionSignaturePtr& aggregateSignature = nullptr) {
protocol::JsonBasedUdfFunctionMetadata metadata;
metadata.docString = name;
metadata.functionKind = kind;
metadata.outputType = signature.returnType().toString();
std::vector<std::string> paramTypes;
paramTypes.reserve(signature.argumentTypes().size());
for (const auto& type : signature.argumentTypes()) {
paramTypes.emplace_back(type.toString());
}
metadata.paramTypes = paramTypes;
metadata.schema = schema;
metadata.routineCharacteristics = getRoutineCharacteristics(name, kind);

if (aggregateSignature) {
metadata.aggregateMetadata =
std::make_shared<protocol::AggregationFunctionMetadata>(
getAggregationFunctionMetadata(name, *aggregateSignature));
}
return metadata;
}

json buildScalarMetadata(
const std::string& name,
const std::string& schema,
const std::vector<const FunctionSignature*>& signatures) {
json j = json::array();
json tj;
for (const auto& signature : signatures) {
protocol::to_json(
tj,
buildFunctionMetadata(
name, schema, protocol::FunctionKind::SCALAR, *signature));
j.push_back(tj);
}
return j;
}

json buildAggregateMetadata(
const std::string& name,
const std::string& schema,
const std::vector<AggregateFunctionSignaturePtr>& signatures) {
// All aggregate functions can be used as window functions.
VELOX_USER_CHECK(
getWindowFunctionSignatures(name).has_value(),
"Aggregate function {} not registered as a window function",
name);
const std::vector<protocol::FunctionKind> kinds = {
protocol::FunctionKind::AGGREGATE, protocol::FunctionKind::WINDOW};
json j = json::array();
json tj;
for (const auto& kind : kinds) {
for (const auto& signature : signatures) {
protocol::to_json(
tj, buildFunctionMetadata(name, schema, kind, *signature, signature));
j.push_back(tj);
}
}
return j;
}

json buildWindowMetadata(
const std::string& name,
const std::string& schema,
const std::vector<FunctionSignaturePtr>& signatures) {
json j = json::array();
json tj;
for (const auto& signature : signatures) {
protocol::to_json(
tj,
buildFunctionMetadata(
name, schema, protocol::FunctionKind::WINDOW, *signature));
j.push_back(tj);
}
return j;
}

} // namespace

json getFunctionsMetadata() {
json j;

// Get metadata for all registered scalar functions in velox.
const auto signatures = getFunctionSignatures();
static const std::unordered_set<std::string> kBlockList = {
"row_constructor", "in", "is_null"};
// Exclude aggregate companion functions (extract aggregate companion
// functions are registered as vector functions).
const auto aggregateFunctions = exec::aggregateFunctions().copy();
for (const auto& entry : signatures) {
const auto name = entry.first;
// Skip internal functions. They don't have any prefix.
if (kBlockList.count(name) != 0 ||
name.find("$internal$") != std::string::npos ||
isCompanionFunctionName(name, aggregateFunctions)) {
continue;
}

const auto parts = getFunctionNameParts(name);
const auto schema = parts[1];
const auto function = parts[2];
j[function] = buildScalarMetadata(name, schema, entry.second);
}

// Get metadata for all registered aggregate functions in velox.
for (const auto& entry : aggregateFunctions) {
if (!isCompanionFunctionName(entry.first, aggregateFunctions)) {
const auto name = entry.first;
const auto parts = getFunctionNameParts(name);
const auto schema = parts[1];
const auto function = parts[2];
j[function] =
buildAggregateMetadata(name, schema, entry.second.signatures);
}
}

// Get metadata for all registered window functions in velox. Skip aggregates
// as they have been processed.
const auto& functions = exec::windowFunctions();
for (const auto& entry : functions) {
if (aggregateFunctions.count(entry.first) == 0) {
const auto name = entry.first;
const auto parts = getFunctionNameParts(entry.first);
const auto schema = parts[1];
const auto function = parts[2];
j[function] = buildWindowMetadata(name, schema, entry.second.signatures);
}
}

return j;
}

} // namespace facebook::presto
24 changes: 24 additions & 0 deletions presto-native-execution/presto_cpp/main/types/FunctionMetadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "presto_cpp/external/json/nlohmann/json.hpp"

namespace facebook::presto {

// Returns metadata for all registered functions as json.
nlohmann::json getFunctionsMetadata();

} // namespace facebook::presto
21 changes: 21 additions & 0 deletions presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_library(presto_type_test_utils OBJECT TestUtils.cpp)

add_executable(presto_velox_split_test PrestoToVeloxSplitTest.cpp)

add_test(presto_velox_split_test presto_velox_split_test)
Expand Down Expand Up @@ -48,6 +50,7 @@ target_link_libraries(
$<TARGET_OBJECTS:presto_type_converter>
$<TARGET_OBJECTS:presto_types>
presto_operators
presto_type_test_utils
velox_core
velox_dwio_common_exception
velox_encode
Expand Down Expand Up @@ -89,3 +92,21 @@ target_link_libraries(
velox_tpch_connector
GTest::gtest
GTest::gtest_main)

add_executable(presto_function_metadata_test FunctionMetadataTest.cpp)

add_test(
NAME presto_function_metadata_test
COMMAND presto_function_metadata_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

target_link_libraries(
presto_function_metadata_test
presto_function_metadata
presto_protocol
presto_type_test_utils
velox_aggregates
velox_functions_prestosql
velox_window
gtest
gtest_main)
Loading

0 comments on commit 568e1cf

Please sign in to comment.