Skip to content

Commit

Permalink
[native] Retrieve Json function metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodsatya committed Aug 2, 2024
1 parent 052c5d9 commit 27d780d
Show file tree
Hide file tree
Showing 12 changed files with 658 additions and 3 deletions.
11 changes: 11 additions & 0 deletions presto-docs/src/main/sphinx/develop/worker-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ output buffer zero.
.. image:: worker-protocol-results.png
:width: 600

Presto Native Sidecar Endpoints
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The following HTTP methods are used by the coordinator to fetch data from the
Presto Native Sidecar process.

* A ``GET`` on ``/v1/info/workerFunctionSignatures`` returns a json containing
the function metadata for all functions registered in the sidecar. The json
has the function names as keys, and a json array of function metadata, each
conforming to the ``protocol::JsonBasedUdfFunctionMetadata`` format.

Output Buffers
~~~~~~~~~~~~~~

Expand Down
16 changes: 16 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/main/types/PrestoToVeloxConnector.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "velox/common/base/Counters.h"
Expand Down Expand Up @@ -375,6 +376,16 @@ void PrestoServer::run() {
http::kMimeTypeApplicationJson)
.sendWithEOM();
});
if (systemConfig->prestoNativeSidecar()) {
httpServer_->registerGet(
"/v1/info/workerFunctionSignatures",
[server = this](
proxygen::HTTPMessage* /*message*/,
const std::vector<std::unique_ptr<folly::IOBuf>>& /*body*/,
proxygen::ResponseHandler* downstream) {
server->getFunctionSignatures(downstream);
});
}

if (systemConfig->enableRuntimeMetricsCollection()) {
enableWorkerStatsReporting();
Expand Down Expand Up @@ -1317,6 +1328,11 @@ void PrestoServer::reportNodeStatus(proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, json(fetchNodeStatus()));
}

void PrestoServer::getFunctionSignatures(
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, getJsonFunctionMetadata());
}

