Skip to content

Commit

Permalink
[#11346] Refactory AgentEventQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Aug 19, 2024
1 parent d72edb5 commit 01f50d4
Show file tree
Hide file tree
Showing 14 changed files with 310 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.web.alarm.DataCollectorCategory;
import com.navercorp.pinpoint.web.dao.AgentEventDao;
import com.navercorp.pinpoint.web.service.component.AgentEventQuery;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author Taejin Koo
*/
public class AgentEventDataCollector extends DataCollector {

private static final AgentEventQuery DEADLOCK = AgentEventQuery.include(Set.of(AgentEventType.AGENT_DEADLOCK_DETECTED));

private final AgentEventDao agentEventDao;

private final List<String> agentIds;
Expand Down Expand Up @@ -67,7 +70,7 @@ public void collect() {
Range range = Range.between(timeSlotEndTime - slotInterval, timeSlotEndTime);

for (String agentId : agentIds) {
List<AgentEventBo> agentEventBoList = agentEventDao.getAgentEvents(agentId, range, Collections.emptySet());
List<AgentEventBo> agentEventBoList = agentEventDao.getAgentEvents(agentId, range, DEADLOCK);
if (hasDeadlockEvent(agentEventBoList)) {
agentDeadlockEventDetected.put(agentId, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.navercorp.pinpoint.web.alarm.DataCollectorCategory;
import com.navercorp.pinpoint.web.alarm.vo.Rule;
import com.navercorp.pinpoint.web.dao.AgentEventDao;
import com.navercorp.pinpoint.web.service.component.AgentEventQuery;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -33,7 +34,6 @@
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.List;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -68,9 +68,9 @@ public void checkTest1() {
Rule rule = new Rule(APPLICATION_NAME, SERVICE_TYPE, CheckerCategory.ERROR_COUNT.getName(), 50, "testGroup", false, false, false, "");

Range range = Range.between(START_TIME_MILLIS, CURRENT_TIME_MILLIS);
when(mockAgentEventDao.getAgentEvents(AGENT_ID_1, range, Set.of())).thenReturn(List.of(createAgentEvent(AGENT_ID_1, createEventTimestamp(), AgentEventType.AGENT_CLOSED_BY_SERVER)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_2, range, Set.of())).thenReturn(List.of(createAgentEvent(AGENT_ID_2, createEventTimestamp(), AgentEventType.AGENT_DEADLOCK_DETECTED)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_3, range, Set.of())).thenReturn(List.of(createAgentEvent(AGENT_ID_3, createEventTimestamp(), AgentEventType.AGENT_PING)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_1, range, allQuery())).thenReturn(List.of(createAgentEvent(AGENT_ID_1, createEventTimestamp(), AgentEventType.AGENT_CLOSED_BY_SERVER)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_2, range, allQuery())).thenReturn(List.of(createAgentEvent(AGENT_ID_2, createEventTimestamp(), AgentEventType.AGENT_DEADLOCK_DETECTED)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_3, range, allQuery())).thenReturn(List.of(createAgentEvent(AGENT_ID_3, createEventTimestamp(), AgentEventType.AGENT_PING)));

AgentEventDataCollector dataCollector = new AgentEventDataCollector(DataCollectorCategory.AGENT_EVENT, mockAgentEventDao, mockAgentIds, CURRENT_TIME_MILLIS, INTERVAL_MILLIS);
DeadlockChecker checker = new DeadlockChecker(dataCollector, rule);
Expand All @@ -89,9 +89,9 @@ public void checkTest2() {
Rule rule = new Rule(APPLICATION_NAME, SERVICE_TYPE, CheckerCategory.ERROR_COUNT.getName(), 50, "testGroup", false, false, false, "");

Range range = Range.between(START_TIME_MILLIS, CURRENT_TIME_MILLIS);
when(mockAgentEventDao.getAgentEvents(AGENT_ID_1, range, Set.of())).thenReturn(List.of(createAgentEvent(AGENT_ID_1, createEventTimestamp(), AgentEventType.AGENT_CLOSED_BY_SERVER)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_2, range, Set.of())).thenReturn(List.of(createAgentEvent(AGENT_ID_2, createEventTimestamp(), AgentEventType.AGENT_SHUTDOWN)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_3, range, Set.of())).thenReturn(List.of(createAgentEvent(AGENT_ID_3, createEventTimestamp(), AgentEventType.AGENT_PING)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_1, range, allQuery())).thenReturn(List.of(createAgentEvent(AGENT_ID_1, createEventTimestamp(), AgentEventType.AGENT_CLOSED_BY_SERVER)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_2, range, allQuery())).thenReturn(List.of(createAgentEvent(AGENT_ID_2, createEventTimestamp(), AgentEventType.AGENT_SHUTDOWN)));
when(mockAgentEventDao.getAgentEvents(AGENT_ID_3, range, allQuery())).thenReturn(List.of(createAgentEvent(AGENT_ID_3, createEventTimestamp(), AgentEventType.AGENT_PING)));

AgentEventDataCollector dataCollector = new AgentEventDataCollector(DataCollectorCategory.AGENT_EVENT, mockAgentEventDao, mockAgentIds, CURRENT_TIME_MILLIS, INTERVAL_MILLIS);
DeadlockChecker checker = new DeadlockChecker(dataCollector, rule);
Expand All @@ -105,6 +105,10 @@ public void checkTest2() {
assertThat(smsMessage).isEmpty();
}

private AgentEventQuery allQuery() {
return AgentEventQuery.all();
}

private AgentEventBo createAgentEvent(String agentId, long eventTimestamp, AgentEventType agentEventType) {
return new AgentEventBo(agentId, START_TIME_MILLIS, eventTimestamp, agentEventType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.navercorp.pinpoint.web.response.CodeResult;
import com.navercorp.pinpoint.web.service.AgentEventService;
import com.navercorp.pinpoint.web.service.AgentInfoService;
import com.navercorp.pinpoint.web.service.component.AgentEventQuery;
import com.navercorp.pinpoint.web.view.tree.SimpleTreeView;
import com.navercorp.pinpoint.web.view.tree.TreeNode;
import com.navercorp.pinpoint.web.view.tree.TreeView;
Expand Down Expand Up @@ -185,7 +186,8 @@ public List<AgentEvent> getAgentEvents(
@RequestParam(value = "exclude", defaultValue = "") int[] excludeEventTypeCodes) {
final Range range = Range.between(from, to);
final Set<AgentEventType> excludeEventTypes = getAgentEventTypes(excludeEventTypeCodes);
return this.agentEventService.getAgentEvents(agentId, range, excludeEventTypes);
AgentEventQuery exclude = AgentEventQuery.exclude(excludeEventTypes);
return this.agentEventService.getAgentEvents(agentId, range, exclude);
}

private static Set<AgentEventType> getAgentEventTypes(int[] excludeEventTypeCodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.navercorp.pinpoint.common.server.bo.event.AgentEventBo;
import com.navercorp.pinpoint.common.server.util.AgentEventType;
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.web.service.component.AgentEventQuery;

import java.util.List;
import java.util.Set;

/**
* @author HyunGil Jeong
Expand All @@ -30,6 +30,6 @@ public interface AgentEventDao {

AgentEventBo getAgentEvent(String agentId, long eventTimestamp, AgentEventType eventType);

List<AgentEventBo> getAgentEvents(String agentId, Range range, Set<AgentEventType> excludeEventTypes);
List<AgentEventBo> getAgentEvents(String agentId, Range range, AgentEventQuery filter);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.navercorp.pinpoint.web.dao.hbase;

import com.navercorp.pinpoint.common.server.util.AgentEventType;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.web.service.component.AgentEventQuery;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.Set;

public class AgentEventFilterBuilder {

public AgentEventFilterBuilder() {

}

public Filter queryToFilter(AgentEventQuery query) {
return switch (query.getQueryType()) {
case INCLUDE -> includeFilter(query.getEventTypes());
case EXCLUDE -> excludeFilter(query.getEventTypes());
case ALL -> null;
};
}

public Filter excludeFilter(Set<AgentEventType> excludeEventTypes) {
if (CollectionUtils.isEmpty(excludeEventTypes)) {
return null;
}
if (excludeEventTypes.size() == 1) {
AgentEventType event = excludeEventTypes.iterator().next();
return excludeQualifierFilter(event);
}

final FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
for (AgentEventType excludeEventType : excludeEventTypes) {
Filter filter = excludeQualifierFilter(excludeEventType);
filterList.addFilter(filter);
}
return filterList;
}

private Filter excludeQualifierFilter(AgentEventType excludeEventType) {
byte[] excludeQualifier = Bytes.toBytes(excludeEventType.getCode());
return new QualifierFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(excludeQualifier));
}

public Filter includeFilter(Set<AgentEventType> includeEventTypes) {
if (CollectionUtils.isEmpty(includeEventTypes)) {
return null;
}
if (includeEventTypes.size() == 1) {
AgentEventType event = includeEventTypes.iterator().next();
return includeFilter(event);
}

final FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for (AgentEventType excludeEventType : includeEventTypes) {
Filter filter = includeFilter(excludeEventType);
filterList.addFilter(filter);
}
return filterList;
}

private Filter includeFilter(AgentEventType excludeEventType) {
byte[] excludeQualifier = Bytes.toBytes(excludeEventType.getCode());
return new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(excludeQualifier));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.web.dao.AgentEventDao;
import org.apache.hadoop.hbase.CompareOperator;
import com.navercorp.pinpoint.web.service.component.AgentEventQuery;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -44,7 +42,6 @@

import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
* @author HyunGil Jeong
Expand All @@ -68,6 +65,8 @@ public class HbaseAgentEventDao implements AgentEventDao {

private final AgentIdRowKeyEncoder rowKeyEncoder = new AgentIdRowKeyEncoder();

private final AgentEventFilterBuilder filterBuilder = new AgentEventFilterBuilder();

public HbaseAgentEventDao(HbaseOperations hbaseOperations,
TableNameProvider tableNameProvider,
@Qualifier("agentEventMapper")
Expand All @@ -79,31 +78,37 @@ public HbaseAgentEventDao(HbaseOperations hbaseOperations,
}

@Override
public List<AgentEventBo> getAgentEvents(String agentId, Range range, Set<AgentEventType> excludeEventTypes) {
public List<AgentEventBo> getAgentEvents(String agentId, Range range, AgentEventQuery query) {
Objects.requireNonNull(agentId, "agentId");
Objects.requireNonNull(range, "range");
Objects.requireNonNull(query, "query");

Scan scan = createScan(agentId, range, query);


TableName table = tableNameProvider.getTableName(DESCRIPTOR.getTable());
List<AgentEventBo> agentEvents = this.hbaseOperations.find(table, scan, agentEventResultsExtractor);
logger.debug("getAgentEvents() query:{} agentEvents:{}", query, agentEvents);
return agentEvents;
}


private Scan createScan(String agentId, Range range, AgentEventQuery query) {
Scan scan = new Scan();
scan.readVersions(1);
scan.setCaching(SCANNER_CACHE_SIZE);

scan.withStartRow(createRowKey(agentId, range.getTo()));
scan.withStopRow(createRowKey(agentId, range.getFrom()));
scan.addFamily(DESCRIPTOR.getName());

if (CollectionUtils.hasLength(excludeEventTypes)) {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
for (AgentEventType excludeEventType : excludeEventTypes) {
byte[] excludeQualifier = Bytes.toBytes(excludeEventType.getCode());
filterList.addFilter(new QualifierFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(excludeQualifier)));
}
scan.setFilter(filterList);
Filter filter = filterBuilder.queryToFilter(query);
if (filter != null) {
scan.setFilter(filter);
}

TableName agentEventTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable());
List<AgentEventBo> agentEvents = this.hbaseOperations.find(agentEventTableName, scan, agentEventResultsExtractor);
logger.debug("agentEvents found. {}", agentEvents);
return agentEvents;
if (query.isOneRowScan()) {
scan.setOneRowLimit();
}
return scan;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

package com.navercorp.pinpoint.web.service;

import java.util.List;
import java.util.Set;

import com.navercorp.pinpoint.common.server.util.AgentEventType;
import com.navercorp.pinpoint.web.vo.AgentEvent;
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.web.service.component.AgentEventQuery;
import com.navercorp.pinpoint.web.vo.AgentEvent;

import java.util.List;

/**
* @author HyunGil Jeong
Expand All @@ -32,6 +32,6 @@ public interface AgentEventService {

List<AgentEvent> getAgentEvents(String agentId, Range range);

List<AgentEvent> getAgentEvents(String agentId, Range range, Set<AgentEventType> excludeEventTypeCodes);
List<AgentEvent> getAgentEvents(String agentId, Range range, AgentEventQuery query);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.common.util.ArrayUtils;
import com.navercorp.pinpoint.web.dao.AgentEventDao;
import com.navercorp.pinpoint.web.service.component.AgentEventQuery;
import com.navercorp.pinpoint.web.vo.AgentEvent;
import com.navercorp.pinpoint.web.vo.DurationalAgentEvent;
import org.apache.commons.collections4.CollectionUtils;
Expand All @@ -37,7 +38,6 @@
import java.util.List;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;

/**
* @author HyunGil Jeong
Expand All @@ -54,23 +54,26 @@ public class AgentEventServiceImpl implements AgentEventService {

private final AgentEventMessageDeserializerV1 agentEventMessageDeserializerV1;

public AgentEventServiceImpl(AgentEventDao agentEventDao, AgentEventMessageDeserializer agentEventMessageDeserializer, AgentEventMessageDeserializerV1 agentEventMessageDeserializerV1) {
public AgentEventServiceImpl(AgentEventDao agentEventDao,
AgentEventMessageDeserializer agentEventMessageDeserializer,
AgentEventMessageDeserializerV1 agentEventMessageDeserializerV1) {
this.agentEventDao = Objects.requireNonNull(agentEventDao, "agentEventDao");
this.agentEventMessageDeserializer = Objects.requireNonNull(agentEventMessageDeserializer, "agentEventMessageDeserializer");
this.agentEventMessageDeserializerV1 = Objects.requireNonNull(agentEventMessageDeserializerV1, "agentEventMessageDeserializerV1");
}

@Override
public List<AgentEvent> getAgentEvents(String agentId, Range range) {
return getAgentEvents(agentId, range, Collections.emptySet());
return getAgentEvents(agentId, range, AgentEventQuery.all());
}

@Override
public List<AgentEvent> getAgentEvents(String agentId, Range range, Set<AgentEventType> excludeEventTypeCodes) {
public List<AgentEvent> getAgentEvents(String agentId, Range range, AgentEventQuery query) {
Objects.requireNonNull(agentId, "agentId");
Objects.requireNonNull(excludeEventTypeCodes, "excludeEventTypeCodes");
Objects.requireNonNull(query, "query");

List<AgentEventBo> agentEventBos = this.agentEventDao.getAgentEvents(agentId, range, query);

List<AgentEventBo> agentEventBos = this.agentEventDao.getAgentEvents(agentId, range, excludeEventTypeCodes);
List<AgentEvent> agentEvents = createAgentEvents(agentEventBos);
agentEvents.sort(AgentEvent.EVENT_TIMESTAMP_ASC_COMPARATOR);
return agentEvents;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,18 @@ private boolean isActiveAgentPredicate(AgentAndStatus agentAndStatus,
logger.trace("isActiveAgentPredicate {}", agentAndStatus);
AgentInfo agentInfo = agentAndStatus.getAgentInfo();
if (agentInfoPredicate.test(agentInfo)) {
logger.trace("agentInfoPredicate=true");
logger.trace("agentInfoPredicate=true {}", agentAndStatus);
}
if (agentStatusFilter.test(agentAndStatus.getStatus())) {
logger.trace("agentStatusFilter=true");
logger.trace("agentStatusFilter=true {}", agentAndStatus);
return true;
}
Application agent = new Application(agentInfo.getAgentId(), agentInfo.getServiceType());
String agentVersion = agentInfo.getAgentVersion();
if (activeAgentValidator.isActiveAgent(agent, agentVersion, range)) {
return true;
}
logger.trace("isActiveAgentPredicate=false");
logger.trace("isActiveAgentPredicate=false {}", agentAndStatus);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ public interface ActiveAgentValidator {

boolean isActiveAgent(Application agent, String version, List<Range> ranges);

boolean isActiveAgentByPing(String agentId, Range range);
boolean isActiveAgentByEvent(String agentId, Range range);
}
Loading

0 comments on commit 01f50d4

Please sign in to comment.