Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command metrics modeled as a stream #981

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hystrix-contrib/hystrix-servo-metrics-publisher/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
dependencies {
compile project(':hystrix-core')
compile 'com.netflix.servo:servo-core:0.7.5'
testCompile 'junit:junit-dep:4.10'
testCompile 'org.mockito:mockito-all:1.9.5'
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected final HystrixRollingNumberEvent getRollingNumberTypeFromEventType(Hyst
return HystrixRollingNumberEvent.from(eventType);
}

protected Monitor<?> getCumulativeMonitor(final String name, final HystrixEventType event) {
protected Monitor<Number> getCumulativeMonitor(final String name, final HystrixEventType event) {
return new CounterMetric(MonitorConfig.builder(name).withTag(getServoTypeTag()).withTag(getServoInstanceTag()).build()) {
@Override
public Long getValue() {
Expand All @@ -165,7 +165,7 @@ public Long getValue() {
};
}

protected Monitor<?> getRollingMonitor(final String name, final HystrixEventType event) {
protected Monitor<Number> getRollingMonitor(final String name, final HystrixEventType event) {
return new GaugeMetric(MonitorConfig.builder(name).withTag(DataSourceLevel.DEBUG).withTag(getServoTypeTag()).withTag(getServoInstanceTag()).build()) {
@Override
public Long getValue() {
Expand All @@ -174,7 +174,7 @@ public Long getValue() {
};
}

protected Monitor<?> getExecutionLatencyMeanMonitor(final String name) {
protected Monitor<Number> getExecutionLatencyMeanMonitor(final String name) {
return new GaugeMetric(MonitorConfig.builder(name).build()) {
@Override
public Number getValue() {
Expand All @@ -183,7 +183,7 @@ public Number getValue() {
};
}

protected Monitor<?> getExecutionLatencyPercentileMonitor(final String name, final double percentile) {
protected Monitor<Number> getExecutionLatencyPercentileMonitor(final String name, final double percentile) {
return new GaugeMetric(MonitorConfig.builder(name).build()) {
@Override
public Number getValue() {
Expand All @@ -192,7 +192,7 @@ public Number getValue() {
};
}

protected Monitor<?> getTotalLatencyMeanMonitor(final String name) {
protected Monitor<Number> getTotalLatencyMeanMonitor(final String name) {
return new GaugeMetric(MonitorConfig.builder(name).build()) {
@Override
public Number getValue() {
Expand All @@ -201,7 +201,7 @@ public Number getValue() {
};
}

protected Monitor<?> getTotalLatencyPercentileMonitor(final String name, final double percentile) {
protected Monitor<Number> getTotalLatencyPercentileMonitor(final String name, final double percentile) {
return new GaugeMetric(MonitorConfig.builder(name).build()) {
@Override
public Number getValue() {
Expand All @@ -210,7 +210,7 @@ public Number getValue() {
};
}

protected Monitor<?> getCurrentValueMonitor(final String name, final Func0<Number> metricToEvaluate) {
protected Monitor<Number> getCurrentValueMonitor(final String name, final Func0<Number> metricToEvaluate) {
return new GaugeMetric(MonitorConfig.builder(name).build()) {
@Override
public Number getValue() {
Expand All @@ -219,7 +219,7 @@ public Number getValue() {
};
}

protected Monitor<?> getCurrentValueMonitor(final String name, final Func0<Number> metricToEvaluate, final Tag tag) {
protected Monitor<Number> getCurrentValueMonitor(final String name, final Func0<Number> metricToEvaluate, final Tag tag) {
return new GaugeMetric(MonitorConfig.builder(name).withTag(tag).build()) {
@Override
public Number getValue() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.hystrix.contrib.servopublisher;

import com.netflix.hystrix.HystrixCircuitBreaker;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesCommandDefault;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class HystrixServoMetricsPublisherCommandTest {

private static HystrixCommandKey key = HystrixCommandKey.Factory.asKey("COMMAND");
private static HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("GROUP");

private static HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key);
private static HystrixCommandProperties.Setter propertiesSetter = HystrixCommandProperties.Setter()
.withCircuitBreakerEnabled(true)
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
.withExecutionTimeoutInMilliseconds(25)
.withMetricsRollingStatisticalWindowInMilliseconds(1000)
.withMetricsRollingPercentileWindowInMilliseconds(1000)
.withMetricsRollingPercentileWindowBuckets(10);
private static HystrixCommandProperties properties = new HystrixPropertiesCommandDefault(key, propertiesSetter);
private static HystrixCommandMetrics metrics = HystrixCommandMetrics.getInstance(key, groupKey, properties);

@Test
public void testCumulativeCounters() throws Exception {
//execute 10 commands/sec (8 SUCCESS, 1 FAILURE, 1 TIMEOUT).
//after 5 seconds, cumulative counters should have observed 50 commands (40 SUCCESS, 5 FAILURE, 5 TIMEOUT)

HystrixServoMetricsPublisherCommand servoPublisher = new HystrixServoMetricsPublisherCommand(key, groupKey, metrics, circuitBreaker, properties);
servoPublisher.initialize();

final int NUM_SECONDS = 5;

for (int i = 0; i < NUM_SECONDS; i++) {
long startTime = System.currentTimeMillis();
new SuccessCommand().execute();
new SuccessCommand().execute();
new SuccessCommand().execute();
Thread.sleep(50);
new TimeoutCommand().execute();
new SuccessCommand().execute();
new FailureCommand().execute();
new SuccessCommand().execute();
new SuccessCommand().execute();
new SuccessCommand().execute();
Thread.sleep(100);
new SuccessCommand().execute();
long endTime = System.currentTimeMillis();
Thread.sleep(1000 - (endTime - startTime)); //sleep the remainder of the 1000ms allotted
}

Thread.sleep(100);

assertEquals(40L, servoPublisher.getCumulativeMonitor("success", HystrixEventType.SUCCESS).getValue());
assertEquals(5L, servoPublisher.getCumulativeMonitor("timeout", HystrixEventType.TIMEOUT).getValue());
assertEquals(5L, servoPublisher.getCumulativeMonitor("failure", HystrixEventType.FAILURE).getValue());
assertEquals(10L, servoPublisher.getCumulativeMonitor("fallback_success", HystrixEventType.FALLBACK_SUCCESS).getValue());
}

@Test
public void testRollingCounters() throws Exception {
//execute 10 commands, then sleep for 2000ms to let these age out
//execute 10 commands again, these should show up in rolling count

HystrixServoMetricsPublisherCommand servoPublisher = new HystrixServoMetricsPublisherCommand(key, groupKey, metrics, circuitBreaker, properties);
servoPublisher.initialize();

new SuccessCommand().execute();
new SuccessCommand().execute();
new SuccessCommand().execute();
new TimeoutCommand().execute();
new SuccessCommand().execute();
new FailureCommand().execute();
new SuccessCommand().execute();
new SuccessCommand().execute();
new SuccessCommand().execute();
new SuccessCommand().execute();

Thread.sleep(2000);

new SuccessCommand().execute();
new SuccessCommand().execute();
new SuccessCommand().execute();
new TimeoutCommand().execute();
new SuccessCommand().execute();
new FailureCommand().execute();
new TimeoutCommand().execute();
new TimeoutCommand().execute();
new TimeoutCommand().execute();
new TimeoutCommand().execute();

Thread.sleep(100); //time for 1 bucket roll

assertEquals(4L, servoPublisher.getRollingMonitor("success", HystrixEventType.SUCCESS).getValue());
assertEquals(5L, servoPublisher.getRollingMonitor("timeout", HystrixEventType.TIMEOUT).getValue());
assertEquals(1L, servoPublisher.getRollingMonitor("failure", HystrixEventType.FAILURE).getValue());
assertEquals(6L, servoPublisher.getRollingMonitor("falback_success", HystrixEventType.FALLBACK_SUCCESS).getValue());
}

@Test
public void testRollingLatencies() throws Exception {
//execute 10 commands, then sleep for 2000ms to let these age out
//execute 10 commands again, these should show up in rolling count

HystrixServoMetricsPublisherCommand servoPublisher = new HystrixServoMetricsPublisherCommand(key, groupKey, metrics, circuitBreaker, properties);
servoPublisher.initialize();

new SuccessCommand(5).execute();
new SuccessCommand(5).execute();
new SuccessCommand(5).execute();
new TimeoutCommand().execute();
new SuccessCommand(5).execute();
new FailureCommand(5).execute();
new SuccessCommand(5).execute();
new SuccessCommand(5).execute();
new SuccessCommand(5).execute();
new SuccessCommand(5).execute();

Thread.sleep(2000);

List<Observable<Integer>> os = new ArrayList<Observable<Integer>>();
TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>();

os.add(new SuccessCommand(10).observe());
os.add(new SuccessCommand(20).observe());
os.add(new SuccessCommand(10).observe());
os.add(new TimeoutCommand().observe());
os.add(new SuccessCommand(15).observe());
os.add(new FailureCommand(10).observe());
os.add(new TimeoutCommand().observe());
os.add(new TimeoutCommand().observe());
os.add(new TimeoutCommand().observe());
os.add(new TimeoutCommand().observe());

Observable.merge(os).subscribe(testSubscriber);

testSubscriber.awaitTerminalEvent(300, TimeUnit.MILLISECONDS);
testSubscriber.assertCompleted();
testSubscriber.assertNoErrors();

Thread.sleep(100); //1 bucket roll

int meanExecutionLatency = servoPublisher.getExecutionLatencyMeanMonitor("meanExecutionLatency").getValue().intValue();
int p50ExecutionLatency = servoPublisher.getExecutionLatencyPercentileMonitor("p50ExecutionLatency", 50).getValue().intValue();
int p99ExecutionLatency = servoPublisher.getExecutionLatencyPercentileMonitor("p99ExecutionLatency", 99).getValue().intValue();
System.out.println("Execution: Mean : " + meanExecutionLatency + ", p50 : " + p50ExecutionLatency + ", p99 : " + p99ExecutionLatency);

int meanTotalLatency = servoPublisher.getTotalLatencyMeanMonitor("meanTotalLatency").getValue().intValue();
int p50TotalLatency = servoPublisher.getTotalLatencyPercentileMonitor("p50TotalLatency", 50).getValue().intValue();
int p99TotalLatency = servoPublisher.getTotalLatencyPercentileMonitor("p99TotalLatency", 99).getValue().intValue();
System.out.println("Total (User-Thread): Mean : " + meanTotalLatency +", p50 : " + p50TotalLatency + ", p99 : " + p99TotalLatency);

assertTrue(meanExecutionLatency > 10);
assertTrue(p50ExecutionLatency < p99ExecutionLatency);
assertTrue(meanExecutionLatency <= meanTotalLatency);
assertTrue(p50ExecutionLatency <= p50TotalLatency);
assertTrue(p99ExecutionLatency <= p99TotalLatency);
}

static class SampleCommand extends HystrixCommand<Integer> {
boolean shouldFail;
int latencyToAdd;

protected SampleCommand(boolean shouldFail, int latencyToAdd) {
super(Setter.withGroupKey(groupKey).andCommandKey(key).andCommandPropertiesDefaults(propertiesSetter));
this.shouldFail = shouldFail;
this.latencyToAdd = latencyToAdd;
}

@Override
protected Integer run() throws Exception {
if (shouldFail) {
throw new RuntimeException("command failure");
} else {
Thread.sleep(latencyToAdd);
return 1;
}
}

@Override
protected Integer getFallback() {
return 99;
}
}

static class SuccessCommand extends SampleCommand {
protected SuccessCommand() {
super(false, 0);
}

public SuccessCommand(int latencyToAdd) {
super(false, latencyToAdd);
}
}

static class FailureCommand extends SampleCommand {
protected FailureCommand() {
super(true, 0);
}

public FailureCommand(int latencyToAdd) {
super(true, latencyToAdd);
}
}

static class TimeoutCommand extends SampleCommand {
protected TimeoutCommand() {
super(false, 100); //exceeds 25ms timeout
}
}
}
2 changes: 2 additions & 0 deletions hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
compile 'io.reactivex:rxjava:1.0.14'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'org.hdrhistogram:HdrHistogram:2.1.7'
testCompile 'junit:junit-dep:4.10'
}

Expand Down Expand Up @@ -43,4 +44,5 @@ jmh {
warmup = '1s'
warmupBatchSize = 1
warmupIterations = 5
include = '.*Multi.*'
}
Loading