Skip to content

Commit

Permalink
Update:grpc nacos discovery config & complete grpc router code
Browse files Browse the repository at this point in the history
  • Loading branch information
shichaoyang committed Sep 27, 2024
1 parent abc0738 commit 95e5fb1
Show file tree
Hide file tree
Showing 11 changed files with 718 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ grpc:
# gRPC 客户端配置,对应 GrpcChannelsProperties 配置类的映射
client:
user-provider:
address: 'static://127.0.0.1:9898'
#服务发现,注意本地host配置 127.0.0.1 user-provider
address: 'discovery:///user-provider'
#静态地址
#address: 'static://127.0.0.1:9898'
enableKeepAlive: true
keepAliveWithoutCalls: true
negotiationType: plaintext
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
///*
// * Copyright © ${year} ${owner} (${email})
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//package com.jd.live.agent.plugin.router.gprc.cluster;
//
//import com.jd.live.agent.bootstrap.exception.RejectException;
//import com.jd.live.agent.bootstrap.exception.RejectException.RejectCircuitBreakException;
//import com.jd.live.agent.bootstrap.exception.RejectException.RejectLimitException;
//import com.jd.live.agent.bootstrap.exception.RejectException.RejectNoProviderException;
//import com.jd.live.agent.core.util.http.HttpMethod;
//import com.jd.live.agent.governance.exception.RetryException.RetryExhaustedException;
//import com.jd.live.agent.governance.invoke.OutboundInvocation;
//import com.jd.live.agent.governance.invoke.cluster.ClusterInvoker;
//import com.jd.live.agent.governance.invoke.cluster.LiveCluster;
//import com.jd.live.agent.governance.policy.service.cluster.ClusterPolicy;
//import com.jd.live.agent.governance.policy.service.cluster.RetryPolicy;
//import com.jd.live.agent.governance.response.ServiceResponse.OutboundResponse;
//import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint;
//import com.jd.live.agent.plugin.router.gprc.request.AbstractClusterRequest;
//import org.springframework.cloud.client.ServiceInstance;
//import org.springframework.cloud.client.loadbalancer.CompletionContext;
//import org.springframework.cloud.client.loadbalancer.DefaultResponse;
//import org.springframework.cloud.client.loadbalancer.LoadBalancerProperties;
//import org.springframework.core.NestedRuntimeException;
//import java.util.*;
//import java.util.concurrent.CompletableFuture;
//import java.util.concurrent.CompletionStage;
//
///**
// * Provides an abstract base for implementing client clusters that can send requests and receive responses from
// * various endpoints. This class serves as a foundation for managing a cluster of client endpoints and handling
// * common operations such as creating HTTP headers and exceptions.
// *
// * @param <R> the type of outbound requests the cluster handles
// * @param <O> the type of outbound responses the cluster can expect
// */
//public abstract class AbstractClientCluster<
// R extends AbstractClusterRequest,
// O extends OutboundResponse>
// implements LiveCluster<R, O, GrpcEndpoint, NestedRuntimeException> {
//
// @Override
// public ClusterPolicy getDefaultPolicy(R request) {
// if (isRetryable()) {
// RetryPolicy retryPolicy = null;
// LoadBalancerProperties properties = request.getProperties();
// LoadBalancerProperties.Retry retry = properties == null ? null : properties.getRetry();
// if (retry != null && retry.isEnabled() && (request.getHttpMethod() == HttpMethod.GET || retry.isRetryOnAllOperations())) {
// Set<String> statuses = new HashSet<>(retry.getRetryableStatusCodes().size());
// retry.getRetryableStatusCodes().forEach(status -> statuses.add(String.valueOf(status)));
// retryPolicy = new RetryPolicy();
// retryPolicy.setRetry(retry.getMaxRetriesOnNextServiceInstance());
// retryPolicy.setRetryInterval(retry.getBackoff().getMinBackoff().toMillis());
// retryPolicy.setRetryStatuses(statuses);
// }
// return new ClusterPolicy(retryPolicy == null ? ClusterInvoker.TYPE_FAILFAST : ClusterInvoker.TYPE_FAILOVER, retryPolicy);
// }
// return new ClusterPolicy(ClusterInvoker.TYPE_FAILFAST);
// }
//
// /**
// * Determines if the current cluster support for retry.
// *
// * @return {@code true} if the operation is retryable; {@code false} otherwise.
// */
// protected abstract boolean isRetryable();
//
// /**
// * Discover the service instances for the requested service.
// *
// * @param request The outbound request to be routed.
// * @return ServiceInstance list
// */
// @Override
// public CompletionStage<List<GrpcEndpoint>> route(R request) {
// CompletableFuture<List<GrpcEndpoint>> future = new CompletableFuture<>();
// ServiceInstanceListSupplier supplier = request.getInstanceSupplier();
// if (supplier == null) {
// future.complete(new ArrayList<>());
// } else {
// Mono<List<ServiceInstance>> mono = supplier.get(request.getLbRequest()).next();
// mono.subscribe(
// v -> {
// List<GrpcEndpoint> endpoints = new ArrayList<>();
// if (v != null) {
// v.forEach(i -> endpoints.add(new GrpcEndpoint(i)));
// }
// future.complete(endpoints);
// },
// future::completeExceptionally
// );
// }
// return future;
// }
//
// @Override
// public NestedRuntimeException createUnReadyException(String message, R request) {
// return createException(HttpStatus.FORBIDDEN, message);
// }
//
// @Override
// public NestedRuntimeException createUnReadyException(R request) {
// return createUnReadyException("The cluster is not ready. ", request);
// }
//
// @Override
// public NestedRuntimeException createException(Throwable throwable, R request, GrpcEndpoint endpoint) {
// if (throwable instanceof NestedRuntimeException) {
// return (NestedRuntimeException) throwable;
// } else if (throwable instanceof RejectException) {
// return createRejectException((RejectException) throwable, request);
// }
// return createException(HttpStatus.INTERNAL_SERVER_ERROR, throwable.getMessage(), throwable);
// }
//
// @Override
// public NestedRuntimeException createNoProviderException(R request) {
// return createException(HttpStatus.SERVICE_UNAVAILABLE,
// "LoadBalancer does not contain an instance for the service " + request.getService());
// }
//
// @Override
// public NestedRuntimeException createLimitException(RejectException exception, R request) {
// return createException(HttpStatus.SERVICE_UNAVAILABLE, exception.getMessage());
// }
//
// @Override
// public NestedRuntimeException createCircuitBreakException(RejectException exception, R request) {
// return createException(HttpStatus.SERVICE_UNAVAILABLE, exception.getMessage(), exception);
// }
//
// @Override
// public NestedRuntimeException createRejectException(RejectException exception, R request) {
// if (exception instanceof RejectNoProviderException) {
// return createNoProviderException(request);
// } else if (exception instanceof RejectLimitException) {
// return createLimitException(exception, request);
// } else if (exception instanceof RejectCircuitBreakException) {
// return createCircuitBreakException(exception, request);
// }
// return createException(HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE, exception.getMessage());
// }
//
// @Override
// public NestedRuntimeException createRetryExhaustedException(RetryExhaustedException exception,
// OutboundInvocation<R> invocation) {
// return createException(exception, invocation.getRequest(), null);
// }
//
// @SuppressWarnings("unchecked")
// @Override
// public void onStart(R request) {
// request.lifecycles(l -> l.onStart(request.getLbRequest()));
// }
//
// @SuppressWarnings("unchecked")
// @Override
// public void onStartRequest(R request, GrpcEndpoint endpoint) {
// request.lifecycles(l -> l.onStartRequest(request.getLbRequest(),
// endpoint == null ? new DefaultResponse(null) : endpoint.getResponse()));
// }
//
// @SuppressWarnings("unchecked")
// @Override
// public void onError(Throwable throwable, R request, GrpcEndpoint endpoint) {
// request.lifecycles(l -> l.onComplete(new CompletionContext<>(
// CompletionContext.Status.FAILED,
// throwable,
// request.getLbRequest(),
// endpoint == null ? new DefaultResponse(null) : endpoint.getResponse())));
// }
//
// /**
// * Creates an {@link HttpHeaders} instance from a map of header names to lists of header values.
// * If the input map is {@code null}, this method returns an empty {@link HttpHeaders} instance.
// *
// * @param headers a map of header names to lists of header values
// * @return an {@link HttpHeaders} instance representing the provided headers
// */
// protected HttpHeaders getHttpHeaders(Map<String, List<String>> headers) {
// return headers == null ? new HttpHeaders() : new HttpHeaders(new MultiValueMapAdapter<>(headers));
// }
//
// /**
// * Creates an {@link NestedRuntimeException} using the provided status, message, and headers map.
// *
// * @param status the HTTP status code of the error
// * @param message the error message
// * @return an {@link NestedRuntimeException} instance with the specified details
// */
// public static NestedRuntimeException createException(HttpStatus status, String message) {
// return createException(status, message, null);
// }
//
// /**
// * Creates an {@link NestedRuntimeException} using the provided status, message, and {@link HttpHeaders}.
// *
// * @param status the HTTP status code of the error
// * @param message the error message
// * @param throwable the exception
// * @return an {@link NestedRuntimeException} instance with the specified details
// */
// public static NestedRuntimeException createException(HttpStatus status, String message, Throwable throwable) {
// return new ResponseStatusException(status.value(), message, throwable);
// }
//}
//
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.jd.live.agent.governance.invoke.OutboundInvocation;
import com.jd.live.agent.governance.invoke.cluster.LiveCluster;
import com.jd.live.agent.plugin.router.gprc.instance.GrpcEndpoint;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.CompletionStage;
import com.jd.live.agent.plugin.router.gprc.response.GrpcResponse.GrpcOutboundResponse;
Expand All @@ -13,6 +14,9 @@

