Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 调整es路由中的索引集格式 --story=118810611 #2406

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions bkmonitor/metadata/models/space/space_table_id_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from django.utils.timezone import now as tz_now

from metadata import models
from metadata.models.constants import DEFAULT_MEASUREMENT, EsSourceType
from metadata.models.constants import DEFAULT_MEASUREMENT
from metadata.models.space import utils
from metadata.models.space.constants import (
ALL_SPACE_TYPE_TABLE_ID_LIST,
Expand Down Expand Up @@ -224,17 +224,23 @@ def _compose_es_table_id_detail(self, table_id_list: Optional[List[str]] = None)
)
# 查询结果表选项
tid_options = models.ResultTableOption.objects.filter(table_id__in=table_id_list).values(
"table_id", "name", "value"
"table_id", "name", "value", "value_type"
)
else:
table_ids = models.ESStorage.objects.values("table_id", "storage_cluster_id", "source_type", "index_set")
tids = [obj["table_id"] for obj in table_ids]
tid_options = models.ResultTableOption.objects.filter(table_id__in=tids).values("table_id", "name", "value")
tid_options = models.ResultTableOption.objects.filter(table_id__in=tids).values(
"table_id", "name", "value", "value_type"
)

tid_options_map = {}
for option in tid_options:
try:
_option = {option["name"]: json.loads(option["value"])}
_option = (
{option["name"]: option["value"]}
if option["value_type"] == models.ResultTableOption.TYPE_STRING
else {option["name"]: json.loads(option["value"])}
)
except Exception:
_option = {}

Expand All @@ -248,14 +254,8 @@ def _compose_es_table_id_detail(self, table_id_list: Optional[List[str]] = None)
index_set = record["index_set"]
tid = record["table_id"]
table_id_db = index_set
# 三个场景进行查询规则处理
if source_type == EsSourceType.LOG.value:
_index_list = index_set.split(",") if index_set else [tid]
table_id_db = ",".join([f"{index.replace('.', '_')}_*_read" for index in _index_list])
elif source_type == EsSourceType.BKDATA.value:
_index_list = index_set.split(",")
table_id_db = ",".join([f"{index}_*" for index in _index_list])

# 索引集,直接按照存储进行路由
data[tid] = json.dumps(
{
"storage_id": record.get("storage_cluster_id", 0),
Expand Down
2 changes: 1 addition & 1 deletion bkmonitor/metadata/resources/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def perform_request(self, data: OrderedDict):
if data.get("options"):
self.create_or_update_options(table_id, data["options"])
need_refresh_table_id_detail = True
options = list(models.ResultTableOption.objects.filter(table_id=table_id).values("name", "value"))
options = list(models.ResultTableOption.objects.filter(table_id=table_id).values("name", "value", "value_type"))
# 如果别名或者索引集有变动,则需要通知到unify-query
if need_refresh_data_label:
push_and_publish_es_aliases(data_label=data["data_label"])
Expand Down
25 changes: 7 additions & 18 deletions bkmonitor/metadata/service/space_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import requests

from metadata import models
from metadata.models.constants import EsSourceType
from metadata.models.space.constants import (
DATA_LABEL_TO_RESULT_TABLE_CHANNEL,
DATA_LABEL_TO_RESULT_TABLE_KEY,
Expand Down Expand Up @@ -90,38 +89,28 @@ def push_and_publish_es_table_id(
):
"""推送并发布es结果表

下面的处理方式,交由调用接口端,通过 `option` 进行处理
- 自有: 追加时间戳和read后缀
- 数据平台: 追加时间戳
- 第三方: 不追加任何,直接按照规则处理
"""
table_id_db = ""
# 针对内建的索引,如果没有设置索引集,则按照结果表获取查询规则
if source_type == EsSourceType.LOG.value:
_index_list = index_set.split(",") if index_set else [table_id]
table_id_db = ",".join([f"{index.replace('.', '_')}_*_read" for index in _index_list])
elif source_type == EsSourceType.BKDATA.value:
_index_list = index_set.split(",")
table_id_db = ",".join([f"{index}_*" for index in _index_list])
else:
table_id_db = index_set

if not table_id_db:
logger.error("compose table_id_db failed, index_set: %s", index_set)
return

# 组装values,包含 options 字段
values = {
"source_type": source_type,
"storage_id": cluster_id,
"db": table_id_db,
"db": index_set,
"measurement": "__default__",
"options": {},
}
if options:
_options = {}
for option in options:
try:
_options[option["name"]] = json.loads(option["value"])
_options[option["name"]] = (
option["value"]
if option["value_type"] == models.ResultTableOption.TYPE_STRING
else json.loads(option["value"])
)
except Exception:
_options[option["name"]] = {}
values["options"] = _options
Expand Down