Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lb4/datahub rest emitter init #1

Closed
wants to merge 10 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
api(project(":extensions:control-plane:api:management-api:contract-negotiation-api"))
api(project(":extensions:control-plane:api:management-api:policy-definition-api"))
api(project(":extensions:control-plane:api:management-api:transfer-process-api"))
api(project(":extensions:control-plane:api:management-api:dataspace-catalog-api"))
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 ZF Friedrichshafen AG
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* ZF Friedrichshafen AG - Initial API and Implementation
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*/


plugins {
`java-library`
id("io.swagger.core.v3.swagger-gradle-plugin")
}

dependencies {
api(project(":extensions:control-plane:api:management-api:asset-api"))
implementation("io.acryl:datahub-client:0.10.5-5")
implementation("org.apache.httpcomponents:httpclient:4.5")
implementation("org.apache.httpcomponents:httpasyncclient:4.1.5")

}

edcBuild {
swagger {
apiGroup.set("management-api")
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package be.imec.edit.ds.catalog;

import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.dataset.EditableDatasetProperties;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaFieldArray;
import com.linkedin.schema.SchemaMetadata;
import datahub.client.MetadataWriteResponse;
import datahub.client.rest.RestEmitter;
import datahub.shaded.org.apache.kafka.common.errors.ApiException;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/***
* The class to ingest metadata about data asset (dataset) to data space catalog.
*
* */

public class DatasetEntityIngestor extends DataSpaceCatalogIngestorBase {
Logger log = LoggerFactory.getLogger(this.getClass().getName());
Fixed Show fixed Hide fixed

Check warning

Code scanning / CodeQL

Field masks field in super class Warning

This field shadows another field called
log
in a superclass.


final String entityType = "dataset";
/***
* editableDatasetProperties aspect of dataset entity - see details on datahub documentation for dataset entity aspects
* */
private DatasetProperties _datasetProperties(Asset asset) {
var createdAt=new com.linkedin.common.TimeStamp();
createdAt.setTime(asset.getCreatedAt());
return new DatasetProperties()
.setDescription(asset.getDescription())
.setCreated(createdAt);

}

/***
* editableDatasetProperties aspect of dataset entity
* */
private EditableDatasetProperties _editableDatasetProperties(Asset asset) {
Fixed Show fixed Hide fixed
return new EditableDatasetProperties();
}
/***
* schemaMetadata aspect of dataset entity
* */
public SchemaMetadata _schemaMetadata(Asset asset) { //todo: This should not be handcrafted, rather should come (if any) from an avro
Dismissed Show dismissed Hide dismissed
SchemaFieldArray fields = new SchemaFieldArray();
fields.add(new SchemaField());
return new SchemaMetadata().setFields(fields);
}

/***
* Returns datahub style urn for an asset - includes `test` as platform, and includes EDC asset name and id within the urn.
* The FabricType is the environment type such as Dev, Prod, etc.
* */
public Urn _urn(Asset asset) throws URISyntaxException {
return new DatasetUrn(_platformUrn(entityType), asset.getName()+asset.getId(), FabricType.DEV);
}

/***
* This method emits whole dataset, with all aspects (defined within) to dataspace catalog. To only ingest/emit a single aspect, see specs.
* In this method, we first create a dataset with a single aspect - datasetProperties Aspect. Then, we create other aspects such as
* schemaMetadata and editableProperties aspects, and ingest them in parallel.
* Usually it can be done sequentially, but this is to show that, if an entity already exists, then aspects can be pushed in parallel as well.
* Since the calls are asynchronous, the datahub api at the receiving end will respond asynchronously.
* */
public Urn emitMetadataChangeProposal(Asset asset)
Dismissed Show dismissed Hide dismissed
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
Urn datasetUrn = _urn(asset);
log.info("Pushing dataset to data space catalog");
Future<MetadataWriteResponse> responseFuture = emitter.emit(_metadataChangeProposalWrapper(_datasetProperties(asset), entityType, datasetUrn));
if(responseFuture.isDone() && responseFuture.get().isSuccess()){
Future<MetadataWriteResponse> editablePropsFut = emitter.emit(_metadataChangeProposalWrapper(_editableDatasetProperties(asset), entityType, datasetUrn));
Future<MetadataWriteResponse> schemaPropsFut = emitter.emit(_metadataChangeProposalWrapper(_schemaMetadata(asset), entityType, datasetUrn));
int numThreads = 2; // Number of threads in the thread pool
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
List<Future<MetadataWriteResponse>> futures = List.of(editablePropsFut, schemaPropsFut);
List<Future<Future<MetadataWriteResponse>>> responses = new ArrayList<>();
for (Future<MetadataWriteResponse> future: futures){
Callable<Future<MetadataWriteResponse>> callable = () -> future;
Future<Future<MetadataWriteResponse>> resp = executor.submit(callable);
responses.add(resp);
}
for (Future<Future<MetadataWriteResponse>> response: responses){
try{
Future<MetadataWriteResponse> fresp = response.get();
if(response.isDone() && fresp.isDone() && fresp.get().isSuccess()){
log.info("Success ingesting "+ fresp.get().getResponseContent());
}
else {
log.error(fresp.get().getResponseContent(), ApiException.class);
}
}
catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
return null;
}
}
}
return datasetUrn;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package be.imec.edit.ds.catalog;

import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase;
import com.linkedin.common.urn.Urn;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class DomainEntityIngestor extends DataSpaceCatalogIngestorBase {
Logger log = LoggerFactory.getLogger(this.getClass().getName());
Dismissed Show dismissed Hide dismissed
@Override
public Urn emitMetadataChangeProposal(Asset asset)
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package be.imec.edit.ds.catalog;

import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase;
import com.linkedin.common.urn.Urn;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class PolicyEntityIngestor extends DataSpaceCatalogIngestorBase {
Logger log = LoggerFactory.getLogger(this.getClass().getName());
Dismissed Show dismissed Hide dismissed
@Override
public Urn emitMetadataChangeProposal(Asset asset)
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package be.imec.edit.ds.catalog;

import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase;
import com.linkedin.common.urn.Urn;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class RelationshipIngestor extends DataSpaceCatalogIngestorBase {
Logger log = LoggerFactory.getLogger(this.getClass().getName());
Dismissed Show dismissed Hide dismissed
@Override
public Urn emitMetadataChangeProposal(Asset asset)
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package be.imec.edit.ds.catalog;

import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase;
import com.linkedin.common.urn.Urn;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class UserEntityIngestor extends DataSpaceCatalogIngestorBase {
Logger log = LoggerFactory.getLogger(this.getClass().getName());
Dismissed Show dismissed Hide dismissed
@Override
public Urn emitMetadataChangeProposal(Asset asset)
throws URISyntaxException, IOException, ExecutionException, InterruptedException {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package be.imec.edit.ds.catalog.common;

import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import datahub.client.rest.RestEmitter;
import datahub.event.MetadataChangeProposalWrapper;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


abstract public class DataSpaceCatalogIngestorBase {
Logger log = LoggerFactory.getLogger(this.getClass().getName());
/***
* To create an emitter that is pushing to datahub instance that is remote (though an IP or url), see RestEmitter class. It has examples for creating emitter with external urls. An example is shown below commented out
*
* */
protected RestEmitter emitter = RestEmitter.createWithDefaults();
//protected RestEmitter emitter = RestEmitter.create(b -> b.server("http://localhost:8080")); // todo: replace the `localhost:8080` with ip address or address of the dathub gms.
/***
* Method to build change proposal for any entity. aspect represents an aspect, e.g. dataSetProperties or Ownership Aspect of dataset, entityType is e.g. dathahub entity `dataset` etc.
* And the urn is `datahub` style urn.
* */
public MetadataChangeProposalWrapper _metadataChangeProposalWrapper(RecordTemplate aspect, String entityType, Urn urn) {
return MetadataChangeProposalWrapper.builder()
.entityType(entityType)
.entityUrn(urn)
.upsert()
.aspect(aspect)
.build();
}
/**
* At the moment, edc connectors are experimental. The data platforms (bigquery, snowflake, hudi etc.) are unknown and not provided, so we use `test`. But we can
* Use `conf` files to configure this.
* */
public DataPlatformUrn _platformUrn(String entityType) throws URISyntaxException {
return DataPlatformUrn.createFromUrn(DataPlatformUrn.createFromTuple(entityType, "test"));
}

/***
* A method used by subclasses to implement entity specific changeproposals and emit to datahub (data space catalog)
* */
public abstract Urn emitMetadataChangeProposal(Asset asset) throws URISyntaxException, IOException, ExecutionException,
InterruptedException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package be.imec.edit.ds.catalog;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;
import java.io.IOException;
import java.io.StringReader;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.mockito.Mockito.*;


public class DataSpaceCatalogTest extends DatasetEntityIngestor {
Logger log = LoggerFactory.getLogger(this.getClass().getName());
private final TypeTransformerRegistry transformerRegistry = mock(TypeTransformerRegistry.class);

@Test
void ingestDataSetEntity() throws URISyntaxException, IOException {
URL resource = null; // getClass().getClassLoader().getResource("asset.json");
try (JsonReader jsonReader = Json.createReader(new StringReader(Files.readString(Paths.get(resource.toURI()))))) {

JsonObject readObject = jsonReader.readObject();
Result<Asset> asset = transformerRegistry.transform(readObject, Asset.class);
emitMetadataChangeProposal(asset.getContent());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"@context": {
"edc": "https://w3id.org/edc/v0.0.1/ns/"
},
"asset": {
"@id": "${assetId}",
"properties": {
"name": "product description",
"contenttype": "application/json"
}
},
"dataAddress": {
"name": "Test asset",
"baseUrl": "${source}",
"type": "HttpData"
}
}
4 changes: 3 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ include(":core:data-plane-selector:data-plane-selector-core")

// modules that provide implementations for data ingress/egress ------------------------------------
include(":data-protocols:dsp:dsp-api-configuration")
include(":data-protocols:dsp:dsp-catalog")
include(":data-protocols:dsp:dsp-catalog:dsp-catalog-api")
include(":data-protocols:dsp:dsp-catalog:dsp-catalog-api")
include(":data-protocols:dsp:dsp-catalog:dsp-catalog-http-dispatcher")
include(":data-protocols:dsp:dsp-catalog:dsp-catalog-transform")
Expand Down Expand Up @@ -129,6 +129,7 @@ include(":extensions:control-plane:api:control-plane-api")
include(":extensions:control-plane:api:control-plane-api-client")
include(":extensions:control-plane:api:management-api")
include(":extensions:control-plane:api:management-api:asset-api")
include(":extensions:control-plane:api:management-api:dataspace-catalog-api")
include(":extensions:control-plane:api:management-api:catalog-api")
include(":extensions:control-plane:api:management-api:contract-agreement-api")
include(":extensions:control-plane:api:management-api:contract-definition-api")
Expand Down Expand Up @@ -189,6 +190,7 @@ include(":spi:common:validator-spi")
include(":spi:common:web-spi")

include(":spi:control-plane:asset-spi")
include(":spi:control-plane:dataspace-catalog-api")
include(":spi:control-plane:contract-spi")
include(":spi:control-plane:control-plane-spi")
include(":spi:control-plane:transfer-data-plane-spi")
Expand Down
Loading