public class GrpcCluster implements LiveCluster<GrpcOutboundRequest, GrpcOutboundResponse, GrpcEndpoint, RuntimeException> {

public GrpcCluster(SocketAddress socketAddress) {
}

@Override
public CompletionStage<List<GrpcEndpoint>> route(GrpcOutboundRequest request) {
System.out.println("---->route");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,66 +1,66 @@
///*
// * Copyright © ${year} ${owner} (${email})
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//package com.jd.live.agent.plugin.router.gprc.definition;
//
//import com.jd.live.agent.core.bytekit.matcher.MatcherBuilder;
////import com.jd.live.agent.core.extension.annotation.ConditionalOnClass;
////import com.jd.live.agent.core.extension.annotation.ConditionalOnProperty;
//import com.jd.live.agent.core.extension.annotation.Extension;
//import com.jd.live.agent.core.inject.annotation.Inject;
//import com.jd.live.agent.core.inject.annotation.Injectable;
//import com.jd.live.agent.core.parser.ObjectParser;
//import com.jd.live.agent.core.plugin.definition.InterceptorDefinition;
//import com.jd.live.agent.core.plugin.definition.InterceptorDefinitionAdapter;
//import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter;
////import com.jd.live.agent.governance.config.GovernanceConfig;
//import com.jd.live.agent.governance.invoke.InvocationContext;
//import com.jd.live.agent.plugin.router.gprc.interceptor.ClusterInterceptor;
//
//@Injectable
//@Extension(value = "GrpcClusterDefinition")
////@ConditionalOnProperty(name = GovernanceConfig.CONFIG_FLOW_CONTROL_ENABLED, matchIfMissing = true)
////@ConditionalOnProperty(name = GovernanceConfig.CONFIG_LIVE_SOFARPC_ENABLED, matchIfMissing = true)
////@ConditionalOnClass(ClusterDefinition.TYPE_ABSTRACT_CLUSTER)
//public class ClusterDefinition extends PluginDefinitionAdapter {
//
// protected static final String TYPE_ABSTRACT_CLUSTER = "com.alipay.sofa.rpc.client.AbstractCluster";
//
// private static final String METHOD_DO_INVOKE = "doInvoke";
//
// private static final String[] ARGUMENT_DO_INVOKE = new String[]{
// "com.alipay.sofa.rpc.core.request.SofaRequest"
// };
//
// @Inject(InvocationContext.COMPONENT_INVOCATION_CONTEXT)
// private InvocationContext context;
//
// @Inject(ObjectParser.JSON)
// private ObjectParser parser;
//
// public ClusterDefinition() {
// System.out.println("----> ClusterDefinition");
// this.matcher = () -> MatcherBuilder.isSubTypeOf(TYPE_ABSTRACT_CLUSTER)
// .and(MatcherBuilder.not(MatcherBuilder.isAbstract()));
// this.interceptors = new InterceptorDefinition[]{
// new InterceptorDefinitionAdapter(
// MatcherBuilder.named(METHOD_DO_INVOKE)
// .and(MatcherBuilder.arguments(ARGUMENT_DO_INVOKE)),
// () -> new ClusterInterceptor(context, parser)
// )
// };
// }
//
//}
/*
* Copyright © ${year} ${owner} (${email})
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jd.live.agent.plugin.router.gprc.definition;

import com.jd.live.agent.core.bytekit.matcher.MatcherBuilder;
import com.jd.live.agent.core.extension.annotation.ConditionalOnClass;
import com.jd.live.agent.core.extension.annotation.ConditionalOnProperty;
import com.jd.live.agent.core.extension.annotation.Extension;
import com.jd.live.agent.core.inject.annotation.Inject;
import com.jd.live.agent.core.inject.annotation.Injectable;
import com.jd.live.agent.core.parser.ObjectParser;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinition;
import com.jd.live.agent.core.plugin.definition.InterceptorDefinitionAdapter;
import com.jd.live.agent.core.plugin.definition.PluginDefinitionAdapter;
import com.jd.live.agent.governance.config.GovernanceConfig;
import com.jd.live.agent.governance.invoke.InvocationContext;
import com.jd.live.agent.plugin.router.gprc.interceptor.ClusterInterceptor;

@Injectable
@Extension(value = "GrpcClusterDefinition")
@ConditionalOnProperty(name = GovernanceConfig.CONFIG_FLOW_CONTROL_ENABLED, matchIfMissing = true)
@ConditionalOnProperty(name = GovernanceConfig.CONFIG_LIVE_SOFARPC_ENABLED, matchIfMissing = true)
@ConditionalOnClass(ClusterDefinition.TYPE_ABSTRACT_CLUSTER)
public class ClusterDefinition extends PluginDefinitionAdapter {

protected static final String TYPE_ABSTRACT_CLUSTER = "net.devh.boot.grpc.client.nameresolver.DiscoveryClientNameResolver$Resolve";

private static final String METHOD_DO_INVOKE = "Resolve";

private static final String[] ARGUMENT_DO_INVOKE = new String[]{

};

@Inject(InvocationContext.COMPONENT_INVOCATION_CONTEXT)
private InvocationContext context;

@Inject(ObjectParser.JSON)
private ObjectParser parser;

public ClusterDefinition() {
System.out.println("----> ClusterDefinition");
this.matcher = () -> MatcherBuilder.isSubTypeOf(TYPE_ABSTRACT_CLUSTER)
.and(MatcherBuilder.not(MatcherBuilder.isAbstract()));
this.interceptors = new InterceptorDefinition[]{
new InterceptorDefinitionAdapter(
MatcherBuilder.named(METHOD_DO_INVOKE)
.or(MatcherBuilder.arguments(ARGUMENT_DO_INVOKE)),
() -> new ClusterInterceptor(context, parser)
)
};
}

}
Loading

0 comments on commit 95e5fb1

Please sign in to comment.