Skip to content

Commit

Permalink
Enable inbound Open Lineage Events
Browse files Browse the repository at this point in the history
Signed-off-by: Mandy Chessell <[email protected]>
  • Loading branch information
mandy-chessell committed Sep 15, 2024
1 parent 5a2f477 commit 96aee42
Show file tree
Hide file tree
Showing 53 changed files with 4,157 additions and 352 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ component-id-report.md
survey-report*.md
valid-values-report*.md
surveys/*
X*GUIDMap.json
Used*GUIDMap.json

# Ignore any json files generated during junit
open-metadata-resources/open-metadata-archives/design-model-archives/*.json
Expand Down
1 change: 0 additions & 1 deletion CocoComboGUIDMap.json

This file was deleted.

1 change: 0 additions & 1 deletion CoreContentPackGUIDMap.json

This file was deleted.

1 change: 1 addition & 0 deletions EgeriaContentPacksGUIDMap.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion content-packs/CocoBusinessSystemsArchive.omarchive

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion content-packs/CocoClinicalTrialsTemplatesArchive.omarchive

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion content-packs/CocoComboArchive.omarchive

Large diffs are not rendered by default.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion content-packs/CocoGovernanceProgramArchive.omarchive

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion content-packs/CocoOrganizationArchive.omarchive

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion content-packs/CocoSustainabilityArchive.omarchive

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion content-packs/CocoTypesArchive.omarchive

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion content-packs/CoreContentPack.omarchive

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void processEvent(AssetManagerOutTopicEvent event)
integrationContext.setMetadataSourceQualifiedName(catalogTarget.getMetadataSourceQualifiedName());
integrationContext.setExternalSourceIsHome(true);

RequestedCatalogTarget requestedCatalogTarget = new RequestedCatalogTarget(catalogTarget);
RequestedCatalogTarget requestedCatalogTarget = new RequestedCatalogTarget(catalogTarget, null);

Map<String, Object> configurationProperties = connectionProperties.getConfigurationProperties();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -171,7 +166,14 @@ private void initializeTopic()
* If group.id explicitly set using the standard kafka property, then use that.
*/

serverId = (String) configurationProperties.get(KafkaOpenMetadataTopicProvider.serverIdPropertyName);
if (configurationProperties.get(KafkaOpenMetadataTopicProvider.serverIdPropertyName) != null)
{
serverId = configurationProperties.get(KafkaOpenMetadataTopicProvider.serverIdPropertyName).toString();
}
else
{
serverId = UUID.randomUUID().toString();
}

