Skip to content

Commit

Permalink
feat: 联邦集群调整,联邦数据表逻辑变更 --story=119592194 (#2867)
Browse files Browse the repository at this point in the history
  • Loading branch information
EASYGOING45 authored Sep 9, 2024
1 parent c6f64ea commit a3e138c
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 21 deletions.
26 changes: 26 additions & 0 deletions bkmonitor/metadata/migrations/0189_auto_20240909_1134.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 3.2.15 on 2024-09-09 03:34

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
('metadata', '0188_esstorage_need_create_index'),
]

operations = [
migrations.RenameField(
model_name='bcsfederalclusterinfo',
old_name='fed_buildin_event_table_id',
new_name='fed_builtin_metric_table_id',
),
migrations.RemoveField(
model_name='bcsfederalclusterinfo',
name='fed_buildin_metric_table_id',
),
migrations.AddField(
model_name='bcsfederalclusterinfo',
name='fed_builtin_event_table_id',
field=models.CharField(blank=True, max_length=128, null=True, verbose_name='内置事件结果表'),
),
]
9 changes: 8 additions & 1 deletion bkmonitor/metadata/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
specific language governing permissions and limitations under the License.
"""

from .bcs import BCSClusterInfo, PodMonitorInfo, ReplaceConfig, ServiceMonitorInfo
from .bcs import (
BCSClusterInfo,
BcsFederalClusterInfo,
PodMonitorInfo,
ReplaceConfig,
ServiceMonitorInfo,
)
from .common import Label
from .custom_report import (
CustomReportSubscription,
Expand Down Expand Up @@ -119,6 +125,7 @@
"Label",
# bcs
"BCSClusterInfo",
"BcsFederalClusterInfo",
"ServiceMonitorInfo",
"PodMonitorInfo",
"ReplaceConfig",
Expand Down
6 changes: 2 additions & 4 deletions bkmonitor/metadata/models/bcs/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ def register_cluster(

# 3. 注册6个必要的data_id和自定义事件及自定义时序上报内容
for usage, register_info in cluster.DATASOURCE_REGISTER_INFO.items():
if is_fed_cluster and usage != cls.DATA_TYPE_CUSTOM_METRIC:
continue
# 注册data_id
data_source = cluster.create_datasource(
usage=usage,
Expand Down Expand Up @@ -481,8 +479,8 @@ class BcsFederalClusterInfo(common.BaseModelWithTime):
host_cluster_id = models.CharField("HOST 集群 ID", max_length=32)
sub_cluster_id = models.CharField("子集群 ID", max_length=32)
fed_namespaces = JsonField("命名空间列表", default=[])
fed_buildin_metric_table_id = models.CharField("内置指标结果表", max_length=128, null=True, blank=True)
fed_buildin_event_table_id = models.CharField("内置指标结果表", max_length=128, null=True, blank=True)
fed_builtin_metric_table_id = models.CharField("内置指标结果表", max_length=128, null=True, blank=True)
fed_builtin_event_table_id = models.CharField("内置事件结果表", max_length=128, null=True, blank=True)

class Meta:
verbose_name = "BCS联邦集群拓扑信息"
Expand Down
38 changes: 25 additions & 13 deletions bkmonitor/metadata/models/data_link/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,15 @@ def create_fed_vm_data_link(
# 判断是否为联邦集群的子集群,如果不是,则直接返回
objs = BcsFederalClusterInfo.objects.filter(sub_cluster_id=bcs_cluster_id)
if not (objs.exists() and utils.is_k8s_metric_data_id(data_name)):
logger.info("not federal sub cluster or not buildin datasource")
logger.info("not federal sub cluster or not builtin datasource")
return

# 获取数据源名称对应的资源,便于后续组装整个链路
logger.info(
"create vm data link for table_id: %s, data_name: %s, vm_cluster_name: %s", table_id, data_name, vm_cluster_name
"create_fed_vm_data_link for table_id: %s, data_name: %s, vm_cluster_name: %s",
table_id,
data_name,
vm_cluster_name,
)

data_id_name = utils.get_bkdata_data_id_name(data_name)
Expand All @@ -182,26 +185,30 @@ def create_fed_vm_data_link(
# 下发资源
config_list, data_links, conditions = [], [], []
for obj in objs:
buildin_name = utils.get_bkdata_table_id(obj.fed_buildin_table_id)
builtin_name = utils.get_bkdata_table_id(obj.fed_builtin_metric_table_id)
match_labels = [{"name": "namespace", "value": ns} for ns in obj.fed_namespaces]
relabels = [{"name": "bcs_cluster_id", "value": obj.fed_cluster_id}]
sinks = [
{"kind": "VmStorageBinding", "name": buildin_name, "namespace": settings.DEFAULT_VM_DATA_LINK_NAMESPACE}
{"kind": "VmStorageBinding", "name": builtin_name, "namespace": settings.DEFAULT_VM_DATA_LINK_NAMESPACE}
]
conditions.append({"match_labels": match_labels, "relabels": relabels, "sinks": sinks})

logger.info(
"composed datalink config,name->{},builtin_name->{},match_labels->{},relabels->{},sinks->{}".format(
name, builtin_name, match_labels, relabels, sinks
)
)
# 添加rt及和存储的关联
rt_config = DataLinkResourceConfig.compose_vm_table_id_config(buildin_name)
rt_config = DataLinkResourceConfig.compose_vm_table_id_config(builtin_name)
vm_storage_binding_config = DataLinkResourceConfig.compose_vm_storage_binding(
buildin_name, buildin_name, vm_cluster_name
builtin_name, builtin_name, vm_cluster_name
)
config_list.extend([rt_config, vm_storage_binding_config])
data_links.append(
{
"raw_rt_id": obj.fed_buildin_table_id,
"rt_name": buildin_name,
"raw_rt_id": obj.fed_builtin_metric_table_id,
"rt_name": builtin_name,
"rt_config": rt_config,
"vm_storage_binding_name": buildin_name,
"vm_storage_binding_name": builtin_name,
"vm_storage_binding_config": vm_storage_binding_config,
}
)
Expand All @@ -217,7 +224,12 @@ def create_fed_vm_data_link(
config_list.extend([vm_conditional_sink_config, vm_data_bus_config])
# 下发资源
data = {"config": config_list}
api.bkdata.apply_data_link(data)
try:
logger.info("create_fed_vm_data_link start to apply data link")
api.bkdata.apply_data_link(data)
except Exception as e:
logger.error("create_fed_vm_data_link apply data link error: %s", e)
return
# 根据存储创建多条记录
records, vm_records = [], []
# 如果 vm 集群ID不存在,则通过集群名称获取
Expand Down Expand Up @@ -282,12 +294,12 @@ def create_fed_vm_data_link(
)
DataLinkResourceConfig.objects.bulk_create(records)
# 创建 vm 记录
from metadata.models import AccessVMRecord, ClusterInfo
from metadata.models import AccessVMRecord

AccessVMRecord.objects.bulk_create(vm_records)

logger.info(
"create vm data link for table_id: %s, data_name: %s, vm_cluster_name: %s success",
"create_fed_vm_data_link for table_id: %s, data_name: %s, vm_cluster_name: %s success",
table_id,
data_name,
vm_cluster_name,
Expand Down
70 changes: 67 additions & 3 deletions bkmonitor/metadata/task/bcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,25 @@

@share_lock(ttl=PERIODIC_TASK_DEFAULT_TTL, identify="metadata_refreshBCSMonitorInfo")
def refresh_bcs_monitor_info():
fed_clusters = {}
try:
fed_clusters = api.bcs.get_federation_clusters()
fed_cluster_id_list = list(fed_clusters.keys())
except Exception as e: # pylint: disable=broad-except
fed_cluster_id_list = []
logger.error("get federation clusters failed: {}".format(e))

bcs_clusters = list(
BCSClusterInfo.objects.filter(
status__in=[models.BCSClusterInfo.CLUSTER_STATUS_RUNNING, models.BCSClusterInfo.CLUSTER_RAW_STATUS_RUNNING],
)
)

# 对 bcs_clusters 进行排序,确保 fed_cluster_id_list 中的集群优先
bcs_clusters = sorted(bcs_clusters, key=lambda x: x.cluster_id not in fed_cluster_id_list)

# 拉取所有cluster,遍历刷新monitorinfo信息
for cluster in BCSClusterInfo.objects.filter(
status__in=[models.BCSClusterInfo.CLUSTER_STATUS_RUNNING, models.BCSClusterInfo.CLUSTER_RAW_STATUS_RUNNING],
):
for cluster in bcs_clusters:
try:
is_fed_cluster = cluster.cluster_id in fed_cluster_id_list
# 刷新集群内置公共dataid resource
Expand All @@ -63,6 +71,13 @@ def refresh_bcs_monitor_info():
logger.debug("refresh bcs service monitor custom resource in cluster:{} done".format(cluster.cluster_id))
PodMonitorInfo.refresh_custom_resource(cluster_id=cluster.cluster_id)
logger.debug("refresh bcs pod monitor custom resource in cluster:{} done".format(cluster.cluster_id))
if is_fed_cluster:
# 更新联邦集群记录
try:
sync_federation_clusters(fed_clusters)
except Exception as e: # pylint: disable=broad-except
logger.error("sync_federation_clusters failed, error:{}".format(e))

except Exception: # noqa
logger.exception("refresh bcs monitor info failed, cluster_id(%s)", cluster.cluster_id)

Expand Down Expand Up @@ -142,13 +157,17 @@ def discover_bcs_clusters():
return
cluster_list = []
# 获取所有联邦集群 ID
fed_clusters = {}
try:
fed_clusters = api.bcs.get_federation_clusters()
fed_cluster_id_list = list(fed_clusters.keys())
except Exception as e: # pylint: disable=broad-except
fed_cluster_id_list = []
logger.error("get federation clusters failed, error:{}".format(e))

# 联邦集群顺序调整到前面,因为创建链路时依赖联邦关系记录
bcs_clusters = sorted(bcs_clusters, key=lambda x: x["cluster_id"] not in fed_cluster_id_list)

# bcs 集群中的正常状态
for bcs_cluster in bcs_clusters:
logger.info("get bcs cluster:{},start to register".format(bcs_cluster["cluster_id"]))
Expand Down Expand Up @@ -193,6 +212,12 @@ def discover_bcs_clusters():
creator="admin",
is_fed_cluster=is_fed_cluster,
)
if is_fed_cluster:
# 创建联邦集群记录
try:
sync_federation_clusters(fed_clusters)
except Exception as e: # pylint: disable=broad-except
logger.error("sync_federation_clusters failed, error:{}".format(e))
logger.info(
"cluster_id:{},project_id:{},bk_biz_id:{} registered".format(
cluster.cluster_id, cluster.project_id, cluster.bk_biz_id
Expand Down Expand Up @@ -323,3 +348,42 @@ def update_bcs_cluster_cloud_id_config(bk_biz_id=None, cluster_id=None):
# 更新云区域
for bk_cloud_id, bcs_cluster_ids in update_params.items():
BCSClusterInfo.objects.filter(cluster_id__in=bcs_cluster_ids).update(bk_cloud_id=bk_cloud_id)


def sync_federation_clusters(fed_clusters):
"""
同步联邦集群信息,创建对应数据记录
"""
logger.info("sync_federation_clusters started insert to db")
try:
fed_cluster_id_list = list(fed_clusters.keys())

for fed_cluster_id in fed_cluster_id_list:
logger.info("Syncing federation cluster->{}".format(fed_cluster_id))
host_cluster_id = fed_clusters[fed_cluster_id]['host_cluster_id']
sub_clusters = fed_clusters[fed_cluster_id]['sub_clusters']

# 获取代理集群的对应RT
cluster = models.BCSClusterInfo.objects.get(cluster_id=fed_cluster_id)
fed_builtin_k8s_metric_data_id = cluster.K8sMetricDataID
fed_builtin_k8s_event_data_id = cluster.K8sEventDataID
fed_builtin_metric_table_id = models.DataSourceResultTable.objects.get(
bk_data_id=fed_builtin_k8s_metric_data_id
).table_id
fed_builtin_event_table_id = models.DataSourceResultTable.objects.get(
bk_data_id=fed_builtin_k8s_event_data_id
).table_id

for sub_cluster_id, namespaces in sub_clusters.items():
# 同步至DB
models.BcsFederalClusterInfo.objects.update_or_create(
fed_cluster_id=fed_cluster_id,
host_cluster_id=host_cluster_id,
sub_cluster_id=sub_cluster_id,
fed_namespaces=namespaces, # 直接存储完整的命名空间列表
fed_builtin_metric_table_id=fed_builtin_metric_table_id,
fed_builtin_event_table_id=fed_builtin_event_table_id,
)
logger.info("sync_federation_clusters run successfully.")
except Exception as e: # pylint: disable=broad-except
logger.error("sync_federation_clusters failed, error: {}".format(e))

0 comments on commit a3e138c

Please sign in to comment.