Skip to content

Commit

Permalink
feat: 上下文 context 接口改为直查 ES --story=119687755 (#2941)
Browse files Browse the repository at this point in the history
  • Loading branch information
wencong1724427771 authored Sep 17, 2024
1 parent 11fb30a commit 87c68b7
Showing 1 changed file with 105 additions and 41 deletions.
146 changes: 105 additions & 41 deletions bklog/apps/log_search/handlers/search/search_handlers_esquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
from apps.log_desensitize.models import DesensitizeConfig, DesensitizeFieldConfig
from apps.log_desensitize.utils import expand_nested_data, merge_nested_data
from apps.log_esquery.esquery.esquery import EsQuery
from apps.log_esquery.serializers import EsQuerySearchAttrSerializer
from apps.log_esquery.serializers import (
EsQueryDslAttrSerializer,
EsQuerySearchAttrSerializer,
)
from apps.log_search.constants import (
ASYNC_SORTED,
CHECK_FIELD_LIST,
Expand Down Expand Up @@ -643,6 +646,11 @@ def direct_esquery_search(cls, params):
data = custom_params_valid(EsQuerySearchAttrSerializer, params)
return EsQuery(data).search()

@classmethod
def direct_esquery_dsl(cls, params):
data = custom_params_valid(EsQueryDslAttrSerializer, params)
return EsQuery(data).dsl()

def _multi_search(self, once_size: int):
"""
根据存储集群切换记录多线程请求 BkLogApi.search
Expand Down Expand Up @@ -923,7 +931,65 @@ def pre_get_result(self, sorted_fields: list, size: int):
@return:
"""
if self.scenario_id == Scenario.ES:
result = BkLogApi.search(
if FeatureToggleObject.switch(DIRECT_ESQUERY_SEARCH, self.search_dict.get("bk_biz_id")):
result = self.direct_esquery_search(
{
"indices": self.indices,
"scenario_id": self.scenario_id,
"storage_cluster_id": self.storage_cluster_id,
"start_time": self.start_time,
"end_time": self.end_time,
"query_string": self.query_string,
"filter": self.filter,
"sort_list": self.sort_list,
"start": self.start,
"size": size,
"aggs": self.aggs,
"highlight": self.highlight,
"time_zone": self.time_zone,
"time_range": self.time_range,
"use_time_range": self.use_time_range,
"time_field": self.time_field,
"time_field_type": self.time_field_type,
"time_field_unit": self.time_field_unit,
"scroll": SCROLL,
"collapse": self.collapse,
}
)
else:
result = BkLogApi.search(
{
"indices": self.indices,
"scenario_id": self.scenario_id,
"storage_cluster_id": self.storage_cluster_id,
"start_time": self.start_time,
"end_time": self.end_time,
"query_string": self.query_string,
"filter": self.filter,
"sort_list": self.sort_list,
"start": self.start,
"size": size,
"aggs": self.aggs,
"highlight": self.highlight,
"time_zone": self.time_zone,
"time_range": self.time_range,
"use_time_range": self.use_time_range,
"time_field": self.time_field,
"time_field_type": self.time_field_type,
"time_field_unit": self.time_field_unit,
"scroll": SCROLL,
"collapse": self.collapse,
},
data_api_retry_cls=DataApiRetryClass.create_retry_obj(
exceptions=[BaseException],
stop_max_attempt_number=MAX_EXPORT_REQUEST_RETRY,
),
)
return result

sorted_list = self._get_user_sorted_list(sorted_fields)
if FeatureToggleObject.switch(DIRECT_ESQUERY_SEARCH, self.search_dict.get("bk_biz_id")):
result = self.direct_esquery_search(
{
"indices": self.indices,
"scenario_id": self.scenario_id,
Expand All @@ -932,56 +998,49 @@ def pre_get_result(self, sorted_fields: list, size: int):
"end_time": self.end_time,
"query_string": self.query_string,
"filter": self.filter,
"sort_list": self.sort_list,
"sort_list": sorted_list,
"start": self.start,
"size": size,
"aggs": self.aggs,
"highlight": self.highlight,
"time_zone": self.time_zone,
"time_range": self.time_range,
"time_field": self.time_field,
"use_time_range": self.use_time_range,
"time_field_type": self.time_field_type,
"time_field_unit": self.time_field_unit,
"scroll": None,
"collapse": self.collapse,
}
)
else:
result = BkLogApi.search(
{
"indices": self.indices,
"scenario_id": self.scenario_id,
"storage_cluster_id": self.storage_cluster_id,
"start_time": self.start_time,
"end_time": self.end_time,
"query_string": self.query_string,
"filter": self.filter,
"sort_list": sorted_list,
"start": self.start,
"size": size,
"aggs": self.aggs,
"highlight": self.highlight,
"time_zone": self.time_zone,
"time_range": self.time_range,
"time_field": self.time_field,
"use_time_range": self.use_time_range,
"time_field_type": self.time_field_type,
"time_field_unit": self.time_field_unit,
"scroll": SCROLL,
"scroll": None,
"collapse": self.collapse,
},
data_api_retry_cls=DataApiRetryClass.create_retry_obj(
exceptions=[BaseException],
stop_max_attempt_number=MAX_EXPORT_REQUEST_RETRY,
exceptions=[BaseException], stop_max_attempt_number=MAX_EXPORT_REQUEST_RETRY
),
)
return result

sorted_list = self._get_user_sorted_list(sorted_fields)

result = BkLogApi.search(
{
"indices": self.indices,
"scenario_id": self.scenario_id,
"storage_cluster_id": self.storage_cluster_id,
"start_time": self.start_time,
"end_time": self.end_time,
"query_string": self.query_string,
"filter": self.filter,
"sort_list": sorted_list,
"start": self.start,
"size": size,
"aggs": self.aggs,
"highlight": self.highlight,
"time_zone": self.time_zone,
"time_range": self.time_range,
"time_field": self.time_field,
"use_time_range": self.use_time_range,
"time_field_type": self.time_field_type,
"time_field_unit": self.time_field_unit,
"scroll": None,
"collapse": self.collapse,
},
data_api_retry_cls=DataApiRetryClass.create_retry_obj(
exceptions=[BaseException], stop_max_attempt_number=MAX_EXPORT_REQUEST_RETRY
),
)
return result

def search_after_result(self, search_result, sorted_fields):
Expand Down Expand Up @@ -1376,12 +1435,17 @@ def search_context(self):
if record_obj:
dsl_params_base.update({"storage_cluster_id": record_obj.storage_cluster_id})

if FeatureToggleObject.switch(DIRECT_ESQUERY_SEARCH, self.search_dict.get("bk_biz_id")):
exec_func = self.direct_esquery_dsl
else:
exec_func = BkLogApi.dsl

if self.zero:
# up
body: dict = self._get_context_body("-")
dsl_params_up = copy.deepcopy(dsl_params_base)
dsl_params_up.update({"body": body})
result_up: dict = BkLogApi.dsl(dsl_params_up)
result_up: dict = exec_func(dsl_params_up)
result_up: dict = self._deal_query_result(result_up)
result_up.update(
{
Expand All @@ -1395,7 +1459,7 @@ def search_context(self):

dsl_params_down = copy.deepcopy(dsl_params_base)
dsl_params_down.update({"body": body})
result_down: Dict = BkLogApi.dsl(dsl_params_down)
result_down: Dict = exec_func(dsl_params_down)

result_down: dict = self._deal_query_result(result_down)
result_down.update({"list": result_down.get("list"), "origin_log_list": result_down.get("origin_log_list")})
Expand Down Expand Up @@ -1432,7 +1496,7 @@ def search_context(self):

dsl_params_up = copy.deepcopy(dsl_params_base)
dsl_params_up.update({"body": body})
result_up = BkLogApi.dsl(dsl_params_up)
result_up = exec_func(dsl_params_up)

result_up: dict = self._deal_query_result(result_up)
result_up.update(
Expand All @@ -1453,7 +1517,7 @@ def search_context(self):

dsl_params_down = copy.deepcopy(dsl_params_base)
dsl_params_down.update({"body": body})
result_down = BkLogApi.dsl(dsl_params_down)
result_down = exec_func(dsl_params_down)

result_down = self._deal_query_result(result_down)
result_down.update({"list": result_down.get("list"), "origin_log_list": result_down.get("origin_log_list")})
Expand Down

0 comments on commit 87c68b7

Please sign in to comment.