Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent error in InMemoryMetricsRepository of the dashboard #488

Merged
merged 40 commits into from
Feb 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
faac56d
doc: add blog with Sentinel in action (#392)
all4you Jan 7, 2019
4c4cb34
dashboard: make fallbackToLocalWhenFail be configurable in dashboard …
cdfive Jan 7, 2019
d3b3d65
dashboard: add healthCount/total information in sidebar of Sentinel d…
jz0630 Jan 7, 2019
471bdf7
dashboard: structure rearrangement and polish related code
sczyh30 Jan 8, 2019
883565a
dashboard: change text of p_qps and b_qps to be more intuitive in mon…
Arlmls Jan 9, 2019
4118cae
dashboard: frontend refinement of monitoring page and flow rule dialo…
sczyh30 Jan 10, 2019
219877d
Add adapter support for Zuul 1.x (#188)
tigerMoon Jan 12, 2019
95ca72c
Update document and pom for sentinel-zuul-adapter
sczyh30 Jan 14, 2019
cca5849
Update cluster demo README and remove unused demo
sczyh30 Jan 14, 2019
5759397
Remove redundant semicolon in MetricWriter (#412)
kangyl Jan 14, 2019
f089afd
Fix incorrect comment in NodeSelectorSlot javadoc (#419)
kexianjun Jan 15, 2019
d352f41
dashboard: when click the first-level menu of sidebar, don't jump to …
cdfive Jan 15, 2019
aacb4d6
Bug fix: fix 420, fix negative waitTime in RateLimiterController and …
CarpenterLee Jan 16, 2019
dd11dc3
Improve cluster embedded demo
sczyh30 Jan 16, 2019
9465fee
doc: fix typo in README of Sentinel Dubbo Demo (#425)
m11y Jan 16, 2019
636ba05
Upgrade Dubbo version in sentinel-demo-dubbo (#436)
Leishunyu Jan 21, 2019
829fde2
Rename dashboard package name from com.taobao to com.alibaba (#435)
CarpenterLee Jan 21, 2019
b760ad9
Add logback dependency in sentinel-demo-nacos-datasource to avoid log…
lltx Jan 21, 2019
1f8236a
dashboard: update WebConfig and improve token server list page
sczyh30 Jan 21, 2019
9e62b68
Add catch throwable logic in ClusterStateManager to detect fatal erro…
sczyh30 Jan 25, 2019
b1238f8
Fixes #453: Support tracing exception count for specific entry or con…
sczyh30 Jan 25, 2019
c1c9367
Remove slf4j dependency in sentinel-annotation-aspectj
sczyh30 Jan 25, 2019
9e257f1
Fix error value type and rename variable in EntranceNode class (#457)
mjaow Jan 26, 2019
75eb58d
Remove duplicate comment in WarmUpController (#464)
mjaow Jan 28, 2019
8059408
Fix zero-count divide overflow bug in RateLimiterController (#461)
mjaow Jan 28, 2019
4364339
HashMap init optimize when adding new ClusterNode to cluster node map…
luoxn28 Jan 28, 2019
146dee9
Code and javadoc refinement
sczyh30 Jan 28, 2019
abad82a
Update dependency version of fastjson and jacoco-maven-plugin
sczyh30 Jan 28, 2019
45b4f97
Make build faster by reducing fixed waiting time in tests (#449)
aalmiray Jan 29, 2019
da5e1cd
Add some unit test for StatisticNode, ClusterNode and DefaultNodeBuil…
cdfive Jan 29, 2019
7155634
Refinement for test cases
sczyh30 Jan 29, 2019
cb2cb2f
Add back thread count metric type support for parameter flow control
sczyh30 Jan 30, 2019
c0d9318
Add volatile in double-checked locking field in ClusterBuilderSlot (#…
mjaow Jan 30, 2019
5f98bf0
Improve ClusterServerConfigManager in sentinel-cluster-server-default…
sczyh30 Jan 30, 2019
9ea2d7c
Update Nacos SDK version to 0.8 and update Nacos namespace demo (#474)
yanlinly Jan 31, 2019
0095762
Carry the triggered rule in subclasses of BlockException (#469)
sczyh30 Jan 31, 2019
7fb4ebf
Fix NPE bug when creating connection group in ConnectionManager
sczyh30 Jan 28, 2019
9b9c502
Improve and fix bugs for ConnectionManager and add test cases
sczyh30 Jan 28, 2019
04fc367
Merge pull request #2 from alibaba/master
nick-tan Feb 17, 2019
2109367
fix dashboard listResourcesOfApp concurrent error
nick-tan Feb 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.util.StringUtil;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import org.springframework.stereotype.Component;

/**
Expand All @@ -43,21 +44,23 @@ public class InMemoryMetricsRepository implements MetricsRepository<MetricEntity
/**
* {@code app -> resource -> timestamp -> metric}
*/
private Map<String, Map<String, LinkedHashMap<Long, MetricEntity>>> allMetrics = new ConcurrentHashMap<>();
private Map<String, Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>>> allMetrics = new ConcurrentHashMap<>();



@Override
public synchronized void save(MetricEntity entity) {
if (entity == null || StringUtil.isBlank(entity.getApp())) {
return;
}
allMetrics.computeIfAbsent(entity.getApp(), e -> new HashMap<>(16))
.computeIfAbsent(entity.getResource(), e -> new LinkedHashMap<Long, MetricEntity>() {
@Override
protected boolean removeEldestEntry(Entry<Long, MetricEntity> eldest) {
allMetrics.computeIfAbsent(entity.getApp(), e -> new ConcurrentHashMap<>(16))
.computeIfAbsent(entity.getResource(), e -> new ConcurrentLinkedHashMap.Builder<Long, MetricEntity>()
.maximumWeightedCapacity(MAX_METRIC_LIVE_TIME_MS).weigher((key, value) -> {
// Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed.
return eldest.getKey() < System.currentTimeMillis() - MAX_METRIC_LIVE_TIME_MS;
}
}).put(entity.getTimestamp().getTime(), entity);
int weight = (int)(System.currentTimeMillis() - key);
// weight must be a number greater than or equal to one
return Math.max(weight, 1);
}).build()).put(entity.getTimestamp().getTime(), entity);
}

@Override
Expand All @@ -75,11 +78,11 @@ public synchronized List<MetricEntity> queryByAppAndResourceBetween(String app,
if (StringUtil.isBlank(app)) {
return results;
}
Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
if (resourceMap == null) {
return results;
}
LinkedHashMap<Long, MetricEntity> metricsMap = resourceMap.get(resource);
ConcurrentLinkedHashMap<Long, MetricEntity> metricsMap = resourceMap.get(resource);
if (metricsMap == null) {
return results;
}
Expand All @@ -98,14 +101,14 @@ public List<String> listResourcesOfApp(String app) {
return results;
}
// resource -> timestamp -> metric
Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
if (resourceMap == null) {
return results;
}
final long minTimeMs = System.currentTimeMillis() - 1000 * 60;
Map<String, MetricEntity> resourceCount = new HashMap<>(32);
Map<String, MetricEntity> resourceCount = new ConcurrentHashMap<>(32);

for (Entry<String, LinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) {
if (metrics.getKey() < minTimeMs) {
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.alibaba.csp.sentinel.dashboard.repository.metric;

import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.util.CollectionUtils;

import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

import static org.junit.Assert.*;

/**
* InMemoryMetricsRepository Test
*
* @author Nick Tan
*/
public class InMemoryMetricsRepositoryTest {

private static final String DEFAULT_APP = "default";
private static final String DEFAULT_EXPIRE_APP = "default_expire_app";
private static final String DEFAULT_RESOURCE = "test";
private static final long EXPIRE_TIME = 1000 * 60 * 5L;
private InMemoryMetricsRepository inMemoryMetricsRepository;

private static final int AVAILABLE_CPU_PROCESSORS = Runtime.getRuntime().availableProcessors();

private ExecutorService executorService = Executors.newFixedThreadPool(AVAILABLE_CPU_PROCESSORS);

@Before
public void setUp() throws Exception {

inMemoryMetricsRepository = new InMemoryMetricsRepository();
}

@Test
public void save() throws InterruptedException {

for (int i = 0; i < 1000000; i++) {

MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(System.currentTimeMillis()));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);

}

}

@Test
public void testExpireMetric() throws InterruptedException {

long now = System.currentTimeMillis();
MetricEntity expireEntry = new MetricEntity();
expireEntry.setApp(DEFAULT_EXPIRE_APP);
expireEntry.setResource(DEFAULT_RESOURCE);
expireEntry.setTimestamp(new Date(now - EXPIRE_TIME - 10L));
expireEntry.setPassQps(1L);
expireEntry.setExceptionQps(1L);
expireEntry.setBlockQps(0L);
expireEntry.setSuccessQps(1L);
inMemoryMetricsRepository.save(expireEntry);

MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_EXPIRE_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(now));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);

List<MetricEntity> list = inMemoryMetricsRepository.queryByAppAndResourceBetween(
DEFAULT_EXPIRE_APP, DEFAULT_RESOURCE, now - 2 * EXPIRE_TIME, now + EXPIRE_TIME);

Assert.assertEquals(false, CollectionUtils.isEmpty(list));

assertTrue(list.size() == 1);

}

@Test
public void listResourcesOfApp() throws InterruptedException {
// prepare basic test data
save();
System.out.println(System.currentTimeMillis() + "[basic test data ready]");

List<CompletableFuture> futures = Lists.newArrayList();

// concurrent query resources of app
final CyclicBarrier cyclicBarrier = new CyclicBarrier(AVAILABLE_CPU_PROCESSORS);
for (int j = 0; j < 10000; j++) {

futures.add(
CompletableFuture.runAsync(() -> {
try {
cyclicBarrier.await();
inMemoryMetricsRepository.listResourcesOfApp(DEFAULT_APP);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, executorService
));
}

// batch add metric entity
for (int i = 0; i < 10000; i++) {

MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(System.currentTimeMillis() - EXPIRE_TIME - 1000L));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);

}

CompletableFuture all = CompletableFuture.allOf(futures.toArray((new CompletableFuture[futures.size()])));
try {
all.join();
} catch (ConcurrentModificationException e) {
e.printStackTrace();
assertFalse("concurrent error", e instanceof ConcurrentModificationException);
}
}

}