Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DE Proxy - use ProcessingState classification for saving sync time in DS connector #7009

Merged
merged 13 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
@JsonSubTypes.Type(value = RelationalTableEvent.class, name = "RelationalTableEvent"),
@JsonSubTypes.Type(value = DataFileEvent.class, name = "DataFileEvent"),
@JsonSubTypes.Type(value = TopicEvent.class, name = "TopicEvent"),
@JsonSubTypes.Type(value = EventTypeEvent.class, name = "EventTypeEvent")
@JsonSubTypes.Type(value = EventTypeEvent.class, name = "EventTypeEvent"),
@JsonSubTypes.Type(value = ProcessingStateEvent.class, name = "ProcessingStateEvent")
})
@Getter
@Setter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,31 @@ public enum DataEngineEventType implements Serializable {
DATA_ENGINE_REGISTRATION_EVENT (1, "DataEngineRegistrationEvent", "An event that register a data engine as external source."),
LINEAGE_MAPPINGS_EVENT (2, "LineageMappingsEvent", "An event that add or update lineage mappings."),
PORT_ALIAS_EVENT (3, "PortAliasEvent", "An event that add or update port alias"),
PORT_IMPLEMENTATION_EVENT (4, "PortImplementationEvent", "An event that create or update port implementations."),
SCHEMA_TYPE_EVENT (5, "SchemaTypeEvent", "An event that create or update schema types."),
PORT_IMPLEMENTATION_EVENT (4, "PortImplementationEvent", "An event that creates or updates port implementations."),
SCHEMA_TYPE_EVENT (5, "SchemaTypeEvent", "An event that creates or updates schema types."),
PROCESS_HIERARCHY_EVENT (6, "ProcessHierarchyEvent", "An event to setup a process hierarchy."),
DELETE_DATA_ENGINE_EVENT (7, "DeleteDataEngineEvent", "An event that deletes an external data engine."),
DELETE_SCHEMA_TYPE_EVENT (8, "DeleteSchemaTypeEvent", "An event that deletes a schema type."),
DELETE_PORT_IMPLEMENTATION_EVENT (9, "DeletePortImplementationEvent", "An event that deletes a port implementation."),
DELETE_PORT_ALIAS_EVENT (10, "DeletePortAliasesEvent", "An event that deletes a port alias."),
DATABASE_EVENT (11, "DatabaseEvent", "An event that create or update databases."),
DATABASE_SCHEMA_EVENT (12, "DatabaseSchemaEvent", "An event that create or update database schemas."),
RELATIONAL_TABLE_EVENT (13, "RelationalTableEvent", "An event that create or update relational tables."),
DATA_FILE_EVENT (14, "DataFileEvent", "An event that create or update data files."),
DATABASE_EVENT (11, "DatabaseEvent", "An event that creates or updates databases."),
DATABASE_SCHEMA_EVENT (12, "DatabaseSchemaEvent", "An event that creates or updates database schemas."),
RELATIONAL_TABLE_EVENT (13, "RelationalTableEvent", "An event that creates or updates relational tables."),
DATA_FILE_EVENT (14, "DataFileEvent", "An event that creates or updates data files."),
DELETE_DATABASE_EVENT (15, "DeleteDatabaseEvent", "An event that deletes a database."),
DELETE_DATABASE_SCHEMA_EVENT (16, "DeleteDatabaseSchemaEvent", "An event that deletes a database schema."),
DELETE_RELATIONAL_TABLE_EVENT (17, "DeleteRelationalTableEvent", "An event that deletes a relational table."),
DELETE_DATA_FILE_EVENT (18, "DeleteDataFileEvent", "An event that deletes a data file."),
DELETE_FOLDER_EVENT (19, "DeleteFolderEvent", "An event that deletes a folder."),
DELETE_CONNECTION_EVENT (20, "DeleteConnectionEvent", "An event that deletes a connection."),
DELETE_ENDPOINT_EVENT (21, "DeleteEndpointEvent", "An event that deletes an endpoint."),
PROCESS_EVENT (22, "ProcessEvent", "An event that create or update a process."),
PROCESS_EVENT (22, "ProcessEvent", "An event that creates or updates a process."),
DELETE_PROCESS_EVENT (23, "DeleteProcessEvent", "An event that deletes a process."),
TOPIC_EVENT (24, "TopicEvent", "An event that create or update topics."),
EVENT_TYPE_EVENT (25, "EventTypeEvent", "An event that create or update event types."),
TOPIC_EVENT (24, "TopicEvent", "An event that creates or updates topics."),
EVENT_TYPE_EVENT (25, "EventTypeEvent", "An event that creates or updates event types."),
DELETE_TOPIC_EVENT (26, "DeleteTopicEvent", "An event that deletes a topic."),
DELETE_EVENT_TYPE_EVENT (27, "DeleteEventTypeEvent", "An event that deletes an event type.");
DELETE_EVENT_TYPE_EVENT (27, "DeleteEventTypeEvent", "An event that deletes an event type."),
PROCESSING_STATE_TYPE_EVENT (28, "ProcessingStateEvent", "An event that creates or updates the processing state classification of an engine.");

@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.odpi.openmetadata.accessservices.dataengine.model.SoftwareServerCapability;
import org.odpi.openmetadata.accessservices.dataengine.model.Engine;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;

/**
* The Data engine registration event for registering external source as software server capability.
* The Data engine registration event for registering external source as engine.
*/
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand All @@ -27,14 +27,14 @@
public class DataEngineRegistrationEvent extends DataEngineEventHeader {

/**
* Software server capability
* Engine
* -- GETTER --
* Gets the software server capability
* @return the software server capability
* Gets the engine
* @return the engine
* -- SETTER --
* Sets the software server capability
* @param softwareServerCapability the software server capability
* Sets the engine
* @param engine the engine
*/
private SoftwareServerCapability softwareServerCapability;
private Engine engine;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* SPDX-License-Identifier: Apache-2.0 */
/* Copyright Contributors to the ODPi Egeria project. */
package org.odpi.openmetadata.accessservices.dataengine.event;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessingState;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;

@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class ProcessingStateEvent extends DataEngineEventHeader {
alexandra-bucur marked this conversation as resolved.
Show resolved Hide resolved

/**
* The processing state.
*
* -- GETTER --
* Return the processing state
*
* @return processing state
* -- SETTER --
* Set up the processing state
* @param processingState processing state
*/
private ProcessingState processingState;
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ public enum DataEngineErrorCode implements ExceptionMessageSet {
"Topic with qualifiedName {0} was not found",
"The system is unable to create a new event type attached to a topic",
"Correct the code in the caller to provide the correct topic qualified name."),
SOFTWARE_SERVER_CAPABILITY_NOT_FOUND(400, "OMAS-DATA-ENGINE-400-012",
"Software Server Capability with qualifiedName {0} was not found",
"The system is unable to find the searched Software Server Capability",
"Correct the code in the caller to provide the correct Software Server Capability qualified name.");
ENGINE_NOT_FOUND(400, "OMAS-DATA-ENGINE-400-012",
"Engine with qualifiedName {0} was not found",
"The system is unable to find the searched Engine",
"Correct the code in the caller to provide the correct Engine qualified name.");

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;

/**
* The type Software server capability.
* The type Engine.
*/
@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand All @@ -28,7 +28,7 @@
@ToString
@Getter
@Setter
public class SoftwareServerCapability implements Serializable {
public class Engine implements Serializable {

@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

Expand All @@ -27,6 +29,8 @@
@ToString(callSuper = true)
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ProcessingState extends Referenceable {

@Getter(AccessLevel.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.odpi.openmetadata.accessservices.dataengine.model.SoftwareServerCapability;
import org.odpi.openmetadata.accessservices.dataengine.model.Engine;

import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
Expand All @@ -25,15 +25,15 @@
public class DataEngineRegistrationRequestBody extends DataEngineOMASAPIRequestBody {

/**
* Software server capability
* Engine
* -- GETTER --
* Gets the software server capability
* @return the software server capability
* Gets the engine
* @return the engine
* -- SETTER --
* Sets the software server capability
* @param softwareServerCapability the software server capability
* Sets the engine
* @param engine the engine
*/
@JsonProperty("dataEngine")
private SoftwareServerCapability softwareServerCapability;
private Engine engine;

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessingState;
Expand All @@ -22,6 +24,8 @@
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class ProcessingStateRequestBody extends DataEngineOMASAPIRequestBody
{
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies {
implementation project(':open-metadata-implementation:repository-services:repository-services-apis')
implementation project(':open-metadata-implementation:adapters:open-connectors:rest-client-connectors:rest-client-connectors-api')
implementation project(':open-metadata-implementation:repository-services:repository-services-apis')
implementation 'org.apache.commons:commons-collections4'
alexandra-bucur marked this conversation as resolved.
Show resolved Hide resolved
implementation 'org.springframework:spring-core'
implementation 'org.apache.commons:commons-collections4'
compileOnly 'com.fasterxml.jackson.core:jackson-annotations'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
<artifactId>rest-client-connectors-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessHierarchy;
import org.odpi.openmetadata.accessservices.dataengine.model.RelationalTable;
import org.odpi.openmetadata.accessservices.dataengine.model.SchemaType;
import org.odpi.openmetadata.accessservices.dataengine.model.SoftwareServerCapability;
import org.odpi.openmetadata.accessservices.dataengine.model.Engine;
import org.odpi.openmetadata.accessservices.dataengine.model.Topic;
import org.odpi.openmetadata.accessservices.dataengine.rest.FindRequestBody;
import org.odpi.openmetadata.commonservices.ffdc.rest.GUIDListResponse;
Expand All @@ -24,6 +24,7 @@
import org.odpi.openmetadata.repositoryservices.ffdc.exception.FunctionNotSupportedException;

import java.util.List;
import java.util.Map;

/**
* DataEngineClient provides the client-side interface for a data engine tool to create processes with ports,
Expand Down Expand Up @@ -68,10 +69,10 @@ void deleteProcess(String userId, String qualifiedName, String guid) throws Inva


/**
* Create or update the software server capability entity
* Create or update the engine entity
*
* @param userId the name of the calling user
* @param softwareServerCapability the software server capability bean
* @param userId the name of the calling user
* @param engine the engine bean
*
* @return unique identifier of the server in the repository
*
Expand All @@ -80,7 +81,7 @@ void deleteProcess(String userId, String qualifiedName, String guid) throws Inva
* @throws PropertyServerException problem accessing the property server
* @throws ConnectorCheckedException problem with the underlying connector (if used)
*/
String createExternalDataEngine(String userId, SoftwareServerCapability softwareServerCapability) throws InvalidParameterException,
String createExternalDataEngine(String userId, Engine engine) throws InvalidParameterException,
UserNotAuthorizedException,
PropertyServerException,
ConnectorCheckedException;
Expand Down Expand Up @@ -540,4 +541,19 @@ void deleteEventType(String userId, String qualifiedName, String guid) throws In
PropertyServerException,
UserNotAuthorizedException,
ConnectorCheckedException;

/**
* Create or update the engine's processing state classification with the provided properties
* @param userId the name of the calling user
* @param properties properties of the processing state
*/
void upsertProcessingState(String userId, Map<String, Long> properties) throws PropertyServerException,
InvalidParameterException, UserNotAuthorizedException, ConnectorCheckedException;

/**
* Get the engine's processing state classification's properties
*
* @param userId the name of the calling user
*/
public Map<String, Long> getProcessingState(String userId) throws PropertyServerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.odpi.openmetadata.accessservices.dataengine.event.PortImplementationEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.ProcessEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.ProcessHierarchyEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.ProcessingStateEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.RelationalTableEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.SchemaTypeEvent;
import org.odpi.openmetadata.accessservices.dataengine.event.TopicEvent;
Expand All @@ -29,9 +30,10 @@
import org.odpi.openmetadata.accessservices.dataengine.model.PortImplementation;
import org.odpi.openmetadata.accessservices.dataengine.model.Process;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessHierarchy;
import org.odpi.openmetadata.accessservices.dataengine.model.ProcessingState;
import org.odpi.openmetadata.accessservices.dataengine.model.RelationalTable;
import org.odpi.openmetadata.accessservices.dataengine.model.SchemaType;
import org.odpi.openmetadata.accessservices.dataengine.model.SoftwareServerCapability;
import org.odpi.openmetadata.accessservices.dataengine.model.Engine;
import org.odpi.openmetadata.accessservices.dataengine.model.Topic;
import org.odpi.openmetadata.accessservices.dataengine.rest.FindRequestBody;
import org.odpi.openmetadata.commonservices.ffdc.rest.GUIDListResponse;
Expand All @@ -40,6 +42,7 @@
import org.odpi.openmetadata.repositoryservices.ffdc.exception.FunctionNotSupportedException;

import java.util.List;
import java.util.Map;


/***
Expand Down Expand Up @@ -103,13 +106,13 @@ public void deleteProcess(String userId, String qualifiedName, String guid) thro
* @throws ConnectorCheckedException problem with the underlying connector (if used)
*/
@Override
public String createExternalDataEngine(String userId, SoftwareServerCapability softwareServerCapability) throws InvalidParameterException,
public String createExternalDataEngine(String userId, Engine engine) throws InvalidParameterException,
ConnectorCheckedException {
DataEngineRegistrationEvent event = new DataEngineRegistrationEvent();
event.setUserId(userId);
event.setExternalSourceName(externalSource);
event.setDataEngineEventType(DataEngineEventType.DATA_ENGINE_REGISTRATION_EVENT);
event.setSoftwareServerCapability(softwareServerCapability);
event.setEngine(engine);

topicConnector.sendEvent(event);

Expand Down Expand Up @@ -507,6 +510,27 @@ public void deleteEventType(String userId, String qualifiedName, String guid) th
topicConnector.sendEvent(event);
}

@Override
public void upsertProcessingState(String userId, Map<String, Long> properties) throws InvalidParameterException, ConnectorCheckedException {

ProcessingState processingState = new ProcessingState(properties);

ProcessingStateEvent event = new ProcessingStateEvent();

event.setUserId(userId);
event.setExternalSourceName(externalSource);
event.setDataEngineEventType(DataEngineEventType.PROCESSING_STATE_TYPE_EVENT);
event.setProcessingState(processingState);

topicConnector.sendEvent(event);
}

@Override
public Map<String, Long> getProcessingState(String userId) {
//async interaction
return null;
}

private DeleteEvent getDeleteEvent(String userId, String qualifiedName, String guid) {
DeleteEvent event = new DeleteEvent();
event.setUserId(userId);
Expand Down
Loading