-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Structured Audit Logging #891
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* Copyright 2018-2019 The Feast Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package feast.common.interceptors; | ||
|
||
import com.google.protobuf.Empty; | ||
import com.google.protobuf.Message; | ||
import feast.common.logging.AuditLogger; | ||
import feast.common.logging.entry.MessageAuditLogEntry; | ||
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; | ||
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; | ||
import io.grpc.Metadata; | ||
import io.grpc.ServerCall; | ||
import io.grpc.ServerCall.Listener; | ||
import io.grpc.ServerCallHandler; | ||
import io.grpc.ServerInterceptor; | ||
import io.grpc.Status; | ||
import org.slf4j.event.Level; | ||
import org.springframework.security.core.Authentication; | ||
import org.springframework.security.core.context.SecurityContextHolder; | ||
|
||
/** | ||
* GrpcMessageInterceptor intercepts a GRPC calls to log handling of GRPC messages to the Audit Log. | ||
* Intercepts the incoming and outgoing messages logs them to the audit log, together with method | ||
* name and assumed authenticated identity (if authentication is enabled). NOTE: | ||
* GrpcMessageInterceptor assumes that all service calls are unary (ie single request/response). | ||
*/ | ||
public class GrpcMessageInterceptor implements ServerInterceptor { | ||
@Override | ||
public <ReqT, RespT> Listener<ReqT> interceptCall( | ||
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { | ||
MessageAuditLogEntry.Builder entryBuilder = MessageAuditLogEntry.newBuilder(); | ||
// default response message to empty proto in log entry. | ||
entryBuilder.setResponse(Empty.newBuilder().build()); | ||
|
||
// Unpack service & method name from call | ||
// full method name is in format <classpath>.<Service>/<Method> | ||
String fullMethodName = call.getMethodDescriptor().getFullMethodName(); | ||
entryBuilder.setService( | ||
fullMethodName.substring(fullMethodName.lastIndexOf(".") + 1, fullMethodName.indexOf("/"))); | ||
entryBuilder.setMethod(fullMethodName.substring(fullMethodName.indexOf("/") + 1)); | ||
|
||
// Attempt Extract current authenticated identity. | ||
Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); | ||
String identity = (authentication == null) ? "" : authentication.getName(); | ||
entryBuilder.setIdentity(identity); | ||
|
||
// Register forwarding call to intercept outgoing response and log to audit log | ||
call = | ||
new SimpleForwardingServerCall<ReqT, RespT>(call) { | ||
@Override | ||
public void sendMessage(RespT message) { | ||
// 2. Track the response & Log entry to audit logger | ||
super.sendMessage(message); | ||
entryBuilder.setResponse((Message) message); | ||
} | ||
|
||
@Override | ||
public void close(Status status, Metadata trailers) { | ||
super.close(status, trailers); | ||
// 3. Log the message log entry to the audit log | ||
Level logLevel = (status.isOk()) ? Level.INFO : Level.ERROR; | ||
entryBuilder.setStatusCode(status.getCode()); | ||
AuditLogger.logMessage(logLevel, entryBuilder); | ||
} | ||
}; | ||
|
||
ServerCall.Listener<ReqT> listener = next.startCall(call, headers); | ||
return new SimpleForwardingServerCallListener<ReqT>(listener) { | ||
@Override | ||
// Register listener to intercept incoming request messages and log to audit log | ||
public void onMessage(ReqT message) { | ||
super.onMessage(message); | ||
// 1. Track the request. | ||
entryBuilder.setRequest((Message) message); | ||
} | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* Copyright 2018-2020 The Feast Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package feast.common.logging; | ||
|
||
import feast.common.logging.config.LoggingProperties; | ||
import feast.common.logging.config.LoggingProperties.AuditLogProperties; | ||
import feast.common.logging.entry.ActionAuditLogEntry; | ||
import feast.common.logging.entry.AuditLogEntry; | ||
import feast.common.logging.entry.AuditLogEntryKind; | ||
import feast.common.logging.entry.LogResource; | ||
import feast.common.logging.entry.LogResource.ResourceType; | ||
import feast.common.logging.entry.MessageAuditLogEntry; | ||
import feast.common.logging.entry.TransitionAuditLogEntry; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.slf4j.Marker; | ||
import org.slf4j.MarkerFactory; | ||
import org.slf4j.event.Level; | ||
import org.springframework.beans.factory.annotation.Autowired; | ||
import org.springframework.boot.info.BuildProperties; | ||
import org.springframework.stereotype.Component; | ||
|
||
@Slf4j | ||
woop marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@Component | ||
public class AuditLogger { | ||
private static final Marker AUDIT_MARKER = MarkerFactory.getMarker("AUDIT_MARK"); | ||
private static AuditLogProperties properties; | ||
private static BuildProperties buildProperties; | ||
|
||
@Autowired | ||
public AuditLogger(LoggingProperties loggingProperties, BuildProperties buildProperties) { | ||
// Spring runs this constructor when creating the AuditLogger bean, | ||
// which allows us to populate the AuditLogger class with dependencies. | ||
// This allows us to use the dependencies in the AuditLogger's static methods | ||
AuditLogger.properties = loggingProperties.getAudit(); | ||
AuditLogger.buildProperties = buildProperties; | ||
} | ||
|
||
/** | ||
* Log the handling of a Protobuf message by a service call. | ||
* | ||
* @param entryBuilder with all fields set except instance. | ||
*/ | ||
public static void logMessage(Level level, MessageAuditLogEntry.Builder entryBuilder) { | ||
log( | ||
level, | ||
entryBuilder | ||
.setComponent(buildProperties.getArtifact()) | ||
.setVersion(buildProperties.getVersion()) | ||
.build()); | ||
} | ||
|
||
/** | ||
* Log an action being taken on a specific resource | ||
* | ||
* @param level describing the severity of the log. | ||
* @param action name of the action being taken on specific resource. | ||
* @param resourceType the type of resource being logged. | ||
* @param resourceId resource specific identifier identifing the instance of the resource. | ||
*/ | ||
public static void logAction( | ||
Level level, String action, ResourceType resourceType, String resourceId) { | ||
log( | ||
level, | ||
ActionAuditLogEntry.of( | ||
buildProperties.getArtifact(), | ||
buildProperties.getArtifact(), | ||
LogResource.of(resourceType, resourceId), | ||
action)); | ||
} | ||
|
||
/** | ||
* Log a transition in state/status in a specific resource. | ||
* | ||
* @param level describing the severity of the log. | ||
* @param status name of end status which the resource transition to. | ||
* @param resourceType the type of resource being logged. | ||
* @param resourceId resource specific identifier identifing the instance of the resource. | ||
*/ | ||
public static void logTransition( | ||
Level level, String status, ResourceType resourceType, String resourceId) { | ||
log( | ||
level, | ||
TransitionAuditLogEntry.of( | ||
buildProperties.getArtifact(), | ||
buildProperties.getArtifact(), | ||
LogResource.of(resourceType, resourceId), | ||
status)); | ||
} | ||
|
||
/** | ||
* Log given {@link AuditLogEntry} at the given logging {@link Level} to the Audit log. | ||
* | ||
* @param level describing the severity of the log. | ||
* @param entry the {@link AuditLogEntry} to push to the audit log. | ||
*/ | ||
private static void log(Level level, AuditLogEntry entry) { | ||
// Check if audit logging is of this specific log entry enabled. | ||
if (!properties.isEnabled()) { | ||
return; | ||
} | ||
if (entry.getKind().equals(AuditLogEntryKind.MESSAGE) | ||
&& !properties.isMessageLoggingEnabled()) { | ||
return; | ||
} | ||
|
||
// Log event to audit log through enabled formats | ||
String entryJSON = entry.toJSON(); | ||
switch (level) { | ||
case TRACE: | ||
log.trace(AUDIT_MARKER, entryJSON); | ||
break; | ||
case DEBUG: | ||
log.debug(AUDIT_MARKER, entryJSON); | ||
break; | ||
case INFO: | ||
log.info(AUDIT_MARKER, entryJSON); | ||
break; | ||
case WARN: | ||
log.warn(AUDIT_MARKER, entryJSON); | ||
break; | ||
case ERROR: | ||
log.error(AUDIT_MARKER, entryJSON); | ||
break; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* Copyright 2018-2020 The Feast Authors | ||
* Copyright 2018-2019 The Feast Authors | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -14,30 +14,24 @@ | |
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package feast.core.job; | ||
package feast.common.logging.config; | ||
|
||
import feast.core.log.Action; | ||
import feast.core.model.Job; | ||
import lombok.Builder; | ||
import javax.validation.constraints.NotNull; | ||
import lombok.Getter; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there was kinda agreement to move from lombok to autovalue, and delete former completely There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Auto value only provides immutable classes. Spring requires mutable POJOs with setters to inject values into these properties. Without lombok we will have setter hell ala Serving's FeastProperties. We can enforce a policy of using AutoValue whenever possible, lombok in cases where AutoValue does not make sense. |
||
import lombok.Setter; | ||
|
||
/** Task to terminate given {@link Job} by using {@link JobManager} */ | ||
@Getter | ||
@Setter | ||
@Builder(setterPrefix = "set") | ||
public class TerminateJobTask implements JobTask { | ||
private Job job; | ||
private JobManager jobManager; | ||
public class LoggingProperties { | ||
@NotNull private AuditLogProperties audit; | ||
|
||
@Override | ||
public Job call() { | ||
JobTask.logAudit( | ||
Action.ABORT, | ||
job, | ||
"Aborting job %s for runner %s", | ||
job.getId(), | ||
jobManager.getRunnerType().toString()); | ||
return jobManager.abortJob(job); | ||
@Getter | ||
@Setter | ||
public static class AuditLogProperties { | ||
// Whether to enable/disable audit logging entirely. | ||
private boolean enabled; | ||
|
||
// Whether to enable/disable message level (ie request/response) audit logging. | ||
private boolean messageLoggingEnabled; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to apply to GetOnlineFeatures as well, because in that case we will probably experience a serious performance hit. Audit logging is normally distinct from request/response logging, since with the former you expect durability and consistency but with the latter you want availability and performance. Logging the complete request/response is still useful as part of the audit log, but we need to have a way to deal with the latency sensitive case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
AuditLogger
can be configured viaapplication.yml
viaAuditLogProperties
and thus can be disabled for the Online Serving instance fromapplication.yml
for cases where latency is more important that visibility.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, but we need request/response logging in online serving as well. We just want to make sure that this logging happens in async fashion and doesn't affect latency of the method