Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate single JSON with all types of plans in the server #1606

Merged
merged 10 commits into from
Feb 7, 2024
222 changes: 220 additions & 2 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <ydb/library/yql/utils/plan/plan_utils.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

#include <ydb/public/lib/ydb_cli/common/format.h>

#include <library/cpp/json/writer/json.h>
#include <library/cpp/json/json_reader.h>
#include <library/cpp/protobuf/json/proto2json.h>
Expand Down Expand Up @@ -1811,6 +1813,217 @@ void SetNonZero(NJson::TJsonValue& node, const TStringBuf& name, T value) {
}
}

void BuildPlanIndex(NJson::TJsonValue& plan, THashMap<int, NJson::TJsonValue>& planIndex, THashMap<TString, NJson::TJsonValue>& precomputes) {
if (plan.GetMapSafe().contains("PlanNodeId")){
auto id = plan.GetMapSafe().at("PlanNodeId").GetIntegerSafe();
planIndex[id] = plan;
}

if (plan.GetMapSafe().contains("Subplan Name")) {
const auto& precomputeName = plan.GetMapSafe().at("Subplan Name").GetStringSafe();

auto pos = precomputeName.find("precompute");
if (pos != TString::npos) {
precomputes[precomputeName.substr(pos)] = plan;
}
}

if (plan.GetMapSafe().contains("Plans")) {
for (auto p : plan.GetMapSafe().at("Plans").GetArraySafe()) {
BuildPlanIndex(p, planIndex, precomputes);
}
}
}

TVector<NJson::TJsonValue> RemoveRedundantNodes(NJson::TJsonValue& plan, const THashSet<TString>& redundantNodes) {
auto& planMap = plan.GetMapSafe();

TVector<NJson::TJsonValue> children;
if (planMap.contains("Plans") && planMap.at("Plans").IsArray()) {
for (auto& child : planMap.at("Plans").GetArraySafe()) {
auto newChildren = RemoveRedundantNodes(child, redundantNodes);
children.insert(children.end(), newChildren.begin(), newChildren.end());
}
}

planMap.erase("Plans");
if (!children.empty()) {
auto& plans = planMap["Plans"];
for (auto& child : children) {
plans.AppendValue(child);
}
}

const auto typeName = planMap.at("Node Type").GetStringSafe();
if (redundantNodes.contains(typeName) || typeName.find("Precompute") != TString::npos) {
return children;
}

return {plan};
}

NJson::TJsonValue ReconstructQueryPlanRec(const NJson::TJsonValue& plan,
int operatorIndex,
const THashMap<int, NJson::TJsonValue>& planIndex,
const THashMap<TString, NJson::TJsonValue>& precomputes,
int& nodeCounter) {

int currentNodeId = nodeCounter++;

NJson::TJsonValue result;
result["PlanNodeId"] = currentNodeId;
if (plan.GetMapSafe().contains("PlanNodeType")) {
result["PlanNodeType"] = plan.GetMapSafe().at("PlanNodeType").GetStringSafe();
}

if (plan.GetMapSafe().contains("Stats")) {
result["Stats"] = plan.GetMapSafe().at("Stats");
}

if (!plan.GetMapSafe().contains("Operators")) {
NJson::TJsonValue planInputs;

result["Node Type"] = plan.GetMapSafe().at("Node Type").GetStringSafe();

if (!plan.GetMapSafe().contains("Plans")) {
return result;
}

if (plan.GetMapSafe().at("Node Type") == "TableLookup") {
NJson::TJsonValue newOps;
NJson::TJsonValue op;

op["Name"] = "TableLookup";
op["Columns"] = plan.GetMapSafe().at("Columns");
op["LookupKeyColumns"] = plan.GetMapSafe().at("LookupKeyColumns");
op["Table"] = plan.GetMapSafe().at("Table");

newOps.AppendValue(op);

result["Operators"] = newOps;
return result;
}

for (auto p : plan.GetMapSafe().at("Plans").GetArraySafe()) {
if (p.GetMapSafe().at("Node Type").GetStringSafe().find("Precompute") == TString::npos) {
planInputs.AppendValue(ReconstructQueryPlanRec(p, 0, planIndex, precomputes, nodeCounter));
}
}
result["Plans"] = planInputs;
return result;
}

if (plan.GetMapSafe().contains("CTE Name") && plan.GetMapSafe().at("Node Type") == "ConstantExpr") {
auto precompute = plan.GetMapSafe().at("CTE Name").GetStringSafe();
if (!precomputes.contains(precompute)) {
result["Node Type"] = "ConstantExpr";
return result;
}
return ReconstructQueryPlanRec(precomputes.at(precompute), 0, planIndex, precomputes, nodeCounter);
}

auto ops = plan.GetMapSafe().at("Operators").GetArraySafe();
auto op = ops[operatorIndex];

TVector<NJson::TJsonValue> planInputs;

auto opName = op.GetMapSafe().at("Name").GetStringSafe();

for (auto opInput : op.GetMapSafe().at("Inputs").GetArraySafe()) {
if (opInput.GetMapSafe().contains("ExternalPlanNodeId")) {
auto inputPlanKey = opInput.GetMapSafe().at("ExternalPlanNodeId").GetIntegerSafe();
auto inputPlan = planIndex.at(inputPlanKey);
planInputs.push_back( ReconstructQueryPlanRec(inputPlan, 0, planIndex, precomputes, nodeCounter));
} else if (opInput.GetMapSafe().contains("InternalOperatorId")) {
auto inputPlanId = opInput.GetMapSafe().at("InternalOperatorId").GetIntegerSafe();
planInputs.push_back( ReconstructQueryPlanRec(plan, inputPlanId, planIndex, precomputes, nodeCounter));
}
// temp hack
if (opName == "Filter") {
break;
}
}

if (op.GetMapSafe().contains("Inputs")) {
op.GetMapSafe().erase("Inputs");
}

if (op.GetMapSafe().contains("Input") || op.GetMapSafe().contains("ToFlow")) {
TString maybePrecompute = "";
if (op.GetMapSafe().contains("Input")) {
maybePrecompute = op.GetMapSafe().at("Input").GetStringSafe();
} else if (op.GetMapSafe().contains("ToFlow")) {
maybePrecompute = op.GetMapSafe().at("ToFlow").GetStringSafe();
}

if (precomputes.contains(maybePrecompute)) {
planInputs.push_back(ReconstructQueryPlanRec(precomputes.at(maybePrecompute), 0, planIndex, precomputes, nodeCounter));
}
}

result["Node Type"] = opName;
NJson::TJsonValue newOps;
newOps.AppendValue(op);
result["Operators"] = newOps;

if (planInputs.size()){
NJson::TJsonValue plans;
for( auto i : planInputs) {
plans.AppendValue(i);
}
result["Plans"] = plans;
}

return result;
}

