Skip to content

Commit

Permalink
Add-support-for-ingesting-schemas-from-schema-registry
Browse files Browse the repository at this point in the history
  • Loading branch information
aabharti-visa committed Jun 3, 2024
1 parent 776e5c4 commit 1a57f3e
Show file tree
Hide file tree
Showing 319 changed files with 9,555 additions and 1,112 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.13.1'
ext.openLineageVersion = '1.14.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down Expand Up @@ -250,6 +250,7 @@ project.ext.externalDependency = [
'springBootStarterValidation': "org.springframework.boot:spring-boot-starter-validation:$springBootVersion",
'springKafka': "org.springframework.kafka:spring-kafka:$springKafkaVersion",
'springActuator': "org.springframework.boot:spring-boot-starter-actuator:$springBootVersion",
'springRetry': "org.springframework.retry:spring-retry:2.0.6",
'swaggerAnnotations': 'io.swagger.core.v3:swagger-annotations:2.2.15',
'swaggerCli': 'io.swagger.codegen.v3:swagger-codegen-cli:3.0.46',
'swaggerCore': 'io.swagger.core.v3:swagger-core:2.2.7',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ private Constants() {}
public static final String LINEAGE_SCHEMA_FILE = "lineage.graphql";
public static final String PROPERTIES_SCHEMA_FILE = "properties.graphql";
public static final String FORMS_SCHEMA_FILE = "forms.graphql";
public static final String ASSERTIONS_SCHEMA_FILE = "assertions.graphql";
public static final String INCIDENTS_SCHEMA_FILE = "incident.graphql";
public static final String CONNECTIONS_SCHEMA_FILE = "connection.graphql";
public static final String BROWSE_PATH_DELIMITER = "/";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.datahub.graphql.analytics.resolver.GetMetadataAnalyticsResolver;
import com.linkedin.datahub.graphql.analytics.resolver.IsAnalyticsEnabledResolver;
import com.linkedin.datahub.graphql.analytics.service.AnalyticsService;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.AccessToken;
import com.linkedin.datahub.graphql.generated.AccessTokenMetadata;
Expand Down Expand Up @@ -117,7 +118,12 @@
import com.linkedin.datahub.graphql.resolvers.assertion.AssertionRunEventResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.DeleteAssertionResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.EntityAssertionsResolver;
import com.linkedin.datahub.graphql.resolvers.auth.*;
import com.linkedin.datahub.graphql.resolvers.auth.CreateAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.auth.DebugAccessResolver;
import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenMetadataResolver;
import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.auth.ListAccessTokensResolver;
import com.linkedin.datahub.graphql.resolvers.auth.RevokeAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.browse.BrowsePathsResolver;
import com.linkedin.datahub.graphql.resolvers.browse.BrowseResolver;
import com.linkedin.datahub.graphql.resolvers.browse.EntityBrowsePathsResolver;
Expand Down Expand Up @@ -394,7 +400,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -814,6 +819,7 @@ public GraphQLEngine.Builder builder() {
.addSchema(fileBasedSchema(PROPERTIES_SCHEMA_FILE))
.addSchema(fileBasedSchema(FORMS_SCHEMA_FILE))
.addSchema(fileBasedSchema(CONNECTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(ASSERTIONS_SCHEMA_FILE))
.addSchema(fileBasedSchema(INCIDENTS_SCHEMA_FILE));

for (GmsGraphQLPlugin plugin : this.graphQLPlugins) {
Expand Down Expand Up @@ -2900,7 +2906,7 @@ private <T, K> DataLoader<K, DataFetcherResult<T>> createDataLoader(
DataLoaderOptions.newOptions().setBatchLoaderContextProvider(contextProvider);
return DataLoader.newDataLoader(
(keys, context) ->
CompletableFuture.supplyAsync(
GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
log.debug(
Expand All @@ -2919,7 +2925,9 @@ private <T, K> DataLoader<K, DataFetcherResult<T>> createDataLoader(
String.format("Failed to retrieve entities of type %s", graphType.name()),
e);
}
}),
},
graphType.getClass().getSimpleName(),
"batchLoad"),
loaderOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static graphql.schema.idl.RuntimeWiring.*;

import com.linkedin.datahub.graphql.exception.DataHubDataFetcherExceptionHandler;
import com.linkedin.datahub.graphql.instrumentation.DataHubFieldComplexityCalculator;
import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
Expand Down Expand Up @@ -80,7 +81,9 @@ private GraphQLEngine(
List<Instrumentation> instrumentations = new ArrayList<>(3);
instrumentations.add(new TracingInstrumentation());
instrumentations.add(new MaxQueryDepthInstrumentation(graphQLQueryDepthLimit));
instrumentations.add(new MaxQueryComplexityInstrumentation(graphQLQueryComplexityLimit));
instrumentations.add(
new MaxQueryComplexityInstrumentation(
graphQLQueryComplexityLimit, new DataHubFieldComplexityCalculator()));
ChainedInstrumentation chainedInstrumentation = new ChainedInstrumentation(instrumentations);
_graphQL =
new GraphQL.Builder(graphQLSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.common.SubTypes;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
Expand All @@ -26,7 +27,7 @@ public class SubTypesResolver implements DataFetcher<CompletableFuture<SubTypes>
@Override
@Nullable
public CompletableFuture<SubTypes> get(DataFetchingEnvironment environment) throws Exception {
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
final QueryContext context = environment.getContext();
SubTypes subType = null;
Expand All @@ -50,6 +51,8 @@ public CompletableFuture<SubTypes> get(DataFetchingEnvironment environment) thro
"Failed to fetch aspect " + _aspectName + " for urn " + urnStr + " ", e);
}
return subType;
});
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.codec.JacksonDataCodec;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.AspectParams;
import com.linkedin.datahub.graphql.generated.AspectRenderSpec;
import com.linkedin.datahub.graphql.generated.Entity;
Expand Down Expand Up @@ -48,7 +49,7 @@ private boolean shouldReturnAspect(AspectSpec aspectSpec, AspectParams params) {
@Override
public CompletableFuture<List<RawAspect>> get(DataFetchingEnvironment environment)
throws Exception {
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
List<RawAspect> results = new ArrayList<>();

Expand Down Expand Up @@ -111,6 +112,8 @@ public CompletableFuture<List<RawAspect>> get(DataFetchingEnvironment environmen
}
});
return results;
});
},
this.getClass().getSimpleName(),
"get");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.linkedin.datahub.graphql.concurrency;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

