Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix grpc-adapter bug,replace the Entry by AsyncEntry #289 #291

Merged
merged 2 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,20 @@
*/
package com.alibaba.csp.sentinel.adapter.grpc;

import javax.annotation.Nullable;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.*;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

import javax.annotation.Nullable;

/**
* <p>gRPC client interceptor for Sentinel. Currently it only works with unary methods.</p>
*
* <p>
* Example code:
* <pre>
* public class ServiceClient {
Expand All @@ -52,26 +44,28 @@
*
* }
* </pre>
*
* <p>
* For server interceptor, see {@link SentinelGrpcServerInterceptor}.
*
* @author Eric Zhao
*/
public class SentinelGrpcClientInterceptor implements ClientInterceptor {

private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription(
"Flow control limit exceeded (client side)");
"Flow control limit exceeded (client side)");

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions callOptions, Channel channel) {
String resourceName = methodDescriptor.getFullMethodName();
Entry entry = null;
AsyncEntry asyncEntry = null;
try {
entry = SphU.entry(resourceName, EntryType.OUT);
asyncEntry = SphU.asyncEntry(resourceName, EntryType.OUT);

final AsyncEntry tempEntry = asyncEntry;
// Allow access, forward the call.
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
channel.newCall(methodDescriptor, callOptions)) {
channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
Expand All @@ -85,18 +79,14 @@ public void onClose(Status status, Metadata trailers) {
super.onClose(status, trailers);
// Record the exception metrics.
if (!status.isOk()) {
recordException(status.asRuntimeException());
recordException(status.asRuntimeException(), tempEntry);
}
tempEntry.exit();
}
}, headers);
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
super.cancel(message, cause);
// Record the exception metrics.
recordException(cause);
}

};
} catch (BlockException e) {
// Flow control threshold exceeded, block the call.
Expand Down Expand Up @@ -126,14 +116,25 @@ public void sendMessage(ReqT message) {

}
};
} finally {
if (entry != null) {
entry.exit();

} catch (RuntimeException e) {
//catch the RuntimeException newCall throws,
// entry is guaranteed to exit
if (asyncEntry != null) {
asyncEntry.exit();
}
throw e;


}
}

private void recordException(Throwable t) {
Tracer.trace(t);
private void recordException(final Throwable t, AsyncEntry asyncEntry) {
ContextUtil.runOnContext(asyncEntry.getAsyncContext(), new Runnable() {
@Override
public void run() {
Tracer.trace(t);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,76 +15,91 @@
*/
package com.alibaba.csp.sentinel.adapter.grpc;

import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.AsyncEntry;
import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.context.ContextUtil;
import com.alibaba.csp.sentinel.slots.block.BlockException;

import io.grpc.ForwardingServerCall;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.*;

/**
* <p>gRPC server interceptor for Sentinel. Currently it only works with unary methods.</p>
*
* <p>
* Example code:
* <pre>
* Server server = ServerBuilder.forPort(port)
* .addService(new MyServiceImpl()) // Add your service.
* .intercept(new SentinelGrpcServerInterceptor()) // Add the server interceptor.
* .build();
* </pre>
*
* <p>
* For client interceptor, see {@link SentinelGrpcClientInterceptor}.
*
* @author Eric Zhao
*/
public class SentinelGrpcServerInterceptor implements ServerInterceptor {

private static final Status FLOW_CONTROL_BLOCK = Status.UNAVAILABLE.withDescription(
"Flow control limit exceeded (server side)");
"Flow control limit exceeded (server side)");

@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata,
ServerCallHandler<ReqT, RespT> serverCallHandler) {
String resourceName = serverCall.getMethodDescriptor().getFullMethodName();
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
String resourceName = call.getMethodDescriptor().getFullMethodName();
// Remote address: serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
Entry entry = null;
AsyncEntry entry = null;
try {
ContextUtil.enter(resourceName);
entry = SphU.entry(resourceName, EntryType.IN);
entry = SphU.asyncEntry(resourceName, EntryType.IN);
// Allow access, forward the call.
final AsyncEntry tempEntry = entry;
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
serverCallHandler.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
// Record the exception metrics.
if (!status.isOk()) {
recordException(status.asRuntimeException());
}
}
}, metadata)) {};
next.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
// Record the exception metrics.
if (!status.isOk()) {
recordException(status.asException(), tempEntry);
}
//entry exit when the call be closed
tempEntry.exit();
}
}, headers)) {

/**
* if call was canceled, onCancel will be called. and the close will not be called
* so the server is encouraged to abort processing to save resources by onCancel
* @see ServerCall.Listener#onCancel()
*/
@Override
public void onCancel() {
super.onCancel();
// request has be canceled, entry should exit
tempEntry.exit();
}
};
} catch (BlockException e) {
serverCall.close(FLOW_CONTROL_BLOCK, new Metadata());
return new ServerCall.Listener<ReqT>() {};
} finally {
call.close(FLOW_CONTROL_BLOCK, new Metadata());
return new ServerCall.Listener<ReqT>() {
};
} catch (RuntimeException e) {
//catch the RuntimeException startCall throws,
// entry is guaranteed to exit
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
throw e;
}
}

