Skip to content

Commit

Permalink
[native] Retrieve Json function metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Aug 12, 2024
1 parent dac530d commit 396ecd2
Show file tree
Hide file tree
Showing 25 changed files with 1,719 additions and 62 deletions.
1 change: 1 addition & 0 deletions presto-docs/src/main/sphinx/presto-cpp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Note: Presto C++ is in active development. See :doc:`Limitations </presto_cpp/li
presto_cpp/features
presto_cpp/limitations
presto_cpp/properties
presto_cpp/sidecar

Overview
========
Expand Down
35 changes: 35 additions & 0 deletions presto-docs/src/main/sphinx/presto_cpp/sidecar.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
==================
Presto C++ Sidecar
==================

In a Presto C++ cluster, the coordinator communicates with the Presto C++ sidecar
process to better support Presto C++ specific functionality. This chapter documents
the REST API used in these communications and the configuration properties
pertaining to the Presto C++ sidecar.

.. contents::
:local:
:backlinks: none
:depth: 1

Endpoints
---------

The following HTTP methods are used by the Presto coordinator to fetch data from
the Presto C++ sidecar.

* A ``GET`` on ``/v1/functions`` returns JSON containing the function metadata for
all functions registered in the sidecar. The JSON has the function names as keys,
and a JSON array of function metadata, each conforming to the
``protocol::JsonBasedUdfFunctionMetadata`` format.


Properties
----------

Set the following configuration properties for the Presto C++ sidecar.

.. code-block:: none
native-sidecar=true
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ target_link_libraries(
$<TARGET_OBJECTS:presto_protocol>
presto_common
presto_exception
presto_function_metadata
presto_http
presto_operators
velox_aggregates
Expand Down
16 changes: 16 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "velox/common/base/Counters.h"
Expand Down Expand Up @@ -375,6 +376,16 @@ void PrestoServer::run() {
http::kMimeTypeApplicationJson)
.sendWithEOM();
});
if (systemConfig->prestoNativeSidecar()) {
httpServer_->registerGet(
"/v1/functions",
[server = this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
server->getFunctionSignatures(downstream);
});
}

if (systemConfig->enableRuntimeMetricsCollection()) {
enableWorkerStatsReporting();
Expand Down Expand Up @@ -1317,6 +1328,11 @@ void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
}

void PrestoServer::getFunctionSignatures(
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, getFunctionsMetadata());
}

protocol::NodeStatus PrestoServer::fetchNodeStatus() {
auto systemConfig = SystemConfig::instance();
const int64_t nodeMemoryGb = systemConfig->systemMemoryGb();
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class PrestoServer {

void reportNodeStatus(proxygen::ResponseHandler* downstream);

void getFunctionSignatures(proxygen::ResponseHandler* downstream);

protocol::NodeStatus fetchNodeStatus();

void populateMemAndCPUInfo();
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ target_link_libraries(presto_types presto_type_converter velox_type_fbhive

set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool)

add_library(presto_function_metadata OBJECT FunctionMetadata.cpp)

target_link_libraries(presto_function_metadata velox_function_registry)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
258 changes: 258 additions & 0 deletions presto-native-execution/presto_cpp/main/types/FunctionMetadata.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"
#include "velox/exec/Aggregate.h"
#include "velox/exec/AggregateFunctionRegistry.h"
#include "velox/exec/WindowFunction.h"
#include "velox/expression/SimpleFunctionRegistry.h"
#include "velox/functions/FunctionRegistry.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace facebook::presto {

namespace {

// The keys in velox function maps are of the format
// `catalog.schema.function_name`. This utility function extracts the
// three parts, {catalog, schema, function_name}, from the registered function.
const std::vector<std::string> getFunctionNameParts(
const std::string& registeredFunction) {
std::vector<std::string> parts;
folly::split('.', registeredFunction, parts, true);
VELOX_USER_CHECK(
parts.size() == 3,
fmt::format("Prefix missing for function {}", registeredFunction));
return parts;
}

// A function name is a companion function's if the name is an existing
// aggregation function name followed by specific suffixes.
bool isCompanionFunctionName(
const std::string& name,
const std::unordered_map<std::string, exec::AggregateFunctionEntry>&
aggregateFunctions) {
auto suffixOffset = name.rfind("_partial");
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_merge_extract");
}
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_merge");
}
if (suffixOffset == std::string::npos) {
suffixOffset = name.rfind("_extract");
}
if (suffixOffset == std::string::npos) {
return false;
}
return aggregateFunctions.count(name.substr(0, suffixOffset)) > 0;
}

const protocol::AggregationFunctionMetadata getAggregationFunctionMetadata(
const std::string& name,
const AggregateFunctionSignature& signature) {
protocol::AggregationFunctionMetadata metadata;
metadata.intermediateType = signature.intermediateType().toString();
metadata.isOrderSensitive =
getAggregateFunctionEntry(name)->metadata.orderSensitive;
return metadata;
}

const exec::VectorFunctionMetadata getScalarMetadata(const std::string& name) {
auto simpleFunctionMetadata =
exec::simpleFunctions().getFunctionSignaturesAndMetadata(name);
if (simpleFunctionMetadata.size()) {
// Functions like abs are registered as simple functions for primitive
// types, and as a vector function for complex types like DECIMAL. So do not
// throw an error if function metadata is not found in simple function
// signature map.
return simpleFunctionMetadata.back().first;
}

auto vectorFunctionMetadata = exec::getVectorFunctionMetadata(name);
if (vectorFunctionMetadata.has_value()) {
return vectorFunctionMetadata.value();
}
VELOX_UNREACHABLE("Metadata for function {} not found", name);
}

const protocol::RoutineCharacteristics getRoutineCharacteristics(
const std::string& name,
const protocol::FunctionKind& kind) {
protocol::Determinism determinism;
protocol::NullCallClause nullCallClause;
if (kind == protocol::FunctionKind::SCALAR) {
auto metadata = getScalarMetadata(name);
determinism = metadata.deterministic
? protocol::Determinism::DETERMINISTIC
: protocol::Determinism::NOT_DETERMINISTIC;
nullCallClause = metadata.defaultNullBehavior
? protocol::NullCallClause::RETURNS_NULL_ON_NULL_INPUT
: protocol::NullCallClause::CALLED_ON_NULL_INPUT;
} else {
// Default metadata values of DETERMINISTIC and CALLED_ON_NULL_INPUT for
// non-scalar functions.
determinism = protocol::Determinism::DETERMINISTIC;
nullCallClause = protocol::NullCallClause::CALLED_ON_NULL_INPUT;
}

protocol::RoutineCharacteristics routineCharacteristics;
routineCharacteristics.language =
std::make_shared<protocol::Language>(protocol::Language({"CPP"}));
routineCharacteristics.determinism =
std::make_shared<protocol::Determinism>(determinism);
routineCharacteristics.nullCallClause =
std::make_shared<protocol::NullCallClause>(nullCallClause);
return routineCharacteristics;
}

const protocol::JsonBasedUdfFunctionMetadata buildFunctionMetadata(
const std::string& name,
const std::string& schema,
const protocol::FunctionKind& kind,
const FunctionSignature& signature,
std::optional<AggregateFunctionSignature> aggregateSignature) {
protocol::JsonBasedUdfFunctionMetadata metadata;
metadata.docString = name;
metadata.functionKind = kind;
metadata.outputType = signature.returnType().toString();

const std::vector<TypeSignature> types = signature.argumentTypes();
std::vector<std::string> paramTypes;
for (const auto& type : types) {
paramTypes.emplace_back(type.toString());
}
metadata.paramTypes = paramTypes;
metadata.schema = schema;
metadata.routineCharacteristics = getRoutineCharacteristics(name, kind);

if (aggregateSignature.has_value()) {
metadata.aggregateMetadata =
std::make_shared<protocol::AggregationFunctionMetadata>(
getAggregationFunctionMetadata(name, aggregateSignature.value()));
}
return metadata;
}

json buildScalarMetadata(
const std::string& name,
const std::string& schema,
const std::vector<const FunctionSignature*>& signatures) {
const protocol::FunctionKind kind = protocol::FunctionKind::SCALAR;
json j = json::array();
json tj;
for (const auto& signature : signatures) {
protocol::to_json(
tj,
buildFunctionMetadata(name, schema, kind, *signature, std::nullopt));
j.push_back(tj);
}
return j;
}

json buildAggregateMetadata(
const std::string& name,
const std::string& schema,
const std::vector<AggregateFunctionSignaturePtr>& signatures) {
// All aggregate functions can be used as window functions.
VELOX_USER_CHECK(
getWindowFunctionSignatures(name).has_value(),
"Aggregate function {} not registered as a window function",
name);
const std::vector<protocol::FunctionKind> kinds = {
protocol::FunctionKind::AGGREGATE, protocol::FunctionKind::WINDOW};
json j = json::array();
json tj;
for (const auto& kind : kinds) {
for (const auto& signature : signatures) {
protocol::to_json(
tj,
buildFunctionMetadata(name, schema, kind, *signature, *signature));
j.push_back(tj);
}
}
return j;
}

json buildWindowMetadata(
const std::string& name,
const std::string& schema,
const std::vector<FunctionSignaturePtr>& signatures) {
const protocol::FunctionKind kind = protocol::FunctionKind::WINDOW;
json j = json::array();
json tj;
for (const auto& signature : signatures) {
protocol::to_json(
tj,
buildFunctionMetadata(name, schema, kind, *signature, std::nullopt));
j.push_back(tj);
}
return j;
}

} // namespace

json getFunctionsMetadata() {
json j;

// Get metadata for all registered scalar functions in velox.
const auto signatures = getFunctionSignatures();
static const std::unordered_set<std::string> kBlockList = {
"row_constructor", "in", "is_null"};
for (const auto& entry : signatures) {
const auto name = entry.first;
// Skip internal functions. They don't have any prefix.
if ((kBlockList.count(name) != 0) ||
(name.find("$internal$") != std::string::npos)) {
continue;
}

const auto parts = getFunctionNameParts(name);
const auto schema = parts[1];
const auto function = parts[2];
j[function] = buildScalarMetadata(name, schema, entry.second);
}

// Get metadata for all registered aggregate functions in velox.
const auto aggregateFunctions = exec::aggregateFunctions().copy();
for (const auto& entry : aggregateFunctions) {
if (!isCompanionFunctionName(entry.first, aggregateFunctions)) {
const auto name = entry.first;
const auto parts = getFunctionNameParts(name);
const auto schema = parts[1];
const auto function = parts[2];
j[function] =
buildAggregateMetadata(name, schema, entry.second.signatures);
}
}

// Get metadata for all registered window functions in velox. Skip aggregates
// as they have been processed.
const auto& functions = exec::windowFunctions();
for (const auto& entry : functions) {
if (aggregateFunctions.count(entry.first) == 0) {
const auto name = entry.first;
const auto parts = getFunctionNameParts(entry.first);
const auto schema = parts[1];
const auto function = parts[2];
j[function] = buildWindowMetadata(name, schema, entry.second.signatures);
}
}

return j;
}

} // namespace facebook::presto
24 changes: 24 additions & 0 deletions presto-native-execution/presto_cpp/main/types/FunctionMetadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "presto_cpp/external/json/nlohmann/json.hpp"

namespace facebook::presto {

// Returns metadata for all registered functions as json.
nlohmann::json getFunctionsMetadata();

} // namespace facebook::presto
Loading

0 comments on commit 396ecd2

Please sign in to comment.