diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/ApplicationHostDAO.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/ApplicationHostDAO.java index 7ff1e7e608..a44d818696 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/ApplicationHostDAO.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/ApplicationHostDAO.java @@ -77,9 +77,7 @@ Long countHostInfoBySearchContents(Collection bizIds, Collection mod PageData listHostInfoByPage(ApplicationHostDTO applicationHostInfoCondition, BaseSearchCondition baseSearchCondition); - int insertAppHostWithoutTopo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO); - - int insertAppHostInfo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO); + int insertHostWithoutTopo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO); int insertOrUpdateHost(DSLContext dslContext, ApplicationHostDTO hostDTO); @@ -132,7 +130,7 @@ int updateBizHostInfoByHostId(DSLContext dslContext, long countHostsByOsType(String osType); - long syncHostTopo(DSLContext dslContext, Long hostId); + int syncHostTopo(DSLContext dslContext, Long hostId); /** * 根据ip查询主机 diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/ApplicationHostDAOImpl.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/ApplicationHostDAOImpl.java index 9b04233660..9fe8ed1a5f 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/ApplicationHostDAOImpl.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/dao/impl/ApplicationHostDAOImpl.java @@ -429,24 +429,18 @@ private List genHostTopoDTOList(ApplicationHostDTO applicationHostD } @Override - public int insertAppHostWithoutTopo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO) { - return insertOrUpdateHost(dslContext, applicationHostDTO, false, false); - } - - @Override - public int insertAppHostInfo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO) { - return insertOrUpdateHost(dslContext, applicationHostDTO, true, false); + public int insertHostWithoutTopo(DSLContext dslContext, ApplicationHostDTO applicationHostDTO) { + return insertOrUpdateHost(dslContext, applicationHostDTO, false); } @Override public int insertOrUpdateHost(DSLContext dslContext, ApplicationHostDTO hostDTO) { - return insertOrUpdateHost(dslContext, hostDTO, true, true); + return insertOrUpdateHost(dslContext, hostDTO, true); } private int insertOrUpdateHost(DSLContext dslContext, ApplicationHostDTO applicationHostDTO, - Boolean insertTopo, - boolean onConflictUpdate) { + Boolean insertTopo) { setDefaultValue(applicationHostDTO); int[] result = new int[]{-1}; String finalSetIdsStr = applicationHostDTO.getSetIdsStr(); @@ -493,24 +487,20 @@ private int insertOrUpdateHost(DSLContext dslContext, cloudIp ); try { - if (onConflictUpdate) { - result[0] = query.onDuplicateKeyUpdate() - .set(TABLE.APP_ID, bizId) - .set(TABLE.IP, ip) - .set(TABLE.IP_DESC, ipDesc) - .set(TABLE.SET_IDS, finalSetIdsStr) - .set(TABLE.MODULE_IDS, finalModuleIdsStr) - .set(TABLE.CLOUD_AREA_ID, cloudAreaId) - .set(TABLE.DISPLAY_IP, displayIp) - .set(TABLE.OS, os) - .set(TABLE.OS_TYPE, osType) - .set(TABLE.MODULE_TYPE, finalModuleTypeStr) - .set(TABLE.IS_AGENT_ALIVE, gseAgentAlive) - .set(TABLE.CLOUD_IP, cloudIp) - .execute(); - } else { - result[0] = query.execute(); - } + result[0] = query.onDuplicateKeyUpdate() + .set(TABLE.APP_ID, bizId) + .set(TABLE.IP, ip) + .set(TABLE.IP_DESC, ipDesc) + .set(TABLE.SET_IDS, finalSetIdsStr) + .set(TABLE.MODULE_IDS, finalModuleIdsStr) + .set(TABLE.CLOUD_AREA_ID, cloudAreaId) + .set(TABLE.DISPLAY_IP, displayIp) + .set(TABLE.OS, os) + .set(TABLE.OS_TYPE, osType) + .set(TABLE.MODULE_TYPE, finalModuleTypeStr) + .set(TABLE.IS_AGENT_ALIVE, gseAgentAlive) + .set(TABLE.CLOUD_IP, cloudIp) + .execute(); } catch (Throwable t) { log.info("SQL=" + query.getSQL(ParamType.INLINED)); throw t; @@ -899,7 +889,7 @@ public long countHostsByOsType(String osType) { } @Override - public long syncHostTopo(DSLContext dslContext, Long hostId) { + public int syncHostTopo(DSLContext dslContext, Long hostId) { ApplicationHostDTO hostInfoDTO = getHostById(hostId); if (hostInfoDTO != null) { List hostTopoDTOList = hostTopoDAO.listHostTopoByHostId(dslContext, hostId); @@ -918,7 +908,7 @@ public long syncHostTopo(DSLContext dslContext, Long hostId) { hostInfoDTO.setModuleType(moduleTypes); return updateBizHostInfoByHostId(dslContext, null, hostInfoDTO, false); } - return -1L; + return -1; } /** diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventsHandler.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventsHandler.java new file mode 100644 index 0000000000..a2959d8b57 --- /dev/null +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostEventsHandler.java @@ -0,0 +1,178 @@ +/* + * Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available. + * + * Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-JOB蓝鲸智云作业平台 is licensed under the MIT License. + * + * License for BK-JOB蓝鲸智云作业平台: + * -------------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and + * to permit persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of + * the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO + * THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF + * CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +package com.tencent.bk.job.manage.service.impl.sync; + +import com.tencent.bk.job.common.cc.model.req.ResourceWatchReq; +import com.tencent.bk.job.common.cc.model.result.HostEventDetail; +import com.tencent.bk.job.common.cc.model.result.ResourceEvent; +import com.tencent.bk.job.common.constant.JobConstants; +import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient; +import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; +import com.tencent.bk.job.common.util.json.JsonUtils; +import com.tencent.bk.job.manage.dao.ApplicationHostDAO; +import com.tencent.bk.job.manage.manager.host.HostCache; +import com.tencent.bk.job.manage.service.ApplicationService; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.jooq.DSLContext; + +import java.util.concurrent.BlockingQueue; + +@Slf4j +public class HostEventsHandler extends EventsHandler { + + private final DSLContext dslContext; + private final ApplicationService applicationService; + private final ApplicationHostDAO applicationHostDAO; + private final QueryAgentStatusClient queryAgentStatusClient; + private final HostCache hostCache; + + HostEventsHandler(BlockingQueue> queue, + DSLContext dslContext, + ApplicationService applicationService, + ApplicationHostDAO applicationHostDAO, + QueryAgentStatusClient queryAgentStatusClient, + HostCache hostCache) { + super(queue); + this.dslContext = dslContext; + this.applicationService = applicationService; + this.applicationHostDAO = applicationHostDAO; + this.queryAgentStatusClient = queryAgentStatusClient; + this.hostCache = hostCache; + } + + @Override + void handleEvent(ResourceEvent event) { + handleOneEventRelatedToApp(event); + } + + private void handleOneEventRelatedToApp(ResourceEvent event) { + try { + log.info("start to handle event:{}", JsonUtils.toJson(event)); + handleOneEventIndeed(event); + } catch (Throwable t) { + log.error(String.format("Fail to handle hostEvent:%s", event), t); + } finally { + log.info("end to handle event"); + } + } + + private void handleOneEventIndeed(ResourceEvent event) { + String eventType = event.getEventType(); + ApplicationHostDTO hostInfoDTO = HostEventDetail.toHostInfoDTO(event.getDetail()); + switch (eventType) { + case ResourceWatchReq.EVENT_TYPE_CREATE: + case ResourceWatchReq.EVENT_TYPE_UPDATE: + // 去除没有IP的主机信息 + if (StringUtils.isBlank(hostInfoDTO.getDisplayIp())) { + deleteHostWithoutIp(hostInfoDTO); + break; + } + // 找出Agent有效的IP,并设置Agent状态 + updateIpAndAgentStatus(hostInfoDTO); + // 更新DB中的主机数据 + createOrUpdateHostInDB(hostInfoDTO); + // 更新缓存中的主机数据 + updateHostCache(hostInfoDTO); + break; + case ResourceWatchReq.EVENT_TYPE_DELETE: + handleHostDelete(hostInfoDTO); + break; + default: + break; + } + } + + private void deleteHostWithoutIp(ApplicationHostDTO hostInfoDTO) { + int affectedRowNum = applicationHostDAO.deleteBizHostInfoById( + dslContext, + null, + hostInfoDTO.getHostId() + ); + log.info( + "{} host deleted, id={} ,ip={}", + affectedRowNum, + hostInfoDTO.getHostId(), + hostInfoDTO.getIp() + ); + } + + private void updateIpAndAgentStatus(ApplicationHostDTO hostInfoDTO) { + Long cloudAreaId = hostInfoDTO.getCloudAreaId(); + String ip = queryAgentStatusClient.getHostIpByAgentStatus(hostInfoDTO.getDisplayIp(), cloudAreaId); + hostInfoDTO.setIp(ip); + if (!ip.contains(":")) { + String cloudIp = cloudAreaId + ":" + ip; + hostInfoDTO.setGseAgentAlive(queryAgentStatusClient.getAgentStatus(cloudIp).status == 1); + } else { + hostInfoDTO.setGseAgentAlive(queryAgentStatusClient.getAgentStatus(ip).status == 1); + } + } + + private void createOrUpdateHostInDB(ApplicationHostDTO hostInfoDTO) { + try { + if (applicationHostDAO.existAppHostInfoByHostId(dslContext, hostInfoDTO.getHostId())) { + // 只更新事件中的主机属性与agent状态 + applicationHostDAO.updateHostAttrsById(dslContext, hostInfoDTO); + } else { + hostInfoDTO.setBizId(JobConstants.PUBLIC_APP_ID); + int affectedNum = applicationHostDAO.insertHostWithoutTopo(dslContext, hostInfoDTO); + log.info("insert host: id={}, affectedNum={}", hostInfoDTO.getHostId(), affectedNum); + } + } catch (Throwable t) { + log.error("handle host event fail", t); + } finally { + // 从拓扑表向主机表同步拓扑数据 + int affectedNum = applicationHostDAO.syncHostTopo(dslContext, hostInfoDTO.getHostId()); + log.info("hostTopo synced: hostId={}, affectedNum={}", hostInfoDTO.getHostId(), affectedNum); + } + } + + private void updateHostCache(ApplicationHostDTO hostInfoDTO) { + hostInfoDTO = applicationHostDAO.getHostById(hostInfoDTO.getHostId()); + if (hostInfoDTO.getBizId() != null && hostInfoDTO.getBizId() > 0) { + // 只更新常规业务的主机到缓存 + if (applicationService.existBiz(hostInfoDTO.getBizId())) { + hostCache.addOrUpdateHost(hostInfoDTO); + log.info("host cache updated: hostId:{}", hostInfoDTO.getHostId()); + } + } + } + + private void handleHostDelete(ApplicationHostDTO hostInfoDTO) { + int affectedRowNum = applicationHostDAO.deleteBizHostInfoById( + dslContext, + null, + hostInfoDTO.getHostId() + ); + log.info( + "{} host deleted, id={} ,ip={}", + affectedRowNum, + hostInfoDTO.getHostId(), + hostInfoDTO.getIp() + ); + hostCache.deleteHost(hostInfoDTO); + } +} diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationWatchThread.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationWatchThread.java index ab5ee622f1..b3941a978c 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationWatchThread.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostRelationWatchThread.java @@ -176,7 +176,7 @@ private void handleOneEvent(ResourceEvent event) { */ private void updateTopoToHost(HostTopoDTO hostTopoDTO) { // 若主机存在需将拓扑信息同步至主机信息冗余字段 - long affectedNum = applicationHostDAO.syncHostTopo(dslContext, hostTopoDTO.getHostId()); + int affectedNum = applicationHostDAO.syncHostTopo(dslContext, hostTopoDTO.getHostId()); if (affectedNum == 0) { log.info("no host topo synced"); } else if (affectedNum < 0) { diff --git a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostWatchThread.java b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostWatchThread.java index 8458efbdd3..d0d082b9bc 100644 --- a/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostWatchThread.java +++ b/src/backend/job-manage/service-job-manage/src/main/java/com/tencent/bk/job/manage/service/impl/sync/HostWatchThread.java @@ -24,13 +24,11 @@ package com.tencent.bk.job.manage.service.impl.sync; -import com.tencent.bk.job.common.cc.model.req.ResourceWatchReq; import com.tencent.bk.job.common.cc.model.result.HostEventDetail; import com.tencent.bk.job.common.cc.model.result.ResourceEvent; import com.tencent.bk.job.common.cc.model.result.ResourceWatchResult; import com.tencent.bk.job.common.cc.sdk.CmdbClientFactory; import com.tencent.bk.job.common.cc.sdk.IBizCmdbClient; -import com.tencent.bk.job.common.constant.JobConstants; import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient; import com.tencent.bk.job.common.model.dto.ApplicationHostDTO; import com.tencent.bk.job.common.redis.util.LockUtils; @@ -45,7 +43,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.jooq.DSLContext; -import org.jooq.exception.DataAccessException; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.StopWatch; @@ -79,10 +76,11 @@ public class HostWatchThread extends Thread { private final RedisTemplate redisTemplate; private final HostCache hostCache; private final String REDIS_KEY_RESOURCE_WATCH_HOST_JOB_RUNNING_MACHINE = "resource-watch-host-job-running-machine"; - private final List eventsHandlerList; + private final List eventsHandlerList; private final BlockingQueue> appHostEventQueue = new LinkedBlockingQueue<>(10000); private final Integer MAX_HANDLER_NUM = 1; private final AtomicBoolean hostWatchFlag = new AtomicBoolean(true); + private String cursor = null; public HostWatchThread(DSLContext dslContext, ApplicationService applicationService, @@ -100,14 +98,25 @@ public HostWatchThread(DSLContext dslContext, this.eventsHandlerList = new ArrayList<>(); // 初始内置1个Handler for (int i = 0; i < 1; i++) { - AppHostEventsHandler handler = new AppHostEventsHandler(appHostEventQueue); - handler.setName("[" + handler.getId() + "]-AppHostEventsHandler-" + (i + 1)); + HostEventsHandler handler = buildHostEventsHandler(); + handler.setName("[" + handler.getId() + "]-HostEventsHandler-" + (i + 1)); eventsHandlerList.add(handler); } } + private HostEventsHandler buildHostEventsHandler() { + return new HostEventsHandler( + appHostEventQueue, + dslContext, + applicationService, + applicationHostDAO, + queryAgentStatusClient, + hostCache + ); + } + private void init() { - for (AppHostEventsHandler appHostEventsHandler : eventsHandlerList) { + for (HostEventsHandler appHostEventsHandler : eventsHandlerList) { appHostEventsHandler.start(); } } @@ -121,8 +130,8 @@ private void dispatchEvent(ResourceEvent event) { Long hostId = hostInfoDTO.getHostId(); ApplicationHostDTO oldHostInfoDTO = applicationHostDAO.getHostById(hostId); Long appId = oldHostInfoDTO.getBizId(); - List idleHandlerList = new ArrayList<>(); - for (AppHostEventsHandler handler : eventsHandlerList) { + List idleHandlerList = new ArrayList<>(); + for (HostEventsHandler handler : eventsHandlerList) { if (appId.equals(handler.getAppId())) { handler.commitEvent(appId, event); return; @@ -131,127 +140,28 @@ private void dispatchEvent(ResourceEvent event) { } } if (!idleHandlerList.isEmpty()) { - AppHostEventsHandler handler = idleHandlerList.get((int) (Math.random() * idleHandlerList.size())); + HostEventsHandler handler = idleHandlerList.get((int) (Math.random() * idleHandlerList.size())); handler.commitEvent(appId, event); } else if (eventsHandlerList.size() < MAX_HANDLER_NUM) { - AppHostEventsHandler handler = new AppHostEventsHandler(appHostEventQueue); - handler.setName("[" + handler.getId() + "]-AppHostEventsHandler-" + (eventsHandlerList.size() + 1)); + HostEventsHandler handler = buildHostEventsHandler(); + handler.setName("[" + handler.getId() + "]-HostEventsHandler-" + (eventsHandlerList.size() + 1)); handler.start(); eventsHandlerList.add(handler); handler.commitEvent(appId, event); } else { - AppHostEventsHandler handler = eventsHandlerList.get((int) (Math.random() * eventsHandlerList.size())); + HostEventsHandler handler = eventsHandlerList.get((int) (Math.random() * eventsHandlerList.size())); handler.commitEvent(appId, event); } } - private void handleOneEventRelatedToApp(ResourceEvent event) { - try { - handleOneEventIndeed(event); - } catch (Throwable t) { - log.error(String.format("Fail to handle hostEvent:%s", event), t); - } - } - - private void handleOneEvent(ResourceEvent event) { - ApplicationHostDTO hostInfoDTO = HostEventDetail.toHostInfoDTO(event.getDetail()); - Long hostId = hostInfoDTO.getHostId(); - ApplicationHostDTO oldHostInfoDTO = applicationHostDAO.getHostById(hostId); - if (oldHostInfoDTO != null && oldHostInfoDTO.getBizId() != null) { - dispatchEvent(event); - } else { - handleOneEventIndeed(event); - } - } - - private void handleOneEventIndeed(ResourceEvent event) { - String eventType = event.getEventType(); - ApplicationHostDTO hostInfoDTO = HostEventDetail.toHostInfoDTO(event.getDetail()); - switch (eventType) { - case ResourceWatchReq.EVENT_TYPE_CREATE: - case ResourceWatchReq.EVENT_TYPE_UPDATE: - //去除没有IP的主机信息 - if (StringUtils.isBlank(hostInfoDTO.getDisplayIp())) { - int affectedRowNum = applicationHostDAO.deleteBizHostInfoById( - dslContext, - null, - hostInfoDTO.getHostId() - ); - log.info( - "{} host deleted, id={} ,ip={}", - affectedRowNum, - hostInfoDTO.getHostId(), - hostInfoDTO.getIp() - ); - break; - } - //找出Agent有效的IP,并设置Agent状态 - Long cloudAreaId = hostInfoDTO.getCloudAreaId(); - String ip = queryAgentStatusClient.getHostIpByAgentStatus(hostInfoDTO.getDisplayIp(), cloudAreaId); - hostInfoDTO.setIp(ip); - if (!ip.contains(":")) { - String cloudIp = cloudAreaId + ":" + ip; - hostInfoDTO.setGseAgentAlive(queryAgentStatusClient.getAgentStatus(cloudIp).status == 1); - } else { - hostInfoDTO.setGseAgentAlive(queryAgentStatusClient.getAgentStatus(ip).status == 1); - } - try { - if (applicationHostDAO.existAppHostInfoByHostId(dslContext, hostInfoDTO.getHostId())) { - // 只更新事件中的主机属性与agent状态 - applicationHostDAO.updateHostAttrsById(dslContext, hostInfoDTO); - } else { - hostInfoDTO.setBizId(JobConstants.PUBLIC_APP_ID); - try { - applicationHostDAO.insertAppHostWithoutTopo(dslContext, hostInfoDTO); - } catch (DataAccessException e) { - String errorMessage = e.getMessage(); - if (errorMessage.contains("Duplicate entry") && errorMessage.contains("PRIMARY")) { - // 若已存在则忽略 - } else { - log.error("insertHost fail:hostInfo=" + hostInfoDTO, e); - } - } - } - } catch (Throwable t) { - log.error("handle host event fail", t); - } finally { - // 从拓扑表向主机表同步拓扑数据 - applicationHostDAO.syncHostTopo(dslContext, hostInfoDTO.getHostId()); - } - if (hostInfoDTO.getBizId() != null && hostInfoDTO.getBizId() > 0) { - // 只更新常规业务的主机到缓存 - if (applicationService.existBiz(hostInfoDTO.getBizId())) { - hostCache.addOrUpdateHost(hostInfoDTO); - } - } - break; - case ResourceWatchReq.EVENT_TYPE_DELETE: - int affectedRowNum = applicationHostDAO.deleteBizHostInfoById( - dslContext, - null, - hostInfoDTO.getHostId() - ); - log.info( - "{} host deleted, id={} ,ip={}", - affectedRowNum, - hostInfoDTO.getHostId(), - hostInfoDTO.getIp() - ); - hostCache.deleteHost(hostInfoDTO); - break; - default: - break; - } - } - public String handleHostWatchResult(ResourceWatchResult hostWatchResult) { String cursor = null; boolean isWatched = hostWatchResult.getWatched(); + List> events = hostWatchResult.getEvents(); if (isWatched) { - List> events = hostWatchResult.getEvents(); //解析事件,进行处理 for (ResourceEvent event : events) { - handleOneEvent(event); + dispatchEvent(event); } if (events.size() > 0) { log.info("events.size={},events={}", events.size(), JsonUtils.toJson(events)); @@ -262,7 +172,6 @@ public String handleHostWatchResult(ResourceWatchResult hostWat } } else { // 只有一个无实际意义的事件,用于换取bk_cursor - List> events = hostWatchResult.getEvents(); if (events != null && events.size() > 0) { cursor = events.get(0).getCursor(); log.info("refresh cursor(fail):{}", cursor); @@ -273,20 +182,16 @@ public String handleHostWatchResult(ResourceWatchResult hostWat return cursor; } + @SuppressWarnings("InfiniteLoopStatement") @Override public void run() { log.info("hostWatch arranged"); init(); while (true) { - String cursor = null; // 从10分钟前开始watch long startTime = System.currentTimeMillis() / 1000 - 10 * 60; try { - boolean lockGotten = LockUtils.tryGetDistributedLock(REDIS_KEY_RESOURCE_WATCH_HOST_JOB_LOCK, - machineIp, 50); - if (!lockGotten) { - log.info("hostWatch lock not gotten, wait 100ms and retry"); - ThreadUtils.sleep(100); + if (!getRedisLockOrWait100ms()) { continue; } String runningMachine = @@ -298,35 +203,13 @@ public void run() { continue; } // 开一个心跳子线程,维护当前机器正在WatchResource的状态 - RedisKeyHeartBeatThread hostWatchRedisKeyHeartBeatThread = new RedisKeyHeartBeatThread( - redisTemplate, - REDIS_KEY_RESOURCE_WATCH_HOST_JOB_RUNNING_MACHINE, - machineIp, - 3000L, - 2000L - ); - hostWatchRedisKeyHeartBeatThread.setName("[" + hostWatchRedisKeyHeartBeatThread.getId() + - "]-hostWatchRedisKeyHeartBeatThread"); - hostWatchRedisKeyHeartBeatThread.start(); + RedisKeyHeartBeatThread hostWatchRedisKeyHeartBeatThread = startRedisKeyHeartBeatThread(); log.info("start watch host resource at {},{}", TimeUtil.getCurrentTimeStr("HH:mm:ss"), System.currentTimeMillis()); StopWatch watch = new StopWatch("hostWatch"); watch.start("total"); try { - IBizCmdbClient bizCmdbClient = CmdbClientFactory.getCmdbClient(); - ResourceWatchResult hostWatchResult; - while (hostWatchFlag.get()) { - if (cursor == null) { - log.info("Start watch from startTime:{}", TimeUtil.formatTime(startTime * 1000)); - hostWatchResult = bizCmdbClient.getHostEvents(startTime, cursor); - } else { - hostWatchResult = bizCmdbClient.getHostEvents(null, cursor); - } - log.info("hostWatchResult={}", JsonUtils.toJson(hostWatchResult)); - cursor = handleHostWatchResult(hostWatchResult); - // 1s/watch一次 - ThreadUtils.sleep(1000); - } + watchInLoop(startTime); } catch (Throwable t) { log.error("hostWatch thread fail", t); } finally { @@ -337,26 +220,56 @@ public void run() { } catch (Throwable t) { log.error("HostWatchThread quit unexpectedly", t); } finally { - do { - // 5s/重试一次 - ThreadUtils.sleep(5000); - } while (!hostWatchFlag.get()); + waitUtilHostWatchFlagSet(); } } } - /** - * 处理同一个业务的多个事件 - */ - class AppHostEventsHandler extends EventsHandler { - - AppHostEventsHandler(BlockingQueue> queue) { - super(queue); + private boolean getRedisLockOrWait100ms() { + boolean lockGotten = LockUtils.tryGetDistributedLock(REDIS_KEY_RESOURCE_WATCH_HOST_JOB_LOCK, + machineIp, 50); + if (!lockGotten) { + log.info("hostWatch lock not gotten, wait 100ms"); + ThreadUtils.sleep(100); } + return lockGotten; + } + + private RedisKeyHeartBeatThread startRedisKeyHeartBeatThread() { + RedisKeyHeartBeatThread hostWatchRedisKeyHeartBeatThread = new RedisKeyHeartBeatThread( + redisTemplate, + REDIS_KEY_RESOURCE_WATCH_HOST_JOB_RUNNING_MACHINE, + machineIp, + 3000L, + 2000L + ); + hostWatchRedisKeyHeartBeatThread.setName("[" + hostWatchRedisKeyHeartBeatThread.getId() + + "]-hostWatchRedisKeyHeartBeatThread"); + hostWatchRedisKeyHeartBeatThread.start(); + return hostWatchRedisKeyHeartBeatThread; + } - @Override - void handleEvent(ResourceEvent event) { - handleOneEventRelatedToApp(event); + private void watchInLoop(long startTime) { + IBizCmdbClient bizCmdbClient = CmdbClientFactory.getCmdbClient(); + ResourceWatchResult hostWatchResult; + while (hostWatchFlag.get()) { + if (cursor == null) { + log.info("Start watch from startTime:{}", TimeUtil.formatTime(startTime * 1000)); + hostWatchResult = bizCmdbClient.getHostEvents(startTime, cursor); + } else { + hostWatchResult = bizCmdbClient.getHostEvents(null, cursor); + } + log.info("hostWatchResult={}", JsonUtils.toJson(hostWatchResult)); + cursor = handleHostWatchResult(hostWatchResult); + // 1s/watch一次 + ThreadUtils.sleep(1000); } } + + private void waitUtilHostWatchFlagSet() { + do { + // 5s/重试一次 + ThreadUtils.sleep(5000); + } while (!hostWatchFlag.get()); + } }