From 35a96d7e1436f18079d0ee7d90b3e5096bcc3eaa Mon Sep 17 00:00:00 2001 From: Ooo0oO0o0oO <907709476@qq.com> Date: Fri, 7 Dec 2018 17:40:45 +0800 Subject: [PATCH 1/2] [#289]fix grpc-adapter bug --- .../grpc/SentinelGrpcClientInterceptor.java | 61 +++++----- .../grpc/SentinelGrpcServerInterceptor.java | 85 ++++++++------ .../adapter/grpc/FooServiceClient.java | 3 + .../sentinel/adapter/grpc/FooServiceImpl.java | 10 +- .../sentinel/adapter/grpc/GrpcTestServer.java | 1 + .../SentinelGrpcClientInterceptorTest.java | 66 +++++++++-- .../SentinelGrpcServerInterceptorTest.java | 111 ++++++++++++++++-- 7 files changed, 254 insertions(+), 83 deletions(-) diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptor.java b/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptor.java index 4e1686279c..a265c95d5b 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptor.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptor.java @@ -15,28 +15,20 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import javax.annotation.Nullable; - -import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.AsyncEntry; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.Tracer; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.slots.block.BlockException; - -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall; +import io.grpc.*; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.Status; + +import javax.annotation.Nullable; /** *

gRPC client interceptor for Sentinel. Currently it only works with unary methods.

- * + *

* Example code: *

  * public class ServiceClient {
@@ -52,7 +44,7 @@
  *
  * }
  * 
- * + *

* For server interceptor, see {@link SentinelGrpcServerInterceptor}. * * @author Eric Zhao @@ -60,18 +52,20 @@ public class SentinelGrpcClientInterceptor implements ClientInterceptor { private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription( - "Flow control limit exceeded (client side)"); + "Flow control limit exceeded (client side)"); @Override public ClientCall interceptCall(MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { String resourceName = methodDescriptor.getFullMethodName(); - Entry entry = null; + AsyncEntry asyncEntry = null; try { - entry = SphU.entry(resourceName, EntryType.OUT); + asyncEntry = SphU.asyncEntry(resourceName, EntryType.OUT); + + final AsyncEntry tempEntry = asyncEntry; // Allow access, forward the call. return new ForwardingClientCall.SimpleForwardingClientCall( - channel.newCall(methodDescriptor, callOptions)) { + channel.newCall(methodDescriptor, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { super.start(new SimpleForwardingClientCallListener(responseListener) { @@ -85,18 +79,14 @@ public void onClose(Status status, Metadata trailers) { super.onClose(status, trailers); // Record the exception metrics. if (!status.isOk()) { - recordException(status.asRuntimeException()); + recordException(status.asRuntimeException(), tempEntry); } + tempEntry.exit(); } }, headers); } - @Override - public void cancel(@Nullable String message, @Nullable Throwable cause) { - super.cancel(message, cause); - // Record the exception metrics. - recordException(cause); - } + }; } catch (BlockException e) { // Flow control threshold exceeded, block the call. @@ -126,14 +116,25 @@ public void sendMessage(ReqT message) { } }; - } finally { - if (entry != null) { - entry.exit(); + + } catch (RuntimeException e) { + //catch the RuntimeException newCall throws, + // entry is guaranteed to exit + if (asyncEntry != null) { + asyncEntry.exit(); } + throw e; + + } } - private void recordException(Throwable t) { - Tracer.trace(t); + private void recordException(final Throwable t, AsyncEntry asyncEntry) { + ContextUtil.runOnContext(asyncEntry.getAsyncContext(), new Runnable() { + @Override + public void run() { + Tracer.trace(t); + } + }); } } diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptor.java b/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptor.java index 1e0d909019..a9b118ba46 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptor.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptor.java @@ -15,25 +15,17 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.AsyncEntry; import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.Tracer; import com.alibaba.csp.sentinel.context.ContextUtil; import com.alibaba.csp.sentinel.slots.block.BlockException; - -import io.grpc.ForwardingServerCall; -import io.grpc.ForwardingServerCallListener; -import io.grpc.Metadata; -import io.grpc.ServerCall; -import io.grpc.ServerCall.Listener; -import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; -import io.grpc.Status; +import io.grpc.*; /** *

gRPC server interceptor for Sentinel. Currently it only works with unary methods.

- * + *

* Example code: *

  * Server server = ServerBuilder.forPort(port)
@@ -41,7 +33,7 @@
  *      .intercept(new SentinelGrpcServerInterceptor()) // Add the server interceptor.
  *      .build();
  * 
- * + *

* For client interceptor, see {@link SentinelGrpcClientInterceptor}. * * @author Eric Zhao @@ -49,42 +41,65 @@ public class SentinelGrpcServerInterceptor implements ServerInterceptor { private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription( - "Flow control limit exceeded (server side)"); + "Flow control limit exceeded (server side)"); @Override - public Listener interceptCall(ServerCall serverCall, Metadata metadata, - ServerCallHandler serverCallHandler) { - String resourceName = serverCall.getMethodDescriptor().getFullMethodName(); + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + String resourceName = call.getMethodDescriptor().getFullMethodName(); // Remote address: serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); - Entry entry = null; + AsyncEntry entry = null; try { ContextUtil.enter(resourceName); - entry = SphU.entry(resourceName, EntryType.IN); + entry = SphU.asyncEntry(resourceName, EntryType.IN); // Allow access, forward the call. + final AsyncEntry tempEntry = entry; return new ForwardingServerCallListener.SimpleForwardingServerCallListener( - serverCallHandler.startCall( - new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { - @Override - public void close(Status status, Metadata trailers) { - super.close(status, trailers); - // Record the exception metrics. - if (!status.isOk()) { - recordException(status.asRuntimeException()); - } - } - }, metadata)) {}; + next.startCall( + new ForwardingServerCall.SimpleForwardingServerCall(call) { + @Override + public void close(Status status, Metadata trailers) { + super.close(status, trailers); + // Record the exception metrics. + if (!status.isOk()) { + recordException(status.asException(), tempEntry); + } + //entry exit when the call be closed + tempEntry.exit(); + } + }, headers)) { + + /** + * if call was canceled, onCancel will be called. and the close will not be called + * so the server is encouraged to abort processing to save resources by onCancel + * @see ServerCall.Listener#onCancel() + */ + @Override + public void onCancel() { + super.onCancel(); + // request has be canceled, entry should exit + tempEntry.exit(); + } + }; } catch (BlockException e) { - serverCall.close(FLOW_CONTROL_BLOCK, new Metadata()); - return new ServerCall.Listener() {}; - } finally { + call.close(FLOW_CONTROL_BLOCK, new Metadata()); + return new ServerCall.Listener() { + }; + } catch (RuntimeException e) { + //catch the RuntimeException startCall throws, + // entry is guaranteed to exit if (entry != null) { entry.exit(); } - ContextUtil.exit(); + throw e; } } - private void recordException(Throwable t) { - Tracer.trace(t); + private void recordException(final Throwable t, AsyncEntry tempEntry) { + ContextUtil.runOnContext(tempEntry.getAsyncContext(), new Runnable() { + @Override + public void run() { + Tracer.trace(t); + } + }); } } diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java index dab36678c9..622a7b6a2b 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java @@ -57,6 +57,9 @@ FooResponse sayHello(FooRequest request) { return blockingStub.sayHello(request); } + + + FooResponse anotherHello(FooRequest request) { if (request == null) { throw new IllegalArgumentException("Request cannot be null"); diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java index adda5102da..5e2fe489fc 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java @@ -18,7 +18,6 @@ import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooServiceGrpc; - import io.grpc.stub.StreamObserver; /** @@ -28,7 +27,12 @@ class FooServiceImpl extends FooServiceGrpc.FooServiceImplBase { @Override public void sayHello(FooRequest request, StreamObserver responseObserver) { + if (request.getId() == -1) { + responseObserver.onError(new IllegalAccessException("The id is error")); + return; + } String message = String.format("Hello %s! Your ID is %d.", request.getName(), request.getId()); + FooResponse response = FooResponse.newBuilder().setMessage(message).build(); responseObserver.onNext(response); responseObserver.onCompleted(); @@ -36,6 +40,10 @@ public void sayHello(FooRequest request, StreamObserver responseObs @Override public void anotherHello(FooRequest request, StreamObserver responseObserver) { + if (request.getId() == -1) { + responseObserver.onError(new IllegalAccessException("The id is error")); + return; + } String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); FooResponse response = FooResponse.newBuilder().setMessage(message).build(); responseObserver.onNext(response); diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/GrpcTestServer.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/GrpcTestServer.java index f46c5718a1..86fe011518 100644 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/GrpcTestServer.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/GrpcTestServer.java @@ -19,6 +19,7 @@ import io.grpc.ServerBuilder; import java.io.IOException; +import java.util.concurrent.Executors; class GrpcTestServer { diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java index e61dd7605c..c3c85e945a 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java @@ -15,21 +15,23 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import java.util.Collections; - import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; - import io.grpc.StatusRuntimeException; import org.junit.After; import org.junit.Test; +import java.io.IOException; +import java.util.Collections; + import static org.junit.Assert.*; /** @@ -42,17 +44,33 @@ public class SentinelGrpcClientInterceptorTest { private final String resourceName = "com.alibaba.sentinel.examples.FooService/sayHello"; private final int threshold = 2; private final GrpcTestServer server = new GrpcTestServer(); + private final int timeWindow = 10; private void configureFlowRule(int count) { FlowRule rule = new FlowRule() - .setCount(count) - .setGrade(RuleConstant.FLOW_GRADE_QPS) - .setResource(resourceName) - .setLimitApp("default") - .as(FlowRule.class); + .setCount(count) + .setGrade(RuleConstant.FLOW_GRADE_QPS) + .setResource(resourceName) + .setLimitApp("default") + .as(FlowRule.class); FlowRuleManager.loadRules(Collections.singletonList(rule)); } + + private void configureDegradeRule(int count) { + DegradeRule rule = new DegradeRule() + .setCount(count) + .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) + .setResource(resourceName) + .setLimitApp("default") + .as(DegradeRule.class) + .setTimeWindow(timeWindow); + DegradeRuleManager.loadRules(Collections.singletonList(rule)); + + + } + + @Test public void testGrpcClientInterceptor() throws Exception { final int port = 19328; @@ -88,6 +106,38 @@ private boolean sendRequest(FooServiceClient client) { } } + + @Test + public void testGrpcClientInterceptor_degrade() throws IOException { + final int port = 19328; + + configureDegradeRule(1); + server.start(port, false); + + FooServiceClient client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor()); + + assertFalse(sendErrorRequest(client)); + ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.OUT); + assertNotNull(clusterNode); + assertEquals(1, clusterNode.exceptionQps()); + // The second request will be blocked. + assertFalse(sendRequest(client)); + assertEquals(1, clusterNode.blockRequest()); + + server.stop(); + } + + private boolean sendErrorRequest(FooServiceClient client) { + try { + FooResponse response = client.sayHello(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); + System.out.println("Response: " + response); + return true; + } catch (StatusRuntimeException ex) { + System.out.println("Blocked, cause: " + ex.getMessage()); + return false; + } + } + @After public void cleanUp() { FlowRuleManager.loadRules(null); diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java index 004a256357..df9b9f7934 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java @@ -15,21 +15,24 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; -import java.util.Collections; - import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; - import io.grpc.StatusRuntimeException; import org.junit.After; import org.junit.Test; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; + import static org.junit.Assert.*; /** @@ -42,19 +45,32 @@ public class SentinelGrpcServerInterceptorTest { private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHello"; private final int threshold = 4; private final GrpcTestServer server = new GrpcTestServer(); - + private final int timeWindow = 10; private FooServiceClient client; private void configureFlowRule(int count) { FlowRule rule = new FlowRule() - .setCount(count) - .setGrade(RuleConstant.FLOW_GRADE_QPS) - .setResource(resourceName) - .setLimitApp("default") - .as(FlowRule.class); + .setCount(count) + .setGrade(RuleConstant.FLOW_GRADE_QPS) + .setResource(resourceName) + .setLimitApp("default") + .as(FlowRule.class); FlowRuleManager.loadRules(Collections.singletonList(rule)); } + private void configureDegradeRule(int count) { + DegradeRule rule = new DegradeRule() + .setCount(count) + .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) + .setResource(resourceName) + .setLimitApp("default") + .as(DegradeRule.class) + .setTimeWindow(timeWindow); + DegradeRuleManager.loadRules(Collections.singletonList(rule)); + + + } + @Test public void testGrpcServerInterceptor() throws Exception { final int port = 19329; @@ -89,6 +105,83 @@ private boolean sendRequest() { } } + @Test + public void testGrpcServerInterceptor_degrade_success() throws Exception { + final int port = 19329; + client = new FooServiceClient("localhost", port); + + configureDegradeRule(threshold); + server.start(port, true); + assertFalse(sendErrorRequest()); + ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); + assertNotNull(clusterNode); + assertEquals(0, clusterNode.totalRequest() - clusterNode.totalException()); + assertTrue(sendRequest()); + + + } + + + @Test + public void testGrpcServerInterceptor_degrade_fail() throws IOException, InterruptedException { + final int port = 19329; + client = new FooServiceClient("localhost", port); + configureDegradeRule(threshold); + server.start(port, true); + // exception count = 1 + configureDegradeRule(1); + + + assertFalse(sendErrorRequest()); + ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); + + //this request will be blocked + assertFalse(sendRequest()); + assertEquals(1, clusterNode.totalException()); + + server.stop(); + } + + + @Test + public void testGrpcServerInterceptor_degrade_fail_threads() throws IOException, InterruptedException { + final int port = 19329; + client = new FooServiceClient("localhost", port); + configureDegradeRule(threshold); + server.start(port, true); + // exception count = 1 + configureDegradeRule(20); + final CountDownLatch latch = new CountDownLatch(20); + + for (int i = 0; i < 20; i++) { + new Thread(new Runnable() { + @Override + public void run() { + assertFalse(sendErrorRequest()); + latch.countDown(); + } + }).start(); + } + latch.await(); + ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); + assertEquals(20, clusterNode.totalException()); + assertFalse(sendRequest()); + assertEquals(20, clusterNode.totalException()); + assertEquals(1, clusterNode.blockRequest()); + + } + + private boolean sendErrorRequest() { + try { + FooResponse response = client.anotherHello(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); + System.out.println("Response: " + response); + return true; + } catch (StatusRuntimeException ex) { + System.out.println("Blocked, cause: " + ex.getMessage()); + return false; + } + } + @After public void cleanUp() { From d0ac442a20b6be4df05f537b3ad3c2954a769882 Mon Sep 17 00:00:00 2001 From: Ooo0oO0o0oO <907709476@qq.com> Date: Fri, 7 Dec 2018 19:57:54 +0800 Subject: [PATCH 2/2] [#289]add junit test for pr --- .../adapter/grpc/FooServiceClient.java | 27 +++-- .../sentinel/adapter/grpc/FooServiceImpl.java | 24 +++- ...tinelGrpcClientInterceptorDegradeTest.java | 98 ++++++++++++++++ .../SentinelGrpcClientInterceptorTest.java | 66 ++--------- ...tinelGrpcServerInterceptorDegradeTest.java | 109 +++++++++++++++++ .../SentinelGrpcServerInterceptorTest.java | 111 ++---------------- .../src/test/proto/example.proto | 7 ++ 7 files changed, 271 insertions(+), 171 deletions(-) create mode 100644 sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorDegradeTest.java create mode 100644 sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorDegradeTest.java diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java index 622a7b6a2b..c72e732ac3 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceClient.java @@ -37,16 +37,16 @@ final class FooServiceClient { FooServiceClient(String host, int port) { this.channel = ManagedChannelBuilder.forAddress(host, port) - .usePlaintext() - .build(); + .usePlaintext() + .build(); this.blockingStub = FooServiceGrpc.newBlockingStub(this.channel); } FooServiceClient(String host, int port, ClientInterceptor interceptor) { this.channel = ManagedChannelBuilder.forAddress(host, port) - .usePlaintext() - .intercept(interceptor) - .build(); + .usePlaintext() + .intercept(interceptor) + .build(); this.blockingStub = FooServiceGrpc.newBlockingStub(this.channel); } @@ -58,8 +58,6 @@ FooResponse sayHello(FooRequest request) { } - - FooResponse anotherHello(FooRequest request) { if (request == null) { throw new IllegalArgumentException("Request cannot be null"); @@ -67,6 +65,21 @@ FooResponse anotherHello(FooRequest request) { return blockingStub.anotherHello(request); } + FooResponse helloWithEx(FooRequest request) { + if (request == null) { + throw new IllegalArgumentException("Request cannot be null"); + } + return blockingStub.helloWithEx(request); + } + + + FooResponse anotherHelloWithEx(FooRequest request) { + if (request == null) { + throw new IllegalArgumentException("Request cannot be null"); + } + return blockingStub.anotherHelloWithEx(request); + } + void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(1, TimeUnit.SECONDS); } diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java index 5e2fe489fc..832c6247de 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/FooServiceImpl.java @@ -27,10 +27,7 @@ class FooServiceImpl extends FooServiceGrpc.FooServiceImplBase { @Override public void sayHello(FooRequest request, StreamObserver responseObserver) { - if (request.getId() == -1) { - responseObserver.onError(new IllegalAccessException("The id is error")); - return; - } + String message = String.format("Hello %s! Your ID is %d.", request.getName(), request.getId()); FooResponse response = FooResponse.newBuilder().setMessage(message).build(); @@ -40,6 +37,25 @@ public void sayHello(FooRequest request, StreamObserver responseObs @Override public void anotherHello(FooRequest request, StreamObserver responseObserver) { + + String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); + FooResponse response = FooResponse.newBuilder().setMessage(message).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + @Override + public void helloWithEx(FooRequest request, StreamObserver responseObserver) { + if (request.getId() == -1) { + responseObserver.onError(new IllegalAccessException("The id is error")); + return; + } + String message = String.format("Good day, %s (%d)", request.getName(), request.getId()); + FooResponse response = FooResponse.newBuilder().setMessage(message).build(); + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + @Override + public void anotherHelloWithEx(FooRequest request, StreamObserver responseObserver) { if (request.getId() == -1) { responseObserver.onError(new IllegalAccessException("The id is error")); return; diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorDegradeTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorDegradeTest.java new file mode 100644 index 0000000000..742ba67659 --- /dev/null +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorDegradeTest.java @@ -0,0 +1,98 @@ +package com.alibaba.csp.sentinel.adapter.grpc; + +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.node.ClusterNode; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; +import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; +import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; +import io.grpc.StatusRuntimeException; +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.*; + +/** + * @author zhengzechao + * @date 2018/12/7 + * Email ooczzoo@gmail.com + */ +public class SentinelGrpcClientInterceptorDegradeTest { + + private final String resourceName = "com.alibaba.sentinel.examples.FooService/helloWithEx"; + private final int threshold = 2; + private final GrpcTestServer server = new GrpcTestServer(); + private final int timeWindow = 10; + + + + + private void configureDegradeRule(int count) { + DegradeRule rule = new DegradeRule() + .setCount(count) + .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) + .setResource(resourceName) + .setLimitApp("default") + .as(DegradeRule.class) + .setTimeWindow(timeWindow); + DegradeRuleManager.loadRules(Collections.singletonList(rule)); + + + } + + + private boolean sendRequest(FooServiceClient client) { + try { + FooResponse response = client.helloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); + System.out.println("Response: " + response); + return true; + } catch (StatusRuntimeException ex) { + System.out.println("Blocked, cause: " + ex.getMessage()); + return false; + } + } + + + @Test + public void testGrpcClientInterceptor_degrade() throws IOException { + final int port = 19316; + + configureDegradeRule(1); + server.start(port, false); + + FooServiceClient client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor()); + + assertFalse(sendErrorRequest(client)); + ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.OUT); + assertNotNull(clusterNode); + assertEquals(1, clusterNode.exceptionQps()); + // The second request will be blocked. + assertFalse(sendRequest(client)); + assertEquals(1, clusterNode.blockRequest()); + + server.stop(); + } + + private boolean sendErrorRequest(FooServiceClient client) { + try { + FooResponse response = client.helloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); + System.out.println("Response: " + response); + return true; + } catch (StatusRuntimeException ex) { + System.out.println("Blocked, cause: " + ex.getMessage()); + return false; + } + } + + @After + public void cleanUp() { + FlowRuleManager.loadRules(null); + ClusterBuilderSlot.getClusterNodeMap().clear(); + } +} diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java index c3c85e945a..e61dd7605c 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcClientInterceptorTest.java @@ -15,23 +15,21 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; +import java.util.Collections; + import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; + import io.grpc.StatusRuntimeException; import org.junit.After; import org.junit.Test; -import java.io.IOException; -import java.util.Collections; - import static org.junit.Assert.*; /** @@ -44,33 +42,17 @@ public class SentinelGrpcClientInterceptorTest { private final String resourceName = "com.alibaba.sentinel.examples.FooService/sayHello"; private final int threshold = 2; private final GrpcTestServer server = new GrpcTestServer(); - private final int timeWindow = 10; private void configureFlowRule(int count) { FlowRule rule = new FlowRule() - .setCount(count) - .setGrade(RuleConstant.FLOW_GRADE_QPS) - .setResource(resourceName) - .setLimitApp("default") - .as(FlowRule.class); + .setCount(count) + .setGrade(RuleConstant.FLOW_GRADE_QPS) + .setResource(resourceName) + .setLimitApp("default") + .as(FlowRule.class); FlowRuleManager.loadRules(Collections.singletonList(rule)); } - - private void configureDegradeRule(int count) { - DegradeRule rule = new DegradeRule() - .setCount(count) - .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) - .setResource(resourceName) - .setLimitApp("default") - .as(DegradeRule.class) - .setTimeWindow(timeWindow); - DegradeRuleManager.loadRules(Collections.singletonList(rule)); - - - } - - @Test public void testGrpcClientInterceptor() throws Exception { final int port = 19328; @@ -106,38 +88,6 @@ private boolean sendRequest(FooServiceClient client) { } } - - @Test - public void testGrpcClientInterceptor_degrade() throws IOException { - final int port = 19328; - - configureDegradeRule(1); - server.start(port, false); - - FooServiceClient client = new FooServiceClient("localhost", port, new SentinelGrpcClientInterceptor()); - - assertFalse(sendErrorRequest(client)); - ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.OUT); - assertNotNull(clusterNode); - assertEquals(1, clusterNode.exceptionQps()); - // The second request will be blocked. - assertFalse(sendRequest(client)); - assertEquals(1, clusterNode.blockRequest()); - - server.stop(); - } - - private boolean sendErrorRequest(FooServiceClient client) { - try { - FooResponse response = client.sayHello(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); - System.out.println("Response: " + response); - return true; - } catch (StatusRuntimeException ex) { - System.out.println("Blocked, cause: " + ex.getMessage()); - return false; - } - } - @After public void cleanUp() { FlowRuleManager.loadRules(null); diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorDegradeTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorDegradeTest.java new file mode 100644 index 0000000000..19b22ae0c1 --- /dev/null +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorDegradeTest.java @@ -0,0 +1,109 @@ +package com.alibaba.csp.sentinel.adapter.grpc; + +import com.alibaba.csp.sentinel.EntryType; +import com.alibaba.csp.sentinel.node.ClusterNode; +import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; +import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; +import io.grpc.StatusRuntimeException; +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.*; + +/** + * @author zhengzechao + * @date 2018/12/7 + * Email ooczzoo@gmail.com + */ +public class SentinelGrpcServerInterceptorDegradeTest { + + private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHelloWithEx"; + private final int threshold = 4; + private final GrpcTestServer server = new GrpcTestServer(); + private final int timeWindow = 10; + private FooServiceClient client; + + + + private void configureDegradeRule(int count) { + DegradeRule rule = new DegradeRule() + .setCount(count) + .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) + .setResource(resourceName) + .setLimitApp("default") + .as(DegradeRule.class) + .setTimeWindow(timeWindow); + DegradeRuleManager.loadRules(Collections.singletonList(rule)); + + + } + + + private boolean sendRequest() { + try { + FooResponse response = client.anotherHelloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(666).build()); + System.out.println("Response: " + response); + return true; + } catch (StatusRuntimeException ex) { + System.out.println("Blocked, cause: " + ex.getMessage()); + return false; + } + } + + + + + + @Test + public void testGrpcServerInterceptor_degrade_fail_threads() throws IOException, InterruptedException { + final int port = 19349; + client = new FooServiceClient("localhost", port); + server.start(port, true); + // exception count = 1 + configureDegradeRule(20); + final CountDownLatch latch = new CountDownLatch(20); + + for (int i = 0; i < 20; i++) { + new Thread(new Runnable() { + @Override + public void run() { + assertFalse(sendErrorRequest()); + latch.countDown(); + } + }).start(); + } + latch.await(); + assertFalse(sendRequest()); + ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); + assertEquals(20, clusterNode.totalException()); + assertEquals(1, clusterNode.blockRequest()); + + } + + private boolean sendErrorRequest() { + try { + FooResponse response = client.anotherHelloWithEx(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); + System.out.println("Response: " + response); + return true; + } catch (StatusRuntimeException ex) { + System.out.println("Blocked, cause: " + ex.getMessage()); + return false; + } + } + + + @After + public void cleanUp() { + FlowRuleManager.loadRules(null); + ClusterBuilderSlot.getClusterNodeMap().clear(); + } +} diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java index df9b9f7934..004a256357 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/java/com/alibaba/csp/sentinel/adapter/grpc/SentinelGrpcServerInterceptorTest.java @@ -15,24 +15,21 @@ */ package com.alibaba.csp.sentinel.adapter.grpc; +import java.util.Collections; + import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest; import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse; import com.alibaba.csp.sentinel.node.ClusterNode; import com.alibaba.csp.sentinel.slots.block.RuleConstant; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; import com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot; + import io.grpc.StatusRuntimeException; import org.junit.After; import org.junit.Test; -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.CountDownLatch; - import static org.junit.Assert.*; /** @@ -45,32 +42,19 @@ public class SentinelGrpcServerInterceptorTest { private final String resourceName = "com.alibaba.sentinel.examples.FooService/anotherHello"; private final int threshold = 4; private final GrpcTestServer server = new GrpcTestServer(); - private final int timeWindow = 10; + private FooServiceClient client; private void configureFlowRule(int count) { FlowRule rule = new FlowRule() - .setCount(count) - .setGrade(RuleConstant.FLOW_GRADE_QPS) - .setResource(resourceName) - .setLimitApp("default") - .as(FlowRule.class); + .setCount(count) + .setGrade(RuleConstant.FLOW_GRADE_QPS) + .setResource(resourceName) + .setLimitApp("default") + .as(FlowRule.class); FlowRuleManager.loadRules(Collections.singletonList(rule)); } - private void configureDegradeRule(int count) { - DegradeRule rule = new DegradeRule() - .setCount(count) - .setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) - .setResource(resourceName) - .setLimitApp("default") - .as(DegradeRule.class) - .setTimeWindow(timeWindow); - DegradeRuleManager.loadRules(Collections.singletonList(rule)); - - - } - @Test public void testGrpcServerInterceptor() throws Exception { final int port = 19329; @@ -105,83 +89,6 @@ private boolean sendRequest() { } } - @Test - public void testGrpcServerInterceptor_degrade_success() throws Exception { - final int port = 19329; - client = new FooServiceClient("localhost", port); - - configureDegradeRule(threshold); - server.start(port, true); - assertFalse(sendErrorRequest()); - ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); - assertNotNull(clusterNode); - assertEquals(0, clusterNode.totalRequest() - clusterNode.totalException()); - assertTrue(sendRequest()); - - - } - - - @Test - public void testGrpcServerInterceptor_degrade_fail() throws IOException, InterruptedException { - final int port = 19329; - client = new FooServiceClient("localhost", port); - configureDegradeRule(threshold); - server.start(port, true); - // exception count = 1 - configureDegradeRule(1); - - - assertFalse(sendErrorRequest()); - ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); - - //this request will be blocked - assertFalse(sendRequest()); - assertEquals(1, clusterNode.totalException()); - - server.stop(); - } - - - @Test - public void testGrpcServerInterceptor_degrade_fail_threads() throws IOException, InterruptedException { - final int port = 19329; - client = new FooServiceClient("localhost", port); - configureDegradeRule(threshold); - server.start(port, true); - // exception count = 1 - configureDegradeRule(20); - final CountDownLatch latch = new CountDownLatch(20); - - for (int i = 0; i < 20; i++) { - new Thread(new Runnable() { - @Override - public void run() { - assertFalse(sendErrorRequest()); - latch.countDown(); - } - }).start(); - } - latch.await(); - ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(resourceName, EntryType.IN); - assertEquals(20, clusterNode.totalException()); - assertFalse(sendRequest()); - assertEquals(20, clusterNode.totalException()); - assertEquals(1, clusterNode.blockRequest()); - - } - - private boolean sendErrorRequest() { - try { - FooResponse response = client.anotherHello(FooRequest.newBuilder().setName("Sentinel").setId(-1).build()); - System.out.println("Response: " + response); - return true; - } catch (StatusRuntimeException ex) { - System.out.println("Blocked, cause: " + ex.getMessage()); - return false; - } - } - @After public void cleanUp() { diff --git a/sentinel-adapter/sentinel-grpc-adapter/src/test/proto/example.proto b/sentinel-adapter/sentinel-grpc-adapter/src/test/proto/example.proto index 61858510b6..2b25b1e1e6 100755 --- a/sentinel-adapter/sentinel-grpc-adapter/src/test/proto/example.proto +++ b/sentinel-adapter/sentinel-grpc-adapter/src/test/proto/example.proto @@ -17,7 +17,14 @@ message FooResponse { // Example service definition. service FooService { + + rpc sayHello(FooRequest) returns (FooResponse) {} rpc anotherHello(FooRequest) returns (FooResponse) {} + + rpc helloWithEx(FooRequest) returns (FooResponse) {} + rpc anotherHelloWithEx(FooRequest) returns (FooResponse) {} + + } \ No newline at end of file