Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: jobv3升级 #312

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ def _send_request(self, params, timeout, request_id, request_cookies):
"response_errors": response_errors,
"cost_time": (end_time - start_time),
"request_id": request_id,
"esb_request_id": raw_response.headers.get("X-Bkapi-Request-Id"),
"request_user": bk_username,
}

Expand Down
26 changes: 20 additions & 6 deletions apps/api/modules/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from django.utils.translation import ugettext_lazy as _

from apps.api.base import DataAPI
from config.domains import JOB_APIGATEWAY_ROOT_V2
from config.domains import JOB_APIGATEWAY_ROOT_V3


def get_job_request_before(params):
Expand All @@ -33,25 +33,39 @@ class _JobApi:
def __init__(self):
self.fast_execute_script = DataAPI(
method="POST",
url=JOB_APIGATEWAY_ROOT_V2 + "fast_execute_script",
url=JOB_APIGATEWAY_ROOT_V3 + "fast_execute_script/",
description=_("快速执行脚本"),
module=self.MODULE,
before_request=get_job_request_before,
)
self.fast_push_file = DataAPI(
self.fast_transfer_file = DataAPI(
method="POST",
url=JOB_APIGATEWAY_ROOT_V2 + "fast_push_file",
url=JOB_APIGATEWAY_ROOT_V3 + "fast_transfer_file/",
description=_("快速分发文件"),
module=self.MODULE,
before_request=get_job_request_before,
)
self.get_job_instance_log = DataAPI(
method="POST",
url=JOB_APIGATEWAY_ROOT_V2 + "get_job_instance_log",
method="GET",
url=JOB_APIGATEWAY_ROOT_V3 + "get_job_instance_log",
description=_("根据作业id获取执行日志"),
module=self.MODULE,
before_request=get_job_request_before,
)
self.get_job_instance_status = DataAPI(
method="GET",
url=JOB_APIGATEWAY_ROOT_V3 + "get_job_instance_status/",
description=_("根据作业实例 ID 查询作业执行状态"),
module=self.MODULE,
before_request=get_job_request_before,
)
self.batch_get_job_instance_ip_log = DataAPI(
method="POST",
url=JOB_APIGATEWAY_ROOT_V3 + "batch_get_job_instance_ip_log/",
description=_("根据ip列表批量查询作业执行日志"),
module=self.MODULE,
before_request=get_job_request_before,
)


JobApi = _JobApi()
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ def _schedule(self, data, parent_data, callback_data=None):

for item in FileServer.get_detail_for_ips(query_result):
if item["exit_code"] != 0:
raise Exception(_("上传网盘异常: {}").format(FileServer.get_job_tag(query_result)))
raise Exception(
_("上传网盘异常: {}, status: {}").format(FileServer.get_job_tag(item), item.get("status", ""))
)

task.download_status = DownloadStatus.DOWNLOADABLE.value
task.cos_file_name = data.get_one_of_outputs("pack_file_name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ def _schedule(self, data, parent_data, callback_data=None):
return True

# 判断文件分发是否成功
ip_status = FileServer.get_ip_status(query_result)
if ip_status != constants.JOB_SUCCESS_STATUS:
raise Exception(_("文件分发异常({})".format(ip_status)))
job_status = FileServer.get_job_instance_status(query_result)
if job_status != constants.JOB_SUCCESS_STATUS:
raise Exception(_("文件分发异常({})".format(job_status)))

self.finish_schedule()
return True
Expand Down
46 changes: 31 additions & 15 deletions apps/log_extract/components/collections/packing_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
from pipeline.component_framework.component import Component
from pipeline.core.flow.activity import Service, StaticIntervalGenerator
from apps.log_extract import constants
from apps.log_extract.constants import PACK_TASK_SCRIPT_NOT_HAVE_ENOUGH_CAP_ERROR_CODE
from apps.log_extract.constants import (
PACK_TASK_SCRIPT_NOT_HAVE_ENOUGH_CAP_ERROR_CODE,
BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE,
)
from apps.log_extract.fileserver import FileServer
from apps.log_extract.models import Tasks
from apps.log_extract.utils.packing import (
Expand All @@ -32,6 +35,8 @@
get_filter_content,
)
from apps.log_extract.components.collections.base_component import BaseService
from apps.utils.db import array_chunk
from apps.utils.log import logger


class FilePackingService(BaseService):
Expand Down Expand Up @@ -130,12 +135,11 @@ def _execute(self, data, parent_data):
ip_list = [ip_list]
distribution_source_file_list = [
{
"group_ids": "",
"account": data.get_one_of_inputs("account"),
"account": {"alias": data.get_one_of_inputs("account")},
# 转换IP格式
"ip_list": [ip],
"server": {"ip_list": [ip]},
# 这里是直接分发目录
"files": [f"{distribution_path}{packed_file_name}"],
"file_list": [f"{distribution_path}{packed_file_name}"],
}
for ip in ip_list
]
Expand All @@ -144,10 +148,12 @@ def _execute(self, data, parent_data):
return True

def _schedule(self, data, parent_data, callback_data=None):
task_instance_id = data.get_one_of_outputs("task_instance_id")
bk_biz_id = data.get_one_of_inputs("bk_biz_id")
query_result = self._poll_status(
task_instance_id=data.get_one_of_outputs("task_instance_id"),
task_instance_id=task_instance_id,
operator=data.get_one_of_inputs("operator"),
bk_biz_id=data.get_one_of_inputs("bk_biz_id"),
bk_biz_id=bk_biz_id,
)

# 判断脚本是否执行结束
Expand All @@ -157,25 +163,35 @@ def _schedule(self, data, parent_data, callback_data=None):
# 输出脚本内容, 如果所有IP都失败了,则返回异常
has_success = False
job_message = ""
for item in FileServer.get_detail_for_ips(query_result):
step_ip_result_list = FileServer.get_detail_for_ips(query_result)
for item in step_ip_result_list:
if item["exit_code"] == 0:
has_success = True
break
elif item["exit_code"] == PACK_TASK_SCRIPT_NOT_HAVE_ENOUGH_CAP_ERROR_CODE:
job_message = _("目标机器没有足够的储存")
else:
job_message = FileServer.get_job_tag(query_result)
job_message = FileServer.get_job_tag(item)

if not has_success:
raise Exception(_("任务打包异常: {}").format(job_message))

data.outputs.task_script_output = FileServer.get_log_content_for_single_ip(query_result)
ip_list_group = array_chunk(
[{"ip": item["ip"], "bk_cloud_id": item["bk_cloud_id"]} for item in step_ip_result_list],
BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE,
)
ip_log_output_kv = {}
for ip_log_content in FileServer.get_detail_for_ips(query_result):
content = html.unescape(ip_log_content["log_content"])
log_output_kv = FileServer.get_bk_kv_log(content)
log_output_kv = {kv[constants.BKLOG_LOG_KEY]: kv[constants.BKLOG_LOG_VALUE] for kv in log_output_kv}
ip_log_output_kv[ip_log_content["ip"]] = log_output_kv
step_instance_id = FileServer.get_step_instance_id(query_result)
try:
for ip_list in ip_list_group:
ip_list_log = FileServer.get_ip_list_log(ip_list, task_instance_id, step_instance_id, bk_biz_id)
for log_content in ip_list_log.get("script_task_logs", []):
content = html.unescape(log_content.get("log_content"))
log_output_kv = FileServer.get_bk_kv_log(content)
log_output_kv = {kv[constants.BKLOG_LOG_KEY]: kv[constants.BKLOG_LOG_VALUE] for kv in log_output_kv}
ip_log_output_kv[log_content["ip"]] = log_output_kv
except Exception as e:
logger.exception(f"[packing get bklog] get log content failed => {e}")

task = Tasks.objects.get(task_id=data.get_one_of_inputs("task_id"))
task.ex_data.update(ip_log_output_kv)
Expand Down
4 changes: 3 additions & 1 deletion apps/log_extract/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ class PreDateMode(ChoicesEnum):
JOB_SCRIPT_TYPE = 1

# 作业执行成功标识
JOB_SUCCESS_STATUS = 9
JOB_SUCCESS_STATUS = 3

BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE = 500

# windows系统名称
WINDOWS_OS_NAME_LIST = settings.WINDOWS_OS_NAME_LIST
Expand Down
61 changes: 49 additions & 12 deletions apps/log_extract/fileserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ def execute_script(cls, content, ip, bk_biz_id, operator, account, task_name, sc
"bk_username": operator,
"bk_biz_id": bk_biz_id,
"script_content": content,
"ip_list": ip,
"target_server": {
"ip_list": ip,
},
"script_type": JOB_SCRIPT_TYPE,
"account": account,
"script_language": JOB_SCRIPT_TYPE,
"account_alias": account,
"task_name": task_name,
}
if script_params:
kwargs["script_params"] = script_params
kwargs["script_param"] = script_params
return JobApi.fast_execute_script(kwargs, request_cookies=False)

Expand All @@ -60,28 +62,60 @@ def get_task_id(task_result):

@staticmethod
def is_finished_for_single_ip(query_result):
return query_result[0]["is_finished"]
return query_result["finished"]

@staticmethod
def get_detail_for_ips(query_result):
return query_result[0]["step_results"][0]["ip_logs"]
step_instance, *_ = query_result["step_instance_list"]
return step_instance["step_ip_result_list"]

@staticmethod
def get_step_instance(query_result):
step_instance, *_ = query_result["step_instance_list"]
return step_instance

@staticmethod
def get_step_instance_id(query_result):
step_instance, *_ = query_result["step_instance_list"]
return step_instance["step_instance_id"]

@staticmethod
def get_job_instance_status(query_result):
return query_result["job_instance"]["status"]

@staticmethod
def get_log_content_for_single_ip(query_result):
return query_result[0]["step_results"][0]["ip_logs"][0]["log_content"]

@staticmethod
def get_ip_status(query_result):
return query_result[0]["step_results"][0]["ip_status"]
return query_result["status"]

@staticmethod
def get_job_tag(query_result):
return query_result[0]["step_results"][0]["tag"]
return query_result.get("tag", "")

@staticmethod
def get_ip_list_log(ip_list, job_instance_id, step_instance_id, bk_biz_id):
return JobApi.batch_get_job_instance_ip_log(
params={
"bk_biz_id": bk_biz_id,
"ip_list": ip_list,
"job_instance_id": job_instance_id,
"step_instance_id": step_instance_id,
},
request_cookies=False,
)

@classmethod
def query_task_result(cls, task_instance_id, operator, bk_biz_id):
result = JobApi.get_job_instance_log(
params={"bk_biz_id": bk_biz_id, "job_instance_id": task_instance_id, "bk_username": operator},
result = JobApi.get_job_instance_status(
params={
"bk_biz_id": bk_biz_id,
"job_instance_id": task_instance_id,
"bk_username": operator,
"return_ip_result": True,
},
request_cookies=False,
)
return result
Expand All @@ -97,13 +131,16 @@ def file_distribution(
kwargs = {
"bk_username": operator,
"bk_biz_id": bk_biz_id,
"file_source": file_source_list,
"file_source_list": file_source_list,
"account": account,
"account_alias": account,
"file_target_path": file_target_path,
"ip_list": target_ip_list,
"target_server": {
"ip_list": target_ip_list,
},
"task_name": task_name,
}
task_result = JobApi.fast_push_file(kwargs, raw=True, request_cookies=False)
task_result = JobApi.fast_transfer_file(kwargs, raw=True, request_cookies=False)
if not task_result["result"]:
raise PipelineApiFailed(PipelineApiFailed.MESSAGE.format(message=task_result["message"]))
return task_result["data"]
Expand Down
32 changes: 22 additions & 10 deletions apps/log_extract/handlers/explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from django.conf import settings
from django.utils.translation import ugettext_lazy as _
from apps.api import CCApi
from apps.utils.db import array_chunk
from apps.utils.log import logger
from apps.iam import ActionEnum, Permission
from apps.log_search.handlers.biz import BizHandler
Expand All @@ -35,7 +36,7 @@
from apps.log_extract.models import Strategies
from apps.utils.local import get_request_username
from apps.exceptions import ApiResultError
from apps.log_extract.constants import JOB_API_PERMISSION_CODE
from apps.log_extract.constants import JOB_API_PERMISSION_CODE, BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE


class ExplorerHandler(object):
Expand Down Expand Up @@ -97,7 +98,13 @@ def list_files(self, bk_biz_id, ip, request_dir, is_search_child, time_range, st

query_result = self.get_finished_result(task_result["job_instance_id"], operator, bk_biz_id)
success_step = self.get_success_step(query_result)
res = self.job_log_to_file_list(success_step["ip_logs"], allowed_dir_file_list)
ip_list = [
{"ip": ip["ip"], "bk_cloud_id": ip["bk_cloud_id"]} for ip in success_step.get("step_ip_result_list", [])
]
ip_logs = self.get_all_ip_logs(
ip_list, task_result["job_instance_id"], success_step["step_instance_id"], bk_biz_id
)
res = self.job_log_to_file_list(ip_logs, allowed_dir_file_list)
res = sorted(res, key=lambda k: k["mtime"], reverse=True)
return res

Expand All @@ -118,14 +125,19 @@ def get_finished_result(task_instance_id, operator, bk_biz_id):

@staticmethod
def get_success_step(query_result):
task_result, *__ = query_result
ip_status_list = []
for step_result in task_result["step_results"]:
ip_status = step_result["ip_status"]
if ip_status == constants.JOB_SUCCESS_STATUS:
return step_result
ip_status_list.append(str(ip_status))
raise exceptions.ExplorerException(_("文件预览异常({})".format(",".join(ip_status_list))))
step_result = FileServer.get_step_instance(query_result)
ip_status = step_result["status"]
if ip_status == constants.JOB_SUCCESS_STATUS:
return step_result
raise exceptions.ExplorerException(_("文件预览异常({})".format(",".join(ip_status))))

def get_all_ip_logs(self, ip_list, job_instance_id, step_instance_id, bk_biz_id):
ip_list_group = array_chunk(ip_list, BATCH_GET_JOB_INSTANCE_IP_LOG_IP_LIST_SIZE)
ip_logs = []
for ip_list in ip_list_group:
ip_list_log = FileServer.get_ip_list_log(ip_list, job_instance_id, step_instance_id, bk_biz_id)
ip_logs.extend(ip_list_log.get("script_task_logs", []))
return ip_logs

def job_log_to_file_list(self, ip_logs, allowed_dir_file_list):
res = []
Expand Down
2 changes: 2 additions & 0 deletions config/domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
"CMSI_APIGATEWAY_ROOT_V2",
# JOB
"JOB_APIGATEWAY_ROOT_V2",
# JOBv3
"JOB_APIGATEWAY_ROOT_V3",
]

env_domains = load_domains(settings)
Expand Down
2 changes: 1 addition & 1 deletion dev.env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ domains:
CMSI_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/cmsi/"
GSE_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/gse/"
IAM_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/iam/"
JOB_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/job/"
JOB_APIGATEWAY_ROOT_V3: "{settings.PAAS_API_HOST}/api/c/compapi/v2/jobv3/"
MONITOR_APIGATEWAY_ROOT: "{settings.PAAS_API_HOST}/api/c/compapi/v2/monitor_v3/"


Expand Down
2 changes: 1 addition & 1 deletion prod.env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ domains:
CMSI_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/cmsi/"
GSE_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/gse/"
IAM_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/iam/"
JOB_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/job/"
JOB_APIGATEWAY_ROOT_V3: "{settings.PAAS_API_HOST}/api/c/compapi/v2/jobv3/"
MONITOR_APIGATEWAY_ROOT: "{settings.PAAS_API_HOST}/api/c/compapi/v2/monitor_v3/"
2 changes: 1 addition & 1 deletion stag.env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ domains:
CMSI_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/cmsi/"
GSE_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/gse/"
IAM_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/iam/"
JOB_APIGATEWAY_ROOT_V2: "{settings.PAAS_API_HOST}/api/c/compapi/v2/job/"
JOB_APIGATEWAY_ROOT_V3: "{settings.PAAS_API_HOST}/api/c/compapi/v2/jobv3/"
MONITOR_APIGATEWAY_ROOT: "{settings.PAAS_API_HOST}/api/c/compapi/v2/monitor_v3/"