private void recordException(Throwable t) {
Tracer.trace(t);
private void recordException(final Throwable t, AsyncEntry tempEntry) {
ContextUtil.runOnContext(tempEntry.getAsyncContext(), new Runnable() {
@Override
public void run() {
Tracer.trace(t);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ final class FooServiceClient {

FooServiceClient(String host, int port) {
this.channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build();
.usePlaintext()
.build();
this.blockingStub = FooServiceGrpc.newBlockingStub(this.channel);
}

FooServiceClient(String host, int port, ClientInterceptor interceptor) {
this.channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.intercept(interceptor)
.build();
.usePlaintext()
.intercept(interceptor)
.build();
this.blockingStub = FooServiceGrpc.newBlockingStub(this.channel);
}

Expand All @@ -57,13 +57,29 @@ FooResponse sayHello(FooRequest request) {
return blockingStub.sayHello(request);
}


FooResponse anotherHello(FooRequest request) {
if (request == null) {
throw new IllegalArgumentException("Request cannot be null");
}
return blockingStub.anotherHello(request);
}

FooResponse helloWithEx(FooRequest request) {
if (request == null) {
throw new IllegalArgumentException("Request cannot be null");
}
return blockingStub.helloWithEx(request);
}


FooResponse anotherHelloWithEx(FooRequest request) {
if (request == null) {
throw new IllegalArgumentException("Request cannot be null");
}
return blockingStub.anotherHelloWithEx(request);
}

void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(1, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooRequest;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooResponse;
import com.alibaba.csp.sentinel.adapter.grpc.gen.FooServiceGrpc;

import io.grpc.stub.StreamObserver;

/**
Expand All @@ -28,14 +27,39 @@ class FooServiceImpl extends FooServiceGrpc.FooServiceImplBase {

@Override
public void sayHello(FooRequest request, StreamObserver<FooResponse> responseObserver) {

String message = String.format("Hello %s! Your ID is %d.", request.getName(), request.getId());

FooResponse response = FooResponse.newBuilder().setMessage(message).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}

@Override
public void anotherHello(FooRequest request, StreamObserver<FooResponse> responseObserver) {

String message = String.format("Good day, %s (%d)", request.getName(), request.getId());
FooResponse response = FooResponse.newBuilder().setMessage(message).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void helloWithEx(FooRequest request, StreamObserver<FooResponse> responseObserver) {
if (request.getId() == -1) {
responseObserver.onError(new IllegalAccessException("The id is error"));
return;
}
String message = String.format("Good day, %s (%d)", request.getName(), request.getId());
FooResponse response = FooResponse.newBuilder().setMessage(message).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void anotherHelloWithEx(FooRequest request, StreamObserver<FooResponse> responseObserver) {
if (request.getId() == -1) {
responseObserver.onError(new IllegalAccessException("The id is error"));
return;
}
String message = String.format("Good day, %s (%d)", request.getName(), request.getId());
FooResponse response = FooResponse.newBuilder().setMessage(message).build();
responseObserver.onNext(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.grpc.ServerBuilder;

import java.io.IOException;
import java.util.concurrent.Executors;

class GrpcTestServer {

Expand Down
Loading