From 70cf165c1d7188fdc36826bd0b828f8d6c1cadd3 Mon Sep 17 00:00:00 2001 From: idris52 Date: Fri, 25 Aug 2023 13:42:19 +0200 Subject: [PATCH 1/6] feat: added wrappers for datahub rest emitter as sub-project in extensions/control-plane/management-api/dataspace-catalog-api --- .../api/management-api/build.gradle.kts | 1 + .../dataspace-catalog-api/build.gradle.kts | 35 ++++++ .../ds/catalog/DatasetEntityIngestor.java | 111 ++++++++++++++++++ .../edit/ds/catalog/DomainEntityIngestor.java | 20 ++++ .../edit/ds/catalog/PolicyEntityIngestor.java | 20 ++++ .../edit/ds/catalog/RelationshipIngestor.java | 20 ++++ .../edit/ds/catalog/UserEntityIngestor.java | 20 ++++ .../common/DataSpaceCatalogIngestorBase.java | 31 +++++ .../edit/ds/catalog/DataSpaceCatalogTest.java | 39 ++++++ .../src/test/resources/Asset.json | 17 +++ settings.gradle.kts | 4 +- 11 files changed, 317 insertions(+), 1 deletion(-) create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/build.gradle.kts create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DomainEntityIngestor.java create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/PolicyEntityIngestor.java create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/RelationshipIngestor.java create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/UserEntityIngestor.java create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java create mode 100644 extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/resources/Asset.json diff --git a/extensions/control-plane/api/management-api/build.gradle.kts b/extensions/control-plane/api/management-api/build.gradle.kts index e137acadde4..4f9a613564d 100644 --- a/extensions/control-plane/api/management-api/build.gradle.kts +++ b/extensions/control-plane/api/management-api/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { api(project(":extensions:control-plane:api:management-api:contract-negotiation-api")) api(project(":extensions:control-plane:api:management-api:policy-definition-api")) api(project(":extensions:control-plane:api:management-api:transfer-process-api")) + api(project(":extensions:control-plane:api:management-api:dataspace-catalog-api")) } diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/build.gradle.kts b/extensions/control-plane/api/management-api/dataspace-catalog-api/build.gradle.kts new file mode 100644 index 00000000000..5a7cf6e7c0d --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/build.gradle.kts @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + */ + + +plugins { + `java-library` + id("io.swagger.core.v3.swagger-gradle-plugin") +} + +dependencies { + api(project(":extensions:control-plane:api:management-api:asset-api")) + implementation("io.acryl:datahub-client:0.10.5-5") + implementation("org.apache.httpcomponents:httpclient:4.5") + implementation("org.apache.httpcomponents:httpasyncclient:4.1.5") + +} + +edcBuild { + swagger { + apiGroup.set("management-api") + } +} + + diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java new file mode 100644 index 00000000000..ef30bcc1a08 --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java @@ -0,0 +1,111 @@ +package be.imec.edit.ds.catalog; + +import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase; +import com.linkedin.common.FabricType; +import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.dataset.DatasetProperties; +import com.linkedin.dataset.EditableDatasetProperties; +import com.linkedin.schema.SchemaField; +import com.linkedin.schema.SchemaFieldArray; +import com.linkedin.schema.SchemaMetadata; +import datahub.client.MetadataWriteResponse; +import datahub.client.rest.RestEmitter; +import datahub.shaded.org.apache.kafka.common.errors.ApiException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/*** + * The class to ingest metadata about data asset (dataset) to data space catalog. + * +* */ + +public class DatasetEntityIngestor extends DataSpaceCatalogIngestorBase { + Logger log = LoggerFactory.getLogger(this.getClass().getName()); + RestEmitter emitter = RestEmitter.createWithDefaults(); + + final String entityType = "dataset"; +/*** + * editableDatasetProperties aspect of dataset entity + * */ + private DatasetProperties _datasetProperties(Asset asset) { + var createdAt=new com.linkedin.common.TimeStamp(); + createdAt.setTime(asset.getCreatedAt()); + return new DatasetProperties() + .setDescription(asset.getDescription()) + .setCreated(createdAt); + + } + + /*** + * editableDatasetProperties aspect of dataset entity + * */ + private EditableDatasetProperties _editableDatasetProperties(Asset asset) { + return new EditableDatasetProperties(); + } + /*** + * schemaMetadata aspect of dataset entity + * */ + public SchemaMetadata _schemaMetadata(Asset asset) { //todo: This should not be handcrafted, rather should come (if any) from an avro + SchemaFieldArray fields = new SchemaFieldArray(); + fields.add(new SchemaField()); + return new SchemaMetadata().setFields(fields); + } + + public Urn _urn(Asset asset) throws URISyntaxException { + return new DatasetUrn(_platformUrn(entityType), asset.getName()+asset.getId(), FabricType.DEV); + } + + /*** + * This method emits whole dataset, with all aspects (defined within) to dataspace catalog. To only ingest/emit a single aspect, see specs. + * */ + public Urn emitMetadataChangeProposal(Asset asset) + throws URISyntaxException, IOException, ExecutionException, InterruptedException { + Urn datasetUrn = _urn(asset); + log.info("Pushing dataset to data space catalog"); + Future responseFuture = emitter.emit(_metadataChangeProposalWrapper(_datasetProperties(asset), entityType, datasetUrn)); + if(responseFuture.isDone() && responseFuture.get().isSuccess()){ + Future editablePropsFut = emitter.emit(_metadataChangeProposalWrapper(_editableDatasetProperties(asset), entityType, datasetUrn)); + Future schemaPropsFut = emitter.emit(_metadataChangeProposalWrapper(_schemaMetadata(asset), entityType, datasetUrn)); + int numThreads = 2; // Number of threads in the thread pool + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + List> futures = List.of(editablePropsFut, schemaPropsFut); + List>> responses = new ArrayList<>(); + for (Future future: futures){ + Callable> callable = () -> future; + Future> resp = executor.submit(callable); + responses.add(resp); + } + for (Future> response: responses){ + try{ + Future fresp = response.get(); + if(response.isDone() && fresp.isDone() && fresp.get().isSuccess()){ + log.info("Success ingesting "+ fresp.get().getResponseContent()); + } + else { + log.error(fresp.get().getResponseContent(), ApiException.class); + } + } + catch (Exception e) { + e.printStackTrace(); + log.error(e.getMessage()); + return null; + } + } + } + return datasetUrn; + } + + +} diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DomainEntityIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DomainEntityIngestor.java new file mode 100644 index 00000000000..5b3f7f4384c --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DomainEntityIngestor.java @@ -0,0 +1,20 @@ +package be.imec.edit.ds.catalog; + +import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase; +import com.linkedin.common.urn.Urn; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutionException; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DomainEntityIngestor extends DataSpaceCatalogIngestorBase { + Logger log = LoggerFactory.getLogger(this.getClass().getName()); + @Override + public Urn emitMetadataChangeProposal(Asset asset) + throws URISyntaxException, IOException, ExecutionException, InterruptedException { + return null; + } +} diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/PolicyEntityIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/PolicyEntityIngestor.java new file mode 100644 index 00000000000..ac5dd5446b8 --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/PolicyEntityIngestor.java @@ -0,0 +1,20 @@ +package be.imec.edit.ds.catalog; + +import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase; +import com.linkedin.common.urn.Urn; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutionException; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PolicyEntityIngestor extends DataSpaceCatalogIngestorBase { + Logger log = LoggerFactory.getLogger(this.getClass().getName()); + @Override + public Urn emitMetadataChangeProposal(Asset asset) + throws URISyntaxException, IOException, ExecutionException, InterruptedException { + return null; + } +} diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/RelationshipIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/RelationshipIngestor.java new file mode 100644 index 00000000000..d770c648582 --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/RelationshipIngestor.java @@ -0,0 +1,20 @@ +package be.imec.edit.ds.catalog; + +import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase; +import com.linkedin.common.urn.Urn; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutionException; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RelationshipIngestor extends DataSpaceCatalogIngestorBase { + Logger log = LoggerFactory.getLogger(this.getClass().getName()); + @Override + public Urn emitMetadataChangeProposal(Asset asset) + throws URISyntaxException, IOException, ExecutionException, InterruptedException { + return null; + } +} diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/UserEntityIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/UserEntityIngestor.java new file mode 100644 index 00000000000..f16224aacd6 --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/UserEntityIngestor.java @@ -0,0 +1,20 @@ +package be.imec.edit.ds.catalog; + +import be.imec.edit.ds.catalog.common.DataSpaceCatalogIngestorBase; +import com.linkedin.common.urn.Urn; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutionException; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class UserEntityIngestor extends DataSpaceCatalogIngestorBase { + Logger log = LoggerFactory.getLogger(this.getClass().getName()); + @Override + public Urn emitMetadataChangeProposal(Asset asset) + throws URISyntaxException, IOException, ExecutionException, InterruptedException { + return null; + } +} diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java new file mode 100644 index 00000000000..4ef68bfc8a9 --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java @@ -0,0 +1,31 @@ +package be.imec.edit.ds.catalog.common; + +import com.linkedin.common.urn.DataPlatformUrn; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import datahub.event.MetadataChangeProposalWrapper; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutionException; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +abstract public class DataSpaceCatalogIngestorBase { + Logger log = LoggerFactory.getLogger(this.getClass().getName()); + public MetadataChangeProposalWrapper _metadataChangeProposalWrapper(RecordTemplate aspect, String entityType, Urn urn) { + return MetadataChangeProposalWrapper.builder() + .entityType(entityType) + .entityUrn(urn) + .upsert() + .aspect(aspect) + .build(); + } + public DataPlatformUrn _platformUrn(String entityType) throws URISyntaxException { + return DataPlatformUrn.createFromUrn(DataPlatformUrn.createFromTuple(entityType, "test")); + } + public abstract Urn emitMetadataChangeProposal(Asset asset) throws URISyntaxException, IOException, ExecutionException, + InterruptedException; + +} diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java new file mode 100644 index 00000000000..2e26c135989 --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java @@ -0,0 +1,39 @@ +package be.imec.edit.ds.catalog; + +import jakarta.json.Json; +import jakarta.json.JsonObject; +import jakarta.json.JsonReader; +import java.io.IOException; +import java.io.StringReader; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.ExecutionException; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.eclipse.edc.transform.spi.TypeTransformerRegistry; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.mockito.Mockito.*; + + +public class DataSpaceCatalogTest extends DatasetEntityIngestor { + Logger log = LoggerFactory.getLogger(this.getClass().getName()); + private final TypeTransformerRegistry transformerRegistry = mock(TypeTransformerRegistry.class); + + @Test + void ingestDataSetEntity() throws URISyntaxException, IOException { + URL resource = null; // getClass().getClassLoader().getResource("asset.json"); + try (JsonReader jsonReader = Json.createReader(new StringReader(Files.readString(Paths.get(resource.toURI()))))) { + + JsonObject readObject = jsonReader.readObject(); + Result asset = transformerRegistry.transform(readObject, Asset.class); + emitMetadataChangeProposal(asset.getContent()); + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/resources/Asset.json b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/resources/Asset.json new file mode 100644 index 00000000000..d7bba0dd8e6 --- /dev/null +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/resources/Asset.json @@ -0,0 +1,17 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/" + }, + "asset": { + "@id": "${assetId}", + "properties": { + "name": "product description", + "contenttype": "application/json" + } + }, + "dataAddress": { + "name": "Test asset", + "baseUrl": "${source}", + "type": "HttpData" + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index ba9aed1df1c..f4a4151049b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -66,7 +66,7 @@ include(":core:data-plane-selector:data-plane-selector-core") // modules that provide implementations for data ingress/egress ------------------------------------ include(":data-protocols:dsp:dsp-api-configuration") -include(":data-protocols:dsp:dsp-catalog") +include(":data-protocols:dsp:dsp-catalog:dsp-catalog-api") include(":data-protocols:dsp:dsp-catalog:dsp-catalog-api") include(":data-protocols:dsp:dsp-catalog:dsp-catalog-http-dispatcher") include(":data-protocols:dsp:dsp-catalog:dsp-catalog-transform") @@ -129,6 +129,7 @@ include(":extensions:control-plane:api:control-plane-api") include(":extensions:control-plane:api:control-plane-api-client") include(":extensions:control-plane:api:management-api") include(":extensions:control-plane:api:management-api:asset-api") +include(":extensions:control-plane:api:management-api:dataspace-catalog-api") include(":extensions:control-plane:api:management-api:catalog-api") include(":extensions:control-plane:api:management-api:contract-agreement-api") include(":extensions:control-plane:api:management-api:contract-definition-api") @@ -189,6 +190,7 @@ include(":spi:common:validator-spi") include(":spi:common:web-spi") include(":spi:control-plane:asset-spi") +include(":spi:control-plane:dataspace-catalog-api") include(":spi:control-plane:contract-spi") include(":spi:control-plane:control-plane-spi") include(":spi:control-plane:transfer-data-plane-spi") From c9684a4926e3d76788a76dfb935d177c1c407255 Mon Sep 17 00:00:00 2001 From: idris52 Date: Fri, 25 Aug 2023 15:56:20 +0200 Subject: [PATCH 2/6] doc: added comments --- .../ds/catalog/DatasetEntityIngestor.java | 12 +++++++++-- .../common/DataSpaceCatalogIngestorBase.java | 20 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java index ef30bcc1a08..ac6135a450a 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java @@ -33,11 +33,11 @@ public class DatasetEntityIngestor extends DataSpaceCatalogIngestorBase { Logger log = LoggerFactory.getLogger(this.getClass().getName()); - RestEmitter emitter = RestEmitter.createWithDefaults(); + final String entityType = "dataset"; /*** - * editableDatasetProperties aspect of dataset entity + * editableDatasetProperties aspect of dataset entity - see details on datahub documentation for dataset entity aspects * */ private DatasetProperties _datasetProperties(Asset asset) { var createdAt=new com.linkedin.common.TimeStamp(); @@ -63,12 +63,20 @@ public SchemaMetadata _schemaMetadata(Asset asset) { //todo: This should not be return new SchemaMetadata().setFields(fields); } + /*** + * Returns datahub style urn for an asset - includes `test` as platform, and includes EDC asset name and id within the urn. + * The FabricType is the environment type such as Dev, Prod, etc. + * */ public Urn _urn(Asset asset) throws URISyntaxException { return new DatasetUrn(_platformUrn(entityType), asset.getName()+asset.getId(), FabricType.DEV); } /*** * This method emits whole dataset, with all aspects (defined within) to dataspace catalog. To only ingest/emit a single aspect, see specs. + * In this method, we first create a dataset with a single aspect - datasetProperties Aspect. Then, we create other aspects such as + * schemaMetadata and editableProperties aspects, and ingest them in parallel. + * Usually it can be done sequentially, but this is to show that, if an entity already exists, then aspects can be pushed in parallel as well. + * Since the calls are asynchronous, the datahub api at the receiving end will respond asynchronously. * */ public Urn emitMetadataChangeProposal(Asset asset) throws URISyntaxException, IOException, ExecutionException, InterruptedException { diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java index 4ef68bfc8a9..e7a7e301b36 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java @@ -3,9 +3,11 @@ import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; +import datahub.client.rest.RestEmitter; import datahub.event.MetadataChangeProposalWrapper; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Collections; import java.util.concurrent.ExecutionException; import org.eclipse.edc.spi.types.domain.asset.Asset; import org.slf4j.Logger; @@ -14,6 +16,16 @@ abstract public class DataSpaceCatalogIngestorBase { Logger log = LoggerFactory.getLogger(this.getClass().getName()); + /*** + * To create an emitter that is pushing to datahub instance that is remote (though an IP or url), see RestEmitter class. It has examples for creating emitter with external urls. An example is shown below commented out + * + * */ + protected RestEmitter emitter = RestEmitter.createWithDefaults(); + //protected RestEmitter emitter = RestEmitter.create(b -> b.server("http://localhost:8080")); // todo: replace the `localhost:8080` with ip address or address of the dathub gms. + /*** + * Method to build change proposal for any entity. aspect represents an aspect, e.g. dataSetProperties or Ownership Aspect of dataset, entityType is e.g. dathahub entity `dataset` etc. + * And the urn is `datahub` style urn. + * */ public MetadataChangeProposalWrapper _metadataChangeProposalWrapper(RecordTemplate aspect, String entityType, Urn urn) { return MetadataChangeProposalWrapper.builder() .entityType(entityType) @@ -22,9 +34,17 @@ public MetadataChangeProposalWrapper _metadataChangeProposalWrapper(RecordTempla .aspect(aspect) .build(); } + /** + * At the moment, edc connectors are experimental. The data platforms (bigquery, snowflake, hudi etc.) are unknown and not provided, so we use `test`. But we can + * Use `conf` files to configure this. + * */ public DataPlatformUrn _platformUrn(String entityType) throws URISyntaxException { return DataPlatformUrn.createFromUrn(DataPlatformUrn.createFromTuple(entityType, "test")); } + + /*** + * A method used by subclasses to implement entity specific changeproposals and emit to datahub (data space catalog) + * */ public abstract Urn emitMetadataChangeProposal(Asset asset) throws URISyntaxException, IOException, ExecutionException, InterruptedException; From 83a820126ddba7f2182a3171556d6007b0052bde Mon Sep 17 00:00:00 2001 From: Konstantin Kostov Date: Mon, 28 Aug 2023 14:50:05 +0200 Subject: [PATCH 3/6] fix: load asset from json file --- .../java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java index 2e26c135989..f6fb7cab1b7 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java @@ -26,7 +26,9 @@ public class DataSpaceCatalogTest extends DatasetEntityIngestor { @Test void ingestDataSetEntity() throws URISyntaxException, IOException { - URL resource = null; // getClass().getClassLoader().getResource("asset.json"); + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + URL resource = classloader.getResource("asset.json"); + try (JsonReader jsonReader = Json.createReader(new StringReader(Files.readString(Paths.get(resource.toURI()))))) { JsonObject readObject = jsonReader.readObject(); From 95bcd0de057e6b26add50ef40813c0e3adcb8318 Mon Sep 17 00:00:00 2001 From: XD Date: Tue, 29 Aug 2023 12:05:14 +0200 Subject: [PATCH 4/6] Runnable parameter --- build.gradle.kts | 3 +++ gradle.properties | 1 + 2 files changed, 4 insertions(+) diff --git a/build.gradle.kts b/build.gradle.kts index 74633e3db7b..fc0584a0614 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -70,3 +70,6 @@ allprojects { } } +dependencies { + implementation("junit:junit:4.13.1") +} diff --git a/gradle.properties b/gradle.properties index f939fa2bf35..7b821127eb8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,3 +7,4 @@ edcGradlePluginsVersion=0.2.2-SNAPSHOT metaModelVersion=0.2.2-SNAPSHOT edcScmUrl=https://github.com/eclipse-edc/Connector.git edcScmConnection=scm:git:git@github.com:eclipse-edc/Connector.git +org.gradle.jvmargs=-Xmx4096M From b0d0eb1faac2b2e386fc2a6b57585dcf69f3755e Mon Sep 17 00:00:00 2001 From: XD Date: Wed, 30 Aug 2023 16:20:50 +0200 Subject: [PATCH 5/6] Post Sample dataset @Asset.json to local run datahub endpoint --- .../dataspace-catalog-api/build.gradle.kts | 12 ++++++++ .../ds/catalog/DatasetEntityIngestor.java | 19 ++++++++++-- .../common/DataSpaceCatalogIngestorBase.java | 2 +- .../edit/ds/catalog/DataSpaceCatalogTest.java | 29 +++++++++++++++++-- .../src/test/resources/Asset.json | 13 +++++---- 5 files changed, 64 insertions(+), 11 deletions(-) diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/build.gradle.kts b/extensions/control-plane/api/management-api/dataspace-catalog-api/build.gradle.kts index 5a7cf6e7c0d..33aa779a9e4 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/build.gradle.kts +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/build.gradle.kts @@ -23,6 +23,18 @@ dependencies { implementation("io.acryl:datahub-client:0.10.5-5") implementation("org.apache.httpcomponents:httpclient:4.5") implementation("org.apache.httpcomponents:httpasyncclient:4.1.5") + testImplementation("org.junit.jupiter:junit-jupiter-api:5.7.2") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine") + testImplementation(project(":core:common:transform-core")) + testImplementation(project(":core:control-plane:control-plane-core")) + testImplementation(project(":core:common:transform-core")) + testImplementation(project(":core:control-plane:control-plane-core")) + testImplementation(project(":core:data-plane-selector:data-plane-selector-core")) + testImplementation(project(":extensions:common:http")) + testImplementation(project(":core:common:junit")) + testImplementation(testFixtures(project(":extensions:common:http:jersey-core"))) + testImplementation(libs.restAssured) + testImplementation(libs.awaitility) } diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java index ac6135a450a..d42097fa0d2 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java @@ -16,6 +16,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -42,8 +43,9 @@ public class DatasetEntityIngestor extends DataSpaceCatalogIngestorBase { private DatasetProperties _datasetProperties(Asset asset) { var createdAt=new com.linkedin.common.TimeStamp(); createdAt.setTime(asset.getCreatedAt()); + Map assetProps = (Map) ((Map) asset.getProperties().get("asset")).get("properties"); return new DatasetProperties() - .setDescription(asset.getDescription()) + .setDescription(assetProps.get("name").toString()) .setCreated(createdAt); } @@ -80,8 +82,21 @@ public Urn _urn(Asset asset) throws URISyntaxException { * */ public Urn emitMetadataChangeProposal(Asset asset) throws URISyntaxException, IOException, ExecutionException, InterruptedException { - Urn datasetUrn = _urn(asset); + //Urn datasetUrn = _urn(asset); + + //Extract EDC assets json properties. + Map assets = (Map) asset.getProperties().get("asset"); + Map dataAddress = (Map) asset.getProperties().get("dataAddress"); + + // creat proper urn which can be feed to datahub api. + // need to be a data platfrom urn. + // Make asset id of edc data model become a part of the urn. + + var urn = "urn:li:dataset:(urn:li:dataPlatform:EDIT_EDC_Platform,asset: "+(String) assets.get("@id")+",TEST)"; + Urn datasetUrn = new Urn(urn); log.info("Pushing dataset to data space catalog"); + + // Emit datasetProperties aspect Future responseFuture = emitter.emit(_metadataChangeProposalWrapper(_datasetProperties(asset), entityType, datasetUrn)); if(responseFuture.isDone() && responseFuture.get().isSuccess()){ Future editablePropsFut = emitter.emit(_metadataChangeProposalWrapper(_editableDatasetProperties(asset), entityType, datasetUrn)); diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java index e7a7e301b36..e144c69b5ac 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java @@ -39,7 +39,7 @@ public MetadataChangeProposalWrapper _metadataChangeProposalWrapper(RecordTempla * Use `conf` files to configure this. * */ public DataPlatformUrn _platformUrn(String entityType) throws URISyntaxException { - return DataPlatformUrn.createFromUrn(DataPlatformUrn.createFromTuple(entityType, "test")); + return DataPlatformUrn.createFromUrn(DataPlatformUrn.createFromTuple(entityType, "test:PROD")); } /*** diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java index f6fb7cab1b7..0061ba0ecf8 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/java/be/imec/edit/ds/catalog/DataSpaceCatalogTest.java @@ -1,6 +1,9 @@ package be.imec.edit.ds.catalog; +import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.json.Json; +import java.util.LinkedHashMap; +import java.util.Map; import jakarta.json.JsonObject; import jakarta.json.JsonReader; import java.io.IOException; @@ -10,9 +13,19 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.concurrent.ExecutionException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import org.eclipse.edc.jsonld.util.JacksonJsonLd; +import org.eclipse.edc.core.transform.transformer.to.JsonObjectToAssetTransformer; +import org.eclipse.edc.core.transform.transformer.to.JsonObjectToDataAddressTransformer; +import org.eclipse.edc.core.transform.transformer.to.JsonValueToGenericTypeTransformer; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.asset.Asset; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; +import org.eclipse.edc.core.transform.TypeTransformerRegistryImpl; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,8 +34,18 @@ public class DataSpaceCatalogTest extends DatasetEntityIngestor { + private final ObjectMapper objectMapper = JacksonJsonLd.createObjectMapper(); Logger log = LoggerFactory.getLogger(this.getClass().getName()); - private final TypeTransformerRegistry transformerRegistry = mock(TypeTransformerRegistry.class); + //private final TypeTransformerRegistry transformerRegistry = mock(TypeTransformerRegistry.class); + private final TypeTransformerRegistry transformer = new TypeTransformerRegistryImpl(); + + + @BeforeEach + void setUp() { + transformer.register(new JsonObjectToAssetTransformer()); + transformer.register(new JsonValueToGenericTypeTransformer(objectMapper)); + transformer.register(new JsonObjectToDataAddressTransformer()); + } @Test void ingestDataSetEntity() throws URISyntaxException, IOException { @@ -32,7 +55,9 @@ void ingestDataSetEntity() throws URISyntaxException, IOException { try (JsonReader jsonReader = Json.createReader(new StringReader(Files.readString(Paths.get(resource.toURI()))))) { JsonObject readObject = jsonReader.readObject(); - Result asset = transformerRegistry.transform(readObject, Asset.class); + + //Transform the json object to Asset object + Result asset = transformer.transform(readObject, Asset.class); emitMetadataChangeProposal(asset.getContent()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/resources/Asset.json b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/resources/Asset.json index d7bba0dd8e6..46f3ad26340 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/resources/Asset.json +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/test/resources/Asset.json @@ -3,15 +3,16 @@ "edc": "https://w3id.org/edc/v0.0.1/ns/" }, "asset": { - "@id": "${assetId}", + "@id": "VSDS-IMEC-Test-Asset-ttl", "properties": { - "name": "product description", - "contenttype": "application/json" + "name": "LDES Server endpoint for blue-bilke data", + "contenttype": "text/turtle" } }, "dataAddress": { - "name": "Test asset", - "baseUrl": "${source}", - "type": "HttpData" + "type": "HttpData", + "name": "VSDS-IMEC-Test", + "baseUrl": "https://openplanner.ilabt.imec.be/ldes/blue-bike", + "proxyPath": "true" } } \ No newline at end of file From a244034cfd1f260a56c6d3b5e66d866e04c25d68 Mon Sep 17 00:00:00 2001 From: XD Date: Thu, 31 Aug 2023 17:33:16 +0200 Subject: [PATCH 6/6] Use proper function to create urn --- .../ds/catalog/DatasetEntityIngestor.java | 39 ++++++++++++------- .../common/DataSpaceCatalogIngestorBase.java | 4 +- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java index d42097fa0d2..914c0799afd 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java @@ -37,6 +37,10 @@ public class DatasetEntityIngestor extends DataSpaceCatalogIngestorBase { final String entityType = "dataset"; + final String platformType = "dataPlatform"; + + + /*** * editableDatasetProperties aspect of dataset entity - see details on datahub documentation for dataset entity aspects * */ @@ -47,15 +51,19 @@ private DatasetProperties _datasetProperties(Asset asset) { return new DatasetProperties() .setDescription(assetProps.get("name").toString()) .setCreated(createdAt); - } /*** * editableDatasetProperties aspect of dataset entity * */ private EditableDatasetProperties _editableDatasetProperties(Asset asset) { - return new EditableDatasetProperties(); + Map assetProps = (Map) ((Map) asset.getProperties().get("asset")).get("properties"); + return new EditableDatasetProperties() + .setDescription(assetProps.get("name").toString()); + } + + /*** * schemaMetadata aspect of dataset entity * */ @@ -70,7 +78,8 @@ public SchemaMetadata _schemaMetadata(Asset asset) { //todo: This should not be * The FabricType is the environment type such as Dev, Prod, etc. * */ public Urn _urn(Asset asset) throws URISyntaxException { - return new DatasetUrn(_platformUrn(entityType), asset.getName()+asset.getId(), FabricType.DEV); + var assetName = ((Map) asset.getProperties().get("asset")).get("@id"); + return new DatasetUrn(_platformUrn(platformType), "Data Asset: "+assetName, FabricType.DEV); } /*** @@ -82,25 +91,25 @@ public Urn _urn(Asset asset) throws URISyntaxException { * */ public Urn emitMetadataChangeProposal(Asset asset) throws URISyntaxException, IOException, ExecutionException, InterruptedException { - //Urn datasetUrn = _urn(asset); - - //Extract EDC assets json properties. - Map assets = (Map) asset.getProperties().get("asset"); - Map dataAddress = (Map) asset.getProperties().get("dataAddress"); - // creat proper urn which can be feed to datahub api. - // need to be a data platfrom urn. - // Make asset id of edc data model become a part of the urn. - - var urn = "urn:li:dataset:(urn:li:dataPlatform:EDIT_EDC_Platform,asset: "+(String) assets.get("@id")+",TEST)"; - Urn datasetUrn = new Urn(urn); + //Get the urn of the asset + Urn datasetUrn = _urn(asset); log.info("Pushing dataset to data space catalog"); // Emit datasetProperties aspect Future responseFuture = emitter.emit(_metadataChangeProposalWrapper(_datasetProperties(asset), entityType, datasetUrn)); - if(responseFuture.isDone() && responseFuture.get().isSuccess()){ + var emitResponse = responseFuture.get(); + // Emit other aspects in parallel + if(responseFuture.isDone() && emitResponse.isSuccess()){ + + //set and emit editable properties aspect to metadata model of datahub Future editablePropsFut = emitter.emit(_metadataChangeProposalWrapper(_editableDatasetProperties(asset), entityType, datasetUrn)); + + + //set and emit data schema aspect to metadata model of datahub Future schemaPropsFut = emitter.emit(_metadataChangeProposalWrapper(_schemaMetadata(asset), entityType, datasetUrn)); + var a = _editableDatasetProperties(asset); + int numThreads = 2; // Number of threads in the thread pool ExecutorService executor = Executors.newFixedThreadPool(numThreads); List> futures = List.of(editablePropsFut, schemaPropsFut); diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java index e144c69b5ac..0b53b77e50f 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java @@ -35,11 +35,11 @@ public MetadataChangeProposalWrapper _metadataChangeProposalWrapper(RecordTempla .build(); } /** - * At the moment, edc connectors are experimental. The data platforms (bigquery, snowflake, hudi etc.) are unknown and not provided, so we use `test`. But we can + * At the moment, edc connectors are experimental. The data platforms (bigquery, snowflake, hudi etc.) are unknown and not provided, so we use `imec-edc`. But we can * Use `conf` files to configure this. * */ public DataPlatformUrn _platformUrn(String entityType) throws URISyntaxException { - return DataPlatformUrn.createFromUrn(DataPlatformUrn.createFromTuple(entityType, "test:PROD")); + return DataPlatformUrn.createFromUrn(DataPlatformUrn.createFromTuple(entityType, "IMEC_EDC_PLATFORM")); } /***