Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bcs 容器采集项延迟创建 --story=119600530 #2919

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 200 additions & 67 deletions bklog/apps/log_databus/handlers/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
CACHE_KEY_CLUSTER_INFO,
CHECK_TASK_READY_NOTE_FOUND_EXCEPTION_CODE,
CONTAINER_CONFIGS_TO_YAML_EXCLUDE_FIELDS,
DEFAULT_COLLECTOR_LENGTH,
DEFAULT_RETENTION,
INTERNAL_TOPO_INDEX,
META_DATA_ENCODING,
Expand Down Expand Up @@ -3139,49 +3138,82 @@ def create_bcs_container_config(self, data, bk_app_code="bk_bcs"):
collector_config_name_en=data["collector_config_name_en"],
)
bcs_rule = BcsRule.objects.create(rule_name=data["collector_config_name"], bcs_project_id=data["project_id"])
# 创建路径采集项
path_collector_config = self.create_bcs_collector(
{
"bk_biz_id": data["bk_biz_id"],
"collector_config_name": bcs_collector_config_name["bcs_path_collector"]["collector_config_name"],
"collector_config_name_en": bcs_collector_config_name["bcs_path_collector"]["collector_config_name_en"],
"collector_scenario_id": CollectorScenarioEnum.ROW.value,
"custom_type": data["custom_type"],
"category_id": data["category_id"],
"description": data["description"],
"data_link_id": int(conf["data_link_id"]),
"bk_app_code": bk_app_code,
"environment": Environment.CONTAINER,
"bcs_cluster_id": data["bcs_cluster_id"],
"add_pod_label": data["add_pod_label"],
"extra_labels": data["extra_labels"],
"rule_id": bcs_rule.id,
},
conf=conf,
async_bkdata=False,
)

# 创建标准输出采集项
std_collector_config = self.create_bcs_collector(
{
"bk_biz_id": data["bk_biz_id"],
"collector_config_name": bcs_collector_config_name["bcs_std_collector"]["collector_config_name"],
"collector_config_name_en": bcs_collector_config_name["bcs_std_collector"]["collector_config_name_en"],
"collector_scenario_id": CollectorScenarioEnum.ROW.value,
"custom_type": data["custom_type"],
"category_id": data["category_id"],
"description": data["description"],
"data_link_id": int(conf["data_link_id"]),
"bk_app_code": bk_app_code,
"environment": Environment.CONTAINER,
"bcs_cluster_id": data["bcs_cluster_id"],
"add_pod_label": data["add_pod_label"],
"extra_labels": data["extra_labels"],
"rule_id": bcs_rule.id,
},
conf=conf,
async_bkdata=False,
)
# 默认设置为空,做为一个标识
path_collector_config = std_collector_config = ""
parent_container_config_id = 0
# 注入索引集标签
tag_id = IndexSetTag.get_tag_id(data["bcs_cluster_id"])
is_send_create_notify = False
for config in data["config"]:
if config["paths"]:
# 创建路径采集项
path_collector_config = self.create_bcs_collector(
{
"bk_biz_id": data["bk_biz_id"],
"collector_config_name": bcs_collector_config_name["bcs_path_collector"][
"collector_config_name"
],
"collector_config_name_en": bcs_collector_config_name["bcs_path_collector"][
"collector_config_name_en"
],
"collector_scenario_id": CollectorScenarioEnum.ROW.value,
"custom_type": data["custom_type"],
"category_id": data["category_id"],
"description": data["description"],
"data_link_id": int(conf["data_link_id"]),
"bk_app_code": bk_app_code,
"environment": Environment.CONTAINER,
"bcs_cluster_id": data["bcs_cluster_id"],
"add_pod_label": data["add_pod_label"],
"extra_labels": data["extra_labels"],
"rule_id": bcs_rule.id,
},
conf=conf,
async_bkdata=False,
)
is_send_create_notify = True
# 注入索引集标签
IndexSetHandler(path_collector_config.index_set_id).add_tag(tag_id=tag_id)

