diff --git a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-api/docs/events/lineage-event.md b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-api/docs/events/lineage-event.md index 7a97ced5863..d5e7eddc604 100644 --- a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-api/docs/events/lineage-event.md +++ b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-api/docs/events/lineage-event.md @@ -22,7 +22,7 @@ The Glossary Term Context Event contains the description of the term plus the fu Glossary Category or Schema Elements that are involved in lineage relationship with the processed term. This is send only when the Semantic Assigment or Term Categorization relationships are created. -The event sent for Process entities includes information about the lineage process, the data flows and context of the Schema Elements. +The event sent for Process entities includes information about the lineage process, the data flows, the lineage mappings and context of the Schema Elements. The full context of the Process is first time sent when the status of the entity is changed from DRAFT to ACTIVE, for the rest of the lineage entity types the Lineage Event sent contains only the entity that has been changed. diff --git a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-api/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/event/AssetLineageEventType.java b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-api/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/event/AssetLineageEventType.java index de7d784193a..bea57131f84 100644 --- a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-api/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/event/AssetLineageEventType.java +++ b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-api/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/event/AssetLineageEventType.java @@ -38,6 +38,7 @@ public enum AssetLineageEventType implements Serializable { COLUMN_CONTEXT_EVENT(13, "ColumnContextEvent", "Has the context for a column"), ASSET_CONTEXT_EVENT(14, "AssetContextEvent", "Has the asset context for a relational table or a data file"), DATA_FLOWS_EVENT(15, "DataFlowsEvent", "Has the data flows for a column"), + LINEAGE_MAPPINGS_EVENT(16, "LineageMappingsEvent", "Has the lineage mappings for a column"), LINEAGE_SYNC_EVENT(99, "LineageSyncEvent","AssetLineage internal processing information shared with external software components like governance servers."), UNKNOWN_ASSET_LINEAGE_EVENT(100, "UnknownAssetLineageEvent", "An AssetLineage OMAS event that is not recognized by the local handlers."); diff --git a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/handlers/ProcessContextHandler.java b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/handlers/ProcessContextHandler.java index 0b05dc83d67..65c88f1cfce 100644 --- a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/handlers/ProcessContextHandler.java +++ b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/handlers/ProcessContextHandler.java @@ -23,6 +23,7 @@ import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.ATTRIBUTE_FOR_SCHEMA; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.COLLECTION_MEMBERSHIP; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW; +import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_ALIAS; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_DELEGATION; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT_IMPLEMENTATION; @@ -102,33 +103,39 @@ public Multimap buildProcessContext(String userId, .map(GraphContext::getToVertex).collect(Collectors.toSet()); for (LineageEntity tabularColumn : tabularColumns) { - addDataFlowsContextForColumn(userId, context, tabularColumn.getGuid(), tabularColumn.getTypeDefName()); + addContextForColumn(userId, context, tabularColumn.getGuid(), tabularColumn.getTypeDefName(), + DATA_FLOW, AssetLineageEventType.DATA_FLOWS_EVENT); + addContextForColumn(userId, context, tabularColumn.getGuid(), tabularColumn.getTypeDefName(), + LINEAGE_MAPPING, AssetLineageEventType.LINEAGE_MAPPINGS_EVENT); } } return context; } /** - * Adds data flows context for the tabular column. It adds the data flows for the column and the column context for all the technical assets - * that have data flows to it. + * Adds context for the tabular column. It adds the specified relationships for the column and the column context for + * all the technical assets that have specified relationships to it. * - * @param userId userId of user making request. - * @param context the context to be updated - * @param columnGUID the column GUID - * @param typeDefName the column type name + * @param userId userId of user making request. + * @param context the context to be updated + * @param columnGUID the column GUID + * @param typeDefName the column type name + * @param relationshipTypeDefName the relationship type name + * @param assetLineageEventType the asset lineage event type * * @throws OCFCheckedExceptionBase checked exception for reporting errors found when using OCF connectors */ - private void addDataFlowsContextForColumn(String userId, Multimap context, String columnGUID, - String typeDefName) throws OCFCheckedExceptionBase { - List dataFlows = handlerHelper.getRelationshipsByType(userId, columnGUID, DATA_FLOW, typeDefName); + private void addContextForColumn(String userId, Multimap context, String columnGUID, + String typeDefName, String relationshipTypeDefName, AssetLineageEventType assetLineageEventType) + throws OCFCheckedExceptionBase { + List relationships = handlerHelper.getRelationshipsByType(userId, columnGUID, relationshipTypeDefName, typeDefName); - context.put(AssetLineageEventType.DATA_FLOWS_EVENT.getEventTypeName(), - handlerHelper.buildContextForRelationships(userId, columnGUID, dataFlows)); + context.put(assetLineageEventType.getEventTypeName(), + handlerHelper.buildContextForRelationships(userId, columnGUID, relationships)); - for (Relationship dataFlow : dataFlows) { + for (Relationship relationship : relationships) { context.putAll(Multimaps.forMap(assetContextHandler.buildSchemaElementContext(userId, handlerHelper.getEntityAtTheEnd(userId, - columnGUID, dataFlow)))); + columnGUID, relationship)))); } } diff --git a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/listeners/AssetLineageOMRSTopicListener.java b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/listeners/AssetLineageOMRSTopicListener.java index ff66d4131fe..acfa3e2fa95 100644 --- a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/listeners/AssetLineageOMRSTopicListener.java +++ b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/listeners/AssetLineageOMRSTopicListener.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW; +import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PROCESS; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PROCESS_HIERARCHY; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.SEMANTIC_ASSIGNMENT; @@ -314,8 +315,8 @@ private void processNewRelationshipEvent(Relationship relationship) throws OCFCh publisher.publishLineageRelationshipEvent(converter.createLineageRelationship(relationship), AssetLineageEventType.NEW_RELATIONSHIP_EVENT); break; - case DATA_FLOW: - publisher.publishDataFlowRelationshipEvent(converter.createLineageRelationship(relationship), + case DATA_FLOW, LINEAGE_MAPPING: + publisher.publishDataFlowOrLineageMappingRelationshipEvent(converter.createLineageRelationship(relationship), AssetLineageEventType.NEW_RELATIONSHIP_EVENT); break; default: diff --git a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/outtopic/AssetLineagePublisher.java b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/outtopic/AssetLineagePublisher.java index 479fcde2a48..4d6485ccc98 100644 --- a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/outtopic/AssetLineagePublisher.java +++ b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/outtopic/AssetLineagePublisher.java @@ -355,8 +355,8 @@ public void publishLineageEntityEvent(LineageEntity lineageEntity, AssetLineageE } /** - * Publishes a {@link LineageRelationshipEvent} containing a {@link LineageRelationship}. For each end of the relationship it publishes a - * {@link LineageRelationshipsEvent} containing the column context if available + * Publishes a {@link LineageRelationshipEvent} containing a {@link LineageRelationship}. For each end of the + * relationship it publishes a {@link LineageRelationshipsEvent} containing the column context if available * * @param lineageRelationship the LineageRelationship to be published * @param eventType the type on the event @@ -364,25 +364,25 @@ public void publishLineageEntityEvent(LineageEntity lineageEntity, AssetLineageE * @throws ConnectorCheckedException unable to send the event due to connectivity issue * @throws JsonProcessingException exception parsing the event json */ - public void publishDataFlowRelationshipEvent(LineageRelationship lineageRelationship, AssetLineageEventType eventType) throws - OCFCheckedExceptionBase, - JsonProcessingException { + public void publishDataFlowOrLineageMappingRelationshipEvent(LineageRelationship lineageRelationship, AssetLineageEventType eventType) + throws OCFCheckedExceptionBase, JsonProcessingException { + publishLineageRelationshipEvent(lineageRelationship, eventType); - publishDataFlowContext(lineageRelationship.getSourceEntity()); - publishDataFlowContext(lineageRelationship.getTargetEntity()); + publishDataFlowOrLineageMappingContext(lineageRelationship.getSourceEntity()); + publishDataFlowOrLineageMappingContext(lineageRelationship.getTargetEntity()); } /** - * Publishes the context for an entity involved in a data flow. If the entity is of type column, it will publish the column context. - * If the entity is of type asset, it will publish the asset context. + * Publishes the context for an entity involved in a data flow or lineage mapping. If the entity is of type column, + * it will publish the column context. If the entity is of type asset, it will publish the asset context. * * @param lineageEntity the lineage entity * * @throws ConnectorCheckedException unable to send the event due to connectivity issue * @throws JsonProcessingException exception parsing the event json */ - private void publishDataFlowContext(LineageEntity lineageEntity) throws JsonProcessingException, OCFCheckedExceptionBase { + private void publishDataFlowOrLineageMappingContext(LineageEntity lineageEntity) throws JsonProcessingException, OCFCheckedExceptionBase { publishLineageRelationshipsEvents(Multimaps.forMap(assetContextHandler.buildColumnContext(serverUserName, lineageEntity))); publishLineageRelationshipsEvents(Multimaps.forMap(assetContextHandler.buildAssetContext(serverUserName, lineageEntity))); diff --git a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/util/AssetLineageConstants.java b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/util/AssetLineageConstants.java index 2929df72826..c38fe9e1db9 100644 --- a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/util/AssetLineageConstants.java +++ b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/util/AssetLineageConstants.java @@ -52,6 +52,7 @@ public final class AssetLineageConstants { public static final String PROCESS_PORT = "ProcessPort"; public static final String COLLECTION_MEMBERSHIP = "CollectionMembership"; public static final String DATA_FLOW = "DataFlow"; + public static final String LINEAGE_MAPPING = "LineageMapping"; public static final String PORT_SCHEMA = "PortSchema"; public static final String NESTED_FILE = "NestedFile"; public static final String FOLDER_HIERARCHY = "FolderHierarchy"; diff --git a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/util/AssetLineageTypesValidator.java b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/util/AssetLineageTypesValidator.java index 90861a29a14..a8cd78e6c4c 100644 --- a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/util/AssetLineageTypesValidator.java +++ b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/main/java/org/odpi/openmetadata/accessservices/assetlineage/util/AssetLineageTypesValidator.java @@ -35,6 +35,7 @@ import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.FOLDER_HIERARCHY; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.GLOSSARY_TERM; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.DATA_FLOW; +import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.LINEAGE_MAPPING; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.NESTED_FILE; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.NESTED_SCHEMA_ATTRIBUTE; import static org.odpi.openmetadata.accessservices.assetlineage.util.AssetLineageConstants.PORT; @@ -77,7 +78,8 @@ public AssetLineageTypesValidator(OMRSRepositoryHelper repositoryHelper, Map defaultTopicRelationships = Set.of(ATTRIBUTE_FOR_SCHEMA, SCHEMA_TYPE_OPTION, ASSET_SCHEMA_TYPE); final Set defaultProcessRelationships = - Set.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, PORT_SCHEMA, PORT_DELEGATION, PROCESS_PORT, PROCESS_HIERARCHY, DATA_FLOW); + Set.of(ATTRIBUTE_FOR_SCHEMA, ASSET_SCHEMA_TYPE, PORT_SCHEMA, PORT_DELEGATION, PROCESS_PORT, + PROCESS_HIERARCHY, DATA_FLOW, LINEAGE_MAPPING); final Set defaultGlossaryTermRelationships = Set.of(SEMANTIC_ASSIGNMENT, TERM_CATEGORIZATION); diff --git a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/test/java/org/odpi/openmetadata/accessservices/assetlineage/listeners/AssetLineageOMRSTopicListenerTest.java b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/test/java/org/odpi/openmetadata/accessservices/assetlineage/listeners/AssetLineageOMRSTopicListenerTest.java index 86368113c0f..42eb878577a 100644 --- a/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/test/java/org/odpi/openmetadata/accessservices/assetlineage/listeners/AssetLineageOMRSTopicListenerTest.java +++ b/open-metadata-implementation/access-services/asset-lineage/asset-lineage-server/src/test/java/org/odpi/openmetadata/accessservices/assetlineage/listeners/AssetLineageOMRSTopicListenerTest.java @@ -334,7 +334,7 @@ void processInstanceEvent_newRelationship_NotSupportedRelationshipType() throws AssetLineageEventType.NEW_RELATIONSHIP_EVENT); verify(assetLineagePublisher, times(0)).publishGlossaryContext(GUID); verify(assetLineagePublisher, times(0)) - .publishDataFlowRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT); + .publishDataFlowOrLineageMappingRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT); } @Test @@ -347,7 +347,7 @@ void processInstanceEvent_newRelationship_DataFlow() throws OCFCheckedExceptionB assetLineageOMRSTopicListener.processInstanceEvent(instanceEvent); verify(assetLineagePublisher, times(1)) - .publishDataFlowRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT); + .publishDataFlowOrLineageMappingRelationshipEvent(lineageRelationship, AssetLineageEventType.NEW_RELATIONSHIP_EVENT); } @Test diff --git a/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/graph/LineageGraphQueryService.java b/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/graph/LineageGraphQueryService.java index af2f5017c81..60fece8a29d 100644 --- a/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/graph/LineageGraphQueryService.java +++ b/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/graph/LineageGraphQueryService.java @@ -89,6 +89,7 @@ import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.JSON_FILE; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.KEYSTORE_FILE; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_FLOW; +import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.LINEAGE_MAPPING; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.LOG_FILE; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.MEDIA_FILE; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.NESTED_FILE; @@ -536,6 +537,7 @@ private Optional> getEdgeLabelsForDataFlow(String label) { List edgeLabels = new ArrayList<>(); if (ASSETS.contains(label)) { edgeLabels.add(DATA_FLOW); + edgeLabels.add(LINEAGE_MAPPING); } switch (label) { case TABULAR_FILE_COLUMN: @@ -543,6 +545,7 @@ private Optional> getEdgeLabelsForDataFlow(String label) { case RELATIONAL_COLUMN: case EVENT_SCHEMA_ATTRIBUTE: edgeLabels.add(EDGE_LABEL_COLUMN_DATA_FLOW); + edgeLabels.add(LINEAGE_MAPPING); break; case DATA_FILE: case AVRO_FILE: @@ -774,7 +777,7 @@ private List getSearchResult(GraphTraversalSource g, LineageSearc if (GLOSSARY_TERM.equalsIgnoreCase(queriedNode.getType())) { edges = Arrays.asList(SEMANTIC_ASSIGNMENT, TERM_CATEGORIZATION); } else { - edges = List.of(DATA_FLOW); + edges = List.of(DATA_FLOW, LINEAGE_MAPPING); } searchTraversal = buildQueryWithRelatedNodes(searchTraversal, relatedNodes, edges); List results = searchTraversal.toList(); diff --git a/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/graph/LineageJobHelper.java b/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/graph/LineageJobHelper.java index a32b9be2d52..f82cefb087a 100644 --- a/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/graph/LineageJobHelper.java +++ b/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/graph/LineageJobHelper.java @@ -31,6 +31,7 @@ import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_FILE_AND_SUBTYPES; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.EVENT_SCHEMA_ATTRIBUTE; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.DATA_FLOW; +import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.LINEAGE_MAPPING; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.NESTED_SCHEMA_ATTRIBUTE; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.PORT_DELEGATION; import static org.odpi.openmetadata.openconnectors.governancedaemonconnectors.openlineageconnectors.janusconnector.utils.Constants.PORT_IMPLEMENTATION; @@ -113,7 +114,7 @@ private Vertex getNodeByGuid(GraphTraversalSource g, String guid) { private List getInputPathsForColumns(GraphTraversalSource g, String guid) { List inputPathsForColumns = g.V().has(PROPERTY_KEY_ENTITY_GUID, guid).out(PROCESS_PORT).out(PORT_DELEGATION) .has(PORT_IMPLEMENTATION, PROPERTY_NAME_PORT_TYPE, INPUT_PORT) - .out(PORT_SCHEMA).out(ATTRIBUTE_FOR_SCHEMA).in(DATA_FLOW) + .out(PORT_SCHEMA).out(ATTRIBUTE_FOR_SCHEMA).in(DATA_FLOW, LINEAGE_MAPPING) .or(__.in(ATTRIBUTE_FOR_SCHEMA).in(ASSET_SCHEMA_TYPE).has(PROPERTY_KEY_LABEL, P.within(DATA_FILE_AND_SUBTYPES)), __.in(NESTED_SCHEMA_ATTRIBUTE).has(PROPERTY_KEY_LABEL, RELATIONAL_TABLE), __.in(ATTRIBUTE_FOR_SCHEMA).in(SCHEMA_TYPE_OPTION).in(ASSET_SCHEMA_TYPE).has(PROPERTY_KEY_LABEL, TOPIC)).toList(); @@ -147,7 +148,7 @@ private void findOutputColumns(Vertex columnIn, Vertex process) { private List getSchemaElementVertices(GraphTraversalSource g, Vertex columnIn) { List schemaElementVertices = g.V() .has(PROPERTY_KEY_ENTITY_GUID, g.V(columnIn.id()).elementMap(PROPERTY_KEY_ENTITY_GUID).toList().get(0).get(PROPERTY_KEY_ENTITY_GUID)) - .out(DATA_FLOW) + .out(DATA_FLOW, LINEAGE_MAPPING) .toList(); return schemaElementVertices; } @@ -195,7 +196,7 @@ private List findPathForOutputAsset(GraphTraversalSource g, Vertex endin if (isEndColumn(g, endingVertex)) { endVertices.add(endingVertex); } else { - List nextVertices = g.V(endingVertex.id()).out(DATA_FLOW).toList(); + List nextVertices = g.V(endingVertex.id()).out(DATA_FLOW, LINEAGE_MAPPING).toList(); for (Vertex vertex : nextVertices) { if (vertex.equals(startingVertex)) { diff --git a/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/utils/Constants.java b/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/utils/Constants.java index a8d955315ca..2748970ee96 100644 --- a/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/utils/Constants.java +++ b/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/utils/Constants.java @@ -91,6 +91,7 @@ private Constants() { public static final String PROCESS_PORT = "ProcessPort"; public static final String PORT_IMPLEMENTATION = "PortImplementation"; public static final String DATA_FLOW = "DataFlow"; + public static final String LINEAGE_MAPPING = "LineageMapping"; public static final String SCHEMA_TYPE = "SchemaType"; public static final String PORT_SCHEMA = "PortSchema"; public static final String NESTED_FILE = "NestedFile"; diff --git a/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/utils/EdgeLabelsLineageGraph.java b/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/utils/EdgeLabelsLineageGraph.java index ce43cfa8dc0..0ab09ecad63 100644 --- a/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/utils/EdgeLabelsLineageGraph.java +++ b/open-metadata-implementation/adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/src/main/java/org/odpi/openmetadata/openconnectors/governancedaemonconnectors/openlineageconnectors/janusconnector/utils/EdgeLabelsLineageGraph.java @@ -11,6 +11,7 @@ public enum EdgeLabelsLineageGraph { SCHEMA_TYPE, SCHEMA_ATTRIBUTE_TYPE, DATA_FLOW, + LINEAGE_MAPPING, NESTED_FILE, FOLDER_HIERARCHY, ASSET_TO_CONNECTION, diff --git a/open-metadata-implementation/governance-servers/open-lineage-services/open-lineage-services-server/src/main/java/org/odpi/openmetadata/governanceservers/openlineage/listeners/OpenLineageInTopicListener.java b/open-metadata-implementation/governance-servers/open-lineage-services/open-lineage-services-server/src/main/java/org/odpi/openmetadata/governanceservers/openlineage/listeners/OpenLineageInTopicListener.java index f8b1348b97a..723c2f2cc02 100644 --- a/open-metadata-implementation/governance-servers/open-lineage-services/open-lineage-services-server/src/main/java/org/odpi/openmetadata/governanceservers/openlineage/listeners/OpenLineageInTopicListener.java +++ b/open-metadata-implementation/governance-servers/open-lineage-services/open-lineage-services-server/src/main/java/org/odpi/openmetadata/governanceservers/openlineage/listeners/OpenLineageInTopicListener.java @@ -95,7 +95,7 @@ private void processEventBasedOnType(String assetLineageEvent) throws IOExceptio storingServices.upsertEntityContext(lineageRelationshipsEvent); break; case CLASSIFICATION_CONTEXT_EVENT: - case DATA_FLOWS_EVENT: + case DATA_FLOWS_EVENT, LINEAGE_MAPPINGS_EVENT: case PROCESS_CONTEXT_EVENT: lineageRelationshipsEvent = OBJECT_READER.readValue(assetLineageEvent, LineageRelationshipsEvent.class); storingServices.addEntityContext(lineageRelationshipsEvent);