if (StringUtils.isEmpty((String)consumerProperties.get("group.id")))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ public class AuditLogDestinationCatalogTargetProcessor extends CatalogTargetProc
* Copy/clone constructor
*
* @param template object to copy
* @param connectorToTarget connector to access the target resource
* @param connectorName name of this integration connector
* @param auditLog logging destination
*/
public AuditLogDestinationCatalogTargetProcessor(CatalogTarget template,
Connector connectorToTarget,
String connectorName,
AuditLog auditLog)
{
super(template, connectorName, auditLog);
super(template, connectorToTarget, connectorName, auditLog);

if (super.getCatalogTargetConnector() instanceof OMRSAuditLogStoreConnectorBase auditLogStoreConnectorBase)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.odpi.openmetadata.adapters.connectors.integration.kafkaaudit.ffdc.DistributeKafkaAuditCode;
import org.odpi.openmetadata.frameworks.connectors.Connector;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.governanceaction.properties.CatalogTarget;
import org.odpi.openmetadata.frameworks.integration.connectors.CatalogTargetIntegrator;
Expand Down Expand Up @@ -90,14 +91,17 @@ public void integrateCatalogTarget(RequestedCatalogTarget requestedCatalogTarget
* Create a new catalog target processor (typically inherits from CatalogTargetProcessorBase).
*
* @param retrievedCatalogTarget details of the open metadata elements describing the catalog target
* @param connectorToTarget connector to access the target resource
* @return new processor based on the catalog target information
*/
@Override
public RequestedCatalogTarget getNewRequestedCatalogTargetSkeleton(CatalogTarget retrievedCatalogTarget)
public RequestedCatalogTarget getNewRequestedCatalogTargetSkeleton(CatalogTarget retrievedCatalogTarget,
Connector connectorToTarget)
{
if (propertyHelper.isTypeOf(retrievedCatalogTarget.getCatalogTargetElement(), OpenMetadataType.KAFKA_TOPIC.typeName))
{
return new KafkaTopicSourceCatalogTargetProcessor(retrievedCatalogTarget,
connectorToTarget,
connectorName,
auditLog,
this);
Expand All @@ -106,10 +110,11 @@ else if ((propertyHelper.isTypeOf(retrievedCatalogTarget.getCatalogTargetElement
(propertyHelper.isTypeOf(retrievedCatalogTarget.getCatalogTargetElement(), OpenMetadataType.DATABASE.typeName)))
{
return new AuditLogDestinationCatalogTargetProcessor(retrievedCatalogTarget,
connectorToTarget,
connectorName,
auditLog);
}

return new RequestedCatalogTarget(retrievedCatalogTarget);
return new RequestedCatalogTarget(retrievedCatalogTarget, connectorToTarget);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package org.odpi.openmetadata.adapters.connectors.integration.kafkaaudit;

import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.connectors.Connector;
import org.odpi.openmetadata.frameworks.governanceaction.properties.CatalogTarget;
import org.odpi.openmetadata.frameworks.integration.connectors.CatalogTargetProcessorBase;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
Expand All @@ -17,16 +18,18 @@ public class KafkaTopicSourceCatalogTargetProcessor extends CatalogTargetProcess
* Copy/clone constructor
*
* @param template object to copy
* @param connectorToTarget connector to access the target resource
* @param connectorName name of this integration connector
* @param auditLog logging destination
* @param listener listener
*/
public KafkaTopicSourceCatalogTargetProcessor(CatalogTarget template,
Connector connectorToTarget,
String connectorName,
AuditLog auditLog,
OpenMetadataTopicListener listener)
{
super(template, connectorName, auditLog);
super(template, connectorToTarget, connectorName, auditLog);

if (super.getCatalogTargetConnector() instanceof OpenMetadataTopicConnector topicConnector)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
public class OpenLineageCataloguerIntegrationConnector extends LineageIntegratorConnector implements OpenLineageEventListener
{
protected String destinationName = "<Unknown";
protected String destinationName = "Unknown";
protected LineageIntegratorContext myContext = null;


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/* SPDX-License-Identifier: Apache-2.0 */
/* Copyright Contributors to the ODPi Egeria project. */
package org.odpi.openmetadata.adapters.connectors.integration.openlineage;

import org.odpi.openmetadata.adapters.connectors.integration.openlineage.ffdc.OpenLineageIntegrationConnectorAuditCode;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.connectors.Connector;
import org.odpi.openmetadata.frameworks.connectors.properties.ConnectionProperties;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointProperties;
import org.odpi.openmetadata.frameworks.governanceaction.properties.CatalogTarget;
import org.odpi.openmetadata.frameworks.integration.connectors.CatalogTargetProcessorBase;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicListener;

/**
* Listens for events from Apache Kafka that contain open lineage events.
*/
public class OpenLineageEventReceiverCatalogTargetProcessor extends CatalogTargetProcessorBase
{
/**
* Copy/clone constructor
*
* @param template object to copy
* @param connectorToTarget connector to access the target resource
* @param connectorName name of this integration connector
* @param auditLog logging destination
* @param listener listener
*/
public OpenLineageEventReceiverCatalogTargetProcessor(CatalogTarget template,
Connector connectorToTarget,
String connectorName,
AuditLog auditLog,
OpenMetadataTopicListener listener)
{
super(template, connectorToTarget, connectorName, auditLog);

if (super.getCatalogTargetConnector() instanceof OpenMetadataTopicConnector topicConnector)
{
this.registerTopicConnector(topicConnector, listener);
}
}


/**
* Add the topic connector to the list of topics this connector is listening on.
*
* @param topicConnector connector
* @param listener event listener
*/
private void registerTopicConnector(OpenMetadataTopicConnector topicConnector,
OpenMetadataTopicListener listener)
{
final String methodName = "registerTopicConnector";

/*
* Register this connector as a listener of the event bus connector.
*/
topicConnector.registerListener(listener);

ConnectionProperties connectionProperties = topicConnector.getConnection();

if (connectionProperties != null)
{
EndpointProperties endpoint = connectionProperties.getEndpoint();

if (endpoint != null)
{
auditLog.logMessage(methodName,
OpenLineageIntegrationConnectorAuditCode.KAFKA_RECEIVER_CONFIGURATION.getMessageDefinition(connectorName,
endpoint.getAddress(),
connectionProperties.getConnectionName()));
}
}
}


/**
* Requests that the connector does a comparison of the metadata in the third party technology and open metadata repositories.
* Refresh is called when the integration connector first starts and then at intervals defined in the connector's configuration
* as well as any external REST API calls to explicitly refresh the connector.
*/
@Override
public void refresh()
{
// nothing to do
}
}
Loading

0 comments on commit 96aee42

Please sign in to comment.