Skip to content

Commit

Permalink
Add extended interface for metric extension hook to support distingui…
Browse files Browse the repository at this point in the history
…shing traffic type (#1665)

- Add EntryType args to all hook methods
  • Loading branch information
Billzaifei authored Aug 18, 2020
1 parent dae4621 commit 8643dac
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.alibaba.csp.sentinel.metric.extension;

import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.slots.block.BlockException;

/**
* Advanced {@link MetricExtension} extending input parameters of each metric
* collection method with the name of {@link EntryType}.
*
* @author bill_yip
* @since 1.8.0
*/
public interface AdvancedMetricExtension extends MetricExtension {
/**
* Add current pass count of the resource name.
*
* @param n count to add
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param args additional arguments of the resource, eg. if the resource is
* a method name, the args will be the parameters of the
* method.
*/
void addPass(String resource, String entryType, int n, Object... args);

/**
* Add current block count of the resource name.
*
* @param n count to add
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as
* consumer.
* @param origin the original invoker.
* @param blockException block exception related.
* @param args additional arguments of the resource, eg. if the
* resource is a method name, the args will be the
* parameters of the method.
*/
void addBlock(String resource, String entryType, int n, String origin, BlockException blockException,
Object... args);

/**
* Add current completed count of the resource name.
*
* @param n count to add
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param args additional arguments of the resource, eg. if the resource is
* a method name, the args will be the parameters of the
* method.
*/
void addSuccess(String resource, String entryType, int n, Object... args);

/**
* Add current exception count of the resource name.
*
* @param n count to add
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param throwable exception related.
*/
void addException(String resource, String entryType, int n, Throwable throwable);

/**
* Add response time of the resource name.
*
* @param rt response time in millisecond
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param args additional arguments of the resource, eg. if the resource is
* a method name, the args will be the parameters of the
* method.
*/
void addRt(String resource, String entryTypeTag, long rt, Object... args);

/**
* Increase current thread count of the resource name.
*
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param args additional arguments of the resource, eg. if the resource is
* a method name, the args will be the parameters of the
* method.
*/
void increaseThreadNum(String resource, String entryType, Object... args);

/**
* Decrease current thread count of the resource name.
*
* @param resource resource name
* @param entryType {@link EntryType} name, [IN] as provider, [OUT] as consumer.
* @param args additional arguments of the resource, eg. if the resource is
* a method name, the args will be the parameters of the
* method.
*/
void decreaseThreadNum(String resource, String entryType, Object... args);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.alibaba.csp.sentinel.metric.extension.callback;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider;
import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension;
import com.alibaba.csp.sentinel.metric.extension.MetricExtension;
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider;
import com.alibaba.csp.sentinel.node.DefaultNode;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotEntryCallback;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
Expand All @@ -15,20 +16,32 @@
* @since 1.6.1
*/
public class MetricEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {
@Override
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode param,
int count, Object... args) throws Exception {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
m.increaseThreadNum(resourceWrapper.getName(), args);
m.addPass(resourceWrapper.getName(), count, args);
}
}
@Override
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode param, int count, Object... args)
throws Exception {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
if (m instanceof AdvancedMetricExtension) {
((AdvancedMetricExtension) m).increaseThreadNum(resourceWrapper.getName(),
resourceWrapper.getEntryType().name(), args);
((AdvancedMetricExtension) m).addPass(resourceWrapper.getName(), resourceWrapper.getEntryType().name(),
count, args);
} else {
m.increaseThreadNum(resourceWrapper.getName(), args);
m.addPass(resourceWrapper.getName(), count, args);
}
}
}