if config["enable_stdout"]:
# 创建标准输出采集项
std_collector_config = self.create_bcs_collector(
{
"bk_biz_id": data["bk_biz_id"],
"collector_config_name": bcs_collector_config_name["bcs_std_collector"][
"collector_config_name"
],
"collector_config_name_en": bcs_collector_config_name["bcs_std_collector"][
"collector_config_name_en"
],
"collector_scenario_id": CollectorScenarioEnum.ROW.value,
"custom_type": data["custom_type"],
"category_id": data["category_id"],
"description": data["description"],
"data_link_id": int(conf["data_link_id"]),
"bk_app_code": bk_app_code,
"environment": Environment.CONTAINER,
"bcs_cluster_id": data["bcs_cluster_id"],
"add_pod_label": data["add_pod_label"],
"extra_labels": data["extra_labels"],
"rule_id": bcs_rule.id,
},
conf=conf,
async_bkdata=False,
)
# 注入索引集标签
IndexSetHandler(std_collector_config.index_set_id).add_tag(tag_id=tag_id)
# 获取父配置id
collector_config_obj = CollectorConfig.objects.filter(
rule_id=bcs_rule.id,
collector_config_name_en=bcs_collector_config_name["bcs_path_collector"][
"collector_config_name_en"
],
).first()
if collector_config_obj:
parent_container_config_id = collector_config_obj.collector_config_id

container_collector_config_list = []
for config in data["config"]:
workload_type = config["container"].get("workload_type", "")
Expand Down Expand Up @@ -3240,29 +3272,27 @@ def create_bcs_container_config(self, data, bk_app_code="bk_bcs"):
match_expressions=match_expressions,
all_container=is_all_container,
rule_id=bcs_rule.id,
parent_container_config_id=path_collector_config.collector_config_id,
parent_container_config_id=parent_container_config_id,
)
)

ContainerCollectorConfig.objects.bulk_create(container_collector_config_list)

# 注入索引集标签
tag_id = IndexSetTag.get_tag_id(data["bcs_cluster_id"])
IndexSetHandler(path_collector_config.index_set_id).add_tag(tag_id=tag_id)
IndexSetHandler(std_collector_config.index_set_id).add_tag(tag_id=tag_id)

self.send_create_notify(path_collector_config)
if is_send_create_notify:
self.send_create_notify(path_collector_config)

return {
"rule_id": bcs_rule.id,
"rule_file_index_set_id": path_collector_config.index_set_id,
"rule_file_collector_config_id": path_collector_config.collector_config_id,
"rule_std_index_set_id": std_collector_config.index_set_id,
"rule_std_collector_config_id": std_collector_config.collector_config_id,
"file_index_set_id": path_collector_config.index_set_id, # TODO: 兼容代码4.8需删除
"std_index_set_id": std_collector_config.index_set_id, # TODO: 兼容代码4.8需删除
"bk_data_id": path_collector_config.bk_data_id,
"stdout_conf": {"bk_data_id": std_collector_config.bk_data_id},
"rule_file_index_set_id": path_collector_config.index_set_id if path_collector_config else "",
"rule_file_collector_config_id": path_collector_config.collector_config_id if path_collector_config else "",
"rule_std_index_set_id": std_collector_config.index_set_id if std_collector_config else "",
"rule_std_collector_config_id": std_collector_config.collector_config_id if std_collector_config else "",
"file_index_set_id": path_collector_config.index_set_id
if path_collector_config
else "", # TODO: 兼容代码4.8需删除
"std_index_set_id": std_collector_config.index_set_id if std_collector_config else "", # TODO: 兼容代码4.8需删除
"bk_data_id": path_collector_config.bk_data_id if path_collector_config else "",
"stdout_conf": {"bk_data_id": std_collector_config.bk_data_id if std_collector_config else ""},
}

def sync_bcs_container_task(self, data: Dict[str, Any]):
Expand All @@ -3274,6 +3304,8 @@ def sync_bcs_container_task(self, data: Dict[str, Any]):
file_collector_config_id = data["rule_file_collector_config_id"]
std_collector_config_id = data["rule_std_collector_config_id"]
for collector_config_id in [file_collector_config_id, std_collector_config_id]:
if not collector_config_id:
continue
collector_config = CollectorConfig.objects.filter(
collector_config_id=collector_config_id,
).first()
Expand Down Expand Up @@ -3403,9 +3435,101 @@ def check_collector_config(self, collector_config_params):
)