protocol::NodeStatus PrestoServer::fetchNodeStatus() {
auto systemConfig = SystemConfig::instance();
const int64_t nodeMemoryGb = systemConfig->systemMemoryGb();
Expand Down
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class PrestoServer {

void reportNodeStatus(proxygen::ResponseHandler* downstream);

void getFunctionSignatures(proxygen::ResponseHandler* downstream);

protocol::NodeStatus fetchNodeStatus();

void populateMemAndCPUInfo();
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ target_link_libraries(presto_types presto_type_converter velox_type_fbhive

set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool)

add_library(presto_function_metadata OBJECT FunctionMetadata.cpp)

target_link_libraries(presto_function_metadata velox_function_registry)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
210 changes: 210 additions & 0 deletions presto-native-execution/presto_cpp/main/types/FunctionMetadata.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/Aggregate.h"
#include "velox/exec/AggregateFunctionRegistry.h"
#include "velox/exec/WindowFunction.h"
#include "velox/functions/FunctionRegistry.h"

#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/presto_protocol/presto_protocol.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace facebook::presto {

namespace {

constexpr char const* kDefaultSchema = "default";

// The keys in velox function maps are of the format
// catalog.schema.function_name. This utility function extracts the
// function_name from this string.
const std::string getFunctionName(const std::string& registeredFunctionName) {
std::vector<std::string> pieces;
folly::split('.', registeredFunctionName, pieces, true);
return pieces.back();
}

const protocol::AggregationFunctionMetadata getAggregationFunctionMetadata(
const std::string& name,
const AggregateFunctionSignature& signature) {
protocol::AggregationFunctionMetadata metadata;
metadata.intermediateType = signature.intermediateType().toString();
metadata.isOrderSensitive =
getAggregateFunctionEntry(name)->metadata.orderSensitive;
return metadata;
}

const protocol::RoutineCharacteristics getRoutineCharacteristics(
const FunctionSignature& signature,
const std::string& name,
const protocol::FunctionKind& kind) {
protocol::Determinism determinism;
protocol::NullCallClause nullCallClause;
if (kind == protocol::FunctionKind::SCALAR) {
auto metadata = getFunctionMetadata(name);
VELOX_USER_CHECK(metadata.has_value());
determinism = metadata.value().deterministic
? protocol::Determinism::DETERMINISTIC
: protocol::Determinism::NOT_DETERMINISTIC;
nullCallClause = metadata.value().defaultNullBehavior
? protocol::NullCallClause::RETURNS_NULL_ON_NULL_INPUT
: protocol::NullCallClause::CALLED_ON_NULL_INPUT;
} else {
// Default metadata values of DETERMINISTIC and CALLED_ON_NULL_INPUT for
// non-scalar functions.
determinism = protocol::Determinism::DETERMINISTIC;
nullCallClause = protocol::NullCallClause::CALLED_ON_NULL_INPUT;
}

protocol::RoutineCharacteristics routineCharacteristics;
routineCharacteristics.language =
std::make_shared<protocol::Language>(protocol::Language({"CPP"}));
routineCharacteristics.determinism =
std::make_shared<protocol::Determinism>(determinism);
routineCharacteristics.nullCallClause =
std::make_shared<protocol::NullCallClause>(nullCallClause);
return routineCharacteristics;
}

void updateFunctionMetadata(
const std::string& name,
const FunctionSignature& signature,
protocol::JsonBasedUdfFunctionMetadata& jsonMetadata) {
jsonMetadata.docString = name;
jsonMetadata.schema = kDefaultSchema;
jsonMetadata.outputType = signature.returnType().toString();

const std::vector<TypeSignature> types = signature.argumentTypes();
std::vector<std::string> paramTypes;
for (const auto& type : types) {
paramTypes.emplace_back(type.toString());
}
jsonMetadata.paramTypes = paramTypes;
}

const std::vector<protocol::JsonBasedUdfFunctionMetadata>
buildAggregateJsonMetadata(
const std::string& name,
const std::vector<AggregateFunctionSignaturePtr>& signatures) {
std::vector<protocol::JsonBasedUdfFunctionMetadata> jsonMetadataList;
jsonMetadataList.reserve(signatures.size());
const protocol::FunctionKind kind = protocol::FunctionKind::AGGREGATE;

for (const auto& signature : signatures) {
protocol::JsonBasedUdfFunctionMetadata jsonMetadata;
jsonMetadata.functionKind = kind;
jsonMetadata.routineCharacteristics =
getRoutineCharacteristics(*signature, name, kind);
jsonMetadata.aggregateMetadata =
std::make_shared<protocol::AggregationFunctionMetadata>(
getAggregationFunctionMetadata(name, *signature));

updateFunctionMetadata(name, *signature, jsonMetadata);
jsonMetadataList.emplace_back(jsonMetadata);
}
return jsonMetadataList;
}

const std::vector<protocol::JsonBasedUdfFunctionMetadata>
buildScalarJsonMetadata(
const std::string& name,
const std::vector<const FunctionSignature*>& signatures) {
std::vector<protocol::JsonBasedUdfFunctionMetadata> jsonMetadataList;
jsonMetadataList.reserve(signatures.size());
const protocol::FunctionKind kind = protocol::FunctionKind::SCALAR;

for (const auto& signature : signatures) {
protocol::JsonBasedUdfFunctionMetadata jsonMetadata;
jsonMetadata.functionKind = kind;
jsonMetadata.routineCharacteristics =
getRoutineCharacteristics(*signature, name, kind);

updateFunctionMetadata(name, *signature, jsonMetadata);
jsonMetadataList.emplace_back(jsonMetadata);
}
return jsonMetadataList;
}

const std::vector<protocol::JsonBasedUdfFunctionMetadata>
buildWindowJsonMetadata(
const std::string& name,
const std::vector<FunctionSignaturePtr>& signatures) {
std::vector<protocol::JsonBasedUdfFunctionMetadata> jsonMetadataList;
jsonMetadataList.reserve(signatures.size());
const protocol::FunctionKind kind = protocol::FunctionKind::WINDOW;

for (const auto& signature : signatures) {
protocol::JsonBasedUdfFunctionMetadata jsonMetadata;
jsonMetadata.functionKind = kind;
jsonMetadata.routineCharacteristics =
getRoutineCharacteristics(*signature, name, kind);

updateFunctionMetadata(name, *signature, jsonMetadata);
jsonMetadataList.emplace_back(jsonMetadata);
}
return jsonMetadataList;
}

const std::vector<protocol::JsonBasedUdfFunctionMetadata> getFunctionMetadata(
const std::string& name) {
if (auto aggregateSignatures = getAggregateFunctionSignatures(name)) {
return buildAggregateJsonMetadata(name, aggregateSignatures.value());
} else if (auto windowSignatures = getWindowFunctionSignatures(name)) {
return buildWindowJsonMetadata(name, windowSignatures.value());
} else {
auto signatures = getFunctionSignatures();
if (signatures.find(name) != signatures.end()) {
return buildScalarJsonMetadata(name, signatures.at(name));
}
}
VELOX_UNREACHABLE("Function kind for {} cannot be determined", name);
}

} // namespace

json getJsonMetadataForFunction(const std::string& name) {
auto metadataList = getFunctionMetadata(name);
auto functionName = getFunctionName(name);
json j = json::array();
json tj;
for (const auto& metadata : metadataList) {
protocol::to_json(tj, metadata);
j.emplace_back(tj);
}
return j;
}

json getJsonFunctionMetadata() {
// Get all registered aggregate, window, and scalar function names from
// FunctionRegistry.
auto functions = getSortedAggregateNames();
auto scalarFunctions = getSortedScalarNames();
for (const auto& scalarFunction : scalarFunctions) {
functions.emplace_back(scalarFunction);
}
auto windowFunctions = getSortedWindowNames();
for (const auto& windowFunction : windowFunctions) {
functions.emplace_back(windowFunction);
}

nlohmann::json j;
for (const auto& function : functions) {
j[function] = getJsonMetadataForFunction(function);
}
return j;
}

} // namespace facebook::presto
25 changes: 25 additions & 0 deletions presto-native-execution/presto_cpp/main/types/FunctionMetadata.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "presto_cpp/external/json/nlohmann/json.hpp"

namespace facebook::presto {

nlohmann::json getJsonMetadataForFunction(const std::string& name);

nlohmann::json getJsonFunctionMetadata();

} // namespace facebook::presto
21 changes: 21 additions & 0 deletions presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,24 @@ target_link_libraries(
velox_tpch_connector
gtest
gtest_main)

add_executable(presto_function_metadata_test FunctionMetadataTest.cpp)

add_test(
NAME presto_function_metadata_test
COMMAND presto_function_metadata_test
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

target_link_libraries(
presto_function_metadata_test
gtest
gtest_main
presto_function_metadata
presto_protocol
velox_aggregates
velox_coverage_util
velox_exec
velox_functions_prestosql
presto_type_converter
velox_window
${ANTLR4_RUNTIME})
Loading

0 comments on commit 27d780d

Please sign in to comment.