Skip to content

Commit

Permalink
Make version service return unavailable on standby masters
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Make version service return unavailable on standby masters

### Why are the changes needed?

In this #16839 PR, we adds the
capability to run grpc services on standby masters. However, if one uses
an old alluxio client and connects to the new master with standby master
enabled, it will get some errors. This PR is used to address the issue
to make the change backward compatible.

### Does this PR introduce any user facing changes?

N/A

pr-link: #16854
change-id: cid-6c82bce1b1d6cb2649658ec09f0f8ed4e243917a
  • Loading branch information
elega authored Feb 13, 2023
1 parent f3eca21 commit 7399c38
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 3 deletions.
14 changes: 13 additions & 1 deletion core/common/src/main/java/alluxio/grpc/GrpcServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -262,7 +263,18 @@ public GrpcServerBuilder sslContext(SslContext sslContext) {
* @return the built {@link GrpcServer}
*/
public GrpcServer build() {
addService(new GrpcService(new ServiceVersionClientServiceHandler(mServices))
return build(null);
}

/**
* Build the server.
* It attaches required services and interceptors for authentication.
*
* @param nodeStateSupplier a supplier to provide the node state (PRIMARY/STANDBY)
* @return the built {@link GrpcServer}
*/
public GrpcServer build(@Nullable Supplier<NodeState> nodeStateSupplier) {
addService(new GrpcService(new ServiceVersionClientServiceHandler(mServices, nodeStateSupplier))
.disableAuthentication());
if (mGrpcReflectionEnabled) {
// authentication needs to be disabled so that the grpc command line tools can call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@

import alluxio.Constants;
import alluxio.annotation.SuppressFBWarnings;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;

import com.google.common.collect.ImmutableSet;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;

import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* This class is a gRPC handler that serves Alluxio service versions.
Expand All @@ -28,19 +32,32 @@ public final class ServiceVersionClientServiceHandler
extends ServiceVersionClientServiceGrpc.ServiceVersionClientServiceImplBase {
/** Set of services that are going to be recognized by this versioning service. */
private final Set<ServiceType> mServices;
@Nullable private final Supplier<NodeState> mNodeStateSupplier;
private final boolean mStandbyRpcEnabled =
Configuration.getBoolean(PropertyKey.STANDBY_MASTER_GRPC_ENABLED);

/**
* Creates service version handler that allows given services.
* @param services services to allow
* @param nodeStateSupplier the supplier to get the node state
*/
public ServiceVersionClientServiceHandler(Set<ServiceType> services) {
public ServiceVersionClientServiceHandler(
Set<ServiceType> services, @Nullable Supplier<NodeState> nodeStateSupplier) {
mServices = ImmutableSet.copyOf(Objects.requireNonNull(services, "services is null"));
mNodeStateSupplier = nodeStateSupplier;
}

@Override
@SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES")
public void getServiceVersion(GetServiceVersionPRequest request,
StreamObserver<GetServiceVersionPResponse> responseObserver) {
if (mStandbyRpcEnabled
&& mNodeStateSupplier != null && mNodeStateSupplier.get() == NodeState.STANDBY) {
responseObserver.onError(Status.UNAVAILABLE
.withDescription("GetServiceVersion is not supported on standby master")
.asException());
return;
}

ServiceType serviceType = request.getServiceType();
if (serviceType != ServiceType.UNKNOWN_SERVICE && !mServices.contains(serviceType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,11 @@ private boolean pollFor(String message, Supplier<Boolean> waitFor, int timeoutMs
public boolean waitForReady(int timeoutMs) {
return waitForGrpcServerReady(timeoutMs);
}

/**
* @return the primary selector
*/
public PrimarySelector getPrimarySelector() {
return mLeaderSelector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected synchronized void startGrpcServer(
LOG.info("registered service {}", type.name());
});
});
mGrpcServer = builder.build();
mGrpcServer = builder.build(() -> mMasterProcess.getPrimarySelector().getStateUnsafe());
try {
mGrpcServer.start();
mMasterProcess.getSafeModeManager().ifPresent(SafeModeManager::notifyRpcServerStarted);
Expand Down

0 comments on commit 7399c38

Please sign in to comment.