@Override
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper,
DefaultNode param, int count, Object... args) {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args);
}
}
@Override
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param,
int count, Object... args) {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
if (m instanceof AdvancedMetricExtension) {
((AdvancedMetricExtension) m).addBlock(resourceWrapper.getName(), resourceWrapper.getEntryType().name(),
count, context.getOrigin(), ex, args);
} else {
m.addBlock(resourceWrapper.getName(), count, context.getOrigin(), ex, args);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.alibaba.csp.sentinel.metric.extension.callback;

import com.alibaba.csp.sentinel.context.Context;
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider;
import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension;
import com.alibaba.csp.sentinel.metric.extension.MetricExtension;
import com.alibaba.csp.sentinel.metric.extension.MetricExtensionProvider;
import com.alibaba.csp.sentinel.slotchain.ProcessorSlotExitCallback;
import com.alibaba.csp.sentinel.slotchain.ResourceWrapper;
import com.alibaba.csp.sentinel.util.TimeUtil;
Expand All @@ -15,22 +16,31 @@
*/
public class MetricExitCallback implements ProcessorSlotExitCallback {

@Override
public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
if (context.getCurEntry().getBlockError() != null) {
continue;
}
String resource = resourceWrapper.getName();
long realRt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTimestamp();
m.addRt(resource, realRt, args);
m.addSuccess(resource, count, args);
m.decreaseThreadNum(resource, args);

Throwable ex = context.getCurEntry().getError();
if (ex != null) {
m.addException(resource, count, ex);
}
}
}
@Override
public void onExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
for (MetricExtension m : MetricExtensionProvider.getMetricExtensions()) {
if (context.getCurEntry().getBlockError() != null) {
continue;
}
String resource = resourceWrapper.getName();
String entryType = resourceWrapper.getEntryType().name();
Throwable ex = context.getCurEntry().getError();
long realRt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTimestamp();
if (m instanceof AdvancedMetricExtension) {
((AdvancedMetricExtension) m).addRt(resource, entryType, realRt, args);
((AdvancedMetricExtension) m).addSuccess(resource, entryType, count, args);
((AdvancedMetricExtension) m).decreaseThreadNum(resource, entryType, args);
if (null != ex) {
((AdvancedMetricExtension) m).addException(resource, entryType, count, ex);
}
} else {
m.addRt(resource, realRt, args);
m.addSuccess(resource, count, args);
m.decreaseThreadNum(resource, args);
if (null != ex) {
m.addException(resource, count, ex);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.alibaba.csp.sentinel.metric.extension.callback;

import com.alibaba.csp.sentinel.metric.extension.AdvancedMetricExtension;
import com.alibaba.csp.sentinel.slots.block.BlockException;

class FakeAdvancedMetricExtension implements AdvancedMetricExtension {
long pass = 0;
long block = 0;
long success = 0;
long exception = 0;
long rt = 0;
long thread = 0;

@Override
public void addPass(String resource, int n, Object... args) {
// Do nothing because of using the enhanced one
}

@Override
public void addBlock(String resource, int n, String origin, BlockException blockException, Object... args) {
// Do nothing because of using the enhanced one
}

@Override
public void addSuccess(String resource, int n, Object... args) {
// Do nothing because of using the enhanced one
}

@Override
public void addException(String resource, int n, Throwable throwable) {
// Do nothing because of using the enhanced one
}

@Override
public void addRt(String resource, long rt, Object... args) {
// Do nothing because of using the enhanced one
}

@Override
public void increaseThreadNum(String resource, Object... args) {
// Do nothing because of using the enhanced one
}

@Override
public void decreaseThreadNum(String resource, Object... args) {
// Do nothing because of using the enhanced one
}

@Override
public void addPass(String resource, String entryType, int n, Object... args) {
pass += n;
}

@Override
public void addBlock(String resource, String entryType, int n, String origin, BlockException blockException,
Object... args) {
block += n;
}

@Override
public void addSuccess(String resource, String entryType, int n, Object... args) {
success += n;
}

@Override
public void addException(String resource, String entryType, int n, Throwable throwable) {
exception += n;
}

@Override
public void addRt(String resource, String entryTypeTag, long rt, Object... args) {
this.rt += rt;
}

@Override
public void increaseThreadNum(String resource, String entryType, Object... args) {
thread ++;
}

@Override
public void decreaseThreadNum(String resource, String entryType, Object... args) {
thread --;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,30 @@ public class MetricEntryCallbackTest {
@Test
public void onPass() throws Exception {
FakeMetricExtension extension = new FakeMetricExtension();
FakeAdvancedMetricExtension advancedExtension = new FakeAdvancedMetricExtension();
MetricExtensionProvider.addMetricExtension(extension);
MetricExtensionProvider.addMetricExtension(advancedExtension);

MetricEntryCallback entryCallback = new MetricEntryCallback();
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT);
int count = 2;
Object[] args = {"args1", "args2"};
entryCallback.onPass(null, resourceWrapper, null, count, args);
// assert extension
Assert.assertEquals(extension.pass, count);
Assert.assertEquals(extension.thread, 1);

// assert advancedExtension
Assert.assertEquals(advancedExtension.pass, count);
Assert.assertEquals(advancedExtension.thread, 1);
}

@Test
public void onBlocked() throws Exception {
FakeMetricExtension extension = new FakeMetricExtension();
FakeAdvancedMetricExtension advancedExtension = new FakeAdvancedMetricExtension();
MetricExtensionProvider.addMetricExtension(extension);
MetricExtensionProvider.addMetricExtension(advancedExtension);

MetricEntryCallback entryCallback = new MetricEntryCallback();
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT);
Expand All @@ -43,6 +52,9 @@ public void onBlocked() throws Exception {
int count = 2;
Object[] args = {"args1", "args2"};
entryCallback.onBlocked(new FlowException("xx"), context, resourceWrapper, null, count, args);
// assert extension
Assert.assertEquals(extension.block, count);
// assert advancedExtension
Assert.assertEquals(advancedExtension.block, count);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,37 @@ public void onExit() {
Assert.assertEquals(extension.success, 6 + count);
Assert.assertEquals(extension.thread, 10 - 1);
}

/**
* @author bill_yip
*/
@Test
public void advancedExtensionOnExit() {
FakeAdvancedMetricExtension extension = new FakeAdvancedMetricExtension();
MetricExtensionProvider.addMetricExtension(extension);

MetricExitCallback exitCallback = new MetricExitCallback();
StringResourceWrapper resourceWrapper = new StringResourceWrapper("resource", EntryType.OUT);
int count = 2;
Object[] args = {"args1", "args2"};
long prevRt = 20;
extension.rt = prevRt;
extension.success = 6;
extension.thread = 10;
Context context = mock(Context.class);
Entry entry = mock(Entry.class);

// Mock current time
long curMillis = System.currentTimeMillis();
setCurrentMillis(curMillis);

int deltaMs = 100;
when(entry.getError()).thenReturn(null);
when(entry.getCreateTimestamp()).thenReturn(curMillis - deltaMs);
when(context.getCurEntry()).thenReturn(entry);
exitCallback.onExit(context, resourceWrapper, count, args);
Assert.assertEquals(prevRt + deltaMs, extension.rt);
Assert.assertEquals(extension.success, 6 + count);
Assert.assertEquals(extension.thread, 10 - 1);
}
}

0 comments on commit 8643dac

Please sign in to comment.