public class GraphQLConcurrencyUtils {
private GraphQLConcurrencyUtils() {}

private static ExecutorService graphQLExecutorService = null;

public static ExecutorService getExecutorService() {
return GraphQLConcurrencyUtils.graphQLExecutorService;
}

public static void setExecutorService(ExecutorService executorService) {
GraphQLConcurrencyUtils.graphQLExecutorService = executorService;
}

public static <T> CompletableFuture<T> supplyAsync(
Supplier<T> supplier, String caller, String task) {
MetricUtils.counter(
MetricRegistry.name(
GraphQLConcurrencyUtils.class.getSimpleName(), "supplyAsync", caller, task))
.inc();
if (GraphQLConcurrencyUtils.graphQLExecutorService == null) {
return CompletableFuture.supplyAsync(supplier);
} else {
return CompletableFuture.supplyAsync(
supplier, GraphQLConcurrencyUtils.graphQLExecutorService);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.datahub.graphql.concurrency;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

public class GraphQLWorkerPoolThreadFactory implements ThreadFactory {

private static final AtomicLong THREAD_INIT_NUMBER = new AtomicLong();
public static final String GRAPHQL_THREAD_POOL_GROUP_NAME = "graphQLThreadGroup";
public static final ThreadGroup GRAPHQL_THREAD_POOL_GROUP =
new ThreadGroup(GRAPHQL_THREAD_POOL_GROUP_NAME);

private static long nextThreadNum() {
return THREAD_INIT_NUMBER.getAndIncrement();
}

private long stackSize;

public GraphQLWorkerPoolThreadFactory(long stackSize) {
this.stackSize = stackSize;
}

@Override
public final Thread newThread(Runnable runnable) {

return new Thread(
GRAPHQL_THREAD_POOL_GROUP, runnable, "GraphQLWorkerThread-" + nextThreadNum(), stackSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.linkedin.datahub.graphql.instrumentation;

import graphql.analysis.FieldComplexityCalculator;
import graphql.analysis.FieldComplexityEnvironment;
import graphql.language.Field;
import graphql.language.FragmentSpread;
import graphql.language.Selection;
import graphql.language.SelectionSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataHubFieldComplexityCalculator implements FieldComplexityCalculator {

private static final String COUNT_ARG = "count";
private static final String INPUT_ARG = "input";
private static final String SEARCH_RESULTS_FIELD = "searchResults";
private static final String ENTITY_FIELD = "entity";
private static final String SEARCH_RESULT_FIELDS_FIELD = "searchResultFields";
private static final String GRAPHQL_QUERY_TYPE = "Query";

@SuppressWarnings("rawtypes")
@Override
public int calculate(FieldComplexityEnvironment environment, int childComplexity) {
int complexity = 1;
Map<String, Object> args = environment.getArguments();
if (args.containsKey(INPUT_ARG)) {
Map<String, Object> input = (Map<String, Object>) args.get(INPUT_ARG);
if (input.containsKey(COUNT_ARG) && (Integer) input.get(COUNT_ARG) > 1) {
Integer count = (Integer) input.get(COUNT_ARG);
Field field = environment.getField();
complexity += countRecursiveLineageComplexity(count, field);
}
}
if (GRAPHQL_QUERY_TYPE.equals(environment.getParentType().getName())) {
log.info(
"Query complexity for query: {} is {}",
environment.getField().getName(),
complexity + childComplexity);
}
return complexity + childComplexity;
}

private int countRecursiveLineageComplexity(Integer count, Field field) {
List<Selection> subFields = field.getSelectionSet().getSelections();
Optional<FragmentSpread> searchResultsFieldsField =
subFields.stream()
.filter(selection -> selection instanceof Field)
.map(selection -> (Field) selection)
.filter(subField -> SEARCH_RESULTS_FIELD.equals(subField.getName()))
.map(Field::getSelectionSet)
.map(SelectionSet::getSelections)
.flatMap(List::stream)
.filter(selection -> selection instanceof Field)
.map(selection -> (Field) selection)
.filter(subField -> ENTITY_FIELD.equals(subField.getName()))
.map(Field::getSelectionSet)
.map(SelectionSet::getSelections)
.flatMap(List::stream)
.filter(selection -> selection instanceof FragmentSpread)
.map(selection -> (FragmentSpread) selection)
.filter(subField -> SEARCH_RESULT_FIELDS_FIELD.equals(subField.getName()))
.findFirst();
if (searchResultsFieldsField.isPresent()) {
// This fragment includes 2 lineage queries, we account for this additional complexity by
// multiplying
// by the count of entities attempting to be returned
return 2 * count;
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.AuthenticatedUser;
import com.linkedin.datahub.graphql.generated.CorpUser;
Expand Down Expand Up @@ -49,7 +50,7 @@ public MeResolver(final EntityClient entityClient, final FeatureFlags featureFla
@Override
public CompletableFuture<AuthenticatedUser> get(DataFetchingEnvironment environment) {
final QueryContext context = environment.getContext();
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
// 1. Get currently logged in user profile.
Expand Down Expand Up @@ -100,7 +101,9 @@ public CompletableFuture<AuthenticatedUser> get(DataFetchingEnvironment environm
} catch (URISyntaxException | RemoteInvocationException e) {
throw new RuntimeException("Failed to fetch authenticated user!", e);
}
});
},
this.getClass().getSimpleName(),
"get");
}

/** Returns true if the authenticated user has privileges to view analytics. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.AssertionResultType;
import com.linkedin.datahub.graphql.generated.AssertionRunEvent;
Expand Down Expand Up @@ -40,7 +41,7 @@ public AssertionRunEventResolver(final EntityClient client) {

@Override
public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment environment) {
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
final QueryContext context = environment.getContext();

Expand Down Expand Up @@ -97,12 +98,24 @@ public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment e
&& AssertionResultType.SUCCESS.equals(
runEvent.getResult().getType()))
.count()));
result.setErrored(
Math.toIntExact(
runEvents.stream()
.filter(
runEvent ->
AssertionRunStatus.COMPLETE.equals(runEvent.getStatus())
&& runEvent.getResult() != null
&& AssertionResultType.ERROR.equals(
runEvent.getResult().getType()))
.count()));
result.setRunEvents(runEvents);
return result;
} catch (RemoteInvocationException e) {
throw new RuntimeException("Failed to retrieve Assertion Run Events from GMS", e);
}
});
},
this.getClass().getSimpleName(),
"get");
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
Expand Down Expand Up @@ -38,7 +39,7 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
throws Exception {
final QueryContext context = environment.getContext();
final Urn assertionUrn = Urn.createFromString(environment.getArgument("urn"));
return CompletableFuture.supplyAsync(
return GraphQLConcurrencyUtils.supplyAsync(
() -> {

// 1. check the entity exists. If not, return false.
Expand Down Expand Up @@ -75,7 +76,9 @@ public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment)
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
});
},
this.getClass().getSimpleName(),
"get");
}

/** Determine whether the current user is allowed to remove an assertion. */
Expand Down
Loading

0 comments on commit 1a57f3e

Please sign in to comment.