@transaction.atomic
def update_bcs_container_config(self, data, rule_id):
def update_bcs_container_config(self, data, rule_id, bk_app_code="bk_bcs"):
conf = self.get_bcs_config(
bk_biz_id=data["bk_biz_id"],
bcs_cluster_id=data["bcs_cluster_id"],
storage_cluster_id=data.get("storage_cluster_id"),
)
bcs_collector_config_name = self.generate_collector_config_name(
bcs_cluster_id=data["bcs_cluster_id"],
collector_config_name=data["collector_config_name"],
collector_config_name_en=data["collector_config_name_en"],
)
bcs_path_collector_config_name_en = bcs_collector_config_name["bcs_path_collector"]["collector_config_name_en"]
bcs_std_collector_config_name_en = bcs_collector_config_name["bcs_std_collector"]["collector_config_name_en"]

# 默认设置为空,做为一个标识
path_collector = std_collector = ""
# 注入索引集标签
tag_id = IndexSetTag.get_tag_id(data["bcs_cluster_id"])
is_send_create_notify = False
# 容器配置是否创建标识
is_exist_bcs_path = False
is_exist_bcs_std = False
for config in data["config"]:
collector_config_name_en_list = CollectorConfig.objects.filter(
rule_id=rule_id,
collector_config_name_en__in=[bcs_path_collector_config_name_en, bcs_std_collector_config_name_en],
).values_list("collector_config_name_en", flat=True)

for collector_config_name_en in collector_config_name_en_list:
if collector_config_name_en.endswith("_path"):
is_exist_bcs_path = True
elif collector_config_name_en.endswith("_std"):
is_exist_bcs_std = True

# 如果还没有创建容器配置,那么当config["paths"]或config["enable_stdout"]存在时需要创建容器配置
if config["paths"] and not is_exist_bcs_path:
# 创建路径采集项
path_collector_config = self.create_bcs_collector(
{
"bk_biz_id": data["bk_biz_id"],
"collector_config_name": bcs_collector_config_name["bcs_path_collector"][
"collector_config_name"
],
"collector_config_name_en": bcs_collector_config_name["bcs_path_collector"][
"collector_config_name_en"
],
"collector_scenario_id": CollectorScenarioEnum.ROW.value,
"custom_type": data["custom_type"],
"category_id": data["category_id"],
"description": data["description"],
"data_link_id": int(conf["data_link_id"]),
"bk_app_code": bk_app_code,
"environment": Environment.CONTAINER,
"bcs_cluster_id": data["bcs_cluster_id"],
"add_pod_label": data["add_pod_label"],
"extra_labels": data["extra_labels"],
"rule_id": rule_id,
},
conf=conf,
async_bkdata=False,
)
is_send_create_notify = True
# 注入索引集标签
IndexSetHandler(path_collector_config.index_set_id).add_tag(tag_id=tag_id)
if config["enable_stdout"] and not is_exist_bcs_std:
# 创建标准输出采集项
std_collector_config = self.create_bcs_collector(
{
"bk_biz_id": data["bk_biz_id"],
"collector_config_name": bcs_collector_config_name["bcs_std_collector"][
"collector_config_name"
],
"collector_config_name_en": bcs_collector_config_name["bcs_std_collector"][
"collector_config_name_en"
],
"collector_scenario_id": CollectorScenarioEnum.ROW.value,
"custom_type": data["custom_type"],
"category_id": data["category_id"],
"description": data["description"],
"data_link_id": int(conf["data_link_id"]),
"bk_app_code": bk_app_code,
"environment": Environment.CONTAINER,
"bcs_cluster_id": data["bcs_cluster_id"],
"add_pod_label": data["add_pod_label"],
"extra_labels": data["extra_labels"],
"rule_id": rule_id,
},
conf=conf,
async_bkdata=False,
)
# 注入索引集标签
IndexSetHandler(std_collector_config.index_set_id).add_tag(tag_id=tag_id)

