Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ols retrieve al topic #5847

Merged
merged 11 commits into from
Nov 9, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,8 @@ else if (ServerTypeClassification.INTEGRATION_DAEMON.equals(serverTypeClassifica
else if (ServerTypeClassification.OPEN_LINEAGE_SERVER.equals(serverTypeClassification))
{
OpenLineageServerOperationalServices
operationalOpenLineageServer = new OpenLineageServerOperationalServices(configuration.getLocalServerName(),
operationalOpenLineageServer = new OpenLineageServerOperationalServices(configuration.getLocalServerId(),
configuration.getLocalServerName(),
configuration.getLocalServerUserId(),
configuration.getLocalServerPassword(),
configuration.getMaxPageSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ For example payloads and endpoints, see the [Postman samples](../samples/OLS.pos
[open-lineage-janus-connector](../../../../adapters/open-connectors/governance-daemon-connectors/open-lineage-connectors/open-lineage-janus-connector/README.md)
- `accessServiceConfig.serverName` - the name of the server where Asset Lineage is running (mandatory value - exception will be thrown during configuration if null)
- `accessServiceConfig.serverPlatformUrlRoot` - the base URL where the Asset Lineage is running (mandatory value - exception will be thrown during configuration if null)
- `accessServiceConfig.user` - the user needed for authentication in Asset Lineage (not used at the moment)
- `accessServiceConfig.user` - the user needed for authentication in Asset Lineage (mandatory value - exception will be thrown during configuration if null)
- `accessServiceConfig.password` - the password needed for authentication in Asset Lineage (not user at the moment)
- `backgroundJobs.jobName` - should be the name of the job class name
- `backgroundJobs.jobInterval` - interval for Open Lineage Services background processing job. The default is 120 if not specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public enum OpenLineageServerErrorCode {
"The server is not able to retrieve its configuration. It fails to start.",
"Add the Open Lineage configuration to the Open Lineage server's configuration document."),

BAD_ACCESS_SERVICE_CONFIG(400, "OPEN-LINEAGE-SERVER-400-002 ",
"Open Lineage access service configuration field {0} does not have a valid value.",
"The server does not have a proper access service configuration. It fails to start.",
"Update the Open Lineage configuration with a correct access service configuration."),

SERVICE_INSTANCE_FAILURE(400, "OPEN-LINEAGE-SERVER-400-005 ",
"The open lineage services are unable to initialize a new instance of open lineage server {0};" +
" error message is {1}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ public class OpenLineageServerOperationalServices {
private static final Logger log = LoggerFactory.getLogger(OpenLineageServerOperationalServices.class);

private static final String EMPTY_STRING = "";
private static final int RETRIEVE_OUT_TOPIC_CONNECTION_TIMEOUT = 60_000;

private final String localServerName;
private final String localServerUserId;
private final String localServerPassword;
private final int maxPageSize;
private final String localServerId;

private OpenLineageServerConfig openLineageServerConfig;
private OpenLineageServerInstance openLineageServerInstance;
Expand All @@ -74,10 +76,12 @@ public class OpenLineageServerOperationalServices {
* @param localServerPassword password for this server to use if sending REST requests.
* @param maxPageSize maximum number of records that can be requested on the pageSize parameter
*/
public OpenLineageServerOperationalServices(String localServerName,
public OpenLineageServerOperationalServices(String localServerId,
String localServerName,
String localServerUserId,
String localServerPassword,
int maxPageSize) {
this.localServerId = localServerId;
this.localServerName = localServerName;
this.localServerUserId = localServerUserId;
this.localServerPassword = localServerPassword;
Expand All @@ -98,9 +102,10 @@ public void initialize(OpenLineageServerConfig openLineageServerConfig, OMRSAudi
final String actionDescription = "Initialize Open lineage Services";

logRecord(OpenLineageServerAuditCode.SERVER_INITIALIZING, actionDescription);
if (openLineageServerConfig == null)
throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.NO_CONFIG_DOC, methodName, OpenLineageServerAuditCode.NO_CONFIG_DOC, actionDescription);

if (openLineageServerConfig == null) {
throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.NO_CONFIG_DOC, localServerName,methodName, OpenLineageServerAuditCode.NO_CONFIG_DOC, actionDescription);
}
validateAccessServiceConfig(openLineageServerConfig.getAccessServiceConfig(), methodName);
try {
initializeOLS(openLineageServerConfig);
} catch (OMAGConfigurationErrorException e) {
Expand All @@ -110,8 +115,23 @@ public void initialize(OpenLineageServerConfig openLineageServerConfig, OMRSAudi
}
}

private void initializeOLS(OpenLineageServerConfig openLineageServerConfig) throws OMAGConfigurationErrorException, InvalidParameterException,
org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException, UserNotAuthorizedException {
private void validateAccessServiceConfig(OLSSimplifiedAccessServiceConfig accessServiceConfig, String methodName) throws OMAGConfigurationErrorException {
String actionDescription = "Verify the access service configuration";
if (accessServiceConfig.getServerName() == null || accessServiceConfig.getServerName().isEmpty()) {
throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.BAD_ACCESS_SERVICE_CONFIG, "serverName",methodName,
OpenLineageServerAuditCode.BAD_ACCESS_SERVICE_CONFIG, actionDescription);
}
if (accessServiceConfig.getServerPlatformUrlRoot() == null || accessServiceConfig.getServerPlatformUrlRoot().isEmpty()) {
throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.BAD_ACCESS_SERVICE_CONFIG, "serverPlatformUrlRoot", methodName,
OpenLineageServerAuditCode.BAD_ACCESS_SERVICE_CONFIG, actionDescription);
}
if (accessServiceConfig.getUser() == null || accessServiceConfig.getUser().isEmpty()) {
throwOMAGConfigurationErrorException(OpenLineageServerErrorCode.BAD_ACCESS_SERVICE_CONFIG, "user", methodName,
OpenLineageServerAuditCode.BAD_ACCESS_SERVICE_CONFIG, actionDescription);
}
}

private void initializeOLS(OpenLineageServerConfig openLineageServerConfig) throws OMAGConfigurationErrorException, InvalidParameterException, InterruptedException {
final String methodName = "initializeOLS";
final String actionDescription = "Initialize Open lineage Services";
Connection lineageGraphConnection = openLineageServerConfig.getLineageGraphConnection();
Expand Down Expand Up @@ -140,10 +160,8 @@ private void initializeOLS(OpenLineageServerConfig openLineageServerConfig) thro
logRecord(OpenLineageServerAuditCode.SERVER_INITIALIZED, actionDescription);
}

private Connection getAssetLineageOutTopicConnection(String methodName, OLSSimplifiedAccessServiceConfig accessServiceConfig) throws
InvalidParameterException, org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException, UserNotAuthorizedException {
private Connection getAssetLineageOutTopicConnection(String methodName, OLSSimplifiedAccessServiceConfig accessServiceConfig) throws InvalidParameterException, InterruptedException {

final String urlTemplate = "/servers/{0}/open-metadata/access-services/asset-lineage/users/{1}/topics/out-topic-connection/{2}";
OCFRESTClient restClient;
String serverName = accessServiceConfig.getServerName();
String serverPlatformURLRoot = accessServiceConfig.getServerPlatformUrlRoot();
Expand All @@ -154,17 +172,31 @@ private Connection getAssetLineageOutTopicConnection(String methodName, OLSSimpl
} else {
restClient = new OCFRESTClient(serverName, serverPlatformURLRoot, serverUserId, serverPassword, auditLog);
}
ConnectionResponse restResult = restClient.callConnectionGetRESTCall(methodName,
serverPlatformURLRoot + urlTemplate,
serverName,
serverUserId,
serverUserId);

Connection inTopicConnection = null;
if (restResult != null) {
inTopicConnection = restResult.getConnection();
ConnectionResponse restResult;
do {
restResult = getConnection(methodName, restClient, accessServiceConfig);
Thread.sleep(RETRIEVE_OUT_TOPIC_CONNECTION_TIMEOUT);
} while (restResult == null);
return restResult.getConnection();
}

private ConnectionResponse getConnection(String methodName, OCFRESTClient restClient, OLSSimplifiedAccessServiceConfig accessServiceConfig) {
final String actionDescription = "Retrieve topic Asset Lineage out topic connection";
final String urlTemplate = "/servers/{0}/open-metadata/access-services/asset-lineage/users/{1}/topics/out-topic-connection/{2}";
String serverName = accessServiceConfig.getServerName();
String serverPlatformURLRoot = accessServiceConfig.getServerPlatformUrlRoot();
String serverUserId = accessServiceConfig.getUser();
ConnectionResponse restResult = null;
try {
restResult = restClient.callConnectionGetRESTCall(methodName,
serverPlatformURLRoot + urlTemplate,
serverName,
serverUserId,
localServerId);
} catch (InvalidParameterException | UserNotAuthorizedException | org.odpi.openmetadata.frameworks.connectors.ffdc.PropertyServerException e) {
logException(OpenLineageServerAuditCode.COULD_NOT_RETRIEVE_TOPIC_CONNECTOR, actionDescription, e);
}
return inTopicConnection;
return restResult;
}

private void initializeAndStartBackgroundJobs() {
Expand Down Expand Up @@ -331,8 +363,10 @@ private void startIntopicConnector() throws OMAGConfigurationErrorException, Inv
* @param actionDescription The action that was taking place when the error occurred.
* @throws OMAGConfigurationErrorException
*/
private void throwOMAGConfigurationErrorException(OpenLineageServerErrorCode errorCode, String methodName, OpenLineageServerAuditCode auditCode, String actionDescription) throws OMAGConfigurationErrorException {
String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(localServerName);
private void throwOMAGConfigurationErrorException(OpenLineageServerErrorCode errorCode, String errorDetails, String methodName,
OpenLineageServerAuditCode auditCode, String actionDescription)
throws OMAGConfigurationErrorException {
String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage(errorDetails);
OMAGConfigurationErrorException e = new OMAGConfigurationErrorException(errorCode.getHTTPErrorCode(),
this.getClass().getName(),
methodName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,18 @@ public enum OpenLineageServerAuditCode {
"Asset Context for entity {0} was requested from Asset Lineage and will be expected as event." +
" Entities in the context will be: {1}",
"Asset Context was requested from Asset Lineage.",
"No action is required.");
"No action is required."),

COULD_NOT_RETRIEVE_TOPIC_CONNECTOR("OPEN-LINEAGE-SERVICES-0021", OMRSAuditLogRecordSeverity.ERROR,
"The Open Lineage Services server encountered an error and could not retrieve the in topic connection.",
"An unexpected error occurred while initializing the Open Lineage Services. The server will try to retrieve the configuration again.",
"Make sure the Asset Lineage out topic is available at the configured location"),

BAD_ACCESS_SERVICE_CONFIG("OPEN-LINEAGE-SERVICES-0022", OMRSAuditLogRecordSeverity.ERROR,
"The Open Lineage Services encountered an error while verifying the access service configuration",
"The configuration for the access services is not valid.",
"Make sure the access service configuration is correct."),
;

private static final Logger log = LoggerFactory.getLogger(OpenLineageServerAuditCode.class);
private final String logMessageId;
Expand Down