diff --git a/bkmonitor/core/prometheus/metrics.py b/bkmonitor/core/prometheus/metrics.py index feb0b8638..7fb38300c 100644 --- a/bkmonitor/core/prometheus/metrics.py +++ b/bkmonitor/core/prometheus/metrics.py @@ -1158,12 +1158,13 @@ def from_exc(cls, expr): name="bkmonitor_metadata_cron_task_cost_seconds", documentation="监控元数据定时任务耗时统计", labelnames=("task_name", "process_target"), + buckets=(0, 1, 5, 10, 30, 60, 120, 180, 240, 300, 600, 900, 1800, 3000, 6000, INF), ) -METADATA_DATA_LINK_ACCESS_INFO = Gauge( - name="bkmonitor_metadata_data_link_access_info", +METADATA_DATA_LINK_ACCESS_TOTAL = Counter( + name="bkmonitor_metadata_data_link_access_total", documentation="监控元数据数据链路接入统计", - labelnames=("version", "biz_id", "data_id", 'table_id', 'strategy'), + labelnames=("version", "biz_id", 'strategy', 'status'), ) diff --git a/bkmonitor/metadata/models/vm/utils.py b/bkmonitor/metadata/models/vm/utils.py index ce7535924..3d9b32f47 100644 --- a/bkmonitor/metadata/models/vm/utils.py +++ b/bkmonitor/metadata/models/vm/utils.py @@ -122,7 +122,6 @@ def access_bkdata(bk_biz_id: int, table_id: str, data_id: int): version=DATA_LINK_V3_VERSION_NAME, data_id=data_id, biz_id=bk_biz_id, - table_id=table_id, status=ACCESS_DATA_LINK_SUCCESS_STATUS, strategy=DataLink.BK_STANDARD_V2_TIME_SERIES, ) @@ -133,7 +132,6 @@ def access_bkdata(bk_biz_id: int, table_id: str, data_id: int): version=DATA_LINK_V3_VERSION_NAME, data_id=data_id, biz_id=bk_biz_id, - table_id=table_id, status=ACCESS_DATA_LINK_FAILURE_STATUS, strategy=DataLink.BK_STANDARD_V2_TIME_SERIES, ) @@ -314,7 +312,6 @@ def report_metadata_data_link_access_metric( status: int, biz_id: int, data_id: int, - table_id: str, strategy: str, ) -> None: """ @@ -323,18 +320,13 @@ def report_metadata_data_link_access_metric( @param status: 接入状态(失败-1/成功1) 以是否成功向bkbase发起请求为准 @param biz_id: 业务ID @param data_id: 数据ID - @param table_id: 结果表ID @param strategy: 链路策略(套餐类型) """ try: logger.info("try to report metadata data link component status metric,data_id->[%s]", data_id) - metrics.METADATA_DATA_LINK_ACCESS_INFO.labels( - version=version, - biz_id=biz_id, - data_id=data_id, - table_id=table_id, - strategy=strategy, - ).set(status) + metrics.METADATA_DATA_LINK_ACCESS_TOTAL.labels( + version=version, biz_id=biz_id, strategy=strategy, status=status + ).inc() metrics.report_all() except Exception as err: # pylint: disable=broad-except logger.error("report metadata data link access metric error->[%s],data_id->[%s]", err, data_id) @@ -553,7 +545,6 @@ def access_v2_bkdata_vm(bk_biz_id: int, table_id: str, data_id: int): version=DATA_LINK_V4_VERSION_NAME, data_id=data_id, biz_id=bk_biz_id, - table_id=table_id, status=ACCESS_DATA_LINK_SUCCESS_STATUS, strategy=DataLink.BK_STANDARD_V2_TIME_SERIES, ) @@ -563,7 +554,6 @@ def access_v2_bkdata_vm(bk_biz_id: int, table_id: str, data_id: int): version=DATA_LINK_V4_VERSION_NAME, data_id=data_id, biz_id=bk_biz_id, - table_id=table_id, status=ACCESS_DATA_LINK_FAILURE_STATUS, strategy=DataLink.BCS_FEDERAL_SUBSET_TIME_SERIES, ) @@ -581,7 +571,6 @@ def access_v2_bkdata_vm(bk_biz_id: int, table_id: str, data_id: int): version=DATA_LINK_V4_VERSION_NAME, data_id=data_id, biz_id=bk_biz_id, - table_id=table_id, status=ACCESS_DATA_LINK_SUCCESS_STATUS, strategy=DataLink.BCS_FEDERAL_SUBSET_TIME_SERIES, ) @@ -591,7 +580,6 @@ def access_v2_bkdata_vm(bk_biz_id: int, table_id: str, data_id: int): version=DATA_LINK_V4_VERSION_NAME, data_id=data_id, biz_id=bk_biz_id, - table_id=table_id, status=ACCESS_DATA_LINK_FAILURE_STATUS, strategy=DataLink.BCS_FEDERAL_SUBSET_TIME_SERIES, ) diff --git a/bkmonitor/metadata/task/refresh_data_link.py b/bkmonitor/metadata/task/refresh_data_link.py index 2ef89de69..c1319a139 100644 --- a/bkmonitor/metadata/task/refresh_data_link.py +++ b/bkmonitor/metadata/task/refresh_data_link.py @@ -33,4 +33,4 @@ def refresh_data_link_status(): bkbase_rt_records = models.BkBaseResultTable.objects.filter(monitor_table_id__in=table_id_list) logger.info("refresh_data_link_status: now try to bulk_refresh_data_link_status,len->[%s] ", len(bkbase_rt_records)) - bulk_refresh_data_link_status.delay(bkbase_rt_records) + bulk_refresh_data_link_status.delay(bkbase_rt_records) # task_id diff --git a/bkmonitor/metadata/task/tasks.py b/bkmonitor/metadata/task/tasks.py index cf38bf224..c38517e77 100644 --- a/bkmonitor/metadata/task/tasks.py +++ b/bkmonitor/metadata/task/tasks.py @@ -169,7 +169,8 @@ def update_time_series_metrics(time_series_metrics): @app.task(ignore_result=True, queue="celery_long_task_cron") def manage_es_storage(es_storages, cluster_id: int = None): """并发管理 ES 存储。""" - + logger.info("manage_es_storage: start to manage_es_storage") + start_time = time.time() # 优先判断集群是否存在于白名单中,如果是,则按照串行的方式实施索引管理 if cluster_id in getattr(settings, "ENABLE_V2_ROTATION_ES_CLUSTER_IDS", []): for es_storage in es_storages: @@ -191,6 +192,14 @@ def manage_es_storage(es_storages, cluster_id: int = None): with ThreadPoolExecutor(max_workers=10) as executor: executor.map(_manage_es_storage, es_storages) + cost_time = time.time() - start_time + # 统计耗时,并上报指标 + metrics.METADATA_CRON_TASK_COST_SECONDS.labels(task_name="manage_es_storage", process_target=None).observe( + cost_time + ) + metrics.report_all() + logger.info("manage_es_storage:manage_es_storage cost time: %s", cost_time) + def _manage_es_storage(es_storage): """ @@ -206,7 +215,7 @@ def _manage_es_storage(es_storage): # es_storage.storage_cluster.domain_name, # ) # return - + start_time = time.time() try: # 先预创建各个时间段的index, # 1. 同时判断各个预创建好的index是否字段与数据库的一致 @@ -249,14 +258,19 @@ def _manage_es_storage(es_storage): # 重新分配索引数据 logger.info("manage_es_storage:table_id->[%s] try to reallocate index", es_storage.table_id) es_storage.reallocate_index() - - logger.info("manage_es_storage:table_id->[%s] create index successfully", es_storage.table_id) - logger.info("manage_es_storage:es_storage->[{}] cron task success.".format(es_storage.table_id)) + logger.info("manage_es_storage:es_storage->[{}] cron task success".format(es_storage.table_id)) except Exception as e: # pylint: disable=broad-except # 记录异常集群的信息 logger.error("manage_es_storage:es_storage index lifecycle failed,table_id->{}".format(es_storage.table_id)) logger.exception(e) + cost_time = time.time() - start_time + # 统计耗时,并上报指标 + metrics.METADATA_CRON_TASK_COST_SECONDS.labels( + task_name="_manage_es_storage", process_target=es_storage.table_id + ).observe(cost_time) + metrics.report_all() + @app.task(ignore_result=True, queue="celery_metadata_task_worker") def push_and_publish_space_router( @@ -414,9 +428,19 @@ def bulk_refresh_data_link_status(bkbase_rt_records): """ 并发刷新链路状态 """ - logger.info("manage_refresh_data_link_status:start to refresh data_link status") + start_time = time.time() # 记录开始时间 + logger.info( + "bulk_refresh_data_link_status: start to refresh data_link status, bkbase_rt_records: %s", bkbase_rt_records + ) with ThreadPoolExecutor(max_workers=10) as executor: executor.map(_refresh_data_link_status, bkbase_rt_records) + cost_time = time.time() - start_time # 总耗时 + logger.info("bulk_refresh_data_link_status: end to refresh data_link status, cost_time: %s", cost_time) + # 统计耗时,并上报指标 + metrics.METADATA_CRON_TASK_COST_SECONDS.labels(task_name="_refresh_data_link_status", process_target=None).observe( + cost_time + ) + metrics.report_all() def _refresh_data_link_status(bkbase_rt_record: BkBaseResultTable): @@ -424,137 +448,138 @@ def _refresh_data_link_status(bkbase_rt_record: BkBaseResultTable): 刷新链路状态(各组件状态+整体状态) @param bkbase_rt_record: BkBaseResultTable 计算平台结果表 """ - # 0. 统计任务耗时 - with metrics.METADATA_CRON_TASK_COST_SECONDS.labels( - task_name="_refresh_data_link_status", process_target=bkbase_rt_record.data_link_name - ).time(): - # 1. 获取基本信息 - start_time = time.time() # 记录开始时间 - bkbase_data_id_name = bkbase_rt_record.bkbase_data_name - data_link_name = bkbase_rt_record.data_link_name - bkbase_rt_name = bkbase_rt_record.bkbase_rt_name - logger.info( - "_refresh_data_link_status: data_link_name->[%s],bkbase_data_id_name->[%s],bkbase_rt_name->[%s]", + # 1. 获取基本信息 + start_time = time.time() # 记录开始时间 + bkbase_data_id_name = bkbase_rt_record.bkbase_data_name + data_link_name = bkbase_rt_record.data_link_name + bkbase_rt_name = bkbase_rt_record.bkbase_rt_name + logger.info( + "_refresh_data_link_status: data_link_name->[%s],bkbase_data_id_name->[%s],bkbase_rt_name->[%s]", + data_link_name, + bkbase_data_id_name, + bkbase_rt_name, + ) + data_link_ins = models.DataLink.objects.get(data_link_name=data_link_name) + data_link_strategy = data_link_ins.data_link_strategy + logger.info( + "_refresh_data_link_status: data_link_name->[%s] data_link_strategy->[%s]", + data_link_name, + data_link_strategy, + ) + + # 2. 刷新数据源状态 + try: + with transaction.atomic(): + data_id_config = models.DataIdConfig.objects.get(name=bkbase_data_id_name) + data_id_status = get_data_link_component_status( + kind=data_id_config.kind, namespace=data_id_config.namespace, component_name=data_id_config.name + ) + # 当和DB中的数据不一致时,才进行变更 + if data_id_config.status != data_id_status: + logger.info( + "_refresh_data_link_status:data_link_name->[%s],data_id_config status->[%s] is different " + "with exist record,will change to->[%s]", + data_link_name, + data_id_config.status, + data_id_status, + ) + data_id_config.status = data_id_status + data_id_config.data_link_name = data_link_name + data_id_config.save() + report_metadata_data_link_status_info( + data_link_name=data_link_name, + biz_id=data_id_config.bk_biz_id, + kind=data_id_config.kind, + status=data_id_config.status, + ) + except models.DataIdConfig.DoesNotExist: + logger.error( + "_refresh_data_link_status: data_link_name->[%s],data_id_config->[%s] does not exist", data_link_name, bkbase_data_id_name, - bkbase_rt_name, - ) - data_link_ins = models.DataLink.objects.get(data_link_name=data_link_name) - data_link_strategy = data_link_ins.data_link_strategy - logger.info( - "_refresh_data_link_status: data_link_name->[%s] data_link_strategy->[%s]", - data_link_name, - data_link_strategy, ) - # 2. 刷新数据源状态 + # 3. 根据链路套餐(类型)获取该链路需要的组件资源种类 + components = models.DataLink.STRATEGY_RELATED_COMPONENTS.get(data_link_strategy) + all_components_ok = True + + # 4. 遍历链路关联的所有类型资源,查询并刷新其状态 + for component in components: try: with transaction.atomic(): - data_id_config = models.DataIdConfig.objects.get(name=bkbase_data_id_name) - data_id_status = get_data_link_component_status( - kind=data_id_config.kind, namespace=data_id_config.namespace, component_name=data_id_config.name + component_ins = component.objects.get(name=bkbase_rt_name) + component_status = get_data_link_component_status( + kind=component_ins.kind, namespace=component_ins.namespace, component_name=component_ins.name + ) + logger.info( + "_refresh_data_link_status: data_link_name->[%s],component->[%s],kind->[%s],status->[%s]", + data_link_name, + component_ins.name, + component_ins.kind, + component_status, ) - # 当和DB中的数据不一致时,才进行变更 - if data_id_config.status != data_id_status: + if component_status != DataLinkResourceStatus.OK.value: + all_components_ok = False + # 和DB中数据不一致时,才进行更新操作 + if component_ins.status != component_status: + component_ins.status = component_status + component_ins.save() logger.info( - "_refresh_data_link_status:data_link_name->[%s],data_id_config status->[%s] is different " - "with exist record,will change to->[%s]", + "_refresh_data_link_status: data_link_name->[%s],component->[%s],kind->[%s]," + "status updated to->[%s]", data_link_name, - data_id_config.status, - data_id_status, + component.name, + component.kind, + component_status, ) - data_id_config.status = data_id_status - data_id_config.data_link_name = data_link_name - data_id_config.save() - report_metadata_data_link_status_info( - data_link_name=data_link_name, - biz_id=data_id_config.bk_biz_id, - kind=data_id_config.kind, - status=data_id_config.status, - ) - except models.DataIdConfig.DoesNotExist: + + report_metadata_data_link_status_info( + data_link_name=data_link_name, + biz_id=component_ins.bk_biz_id, + kind=component_ins.kind, + status=component_ins.status, + ) + except Exception as e: # pylint: disable=broad-except logger.error( - "_refresh_data_link_status: data_link_name->[%s],data_id_config->[%s] does not exist", + "_refresh_data_link_status: data_link_name->[%s],component->[%s],kind->[%s] refresh failed," + "error->[%s]", data_link_name, - bkbase_data_id_name, + component.name, + component.kind, + e, ) - # 3. 根据链路套餐(类型)获取该链路需要的组件资源种类 - components = models.DataLink.STRATEGY_RELATED_COMPONENTS.get(data_link_strategy) - all_components_ok = True - - # 4. 遍历链路关联的所有类型资源,查询并刷新其状态 - for component in components: - try: - with transaction.atomic(): - component_ins = component.objects.get(name=bkbase_rt_name) - component_status = get_data_link_component_status( - kind=component_ins.kind, namespace=component_ins.namespace, component_name=component_ins.name - ) - logger.info( - "_refresh_data_link_status: data_link_name->[%s],component->[%s],kind->[%s],status->[%s]", - data_link_name, - component_ins.name, - component_ins.kind, - component_status, - ) - if component_status != DataLinkResourceStatus.OK.value: - all_components_ok = False - # 和DB中数据不一致时,才进行更新操作 - if component_ins.status != component_status: - component_ins.status = component_status - component_ins.save() - logger.info( - "_refresh_data_link_status: data_link_name->[%s],component->[%s],kind->[%s]," - "status updated to->[%s]", - data_link_name, - component.name, - component.kind, - component_status, - ) - - report_metadata_data_link_status_info( - data_link_name=data_link_name, - biz_id=component_ins.bk_biz_id, - kind=component_ins.kind, - status=component_ins.status, - ) - except Exception as e: # pylint: disable=broad-except - logger.error( - "_refresh_data_link_status: data_link_name->[%s],component->[%s],kind->[%s] refresh failed," - "error->[%s]", - data_link_name, - component.name, - component.kind, - e, - ) + # 5. 如果所有的component_ins状态都为OK,那么BkBaseResultTable也应设置为OK,否则为PENDING + if all_components_ok: + bkbase_rt_record.status = DataLinkResourceStatus.OK.value + else: + bkbase_rt_record.status = DataLinkResourceStatus.PENDING.value + with transaction.atomic(): + bkbase_rt_record.save() + + report_metadata_data_link_status_info( + data_link_name=data_link_name, + biz_id=data_id_config.bk_biz_id, + kind=data_id_config.kind, + status=bkbase_rt_record.status, + ) - # 5. 如果所有的component_ins状态都为OK,那么BkBaseResultTable也应设置为OK,否则为PENDING - if all_components_ok: - bkbase_rt_record.status = DataLinkResourceStatus.OK.value - else: - bkbase_rt_record.status = DataLinkResourceStatus.PENDING.value - with transaction.atomic(): - bkbase_rt_record.save() + cost_time = time.time() - start_time - report_metadata_data_link_status_info( - data_link_name=data_link_name, - biz_id=data_id_config.bk_biz_id, - kind=data_id_config.kind, - status=bkbase_rt_record.status, - ) + logger.info( + "_refresh_data_link_status: data_link_name->[%s],all_components_ok->[%s],status updated to->[%s]", + data_link_name, + all_components_ok, + bkbase_rt_record.status, + ) - logger.info( - "_refresh_data_link_status: data_link_name->[%s],all_components_ok->[%s],status updated to->[%s]", - data_link_name, - all_components_ok, - bkbase_rt_record.status, - ) + # 6. 上报指标 + metrics.METADATA_CRON_TASK_COST_SECONDS.labels( + task_name="_refresh_data_link_status", process_target=bkbase_rt_record.data_link_name + ).observe(cost_time) - logger.info( - "_refresh_data_link_status: data_link_name->[%s] refresh status finished,cost time->[%s]", - data_link_name, - time.time() - start_time, - ) - # 6. 统一上报指标 - metrics.report_all() + logger.info( + "_refresh_data_link_status: data_link_name->[%s] refresh status finished,cost time->[%s]", + data_link_name, + cost_time, + )