NJson::TJsonValue SimplifyQueryPlan(NJson::TJsonValue& plan) {
static const THashSet<TString> redundantNodes = {
"UnionAll",
"Broadcast",
"Map",
"HashShuffle",
"Merge",
"Collect",
"Stage",
"Iterator",
"PartitionByKey",
"ToFlow"
};

THashMap<int, NJson::TJsonValue> planIndex;
THashMap<TString, NJson::TJsonValue> precomputes;


BuildPlanIndex(plan, planIndex, precomputes);

int nodeCounter = 0;
plan = ReconstructQueryPlanRec(plan, 0, planIndex, precomputes, nodeCounter);
RemoveRedundantNodes(plan, redundantNodes);
return plan;
}

TString AddSimplifiedPlan(const TString& planText, bool analyzeMode) {
Y_UNUSED(analyzeMode);
NJson::TJsonValue planJson;
NJson::ReadJsonTree(planText, &planJson, true);
if (!planJson.GetMapSafe().contains("Plan")){
return planText;
}

NJson::TJsonValue planCopy;
NJson::ReadJsonTree(planText, &planCopy, true);

planJson["SimplifiedPlan"] = SimplifyQueryPlan(planCopy.GetMapSafe().at("Plan"));

// Don't print the OLAP plan yet, there are some non UTF-8 symbols there that need to be fixed
//TTempBufOutput stringStream;
//NYdb::NConsoleClient::TQueryPlanPrinter printer(NYdb::NConsoleClient::EOutputFormat::PrettyTable, analyzeMode, stringStream);
//printer.Print(planJson.GetStringRobust());
//planJson["OLAPText"] = stringStream.Data();
return planJson.GetStringRobust();
}

TString SerializeTxPlans(const TVector<const TString>& txPlans, const TString commonPlanInfo = "") {
NJsonWriter::TBuf writer;
writer.SetIndentSpaces(2);
Expand Down Expand Up @@ -1862,7 +2075,8 @@ TString SerializeTxPlans(const TVector<const TString>& txPlans, const TString co
writer.EndObject();
writer.EndObject();

return writer.Str();
auto resultPlan = writer.Str();
return AddSimplifiedPlan(resultPlan, false);
}

} // namespace
Expand Down Expand Up @@ -2250,7 +2464,8 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD

NJsonWriter::TBuf txWriter;
txWriter.WriteJsonValue(&root, true);
return txWriter.Str();
auto resultPlan = txWriter.Str();
return AddSimplifiedPlan(resultPlan, true);
}

TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
Expand Down Expand Up @@ -2294,6 +2509,9 @@ TString SerializeScriptPlan(const TVector<const TString>& queryPlans) {
if (auto dqPlan = planMap.FindPtr("Plan")) {
writer.WriteKey("Plan");
writer.WriteJsonValue(dqPlan);
writer.WriteKey("SimplifiedPlan");
auto simplifiedPlan = SimplifyQueryPlan(*dqPlan);
writer.WriteJsonValue(&simplifiedPlan);
}
writer.EndObject();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ std::vector<NJson::TJsonValue> FindPlanNodes(const NJson::TJsonValue& plan, cons

std::vector<NJson::TJsonValue> FindPlanStages(const NJson::TJsonValue& plan) {
std::vector<NJson::TJsonValue> stages;
FindPlanStagesImpl(plan, stages);
FindPlanStagesImpl(plan.GetMapSafe().at("Plan"), stages);
return stages;
}

Expand Down
Loading
Loading