Skip to content

Commit

Permalink
Add logging interceptor and error augmentation interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
yihanzhen committed May 31, 2018
1 parent bc647b1 commit fcf3a0e
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.api.gax.rpc.StatusCode;
Expand All @@ -52,6 +50,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.longrunning.GetOperationRequest;
import com.google.longrunning.Operation;
import com.google.protobuf.Empty;
import com.google.protobuf.FieldMask;
import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
Expand Down Expand Up @@ -83,10 +82,10 @@
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.PartitionQueryRequest;
import com.google.spanner.v1.PartitionReadRequest;
import com.google.spanner.v1.PartitionResponse;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ReadRequest;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
Expand All @@ -100,8 +99,6 @@
import java.util.concurrent.Future;
import javax.annotation.Nullable;

import com.google.longrunning.Operation;

/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
public class GapicSpannerRpc implements SpannerRpc {

Expand Down Expand Up @@ -145,9 +142,7 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException {
mergedHeaderProvider.getHeaders(),
internalHeaderProviderBuilder.getResourceHeaderKey());

// TODO(pongad): make RPC logging work (formerly LoggingInterceptor)
// TODO(pongad): add watchdog
// TODO(pongad): make error augmentation work (formerly SpannerErrorInterceptor)

// TODO(hzyi): make this channelProvider configurable through SpannerOptions
TransportChannelProvider channelProvider =
Expand All @@ -156,60 +151,61 @@ public GapicSpannerRpc(SpannerOptions options) throws IOException {
.setEndpoint(options.getEndpoint())
.setMaxInboundMessageSize(MAX_MESSAGE_SIZE)
.setPoolSize(options.getNumChannels())
.setInterceptorProvider(new SpannerInterceptorProvider())
.build();

CredentialsProvider credentialsProvider =
GrpcTransportOptions.setUpCredentialsProvider(options);

// Disabling retry for now because spanner handles retry in SpannerImpl.
// We will finally want to improve gax but for smooth transitioning we
// preserve the retry in SpannerImpl
try {
// TODO: bump the version of gax and remove this try-catch block
// applyToAllUnaryMethods does not throw exception in the latest version
this.stub =
GrpcSpannerStub.create(
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
this.instanceStub =
GrpcInstanceAdminStub.create(
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
this.databaseStub =
GrpcDatabaseAdminStub.create(
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?,?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
GrpcSpannerStub.create(
SpannerStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());

this.instanceStub =
GrpcInstanceAdminStub.create(
InstanceAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
this.databaseStub =
GrpcDatabaseAdminStub.create(
DatabaseAdminStubSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.applyToAllUnaryMethods(
new ApiFunction<UnaryCallSettings.Builder<?, ?>, Void>() {
@Override
public Void apply(UnaryCallSettings.Builder<?, ?> builder) {
builder.setRetryableCodes(ImmutableSet.<StatusCode.Code>of());
return null;
}
})
.build());
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.spi.v1;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/** Adds logging to rpc calls */
class LoggingInterceptor implements ClientInterceptor {

private final Logger logger;
private final Level level;

LoggingInterceptor(Logger logger, Level level) {
this.logger = logger;
this.level = level;
}

private class CallLogger {

private final MethodDescriptor<?, ?> method;

CallLogger(MethodDescriptor<?, ?> method) {
this.method = method;
}

void log(String message) {
logger.log(
level,
"{0}[{1}]: {2}",
new Object[] {
method.getFullMethodName(), Integer.toHexString(System.identityHashCode(this)), message
});
}

void logfmt(String message, Object... params) {
log(String.format(message, params));
}
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
if (!logger.isLoggable(level)) {
return next.newCall(method, callOptions);
}

final CallLogger callLogger = new CallLogger(method);
callLogger.log("Start");
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onMessage(RespT message) {
callLogger.logfmt("Received:\n%s", message);
super.onMessage(message);
}

@Override
public void onClose(Status status, Metadata trailers) {
callLogger.logfmt("Closed with status %s and trailers %s", status, trailers);
super.onClose(status, trailers);
}
},
headers);
}

@Override
public void sendMessage(ReqT message) {
callLogger.logfmt("Send:\n%s", message);
super.sendMessage(message);
}

@Override
public void cancel(@Nullable String message, @Nullable Throwable cause) {
callLogger.logfmt("Cancelled with message %s", message);
super.cancel(message, cause);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner.spi.v1;

import com.google.api.gax.grpc.GrpcInterceptorProvider;
import com.google.common.collect.ImmutableList;
import io.grpc.ClientInterceptor;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* For internal use only.
*/
class SpannerInterceptorProvider implements GrpcInterceptorProvider {

private static final List<ClientInterceptor> clientInterceptors =
ImmutableList.of(
new SpannerErrorInterceptor(),
new LoggingInterceptor(Logger.getLogger(GrpcSpannerRpc.class.getName()), Level.FINER));

@Override
public List<ClientInterceptor> getInterceptors() {
return clientInterceptors;
}

}

0 comments on commit fcf3a0e

Please sign in to comment.