collectors = CollectorConfig.objects.filter(rule_id=rule_id)
if len(collectors) != DEFAULT_COLLECTOR_LENGTH:
if not collectors:
raise RuleCollectorException(RuleCollectorException.MESSAGE.format(rule_id=rule_id))
for collector in collectors:
if collector.collector_config_name_en.endswith("_path"):
Expand Down Expand Up @@ -3439,14 +3563,17 @@ def update_bcs_container_config(self, data, rule_id):
**{"data": {"configs": std_container_config}},
)

if is_send_create_notify:
self.send_create_notify(path_collector_config)

return {
"rule_id": rule_id,
"rule_file_index_set_id": path_collector.index_set_id,
"rule_std_index_set_id": std_collector.index_set_id,
"file_index_set_id": path_collector.index_set_id, # TODO: 兼容代码4.8需删除
"std_index_set_id": std_collector.index_set_id, # TODO: 兼容代码4.8需删除
"bk_data_id": path_collector.bk_data_id,
"stdout_conf": {"bk_data_id": std_collector.bk_data_id},
"rule_file_index_set_id": path_collector.index_set_id if path_collector else "",
"rule_std_index_set_id": std_collector.index_set_id if std_collector else "",
"file_index_set_id": path_collector.index_set_id if path_collector else "", # TODO: 兼容代码4.8需删除
"std_index_set_id": std_collector.index_set_id if std_collector else "", # TODO: 兼容代码4.8需删除
"bk_data_id": path_collector.bk_data_id if path_collector else "",
"stdout_conf": {"bk_data_id": std_collector.bk_data_id if std_collector else ""},
}

def deal_self_call(self, **kwargs):
Expand Down Expand Up @@ -3487,6 +3614,9 @@ def get_container_configs(cls, config, path_collector, rule_id):
"match_labels": conf["label_selector"].get("match_labels", []),
"match_expressions": conf["label_selector"].get("match_expressions", []),
},
"annotation_selector": {
yiqiwang-17 marked this conversation as resolved.
Show resolved Hide resolved
"match_annotations": conf["annotation_selector"].get("match_annotations", []),
},
"rule_id": rule_id,
"parent_container_config_id": 0,
"collector_type": ContainerCollectorType.CONTAINER,
Expand Down Expand Up @@ -3517,8 +3647,11 @@ def get_container_configs(cls, config, path_collector, rule_id):
"match_labels": conf["label_selector"].get("match_labels", []),
"match_expressions": conf["label_selector"].get("match_expressions", []),
},
"annotation_selector": {
"match_annotations": conf["annotation_selector"].get("match_annotations", []),
},
"rule_id": rule_id,
"parent_container_config_id": path_collector.collector_config_id,
"parent_container_config_id": path_collector.collector_config_id if path_collector else 0,
"collector_type": ContainerCollectorType.STDOUT,
}
)
Expand Down
2 changes: 1 addition & 1 deletion bklog/apps/log_databus/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ class ConditionSerializer(serializers.Serializer):
exclude_files = serializers.ListField(
label=_("日志采集路径黑名单"), child=serializers.CharField(allow_blank=True), required=False, allow_empty=True
)
encoding = serializers.ChoiceField(label=_("日志字符集"), choices=EncodingsEnum.get_choices(), default="utf-8")
encoding = serializers.ChoiceField(label=_("日志字符集"), choices=EncodingsEnum.get_choices(), default="UTF-8")
multiline = MultilineSerializer(label=_("段日志配置"), required=False)
extMeta = serializers.DictField(label=_("额外的元数据"), required=False, allow_empty=True)
logConfigType = serializers.ChoiceField(
Expand Down
6 changes: 5 additions & 1 deletion bklog/apps/log_databus/views/collector_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2108,7 +2108,11 @@ def update_bcs_collector(self, request, collector_config_id=None):
raise BkJwtVerifyException()
data = self.params_valid(BCSCollectorSerializer)
rule_id = int(collector_config_id)
return Response(CollectorHandler().update_bcs_container_config(data=data, rule_id=rule_id))
return Response(
CollectorHandler().update_bcs_container_config(
data=data, rule_id=rule_id, bk_app_code=auth_info["bk_app_code"]
)
)

@detail_route(methods=["POST"], url_path="retry_bcs_collector")
def retry_bcs_collector(self, request, collector_config_id=None):
Expand Down
Loading