diff --git a/bkmonitor/api/bkdata/default.py b/bkmonitor/api/bkdata/default.py index 9e770d2e33..ea643f32cb 100644 --- a/bkmonitor/api/bkdata/default.py +++ b/bkmonitor/api/bkdata/default.py @@ -775,6 +775,7 @@ class RequestSerializer(CommonRequestSerializer): flow_id = serializers.IntegerField(required=True, label="DataFlow的ID") consuming_mode = serializers.CharField(default="continue", label="数据处理模式") cluster_group = serializers.CharField(default="default", label="计算集群组") + check_and_start_clean_task = serializers.BooleanField(default=True, label="是否检查并启动清洗任务") class StopDataFlow(DataAccessAPIResource): diff --git a/bkmonitor/metadata/models/record_rule/rules.py b/bkmonitor/metadata/models/record_rule/rules.py index acb1d2ddce..262bb734de 100644 --- a/bkmonitor/metadata/models/record_rule/rules.py +++ b/bkmonitor/metadata/models/record_rule/rules.py @@ -293,6 +293,7 @@ def start_flow(self, flow_id: int, consuming_mode: Optional[str] = ConsumingMode req_data = { "consuming_mode": consuming_mode, "cluster_group": settings.BK_DATA_FLOW_CLUSTER_GROUP, + "check_and_start_clean_task": True, "flow_id": flow_id, } try: diff --git a/bkmonitor/metadata/models/record_rule/utils.py b/bkmonitor/metadata/models/record_rule/utils.py index 4423f1cfa7..0175d11d72 100644 --- a/bkmonitor/metadata/models/record_rule/utils.py +++ b/bkmonitor/metadata/models/record_rule/utils.py @@ -10,7 +10,9 @@ """ import logging import re -from typing import Dict, List +from typing import Dict, List, Optional + +import yaml from core.drf_resource import api from core.errors.api import BKAPIError @@ -18,6 +20,27 @@ logger = logging.getLogger("metadata") +def generate_rule_config( + expr: str, metric_name: str, record_name: str, interval: Optional[str] = "1m" # 默认间隔为1分钟 +) -> str: + """ + 生成规则配置的YAML格式字符串 + + :param expr: 计算表达式 + :param metric_name: 计算后的指标名 + :param record_name: 预计算记录名 + :param interval: 计算间隔,默认为1分钟 + :return: 规则配置的YAML格式字符串 + """ + rule_config = { + "name": "record/{}".format(record_name), + "rules": [{"expr": expr, "record": metric_name}], + "interval": interval, + } + + return yaml.dump(rule_config, sort_keys=False) + + def generate_table_id(space_type: str, space_id: str, record_name: str) -> str: """生成项目下的结果表""" # 处理预计算的名称 diff --git a/bkmonitor/metadata/resources/record_rule.py b/bkmonitor/metadata/resources/record_rule.py new file mode 100644 index 0000000000..334e783375 --- /dev/null +++ b/bkmonitor/metadata/resources/record_rule.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available. +Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from rest_framework import serializers + +from core.drf_resource import Resource +from core.errors.api import BKAPIError +from metadata.models.record_rule.rules import RecordRule +from metadata.models.record_rule.service import BkDataFlow, RecordRuleService +from metadata.models.record_rule.utils import generate_rule_config + +logger = logging.getLogger("metadata") + + +class CreatePrecomputationRecordResource(Resource): + """ + 创建并启动预计算任务 + """ + + class RequestSerializer(serializers.Serializer): + space_type = serializers.CharField(required=True, label="空间类型") + space_id = serializers.CharField(required=True, label="空间ID") + record_name = serializers.CharField(required=True, label="指标分组名称") + expr = serializers.CharField(required=True, label="预计算指标表达式") + metric_name = serializers.CharField(required=True, label="指标名称") + interval = serializers.CharField(required=False, label="预计算周期", default="1m") + + def perform_request(self, validated_request_data): + space_type = validated_request_data.get("space_type") + space_id = validated_request_data.get("space_id") + record_name = validated_request_data.get("record_name") + expr = validated_request_data.get("expr") + metric_name = validated_request_data.get("metric_name") + interval = validated_request_data.get("interval") + + # 生成预计算规则 + rule_config = generate_rule_config(expr, metric_name, record_name, interval) + logger.info("CreatePrecomputationRecordResource:generated rule_config: %s" % rule_config) + + # 创建预计算实例 + try: + service = RecordRuleService( + space_type=space_type, space_id=space_id, record_name=record_name, rule_config=rule_config + ) + service.create_record_rule() + except Exception as e: + logger.error("CreatePrecomputationRecordResource: create record rule service error: %s" % e) + raise e + + # 获取预计算创建的table_id + table_id = RecordRule.objects.get(record_name=record_name).table_id + + # 创建并启动预计算任务 + flow = BkDataFlow(space_type, space_id, table_id) + res = flow.start_flow() + logger.info("CreatePrecomputationRecordResource: start flow result: %s" % res) + if not res: # 若res为False,说明预计算任务启动失败 + raise BKAPIError("CreatePrecomputationRecordResource: start flow error") + # 预计算指标检索表达式 + new_expr = f"bkmonitor_{space_type}_{space_id}_tsdb:{table_id}:{metric_name}" + return {"new_expr": new_expr}