diff --git a/bkmonitor/metadata/migrations/0189_auto_20240909_1134.py b/bkmonitor/metadata/migrations/0189_auto_20240909_1134.py new file mode 100644 index 000000000..91ade2f80 --- /dev/null +++ b/bkmonitor/metadata/migrations/0189_auto_20240909_1134.py @@ -0,0 +1,26 @@ +# Generated by Django 3.2.15 on 2024-09-09 03:34 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ('metadata', '0188_esstorage_need_create_index'), + ] + + operations = [ + migrations.RenameField( + model_name='bcsfederalclusterinfo', + old_name='fed_buildin_event_table_id', + new_name='fed_builtin_metric_table_id', + ), + migrations.RemoveField( + model_name='bcsfederalclusterinfo', + name='fed_buildin_metric_table_id', + ), + migrations.AddField( + model_name='bcsfederalclusterinfo', + name='fed_builtin_event_table_id', + field=models.CharField(blank=True, max_length=128, null=True, verbose_name='内置事件结果表'), + ), + ] diff --git a/bkmonitor/metadata/models/__init__.py b/bkmonitor/metadata/models/__init__.py index 14972ff18..3c5b8eb99 100644 --- a/bkmonitor/metadata/models/__init__.py +++ b/bkmonitor/metadata/models/__init__.py @@ -9,7 +9,13 @@ specific language governing permissions and limitations under the License. """ -from .bcs import BCSClusterInfo, PodMonitorInfo, ReplaceConfig, ServiceMonitorInfo +from .bcs import ( + BCSClusterInfo, + BcsFederalClusterInfo, + PodMonitorInfo, + ReplaceConfig, + ServiceMonitorInfo, +) from .common import Label from .custom_report import ( CustomReportSubscription, @@ -119,6 +125,7 @@ "Label", # bcs "BCSClusterInfo", + "BcsFederalClusterInfo", "ServiceMonitorInfo", "PodMonitorInfo", "ReplaceConfig", diff --git a/bkmonitor/metadata/models/bcs/cluster.py b/bkmonitor/metadata/models/bcs/cluster.py index c8b0278cf..631d7b138 100644 --- a/bkmonitor/metadata/models/bcs/cluster.py +++ b/bkmonitor/metadata/models/bcs/cluster.py @@ -211,8 +211,6 @@ def register_cluster( # 3. 注册6个必要的data_id和自定义事件及自定义时序上报内容 for usage, register_info in cluster.DATASOURCE_REGISTER_INFO.items(): - if is_fed_cluster and usage != cls.DATA_TYPE_CUSTOM_METRIC: - continue # 注册data_id data_source = cluster.create_datasource( usage=usage, @@ -481,8 +479,8 @@ class BcsFederalClusterInfo(common.BaseModelWithTime): host_cluster_id = models.CharField("HOST 集群 ID", max_length=32) sub_cluster_id = models.CharField("子集群 ID", max_length=32) fed_namespaces = JsonField("命名空间列表", default=[]) - fed_buildin_metric_table_id = models.CharField("内置指标结果表", max_length=128, null=True, blank=True) - fed_buildin_event_table_id = models.CharField("内置指标结果表", max_length=128, null=True, blank=True) + fed_builtin_metric_table_id = models.CharField("内置指标结果表", max_length=128, null=True, blank=True) + fed_builtin_event_table_id = models.CharField("内置事件结果表", max_length=128, null=True, blank=True) class Meta: verbose_name = "BCS联邦集群拓扑信息" diff --git a/bkmonitor/metadata/models/data_link/service.py b/bkmonitor/metadata/models/data_link/service.py index 97bfd6716..620a80042 100644 --- a/bkmonitor/metadata/models/data_link/service.py +++ b/bkmonitor/metadata/models/data_link/service.py @@ -167,12 +167,15 @@ def create_fed_vm_data_link( # 判断是否为联邦集群的子集群,如果不是,则直接返回 objs = BcsFederalClusterInfo.objects.filter(sub_cluster_id=bcs_cluster_id) if not (objs.exists() and utils.is_k8s_metric_data_id(data_name)): - logger.info("not federal sub cluster or not buildin datasource") + logger.info("not federal sub cluster or not builtin datasource") return # 获取数据源名称对应的资源,便于后续组装整个链路 logger.info( - "create vm data link for table_id: %s, data_name: %s, vm_cluster_name: %s", table_id, data_name, vm_cluster_name + "create_fed_vm_data_link for table_id: %s, data_name: %s, vm_cluster_name: %s", + table_id, + data_name, + vm_cluster_name, ) data_id_name = utils.get_bkdata_data_id_name(data_name) @@ -182,26 +185,30 @@ def create_fed_vm_data_link( # 下发资源 config_list, data_links, conditions = [], [], [] for obj in objs: - buildin_name = utils.get_bkdata_table_id(obj.fed_buildin_table_id) + builtin_name = utils.get_bkdata_table_id(obj.fed_builtin_metric_table_id) match_labels = [{"name": "namespace", "value": ns} for ns in obj.fed_namespaces] relabels = [{"name": "bcs_cluster_id", "value": obj.fed_cluster_id}] sinks = [ - {"kind": "VmStorageBinding", "name": buildin_name, "namespace": settings.DEFAULT_VM_DATA_LINK_NAMESPACE} + {"kind": "VmStorageBinding", "name": builtin_name, "namespace": settings.DEFAULT_VM_DATA_LINK_NAMESPACE} ] conditions.append({"match_labels": match_labels, "relabels": relabels, "sinks": sinks}) - + logger.info( + "composed datalink config,name->{},builtin_name->{},match_labels->{},relabels->{},sinks->{}".format( + name, builtin_name, match_labels, relabels, sinks + ) + ) # 添加rt及和存储的关联 - rt_config = DataLinkResourceConfig.compose_vm_table_id_config(buildin_name) + rt_config = DataLinkResourceConfig.compose_vm_table_id_config(builtin_name) vm_storage_binding_config = DataLinkResourceConfig.compose_vm_storage_binding( - buildin_name, buildin_name, vm_cluster_name + builtin_name, builtin_name, vm_cluster_name ) config_list.extend([rt_config, vm_storage_binding_config]) data_links.append( { - "raw_rt_id": obj.fed_buildin_table_id, - "rt_name": buildin_name, + "raw_rt_id": obj.fed_builtin_metric_table_id, + "rt_name": builtin_name, "rt_config": rt_config, - "vm_storage_binding_name": buildin_name, + "vm_storage_binding_name": builtin_name, "vm_storage_binding_config": vm_storage_binding_config, } ) @@ -217,7 +224,12 @@ def create_fed_vm_data_link( config_list.extend([vm_conditional_sink_config, vm_data_bus_config]) # 下发资源 data = {"config": config_list} - api.bkdata.apply_data_link(data) + try: + logger.info("create_fed_vm_data_link start to apply data link") + api.bkdata.apply_data_link(data) + except Exception as e: + logger.error("create_fed_vm_data_link apply data link error: %s", e) + return # 根据存储创建多条记录 records, vm_records = [], [] # 如果 vm 集群ID不存在,则通过集群名称获取 @@ -282,12 +294,12 @@ def create_fed_vm_data_link( ) DataLinkResourceConfig.objects.bulk_create(records) # 创建 vm 记录 - from metadata.models import AccessVMRecord, ClusterInfo + from metadata.models import AccessVMRecord AccessVMRecord.objects.bulk_create(vm_records) logger.info( - "create vm data link for table_id: %s, data_name: %s, vm_cluster_name: %s success", + "create_fed_vm_data_link for table_id: %s, data_name: %s, vm_cluster_name: %s success", table_id, data_name, vm_cluster_name, diff --git a/bkmonitor/metadata/task/bcs.py b/bkmonitor/metadata/task/bcs.py index e4719460a..a51eca3e0 100644 --- a/bkmonitor/metadata/task/bcs.py +++ b/bkmonitor/metadata/task/bcs.py @@ -35,6 +35,7 @@ @share_lock(ttl=PERIODIC_TASK_DEFAULT_TTL, identify="metadata_refreshBCSMonitorInfo") def refresh_bcs_monitor_info(): + fed_clusters = {} try: fed_clusters = api.bcs.get_federation_clusters() fed_cluster_id_list = list(fed_clusters.keys()) @@ -42,10 +43,17 @@ def refresh_bcs_monitor_info(): fed_cluster_id_list = [] logger.error("get federation clusters failed: {}".format(e)) + bcs_clusters = list( + BCSClusterInfo.objects.filter( + status__in=[models.BCSClusterInfo.CLUSTER_STATUS_RUNNING, models.BCSClusterInfo.CLUSTER_RAW_STATUS_RUNNING], + ) + ) + + # 对 bcs_clusters 进行排序,确保 fed_cluster_id_list 中的集群优先 + bcs_clusters = sorted(bcs_clusters, key=lambda x: x.cluster_id not in fed_cluster_id_list) + # 拉取所有cluster,遍历刷新monitorinfo信息 - for cluster in BCSClusterInfo.objects.filter( - status__in=[models.BCSClusterInfo.CLUSTER_STATUS_RUNNING, models.BCSClusterInfo.CLUSTER_RAW_STATUS_RUNNING], - ): + for cluster in bcs_clusters: try: is_fed_cluster = cluster.cluster_id in fed_cluster_id_list # 刷新集群内置公共dataid resource @@ -63,6 +71,13 @@ def refresh_bcs_monitor_info(): logger.debug("refresh bcs service monitor custom resource in cluster:{} done".format(cluster.cluster_id)) PodMonitorInfo.refresh_custom_resource(cluster_id=cluster.cluster_id) logger.debug("refresh bcs pod monitor custom resource in cluster:{} done".format(cluster.cluster_id)) + if is_fed_cluster: + # 更新联邦集群记录 + try: + sync_federation_clusters(fed_clusters) + except Exception as e: # pylint: disable=broad-except + logger.error("sync_federation_clusters failed, error:{}".format(e)) + except Exception: # noqa logger.exception("refresh bcs monitor info failed, cluster_id(%s)", cluster.cluster_id) @@ -142,6 +157,7 @@ def discover_bcs_clusters(): return cluster_list = [] # 获取所有联邦集群 ID + fed_clusters = {} try: fed_clusters = api.bcs.get_federation_clusters() fed_cluster_id_list = list(fed_clusters.keys()) @@ -149,6 +165,9 @@ def discover_bcs_clusters(): fed_cluster_id_list = [] logger.error("get federation clusters failed, error:{}".format(e)) + # 联邦集群顺序调整到前面,因为创建链路时依赖联邦关系记录 + bcs_clusters = sorted(bcs_clusters, key=lambda x: x["cluster_id"] not in fed_cluster_id_list) + # bcs 集群中的正常状态 for bcs_cluster in bcs_clusters: logger.info("get bcs cluster:{},start to register".format(bcs_cluster["cluster_id"])) @@ -193,6 +212,12 @@ def discover_bcs_clusters(): creator="admin", is_fed_cluster=is_fed_cluster, ) + if is_fed_cluster: + # 创建联邦集群记录 + try: + sync_federation_clusters(fed_clusters) + except Exception as e: # pylint: disable=broad-except + logger.error("sync_federation_clusters failed, error:{}".format(e)) logger.info( "cluster_id:{},project_id:{},bk_biz_id:{} registered".format( cluster.cluster_id, cluster.project_id, cluster.bk_biz_id @@ -323,3 +348,42 @@ def update_bcs_cluster_cloud_id_config(bk_biz_id=None, cluster_id=None): # 更新云区域 for bk_cloud_id, bcs_cluster_ids in update_params.items(): BCSClusterInfo.objects.filter(cluster_id__in=bcs_cluster_ids).update(bk_cloud_id=bk_cloud_id) + + +def sync_federation_clusters(fed_clusters): + """ + 同步联邦集群信息,创建对应数据记录 + """ + logger.info("sync_federation_clusters started insert to db") + try: + fed_cluster_id_list = list(fed_clusters.keys()) + + for fed_cluster_id in fed_cluster_id_list: + logger.info("Syncing federation cluster->{}".format(fed_cluster_id)) + host_cluster_id = fed_clusters[fed_cluster_id]['host_cluster_id'] + sub_clusters = fed_clusters[fed_cluster_id]['sub_clusters'] + + # 获取代理集群的对应RT + cluster = models.BCSClusterInfo.objects.get(cluster_id=fed_cluster_id) + fed_builtin_k8s_metric_data_id = cluster.K8sMetricDataID + fed_builtin_k8s_event_data_id = cluster.K8sEventDataID + fed_builtin_metric_table_id = models.DataSourceResultTable.objects.get( + bk_data_id=fed_builtin_k8s_metric_data_id + ).table_id + fed_builtin_event_table_id = models.DataSourceResultTable.objects.get( + bk_data_id=fed_builtin_k8s_event_data_id + ).table_id + + for sub_cluster_id, namespaces in sub_clusters.items(): + # 同步至DB + models.BcsFederalClusterInfo.objects.update_or_create( + fed_cluster_id=fed_cluster_id, + host_cluster_id=host_cluster_id, + sub_cluster_id=sub_cluster_id, + fed_namespaces=namespaces, # 直接存储完整的命名空间列表 + fed_builtin_metric_table_id=fed_builtin_metric_table_id, + fed_builtin_event_table_id=fed_builtin_event_table_id, + ) + logger.info("sync_federation_clusters run successfully.") + except Exception as e: # pylint: disable=broad-except + logger.error("sync_federation_clusters failed, error: {}".format(e))