diff --git a/apps/api/base.py b/apps/api/base.py index a3f22c536..303264076 100644 --- a/apps/api/base.py +++ b/apps/api/base.py @@ -361,6 +361,7 @@ def _send_request(self, params, timeout, request_id, request_cookies): "response_errors": response_errors, "cost_time": (end_time - start_time), "request_id": request_id, + "esb_request_id": raw_response.headers.get("X-Bkapi-Request-Id"), "request_user": bk_username, } diff --git a/apps/api/modules/job.py b/apps/api/modules/job.py index c3a882678..d24ccba7c 100644 --- a/apps/api/modules/job.py +++ b/apps/api/modules/job.py @@ -20,7 +20,7 @@ from django.utils.translation import ugettext_lazy as _ from apps.api.base import DataAPI -from config.domains import JOB_APIGATEWAY_ROOT_V2 +from config.domains import JOB_APIGATEWAY_ROOT_V3 def get_job_request_before(params): @@ -33,25 +33,39 @@ class _JobApi: def __init__(self): self.fast_execute_script = DataAPI( method="POST", - url=JOB_APIGATEWAY_ROOT_V2 + "fast_execute_script", + url=JOB_APIGATEWAY_ROOT_V3 + "fast_execute_script/", description=_("快速执行脚本"), module=self.MODULE, before_request=get_job_request_before, ) - self.fast_push_file = DataAPI( + self.fast_transfer_file = DataAPI( method="POST", - url=JOB_APIGATEWAY_ROOT_V2 + "fast_push_file", + url=JOB_APIGATEWAY_ROOT_V3 + "fast_transfer_file/", description=_("快速分发文件"), module=self.MODULE, before_request=get_job_request_before, ) self.get_job_instance_log = DataAPI( - method="POST", - url=JOB_APIGATEWAY_ROOT_V2 + "get_job_instance_log", + method="GET", + url=JOB_APIGATEWAY_ROOT_V3 + "get_job_instance_log", description=_("根据作业id获取执行日志"), module=self.MODULE, before_request=get_job_request_before, ) + self.get_job_instance_status = DataAPI( + method="GET", + url=JOB_APIGATEWAY_ROOT_V3 + "get_job_instance_status/", + description=_("根据作业实例 ID 查询作业执行状态"), + module=self.MODULE, + before_request=get_job_request_before, + ) + self.batch_get_job_instance_ip_log = DataAPI( + method="POST", + url=JOB_APIGATEWAY_ROOT_V3 + "batch_get_job_instance_ip_log/", + description=_("根据ip列表批量查询作业执行日志"), + module=self.MODULE, + before_request=get_job_request_before, + ) JobApi = _JobApi() diff --git a/apps/log_extract/components/collections/cos_upload_component.py b/apps/log_extract/components/collections/cos_upload_component.py index 71a2088fb..1c7ab2d29 100644 --- a/apps/log_extract/components/collections/cos_upload_component.py +++ b/apps/log_extract/components/collections/cos_upload_component.py @@ -94,7 +94,9 @@ def _schedule(self, data, parent_data, callback_data=None): for item in FileServer.get_detail_for_ips(query_result): if item["exit_code"] != 0: - raise Exception(_("上传网盘异常: {}").format(FileServer.get_job_tag(query_result))) + raise Exception( + _("上传网盘异常: {}, status: {}").format(FileServer.get_job_tag(item), item.get("status", "")) + ) task.download_status = DownloadStatus.DOWNLOADABLE.value task.cos_file_name = data.get_one_of_outputs("pack_file_name") diff --git a/apps/log_extract/components/collections/distribution_component.py b/apps/log_extract/components/collections/distribution_component.py index 304e3aa97..13058cbe7 100644 --- a/apps/log_extract/components/collections/distribution_component.py +++ b/apps/log_extract/components/collections/distribution_component.py @@ -115,9 +115,9 @@ def _schedule(self, data, parent_data, callback_data=None): return True # 判断文件分发是否成功 - ip_status = FileServer.get_ip_status(query_result) - if ip_status != constants.JOB_SUCCESS_STATUS: - raise Exception(_("文件分发异常({})".format(ip_status))) + job_status = FileServer.get_job_instance_status(query_result) + if job_status != constants.JOB_SUCCESS_STATUS: + raise Exception(_("文件分发异常({})".format(job_status))) self.finish_schedule() return True diff --git a/apps/log_extract/components/collections/packing_component.py b/apps/log_extract/components/collections/packing_component.py index ae79d988b..5a85ab4d8 100644 --- a/apps/log_extract/components/collections/packing_component.py +++ b/apps/log_extract/components/collections/packing_component.py @@ -23,7 +23,10 @@ from pipeline.component_framework.component import Component from pipeline.core.flow.activity import Service, StaticIntervalGenerator from apps.log_extract import constants -from apps.log_extract.constants import PACK_TASK_SCRIPT_NOT_HAVE_ENOUGH_CAP_ERROR_CODE +from apps.log_extract.constants import ( + PACK_TASK_SCRIPT_NOT_HAVE_ENOUGH_CAP_ERROR_CODE, + BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE, +) from apps.log_extract.fileserver import FileServer from apps.log_extract.models import Tasks from apps.log_extract.utils.packing import ( @@ -32,6 +35,8 @@ get_filter_content, ) from apps.log_extract.components.collections.base_component import BaseService +from apps.utils.db import array_chunk +from apps.utils.log import logger class FilePackingService(BaseService): @@ -130,12 +135,11 @@ def _execute(self, data, parent_data): ip_list = [ip_list] distribution_source_file_list = [ { - "group_ids": "", - "account": data.get_one_of_inputs("account"), + "account": {"alias": data.get_one_of_inputs("account")}, # 转换IP格式 - "ip_list": [ip], + "server": {"ip_list": [ip]}, # 这里是直接分发目录 - "files": [f"{distribution_path}{packed_file_name}"], + "file_list": [f"{distribution_path}{packed_file_name}"], } for ip in ip_list ] @@ -144,10 +148,12 @@ def _execute(self, data, parent_data): return True def _schedule(self, data, parent_data, callback_data=None): + task_instance_id = data.get_one_of_outputs("task_instance_id") + bk_biz_id = data.get_one_of_inputs("bk_biz_id") query_result = self._poll_status( - task_instance_id=data.get_one_of_outputs("task_instance_id"), + task_instance_id=task_instance_id, operator=data.get_one_of_inputs("operator"), - bk_biz_id=data.get_one_of_inputs("bk_biz_id"), + bk_biz_id=bk_biz_id, ) # 判断脚本是否执行结束 @@ -157,25 +163,35 @@ def _schedule(self, data, parent_data, callback_data=None): # 输出脚本内容, 如果所有IP都失败了,则返回异常 has_success = False job_message = "" - for item in FileServer.get_detail_for_ips(query_result): + step_ip_result_list = FileServer.get_detail_for_ips(query_result) + for item in step_ip_result_list: if item["exit_code"] == 0: has_success = True break elif item["exit_code"] == PACK_TASK_SCRIPT_NOT_HAVE_ENOUGH_CAP_ERROR_CODE: job_message = _("目标机器没有足够的储存") else: - job_message = FileServer.get_job_tag(query_result) + job_message = FileServer.get_job_tag(item) if not has_success: raise Exception(_("任务打包异常: {}").format(job_message)) - data.outputs.task_script_output = FileServer.get_log_content_for_single_ip(query_result) + ip_list_group = array_chunk( + [{"ip": item["ip"], "bk_cloud_id": item["bk_cloud_id"]} for item in step_ip_result_list], + BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE, + ) ip_log_output_kv = {} - for ip_log_content in FileServer.get_detail_for_ips(query_result): - content = html.unescape(ip_log_content["log_content"]) - log_output_kv = FileServer.get_bk_kv_log(content) - log_output_kv = {kv[constants.BKLOG_LOG_KEY]: kv[constants.BKLOG_LOG_VALUE] for kv in log_output_kv} - ip_log_output_kv[ip_log_content["ip"]] = log_output_kv + step_instance_id = FileServer.get_step_instance_id(query_result) + try: + for ip_list in ip_list_group: + ip_list_log = FileServer.get_ip_list_log(ip_list, task_instance_id, step_instance_id, bk_biz_id) + for log_content in ip_list_log.get("script_task_logs", []): + content = html.unescape(log_content.get("log_content")) + log_output_kv = FileServer.get_bk_kv_log(content) + log_output_kv = {kv[constants.BKLOG_LOG_KEY]: kv[constants.BKLOG_LOG_VALUE] for kv in log_output_kv} + ip_log_output_kv[log_content["ip"]] = log_output_kv + except Exception as e: + logger.exception(f"[packing get bklog] get log content failed => {e}") task = Tasks.objects.get(task_id=data.get_one_of_inputs("task_id")) task.ex_data.update(ip_log_output_kv) diff --git a/apps/log_extract/constants.py b/apps/log_extract/constants.py index b242e9b5e..0ed92c85f 100644 --- a/apps/log_extract/constants.py +++ b/apps/log_extract/constants.py @@ -174,7 +174,9 @@ class PreDateMode(ChoicesEnum): JOB_SCRIPT_TYPE = 1 # 作业执行成功标识 -JOB_SUCCESS_STATUS = 9 +JOB_SUCCESS_STATUS = 3 + +BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE = 500 # windows系统名称 WINDOWS_OS_NAME_LIST = settings.WINDOWS_OS_NAME_LIST diff --git a/apps/log_extract/fileserver.py b/apps/log_extract/fileserver.py index 6f2716fc9..97754b4b7 100644 --- a/apps/log_extract/fileserver.py +++ b/apps/log_extract/fileserver.py @@ -44,13 +44,15 @@ def execute_script(cls, content, ip, bk_biz_id, operator, account, task_name, sc "bk_username": operator, "bk_biz_id": bk_biz_id, "script_content": content, - "ip_list": ip, + "target_server": { + "ip_list": ip, + }, "script_type": JOB_SCRIPT_TYPE, - "account": account, + "script_language": JOB_SCRIPT_TYPE, + "account_alias": account, "task_name": task_name, } if script_params: - kwargs["script_params"] = script_params kwargs["script_param"] = script_params return JobApi.fast_execute_script(kwargs, request_cookies=False) @@ -60,11 +62,26 @@ def get_task_id(task_result): @staticmethod def is_finished_for_single_ip(query_result): - return query_result[0]["is_finished"] + return query_result["finished"] @staticmethod def get_detail_for_ips(query_result): - return query_result[0]["step_results"][0]["ip_logs"] + step_instance, *_ = query_result["step_instance_list"] + return step_instance["step_ip_result_list"] + + @staticmethod + def get_step_instance(query_result): + step_instance, *_ = query_result["step_instance_list"] + return step_instance + + @staticmethod + def get_step_instance_id(query_result): + step_instance, *_ = query_result["step_instance_list"] + return step_instance["step_instance_id"] + + @staticmethod + def get_job_instance_status(query_result): + return query_result["job_instance"]["status"] @staticmethod def get_log_content_for_single_ip(query_result): @@ -72,16 +89,33 @@ def get_log_content_for_single_ip(query_result): @staticmethod def get_ip_status(query_result): - return query_result[0]["step_results"][0]["ip_status"] + return query_result["status"] @staticmethod def get_job_tag(query_result): - return query_result[0]["step_results"][0]["tag"] + return query_result.get("tag", "") + + @staticmethod + def get_ip_list_log(ip_list, job_instance_id, step_instance_id, bk_biz_id): + return JobApi.batch_get_job_instance_ip_log( + params={ + "bk_biz_id": bk_biz_id, + "ip_list": ip_list, + "job_instance_id": job_instance_id, + "step_instance_id": step_instance_id, + }, + request_cookies=False, + ) @classmethod def query_task_result(cls, task_instance_id, operator, bk_biz_id): - result = JobApi.get_job_instance_log( - params={"bk_biz_id": bk_biz_id, "job_instance_id": task_instance_id, "bk_username": operator}, + result = JobApi.get_job_instance_status( + params={ + "bk_biz_id": bk_biz_id, + "job_instance_id": task_instance_id, + "bk_username": operator, + "return_ip_result": True, + }, request_cookies=False, ) return result @@ -97,13 +131,16 @@ def file_distribution( kwargs = { "bk_username": operator, "bk_biz_id": bk_biz_id, - "file_source": file_source_list, + "file_source_list": file_source_list, "account": account, + "account_alias": account, "file_target_path": file_target_path, - "ip_list": target_ip_list, + "target_server": { + "ip_list": target_ip_list, + }, "task_name": task_name, } - task_result = JobApi.fast_push_file(kwargs, raw=True, request_cookies=False) + task_result = JobApi.fast_transfer_file(kwargs, raw=True, request_cookies=False) if not task_result["result"]: raise PipelineApiFailed(PipelineApiFailed.MESSAGE.format(message=task_result["message"])) return task_result["data"] diff --git a/apps/log_extract/handlers/explorer.py b/apps/log_extract/handlers/explorer.py index 6bf08b18c..4aad5ef54 100644 --- a/apps/log_extract/handlers/explorer.py +++ b/apps/log_extract/handlers/explorer.py @@ -25,6 +25,7 @@ from django.conf import settings from django.utils.translation import ugettext_lazy as _ from apps.api import CCApi +from apps.utils.db import array_chunk from apps.utils.log import logger from apps.iam import ActionEnum, Permission from apps.log_search.handlers.biz import BizHandler @@ -35,7 +36,7 @@ from apps.log_extract.models import Strategies from apps.utils.local import get_request_username from apps.exceptions import ApiResultError -from apps.log_extract.constants import JOB_API_PERMISSION_CODE +from apps.log_extract.constants import JOB_API_PERMISSION_CODE, BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE class ExplorerHandler(object): @@ -97,7 +98,13 @@ def list_files(self, bk_biz_id, ip, request_dir, is_search_child, time_range, st query_result = self.get_finished_result(task_result["job_instance_id"], operator, bk_biz_id) success_step = self.get_success_step(query_result) - res = self.job_log_to_file_list(success_step["ip_logs"], allowed_dir_file_list) + ip_list = [ + {"ip": ip["ip"], "bk_cloud_id": ip["bk_cloud_id"]} for ip in success_step.get("step_ip_result_list", []) + ] + ip_logs = self.get_all_ip_logs( + ip_list, task_result["job_instance_id"], success_step["step_instance_id"], bk_biz_id + ) + res = self.job_log_to_file_list(ip_logs, allowed_dir_file_list) res = sorted(res, key=lambda k: k["mtime"], reverse=True) return res @@ -118,14 +125,19 @@ def get_finished_result(task_instance_id, operator, bk_biz_id): @staticmethod def get_success_step(query_result): - task_result, *__ = query_result - ip_status_list = [] - for step_result in task_result["step_results"]: - ip_status = step_result["ip_status"] - if ip_status == constants.JOB_SUCCESS_STATUS: - return step_result - ip_status_list.append(str(ip_status)) - raise exceptions.ExplorerException(_("文件预览异常({})".format(",".join(ip_status_list)))) + step_result = FileServer.get_step_instance(query_result) + ip_status = step_result["status"] + if ip_status == constants.JOB_SUCCESS_STATUS: + return step_result + raise exceptions.ExplorerException(_("文件预览异常({})".format(",".join(ip_status)))) + + def get_all_ip_logs(self, ip_list, job_instance_id, step_instance_id, bk_biz_id): + ip_list_group = array_chunk(ip_list, BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE) + ip_logs = [] + for ip_list in ip_list_group: + ip_list_log = FileServer.get_ip_list_log(ip_list, job_instance_id, step_instance_id, bk_biz_id) + ip_logs.extend(ip_list_log.get("script_task_logs", [])) + return ip_logs def job_log_to_file_list(self, ip_logs, allowed_dir_file_list): res = [] diff --git a/config/domains.py b/config/domains.py index ceae12349..eed5714e1 100644 --- a/config/domains.py +++ b/config/domains.py @@ -51,6 +51,8 @@ "CMSI_APIGATEWAY_ROOT_V2", # JOB "JOB_APIGATEWAY_ROOT_V2", + # JOBv3 + "JOB_APIGATEWAY_ROOT_V3", ] env_domains = load_domains(settings) diff --git a/dev.env.yml b/dev.env.yml index 737bfc84e..0744b3894 100644 --- a/dev.env.yml +++ b/dev.env.yml @@ -83,7 +83,7 @@ domains: CMSI_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/cmsi/" GSE_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/gse/" IAM_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/iam/" - JOB_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/job/" + JOB_APIGATEWAY_ROOT_V3: "{settings.PAAS_API_HOST}/api/c/compapi/v2/jobv3/" MONITOR_APIGATEWAY_ROOT: "{settings.PAAS_API_HOST}/api/c/compapi/v2/monitor_v3/" diff --git a/prod.env.yml b/prod.env.yml index a6a745acb..16b4daacc 100644 --- a/prod.env.yml +++ b/prod.env.yml @@ -83,5 +83,5 @@ domains: CMSI_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/cmsi/" GSE_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/gse/" IAM_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/iam/" - JOB_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/job/" + JOB_APIGATEWAY_ROOT_V3: "{settings.PAAS_API_HOST}/api/c/compapi/v2/jobv3/" MONITOR_APIGATEWAY_ROOT: "{settings.PAAS_API_HOST}/api/c/compapi/v2/monitor_v3/" diff --git a/stag.env.yml b/stag.env.yml index a6a745acb..16b4daacc 100644 --- a/stag.env.yml +++ b/stag.env.yml @@ -83,5 +83,5 @@ domains: CMSI_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/cmsi/" GSE_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/gse/" IAM_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/iam/" - JOB_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/job/" + JOB_APIGATEWAY_ROOT_V3: "{settings.PAAS_API_HOST}/api/c/compapi/v2/jobv3/" MONITOR_APIGATEWAY_ROOT: "{settings.PAAS_API_HOST}/api/c/compapi/v2/monitor_v3/"