Skip to content

Commit

Permalink
perf: 主机与主机关系事件处理优化 TencentBlueKing#1145
Browse files Browse the repository at this point in the history
主机缓存更新策略优化;
主机事件监听代码重构。
  • Loading branch information
jsonwan committed Jul 26, 2022
1 parent f83aec2 commit 39f9a02
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ Long countHostInfoBySearchContents(Collection<Long> bizIds, Collection<Long> mod
PageData<ApplicationHostDTO> listHostInfoByPage(ApplicationHostDTO applicationHostInfoCondition,
BaseSearchCondition baseSearchCondition);

int insertAppHostWithoutTopo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO);

int insertAppHostInfo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO);
int insertHostWithoutTopo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO);

int insertOrUpdateHost(DSLContext dslContext, ApplicationHostDTO hostDTO);

Expand Down Expand Up @@ -132,7 +130,7 @@ int updateBizHostInfoByHostId(DSLContext dslContext,

long countHostsByOsType(String osType);

long syncHostTopo(DSLContext dslContext, Long hostId);
int syncHostTopo(DSLContext dslContext, Long hostId);

/**
* 根据ip查询主机
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,24 +429,18 @@ private List<HostTopoDTO> genHostTopoDTOList(ApplicationHostDTO applicationHostD
}

@Override
public int insertAppHostWithoutTopo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO) {
return insertOrUpdateHost(dslContext, applicationHostDTO, false, false);
}

@Override
public int insertAppHostInfo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO) {
return insertOrUpdateHost(dslContext, applicationHostDTO, true, false);
public int insertHostWithoutTopo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO) {
return insertOrUpdateHost(dslContext, applicationHostDTO, false);
}

@Override
public int insertOrUpdateHost(DSLContext dslContext, ApplicationHostDTO hostDTO) {
return insertOrUpdateHost(dslContext, hostDTO, true, true);
return insertOrUpdateHost(dslContext, hostDTO, true);
}

private int insertOrUpdateHost(DSLContext dslContext,
ApplicationHostDTO applicationHostDTO,
Boolean insertTopo,
boolean onConflictUpdate) {
Boolean insertTopo) {
setDefaultValue(applicationHostDTO);
int[] result = new int[]{-1};
String finalSetIdsStr = applicationHostDTO.getSetIdsStr();
Expand Down Expand Up @@ -493,24 +487,20 @@ private int insertOrUpdateHost(DSLContext dslContext,
cloudIp
);
try {
if (onConflictUpdate) {
result[0] = query.onDuplicateKeyUpdate()
.set(TABLE.APP_ID, bizId)
.set(TABLE.IP, ip)
.set(TABLE.IP_DESC, ipDesc)
.set(TABLE.SET_IDS, finalSetIdsStr)
.set(TABLE.MODULE_IDS, finalModuleIdsStr)
.set(TABLE.CLOUD_AREA_ID, cloudAreaId)
.set(TABLE.DISPLAY_IP, displayIp)
.set(TABLE.OS, os)
.set(TABLE.OS_TYPE, osType)
.set(TABLE.MODULE_TYPE, finalModuleTypeStr)
.set(TABLE.IS_AGENT_ALIVE, gseAgentAlive)
.set(TABLE.CLOUD_IP, cloudIp)
.execute();
} else {
result[0] = query.execute();
}
result[0] = query.onDuplicateKeyUpdate()
.set(TABLE.APP_ID, bizId)
.set(TABLE.IP, ip)
.set(TABLE.IP_DESC, ipDesc)
.set(TABLE.SET_IDS, finalSetIdsStr)
.set(TABLE.MODULE_IDS, finalModuleIdsStr)
.set(TABLE.CLOUD_AREA_ID, cloudAreaId)
.set(TABLE.DISPLAY_IP, displayIp)
.set(TABLE.OS, os)
.set(TABLE.OS_TYPE, osType)
.set(TABLE.MODULE_TYPE, finalModuleTypeStr)
.set(TABLE.IS_AGENT_ALIVE, gseAgentAlive)
.set(TABLE.CLOUD_IP, cloudIp)
.execute();
} catch (Throwable t) {
log.info("SQL=" + query.getSQL(ParamType.INLINED));
throw t;
Expand Down Expand Up @@ -899,7 +889,7 @@ public long countHostsByOsType(String osType) {
}

@Override
public long syncHostTopo(DSLContext dslContext, Long hostId) {
public int syncHostTopo(DSLContext dslContext, Long hostId) {
ApplicationHostDTO hostInfoDTO = getHostById(hostId);
if (hostInfoDTO != null) {
List<HostTopoDTO> hostTopoDTOList = hostTopoDAO.listHostTopoByHostId(dslContext, hostId);
Expand All @@ -918,7 +908,7 @@ public long syncHostTopo(DSLContext dslContext, Long hostId) {
hostInfoDTO.setModuleType(moduleTypes);
return updateBizHostInfoByHostId(dslContext, null, hostInfoDTO, false);
}
return -1L;
return -1;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.manage.service.impl.sync;

import com.tencent.bk.job.common.cc.model.req.ResourceWatchReq;
import com.tencent.bk.job.common.cc.model.result.HostEventDetail;
import com.tencent.bk.job.common.cc.model.result.ResourceEvent;
import com.tencent.bk.job.common.constant.JobConstants;
import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient;
import com.tencent.bk.job.common.model.dto.ApplicationHostDTO;
import com.tencent.bk.job.common.util.json.JsonUtils;
import com.tencent.bk.job.manage.dao.ApplicationHostDAO;
import com.tencent.bk.job.manage.manager.host.HostCache;
import com.tencent.bk.job.manage.service.ApplicationService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jooq.DSLContext;

import java.util.concurrent.BlockingQueue;

@Slf4j
public class HostEventsHandler extends EventsHandler<HostEventDetail> {

private final DSLContext dslContext;
private final ApplicationService applicationService;
private final ApplicationHostDAO applicationHostDAO;
private final QueryAgentStatusClient queryAgentStatusClient;
private final HostCache hostCache;

HostEventsHandler(BlockingQueue<ResourceEvent<HostEventDetail>> queue,
DSLContext dslContext,
ApplicationService applicationService,
ApplicationHostDAO applicationHostDAO,
QueryAgentStatusClient queryAgentStatusClient,
HostCache hostCache) {
super(queue);
this.dslContext = dslContext;
this.applicationService = applicationService;
this.applicationHostDAO = applicationHostDAO;
this.queryAgentStatusClient = queryAgentStatusClient;
this.hostCache = hostCache;
}

@Override
void handleEvent(ResourceEvent<HostEventDetail> event) {
handleOneEventRelatedToApp(event);
}

private void handleOneEventRelatedToApp(ResourceEvent<HostEventDetail> event) {
try {
log.info("start to handle event:{}", JsonUtils.toJson(event));
handleOneEventIndeed(event);
} catch (Throwable t) {
log.error(String.format("Fail to handle hostEvent:%s", event), t);
} finally {
log.info("end to handle event");
}
}

private void handleOneEventIndeed(ResourceEvent<HostEventDetail> event) {
String eventType = event.getEventType();
ApplicationHostDTO hostInfoDTO = HostEventDetail.toHostInfoDTO(event.getDetail());
switch (eventType) {
case ResourceWatchReq.EVENT_TYPE_CREATE:
case ResourceWatchReq.EVENT_TYPE_UPDATE:
// 去除没有IP的主机信息
if (StringUtils.isBlank(hostInfoDTO.getDisplayIp())) {
deleteHostWithoutIp(hostInfoDTO);
break;
}
// 找出Agent有效的IP,并设置Agent状态
updateIpAndAgentStatus(hostInfoDTO);
// 更新DB中的主机数据
createOrUpdateHostInDB(hostInfoDTO);
// 更新缓存中的主机数据
updateHostCache(hostInfoDTO);
break;
case ResourceWatchReq.EVENT_TYPE_DELETE:
handleHostDelete(hostInfoDTO);
break;
default:
break;
}
}

private void deleteHostWithoutIp(ApplicationHostDTO hostInfoDTO) {
int affectedRowNum = applicationHostDAO.deleteBizHostInfoById(
dslContext,
null,
hostInfoDTO.getHostId()
);
log.info(
"{} host deleted, id={} ,ip={}",
affectedRowNum,
hostInfoDTO.getHostId(),
hostInfoDTO.getIp()
);
}

private void updateIpAndAgentStatus(ApplicationHostDTO hostInfoDTO) {
Long cloudAreaId = hostInfoDTO.getCloudAreaId();
String ip = queryAgentStatusClient.getHostIpByAgentStatus(hostInfoDTO.getDisplayIp(), cloudAreaId);
hostInfoDTO.setIp(ip);
if (!ip.contains(":")) {
String cloudIp = cloudAreaId + ":" + ip;
hostInfoDTO.setGseAgentAlive(queryAgentStatusClient.getAgentStatus(cloudIp).status == 1);
} else {
hostInfoDTO.setGseAgentAlive(queryAgentStatusClient.getAgentStatus(ip).status == 1);
}
}

private void createOrUpdateHostInDB(ApplicationHostDTO hostInfoDTO) {
try {
if (applicationHostDAO.existAppHostInfoByHostId(dslContext, hostInfoDTO.getHostId())) {
// 只更新事件中的主机属性与agent状态
applicationHostDAO.updateHostAttrsById(dslContext, hostInfoDTO);
} else {
hostInfoDTO.setBizId(JobConstants.PUBLIC_APP_ID);
int affectedNum = applicationHostDAO.insertHostWithoutTopo(dslContext, hostInfoDTO);
log.info("insert host: id={}, affectedNum={}", hostInfoDTO.getHostId(), affectedNum);
}
} catch (Throwable t) {
log.error("handle host event fail", t);
} finally {
// 从拓扑表向主机表同步拓扑数据
int affectedNum = applicationHostDAO.syncHostTopo(dslContext, hostInfoDTO.getHostId());
log.info("hostTopo synced: hostId={}, affectedNum={}", hostInfoDTO.getHostId(), affectedNum);
}
}

private void updateHostCache(ApplicationHostDTO hostInfoDTO) {
hostInfoDTO = applicationHostDAO.getHostById(hostInfoDTO.getHostId());
if (hostInfoDTO.getBizId() != null && hostInfoDTO.getBizId() > 0) {
// 只更新常规业务的主机到缓存
if (applicationService.existBiz(hostInfoDTO.getBizId())) {
hostCache.addOrUpdateHost(hostInfoDTO);
log.info("host cache updated: hostId:{}", hostInfoDTO.getHostId());
}
}
}

private void handleHostDelete(ApplicationHostDTO hostInfoDTO) {
int affectedRowNum = applicationHostDAO.deleteBizHostInfoById(
dslContext,
null,
hostInfoDTO.getHostId()
);
log.info(
"{} host deleted, id={} ,ip={}",
affectedRowNum,
hostInfoDTO.getHostId(),
hostInfoDTO.getIp()
);
hostCache.deleteHost(hostInfoDTO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private void handleOneEvent(ResourceEvent<HostRelationEventDetail> event) {
*/
private void updateTopoToHost(HostTopoDTO hostTopoDTO) {
// 若主机存在需将拓扑信息同步至主机信息冗余字段
long affectedNum = applicationHostDAO.syncHostTopo(dslContext, hostTopoDTO.getHostId());
int affectedNum = applicationHostDAO.syncHostTopo(dslContext, hostTopoDTO.getHostId());
if (affectedNum == 0) {
log.info("no host topo synced");
} else if (affectedNum < 0) {
Expand Down
Loading

0 comments on commit 39f9a02

Please sign in to comment.