Skip to content

Commit

Permalink
Merge pull request #7583 from lcpopa/main
Browse files Browse the repository at this point in the history
Add Lineage Mappings to asset lineage omas and janus connector
  • Loading branch information
popa-raluca committed Apr 19, 2023
2 parents 29f5d28 + f0e923d commit 152aadb
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The Glossary Term Context Event contains the description of the term plus the fu
Glossary Category or Schema Elements that are involved in lineage relationship with the processed term.
This is send only when the Semantic Assigment or Term Categorization relationships are created.

The event sent for Process entities includes information about the lineage process, the data flows and context of the Schema Elements.
The event sent for Process entities includes information about the lineage process, the data flows, the lineage mappings and context of the Schema Elements.
The full context of the Process is first time sent when the status of the entity is changed from DRAFT to ACTIVE, for the rest of the lineage entity types
the Lineage Event sent contains only the entity that has been changed.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public enum AssetLineageEventType implements Serializable {
COLUMN_CONTEXT_EVENT(13, "ColumnContextEvent", "Has the context for a column"),
ASSET_CONTEXT_EVENT(14, "AssetContextEvent", "Has the asset context for a relational table or a data file"),
DATA_FLOWS_EVENT(15, "DataFlowsEvent", "Has the data flows for a column"),
LINEAGE_MAPPINGS_EVENT(16, "LineageMappingsEvent", "Has the lineage mappings for a column"),
LINEAGE_SYNC_EVENT(99, "LineageSyncEvent","AssetLineage internal processing information shared with external software components like governance servers."),
UNKNOWN_ASSET_LINEAGE_EVENT(100, "UnknownAssetLineageEvent", "An AssetLineage OMAS event that is not recognized by the local handlers.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.ATTRIBUTE_FOR_SCHEMA;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.COLLECTION_MEMBERSHIP;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_ALIAS;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_DELEGATION;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_IMPLEMENTATION;
Expand Down Expand Up @@ -102,33 +103,39 @@ public Multimap<String, RelationshipsContext> buildProcessContext(String userId,
.map(GraphContext::getToVertex).collect(Collectors.toSet());

for (LineageEntity tabularColumn : tabularColumns) {
addDataFlowsContextForColumn(userId, context, tabularColumn.getGuid(), tabularColumn.getTypeDefName());
addContextForColumn(userId, context, tabularColumn.getGuid(), tabularColumn.getTypeDefName(),
DATA_FLOW, AssetLineageEventType.DATA_FLOWS_EVENT);
addContextForColumn(userId, context, tabularColumn.getGuid(), tabularColumn.getTypeDefName(),
LINEAGE_MAPPING, AssetLineageEventType.LINEAGE_MAPPINGS_EVENT);
}
}
return context;
}

/**
* Adds data flows context for the tabular column. It adds the data flows for the column and the column context for all the technical assets
* that have data flows to it.
* Adds context for the tabular column. It adds the specified relationships for the column and the column context for
* all the technical assets that have specified relationships to it.
*
* @param userId userId of user making request.
* @param context the context to be updated
* @param columnGUID the column GUID
* @param typeDefName the column type name
* @param userId userId of user making request.
* @param context the context to be updated
* @param columnGUID the column GUID
* @param typeDefName the column type name
* @param relationshipTypeDefName the relationship type name
* @param assetLineageEventType the asset lineage event type
*
* @throws OCFCheckedExceptionBase checked exception for reporting errors found when using OCF connectors
*/
private void addDataFlowsContextForColumn(String userId, Multimap<String, RelationshipsContext> context, String columnGUID,
String typeDefName) throws OCFCheckedExceptionBase {
List<Relationship> dataFlows = handlerHelper.getRelationshipsByType(userId, columnGUID, DATA_FLOW, typeDefName);
private void addContextForColumn(String userId, Multimap<String, RelationshipsContext> context, String columnGUID,
String typeDefName, String relationshipTypeDefName, AssetLineageEventType assetLineageEventType)
throws OCFCheckedExceptionBase {
List<Relationship> relationships = handlerHelper.getRelationshipsByType(userId, columnGUID, relationshipTypeDefName, typeDefName);

context.put(AssetLineageEventType.DATA_FLOWS_EVENT.getEventTypeName(),
handlerHelper.buildContextForRelationships(userId, columnGUID, dataFlows));
context.put(assetLineageEventType.getEventTypeName(),
handlerHelper.buildContextForRelationships(userId, columnGUID, relationships));

for (Relationship dataFlow : dataFlows) {
for (Relationship relationship : relationships) {
context.putAll(Multimaps.forMap(assetContextHandler.buildSchemaElementContext(userId, handlerHelper.getEntityAtTheEnd(userId,
columnGUID, dataFlow))));
columnGUID, relationship))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.LoggerFactory;

import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PROCESS;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PROCESS_HIERARCHY;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.SEMANTIC_ASSIGNMENT;
Expand Down Expand Up @@ -314,8 +315,8 @@ private void processNewRelationshipEvent(Relationship relationship) throws OCFCh
publisher.publishLineageRelationshipEvent(converter.createLineageRelationship(relationship),
AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
break;
case DATA_FLOW:
publisher.publishDataFlowRelationshipEvent(converter.createLineageRelationship(relationship),
case DATA_FLOW, LINEAGE_MAPPING:
publisher.publishDataFlowOrLineageMappingRelationshipEvent(converter.createLineageRelationship(relationship),
AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,34 +355,34 @@ public void publishLineageEntityEvent(LineageEntity lineageEntity, AssetLineageE
}

/**
* Publishes a {@link LineageRelationshipEvent} containing a {@link LineageRelationship}. For each end of the relationship it publishes a
* {@link LineageRelationshipsEvent} containing the column context if available
* Publishes a {@link LineageRelationshipEvent} containing a {@link LineageRelationship}. For each end of the
* relationship it publishes a {@link LineageRelationshipsEvent} containing the column context if available
*
* @param lineageRelationship the LineageRelationship to be published
* @param eventType the type on the event
*
* @throws ConnectorCheckedException unable to send the event due to connectivity issue
* @throws JsonProcessingException exception parsing the event json
*/
public void publishDataFlowRelationshipEvent(LineageRelationship lineageRelationship, AssetLineageEventType eventType) throws
OCFCheckedExceptionBase,
JsonProcessingException {
public void publishDataFlowOrLineageMappingRelationshipEvent(LineageRelationship lineageRelationship, AssetLineageEventType eventType)
throws OCFCheckedExceptionBase, JsonProcessingException {

publishLineageRelationshipEvent(lineageRelationship, eventType);

publishDataFlowContext(lineageRelationship.getSourceEntity());
publishDataFlowContext(lineageRelationship.getTargetEntity());
publishDataFlowOrLineageMappingContext(lineageRelationship.getSourceEntity());
publishDataFlowOrLineageMappingContext(lineageRelationship.getTargetEntity());
}

/**
* Publishes the context for an entity involved in a data flow. If the entity is of type column, it will publish the column context.
* If the entity is of type asset, it will publish the asset context.
* Publishes the context for an entity involved in a data flow or lineage mapping. If the entity is of type column,
* it will publish the column context. If the entity is of type asset, it will publish the asset context.
*
* @param lineageEntity the lineage entity
*
* @throws ConnectorCheckedException unable to send the event due to connectivity issue
* @throws JsonProcessingException exception parsing the event json
*/
private void publishDataFlowContext(LineageEntity lineageEntity) throws JsonProcessingException, OCFCheckedExceptionBase {
private void publishDataFlowOrLineageMappingContext(LineageEntity lineageEntity) throws JsonProcessingException, OCFCheckedExceptionBase {
publishLineageRelationshipsEvents(Multimaps.forMap(assetContextHandler.buildColumnContext(serverUserName, lineageEntity)));
publishLineageRelationshipsEvents(Multimaps.forMap(assetContextHandler.buildAssetContext(serverUserName, lineageEntity)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class AssetLineageConstants {
public static final String PROCESS_PORT = "ProcessPort";
public static final String COLLECTION_MEMBERSHIP = "CollectionMembership";
public static final String DATA_FLOW = "DataFlow";
public static final String LINEAGE_MAPPING = "LineageMapping";
public static final String PORT_SCHEMA = "PortSchema";
public static final String NESTED_FILE = "NestedFile";
public static final String FOLDER_HIERARCHY = "FolderHierarchy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.FOLDER_HIERARCHY;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.GLOSSARY_TERM;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.NESTED_FILE;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.NESTED_SCHEMA_ATTRIBUTE;
import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT;
Expand Down Expand Up @@ -77,7 +78,8 @@ public AssetLineageTypesValidator(OMRSRepositoryHelper repositoryHelper, Map<Str
final Set<String> defaultTopicRelationships =
Set.of(ATTRIBUTE_FOR_SCHEMA, SCHEMA_TYPE_OPTION, ASSET_SCHEMA_TYPE);
final Set<String> defaultProcessRelationships =
Set.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, PORT_SCHEMA, PORT_DELEGATION, PROCESS_PORT, PROCESS_HIERARCHY, DATA_FLOW);
Set.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, PORT_SCHEMA, PORT_DELEGATION, PROCESS_PORT,
PROCESS_HIERARCHY, DATA_FLOW, LINEAGE_MAPPING);
final Set<String> defaultGlossaryTermRelationships =
Set.of(SEMANTIC_ASSIGNMENT, TERM_CATEGORIZATION);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ void processInstanceEvent_newRelationship_NotSupportedRelationshipType() throws
AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
verify(assetLineagePublisher, times(0)).publishGlossaryContext(GUID);
verify(assetLineagePublisher, times(0))
.publishDataFlowRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
.publishDataFlowOrLineageMappingRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
}

@Test
Expand All @@ -347,7 +347,7 @@ void processInstanceEvent_newRelationship_DataFlow() throws OCFCheckedExceptionB
assetLineageOMRSTopicListener.processInstanceEvent(instanceEvent);

verify(assetLineagePublisher, times(1))
.publishDataFlowRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
.publishDataFlowOrLineageMappingRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.JSON_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.KEYSTORE_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_FLOW;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.LOG_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.MEDIA_FILE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.NESTED_FILE;
Expand Down Expand Up @@ -536,13 +537,15 @@ private Optional<List<String>> getEdgeLabelsForDataFlow(String label) {
List<String> edgeLabels = new ArrayList<>();
if (ASSETS.contains(label)) {
edgeLabels.add(DATA_FLOW);
edgeLabels.add(LINEAGE_MAPPING);
}
switch (label) {
case TABULAR_FILE_COLUMN:
case TABULAR_COLUMN:
case RELATIONAL_COLUMN:
case EVENT_SCHEMA_ATTRIBUTE:
edgeLabels.add(EDGE_LABEL_COLUMN_DATA_FLOW);
edgeLabels.add(LINEAGE_MAPPING);
break;
case DATA_FILE:
case AVRO_FILE:
Expand Down Expand Up @@ -774,7 +777,7 @@ private List<LineageVertex> getSearchResult(GraphTraversalSource g, LineageSearc
if (GLOSSARY_TERM.equalsIgnoreCase(queriedNode.getType())) {
edges = Arrays.asList(SEMANTIC_ASSIGNMENT, TERM_CATEGORIZATION);
} else {
edges = List.of(DATA_FLOW);
edges = List.of(DATA_FLOW, LINEAGE_MAPPING);
}
searchTraversal = buildQueryWithRelatedNodes(searchTraversal, relatedNodes, edges);
List<Vertex> results = searchTraversal.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_FILE_AND_SUBTYPES;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.EVENT_SCHEMA_ATTRIBUTE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_FLOW;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.LINEAGE_MAPPING;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.NESTED_SCHEMA_ATTRIBUTE;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.PORT_DELEGATION;
import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.PORT_IMPLEMENTATION;
Expand Down Expand Up @@ -113,7 +114,7 @@ private Vertex getNodeByGuid(GraphTraversalSource g, String guid) {
private List<Vertex> getInputPathsForColumns(GraphTraversalSource g, String guid) {
List<Vertex> inputPathsForColumns = g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).out(PROCESS_PORT).out(PORT_DELEGATION)
.has(PORT_IMPLEMENTATION, PROPERTY_NAME_PORT_TYPE, INPUT_PORT)
.out(PORT_SCHEMA).out(ATTRIBUTE_FOR_SCHEMA).in(DATA_FLOW)
.out(PORT_SCHEMA).out(ATTRIBUTE_FOR_SCHEMA).in(DATA_FLOW, LINEAGE_MAPPING)
.or(__.in(ATTRIBUTE_FOR_SCHEMA).in(ASSET_SCHEMA_TYPE).has(PROPERTY_KEY_LABEL, P.within(DATA_FILE_AND_SUBTYPES)),
__.in(NESTED_SCHEMA_ATTRIBUTE).has(PROPERTY_KEY_LABEL, RELATIONAL_TABLE),
__.in(ATTRIBUTE_FOR_SCHEMA).in(SCHEMA_TYPE_OPTION).in(ASSET_SCHEMA_TYPE).has(PROPERTY_KEY_LABEL, TOPIC)).toList();
Expand Down Expand Up @@ -147,7 +148,7 @@ private void findOutputColumns(Vertex columnIn, Vertex process) {
private List<Vertex> getSchemaElementVertices(GraphTraversalSource g, Vertex columnIn) {
List<Vertex> schemaElementVertices = g.V()
.has(PROPERTY_KEY_ENTITY_GUID, g.V(columnIn.id()).elementMap(PROPERTY_KEY_ENTITY_GUID).toList().get(0).get(PROPERTY_KEY_ENTITY_GUID))
.out(DATA_FLOW)
.out(DATA_FLOW, LINEAGE_MAPPING)
.toList();
return schemaElementVertices;
}
Expand Down Expand Up @@ -195,7 +196,7 @@ private List<Vertex> findPathForOutputAsset(GraphTraversalSource g, Vertex endin
if (isEndColumn(g, endingVertex)) {
endVertices.add(endingVertex);
} else {
List<Vertex> nextVertices = g.V(endingVertex.id()).out(DATA_FLOW).toList();
List<Vertex> nextVertices = g.V(endingVertex.id()).out(DATA_FLOW, LINEAGE_MAPPING).toList();

for (Vertex vertex : nextVertices) {
if (vertex.equals(startingVertex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private Constants() {
public static final String PROCESS_PORT = "ProcessPort";
public static final String PORT_IMPLEMENTATION = "PortImplementation";
public static final String DATA_FLOW = "DataFlow";
public static final String LINEAGE_MAPPING = "LineageMapping";
public static final String SCHEMA_TYPE = "SchemaType";
public static final String PORT_SCHEMA = "PortSchema";
public static final String NESTED_FILE = "NestedFile";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public enum EdgeLabelsLineageGraph {
SCHEMA_TYPE,
SCHEMA_ATTRIBUTE_TYPE,
DATA_FLOW,
LINEAGE_MAPPING,
NESTED_FILE,
FOLDER_HIERARCHY,
ASSET_TO_CONNECTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void processEventBasedOnType(String assetLineageEvent) throws IOExceptio
storingServices.upsertEntityContext(lineageRelationshipsEvent);
break;
case CLASSIFICATION_CONTEXT_EVENT:
case DATA_FLOWS_EVENT:
case DATA_FLOWS_EVENT, LINEAGE_MAPPINGS_EVENT:
case PROCESS_CONTEXT_EVENT:
lineageRelationshipsEvent = OBJECT_READER.readValue(assetLineageEvent, LineageRelationshipsEvent.class);
storingServices.addEntityContext(lineageRelationshipsEvent);
Expand Down

0 comments on commit 152aadb

Please sign in to comment.