Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harmonize #207

Merged
merged 11 commits into from
Apr 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions examples/healthcare/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ The sample data is located in the input/ folder.

1. At this point you have loaded the sample data. You can browse the data via [QConsole](http://localhost:8000/qconsole) or by searching the REST endpoint on the Staging Http Server [http://localhost:8010/v1/search](http://localhost:8010/v1/search). *Your port may be different if you changed it during setup*

1. To run the conformance flows simply press the **Run** button next to the final flow.
1. To run the harmonize flows simply press the **Run** button next to the final flow.

1. Now you have conformed the data into your final database. You can browse the data via [QConsole](http://localhost:8000/qconsole) or by searching the REST endpoint on the Final Http Server [http://localhost:8011/v1/search](http://localhost:8011/v1/search). *Your port may be different if you changed it during setup*
1. Now you have harmonized the data into your final database. You can browse the data via [QConsole](http://localhost:8000/qconsole) or by searching the REST endpoint on the Final Http Server [http://localhost:8011/v1/search](http://localhost:8011/v1/search). *Your port may be different if you changed it during setup*


# Entities
Expand All @@ -43,7 +43,7 @@ Entities represent the data you are modeling. For this example we provide the **
Flows are sets of plugins that work together to create an envelope document.

- [Input Flows](#input-flows) work on incoming data and store it in the Staging database.
- [Conformance Flows](#conformance-flows) work on staged data and transform and store it into the Final database.
- [Harmonize Flows](#harmonize-flows) work on staged data and transform and store it into the Final database.

## Input Flows

Expand All @@ -53,9 +53,9 @@ The hl7 Flow is intended to ingest C-CDA C32 Hl7 XML files. When running the hl7
### nppes
The nppes Flow is intended to ingest NPPES csv files. This flow will split each row of the NPPES file into a separate XML document in the staging database. When running the hl7 flow simply point it at input/nppes. Set the collection to **nppes** and set the document type to **Delimited Text**.

## Conformance Flows
## Harmonize Flows

There is only one conformance flow provided. This final flow will create a conformed XML document that contains the original C32 xml as the content of an envelope. It will also extract various data from th3 C32 into the header section for easier queryability.
There is only one harmonize flow provided. This final flow will create a harmonized XML document that contains the original C32 xml as the content of an envelope. It will also extract various data from th3 C32 into the header section for easier queryability.

## Final REST services

Expand Down
2 changes: 1 addition & 1 deletion examples/hr-hub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Global Corp has exported the Employee data from a relational database. They are
Acme Tech uses a SAAS for managing employees. The data is provided as JSON documents that came straight out of the SAAS REST API.

# What is this example?
In this example we are loading the CSV table dumps from Global Corp and the JSON documents from Acme Tech into the Hub staging area. We then conform the two data sources into the final area by extracting common header fields. The header fields we extract are:
In this example we are loading the CSV table dumps from Global Corp and the JSON documents from Acme Tech into the Hub staging area. We then harmonize the two data sources into the final area by extracting common header fields. The header fields we extract are:

- Employee ID
- Employee Hire Date
Expand Down
12 changes: 6 additions & 6 deletions examples/tweets/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The tweets are zipped into a .zip file in the input/ folder.

**Entity Name:** Tweet
**Ingest Flow Name:** ingest-tweets
**Conformance Flow Name:** conform-tweets
**Harmonize Flow Name:** harmonize-tweets
**Plugin Type:** Javascript
**Data Format:** JSON

Expand All @@ -32,9 +32,9 @@ The tweets are zipped into a .zip file in the input/ folder.

1. At this point you have loaded the sample tweets. You can browse the data via [QConsole](http://localhost:8000/qconsole) in the data-hub-STAGING database or by searching the REST endpoint on the Staging Http Server [http://localhost:8010/v1/search](http://localhost:8010/v1/search). *Your port may be different if you changed it during setup*

1. To run the conformance flow simply press the **Run** button next to the "conform-tweets" flow.
1. To run the harmonize flow simply press the **Run** button next to the "harmonize-tweets" flow.

1. Now you have conformed the data into your final database. You can browse the data via [QConsole](http://localhost:8000/qconsole) or by searching the REST endpoint on the Final Http Server [http://localhost:8011/v1/search](http://localhost:8011/v1/search). *Your port may be different if you changed it during setup*
1. Now you have harmonizeed the data into your final database. You can browse the data via [QConsole](http://localhost:8000/qconsole) or by searching the REST endpoint on the Final Http Server [http://localhost:8011/v1/search](http://localhost:8011/v1/search). *Your port may be different if you changed it during setup*


# Entities
Expand All @@ -44,13 +44,13 @@ Entities represent the data you are modeling. For this example you created the *
Flows are sets of plugins that work together to create an envelope document.

- [Input Flows](#input-flows) work on incoming data and store it in the Staging database.
- [Conformance Flows](#conformance-flows) work on staged data and transform and store it into the Final database.
- [Harmonize Flows](#harmonize-flows) work on staged data and transform and store it into the Final database.

## Input Flows

### input-tweets
The auto-generated input Flow will ingest the compressed tweets as JSON files.

## Conformance Flows
## Harmonize Flows

This "conform-tweets" flow will create a conformed JSON document that contains the original tweet JSON as the content of an envelope.
This "harmonize-tweets" flow will create a harmonizeed JSON document that contains the original tweet JSON as the content of an envelope.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import com.marklogic.appdeployer.AppConfig;
import com.marklogic.appdeployer.ConfigDir;
import com.marklogic.appdeployer.command.Command;
import com.marklogic.appdeployer.command.databases.DeploySchemasDatabaseCommand;
import com.marklogic.appdeployer.command.databases.DeployTriggersDatabaseCommand;
import com.marklogic.appdeployer.command.modules.AllButAssetsModulesFinder;
import com.marklogic.appdeployer.command.modules.AssetModulesFinder;
import com.marklogic.appdeployer.command.security.DeployRolesCommand;
Expand Down Expand Up @@ -267,12 +265,12 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)

String dirStr = dir.toString();
boolean isInputDir = dirStr.matches(".*[/\\\\]input[/\\\\].*");
boolean isConformanceDir = dirStr.matches(".*[/\\\\]conformance[/\\\\].*");
boolean isHarmonizeDir = dirStr.matches(".*[/\\\\]harmonize[/\\\\].*");
if (isRest) {
if (isInputDir) {
loadedFiles.addAll(hubModulesLoader.loadModules(dir.normalize().toAbsolutePath().toFile(), new AllButAssetsModulesFinder(), stagingClient));
}
else if (isConformanceDir) {
else if (isHarmonizeDir) {
loadedFiles.addAll(hubModulesLoader.loadModules(dir.normalize().toAbsolutePath().toFile(), new AllButAssetsModulesFinder(), finalClient));
}
return FileVisitResult.SKIP_SUBTREE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ public void runInputFlow(Flow flow, HubConfig config) {
flow.getEntityName(), flow.getName(),
FlowType.INPUT.toString(),
flow.getDataFormat());
sourceOptions.setInputFileType("documents");
sourceOptions.setTransformModule("/com.marklogic.hub/mlcp-flow-transform.xqy");
sourceOptions.setTransformNamespace("http://marklogic.com/data-hub/mlcp-flow-transform");
mlcp.addSourceDirectory(config.modulesPath, sourceOptions);
mlcp.loadContent();
}
Expand Down
62 changes: 50 additions & 12 deletions marklogic-data-hub/src/main/java/com/marklogic/hub/Mlcp.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.marklogic.client.io.Format;

public class Mlcp {

public static final String DOCUMENT_TYPE_KEY = "-document_type";
public static final String INPUT_FILE_PATH_KEY = "-input_file_path";
public static final String INPUT_FILE_TYPE_KEY = "-input_file_type";
Expand All @@ -40,7 +40,7 @@ public class Mlcp {
public static final String PORT_KEY = "-port";
public static final String USERNAME_KEY = "-username";
public static final String PASSWORD_KEY = "-password";

private static final Logger LOGGER = LoggerFactory.getLogger(Mlcp.class);
private static final String DEFAULT_HADOOP_HOME_DIR = "./hadoop/";

Expand Down Expand Up @@ -109,24 +109,38 @@ public List<String> getMlcpArguments() throws IOException, JSONException {
String canonicalPath = file.getCanonicalPath();

List<String> arguments = new ArrayList<>();

arguments.add(INPUT_FILE_PATH_KEY);
arguments.add(canonicalPath);

arguments.add(OUTPUT_URI_REPLACE_KEY);
arguments.add("\""+canonicalPath+",''\"");

arguments.add(INPUT_FILE_TYPE_KEY);
arguments.add(sourceOptions.getInputFileType());

addOtherArguments(arguments, sourceOptions.getOtherOptions());


String other = sourceOptions.getOtherOptions();
if (other != null) {
addOtherArguments(arguments, other);
}

//add document type only if it does not exist in the list
if(!arguments.contains(DOCUMENT_TYPE_KEY)) {
arguments.add(DOCUMENT_TYPE_KEY);
arguments.add(sourceOptions.getDataFormat());
}


String transformModule = sourceOptions.getTransformModule();
if (transformModule != null) {
arguments.add("-transform_module");
arguments.add("\"" + transformModule +"\"");
arguments.add("-transform_namespace");
arguments.add("\"" + sourceOptions.getTransformNamespace() +"\"");
arguments.add("-transform_param");
arguments.add("\"" + sourceOptions.getTransformParams() + "\"");
}

return arguments;
}

Expand All @@ -142,9 +156,9 @@ private void addOtherArguments(List<String> arguments,
arguments.add(key);
arguments.add(jsonObject.getString(key));
}

}

}
}

Expand All @@ -155,6 +169,8 @@ public static class SourceOptions {
private String dataFormat = "json";
private String inputFileType;
private String otherOptions;
private String transformModule;
private String transformNamespace;

public SourceOptions(String entityName, String flowName, String flowType, Format dataFormat) {
this.entityName = entityName;
Expand Down Expand Up @@ -191,14 +207,36 @@ public String getInputFileType() {
public void setInputFileType(String inputFileType) {
this.inputFileType = inputFileType;
}

public String getOtherOptions() {
return otherOptions;
}

public void setOtherOptions(String otherOptions) {
this.otherOptions = otherOptions;
}

protected String getTransformParams() {
return String.format(
"<params><entity-name>%s</entity-name><flow-name>%s</flow-name><flow-type>%s</flow-type></params>",
entityName, flowName, flowType);
}

public String getTransformModule() {
return transformModule;
}

public void setTransformModule(String transformModule) {
this.transformModule = transformModule;
}

public String getTransformNamespace() {
return transformNamespace;
}

public void setTransformNamespace(String transformNamespace) {
this.transformNamespace = transformNamespace;
}
}

public List<String> getMlcpOptions(MlcpSource source) throws IOException, JSONException {
Expand All @@ -218,7 +256,7 @@ public List<String> getMlcpOptions(MlcpSource source) throws IOException, JSONEx

List<String> sourceArguments = source.getMlcpArguments();
mlcpOptions.addAll(sourceArguments);

return mlcpOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void createFlow(String entityName, String flowName,
throws IOException {
File flowDir = getFlowDir(userlandDir, entityName, flowName, flowType);

if (flowType.equals(FlowType.CONFORMANCE)) {
if (flowType.equals(FlowType.HARMONIZE)) {
File collectorDir = new File(flowDir, "collector");
collectorDir.mkdirs();
writeFile("scaffolding/" + flowType + "/" + pluginFormat + "/collector." + pluginFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ public class ScaffoldingValidator {

public static boolean isUniqueRestServiceExtension(File pluginsDir, String name) {
String entityNameFilter = "[a-zA-Z0-9_.-]+";
String flowTypeFilter = "(" + FlowType.INPUT + "|" + FlowType.CONFORMANCE + ")";
String flowTypeFilter = "(" + FlowType.INPUT + "|" + FlowType.HARMONIZE + ")";
String pluginFormatFilter = "(" + PluginFormat.XQUERY + "|" + PluginFormat.JAVASCRIPT + ")";
String absoluteFilePathFilter = Scaffolding.getAbsolutePath(pluginsDir.getAbsolutePath(), "entities", entityNameFilter, flowTypeFilter, "REST", "services", name + "." + pluginFormatFilter);
return !checkIfFileExists(pluginsDir, absoluteFilePathFilter);
}

private static boolean checkIfFileExists(File rootDirectory, String absoluteFilePathFilter) {
File[] list = rootDirectory.listFiles();
if (list != null) {
Expand All @@ -32,7 +32,7 @@ private static boolean checkIfFileExists(File rootDirectory, String absoluteFile
}
return false;
}

public boolean checkIfFolderExists(File rootDirectory, String absoluteFilePathFilter) {
File[] list = rootDirectory.listFiles();
if (list != null) {
Expand All @@ -47,10 +47,10 @@ public boolean checkIfFolderExists(File rootDirectory, String absoluteFilePathFi
}
return false;
}

public static boolean isUniqueRestTransform(File pluginsDir, String name) {
String entityNameFilter = "[a-zA-Z0-9_.-]+";
String flowTypeFilter = "(" + FlowType.INPUT + "|" + FlowType.CONFORMANCE + ")";
String flowTypeFilter = "(" + FlowType.INPUT + "|" + FlowType.HARMONIZE + ")";
String pluginFormatFilter = "(" + PluginFormat.XQUERY + "|" + PluginFormat.JAVASCRIPT + ")";
String absoluteFilePathFilter = Scaffolding.getAbsolutePath(pluginsDir.getAbsolutePath(), "entities", entityNameFilter, flowTypeFilter, "REST", "transforms", name + "." + pluginFormatFilter);
return !checkIfFileExists(pluginsDir, absoluteFilePathFilter);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.marklogic.hub.flow;

public enum FlowType {
INPUT("input"), CONFORMANCE("conformance");
INPUT("input"), HARMONIZE("harmonize");

private String type;
FlowType(String type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,10 @@ declare function flow:get-flows(
cts:uri-match($ENTITIES-DIR || "*")
})
let $flows :=
for $flow in $uris[fn:matches(., $ENTITIES-DIR || $entity-name || "/(input|conformance)/[^/]+/$")]
let $name := fn:replace($flow, $ENTITIES-DIR || $entity-name || "/(input|conformance)/([^/]+)/$", "$2")
for $flow in $uris[fn:matches(., $ENTITIES-DIR || $entity-name || "/(input|harmonize)/[^/]+/$")]
let $name := fn:replace($flow, $ENTITIES-DIR || $entity-name || "/(input|harmonize)/([^/]+)/$", "$2")
return
flow:get-flow($entity-name, $name, (), $uris[fn:matches(., $ENTITIES-DIR || $entity-name || "/(input|conformance)/" || $name || "/.+")])
flow:get-flow($entity-name, $name, (), $uris[fn:matches(., $ENTITIES-DIR || $entity-name || "/(input|harmonize)/" || $name || "/.+")])
return
<flows xmlns="http://marklogic.com/data-hub">
{
Expand Down Expand Up @@ -415,7 +415,7 @@ declare function flow:run-collector(
(),
$module-uri,
"collector",
"conformance",
"harmonize",
$ex,
(),
xdmp:elapsed-time() - $before,
Expand All @@ -430,7 +430,7 @@ declare function flow:run-collector(
null-node {},
$module-uri,
"collector",
"conformance",
"harmonize",
null-node {},
json:to-array($resp),
xdmp:elapsed-time() - $before,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,21 @@ public void testInstallUserModules() throws IOException, ParserConfigurationExce
dataHub.installUserModules(path);

assertEquals(
getResource("data-hub-test/entities/test-entity/conformance/final/collector/collector.xqy"),
getModulesFile("/entities/test-entity/conformance/final/collector/collector.xqy"));
getResource("data-hub-test/entities/test-entity/harmonize/final/collector/collector.xqy"),
getModulesFile("/entities/test-entity/harmonize/final/collector/collector.xqy"));
assertEquals(
getResource("data-hub-test/entities/test-entity/conformance/final/content/content.xqy"),
getModulesFile("/entities/test-entity/conformance/final/content/content.xqy"));
getResource("data-hub-test/entities/test-entity/harmonize/final/content/content.xqy"),
getModulesFile("/entities/test-entity/harmonize/final/content/content.xqy"));
assertEquals(
getResource("data-hub-test/entities/test-entity/conformance/final/headers/headers.xqy"),
getModulesFile("/entities/test-entity/conformance/final/headers/headers.xqy"));
getResource("data-hub-test/entities/test-entity/harmonize/final/headers/headers.xqy"),
getModulesFile("/entities/test-entity/harmonize/final/headers/headers.xqy"));
assertEquals(
getResource("data-hub-test/entities/test-entity/conformance/final/triples/triples.xqy"),
getModulesFile("/entities/test-entity/conformance/final/triples/triples.xqy"));
getResource("data-hub-test/entities/test-entity/harmonize/final/triples/triples.xqy"),
getModulesFile("/entities/test-entity/harmonize/final/triples/triples.xqy"));

assertXMLEqual(
getXmlFromResource("data-hub-test/entities/test-entity/conformance/final/final.xml"),
getModulesDocument("/entities/test-entity/conformance/final/final.xml"));
getXmlFromResource("data-hub-test/entities/test-entity/harmonize/final/final.xml"),
getModulesDocument("/entities/test-entity/harmonize/final/final.xml"));


assertEquals(
Expand All @@ -85,14 +85,14 @@ public void testInstallUserModules() throws IOException, ParserConfigurationExce
getModulesDocument("/Default/" + HubConfig.DEFAULT_STAGING_NAME + "/rest-api/options/doctors.xml"));

assertXMLEqual(
getXmlFromResource("data-hub-test/entities/test-entity/conformance/REST/options/patients.xml"),
getXmlFromResource("data-hub-test/entities/test-entity/harmonize/REST/options/patients.xml"),
getModulesDocument("/Default/" + HubConfig.DEFAULT_FINAL_NAME + "/rest-api/options/patients.xml"));

assertXMLEqual(
getXmlFromResource("data-hub-helpers/test-conf-metadata.xml"),
getModulesDocument("/marklogic.rest.transform/test-conf-transform/assets/metadata.xml"));
assertEquals(
getResource("data-hub-test/entities/test-entity/conformance/REST/transforms/test-conf-transform.xqy"),
getResource("data-hub-test/entities/test-entity/harmonize/REST/transforms/test-conf-transform.xqy"),
getModulesFile("/marklogic.rest.transform/test-conf-transform/assets/transform.xqy"));

assertXMLEqual(
Expand Down
Loading