Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quick Start Input flow and synching #37

Merged
merged 15 commits into from
Feb 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions data-hub/src/main/java/com/marklogic/hub/Mlcp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Copyright 2012-2016 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.marklogic.hub;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.marklogic.hub.util.IOUtil;
import com.marklogic.hub.util.IOUtil.LogLevel;

public class Mlcp {
private static final Logger LOGGER = LoggerFactory.getLogger(Mlcp.class);

private List<MlcpSource> sources = new ArrayList<>();

private String mlcpPath;

private String host;

private String port;

private String user;

private String password;

public Mlcp(String mlcpHome, String host, String port, String user, String password) {
this.host = host;
this.port = port;
this.user = user;
this.password = password;

// set the mlcp executable path based on OS
this.mlcpPath = mlcpHome;
String osName = System.getProperty("os.name");
if (osName != null && osName.toLowerCase().startsWith("windows")) {
mlcpPath += "/bin/mlcp.bat";
}
else {
mlcpPath += "/bin/mlcp.sh";
}
}

public void addSourceDirectory(String directoryPath, SourceOptions options) {
MlcpSource source = new MlcpSource(directoryPath, options);
sources.add(source);
}

public void loadContent() {
for (MlcpSource source : sources) {
Thread inputThread = null;
Thread errorThread = null;
try {
List<String> arguments = new ArrayList<>();

arguments.add(mlcpPath);
arguments.add("import");
arguments.add("-mode");
arguments.add("local");
arguments.add("-host");
arguments.add(host);
arguments.add("-port");
arguments.add(port);
arguments.add("-username");
arguments.add(user);
arguments.add("-password");
arguments.add(password);

// add arguments related to the source
List<String> sourceArguments = source.getMlcpArguments();
arguments.addAll(sourceArguments);

ProcessBuilder pb = new ProcessBuilder(arguments.toArray(new String[0]));
Process process = pb.start();

inputThread = IOUtil.createInputStreamSink(process.getInputStream(), LOGGER, LogLevel.DEBUG);
errorThread = IOUtil.createInputStreamSink(process.getErrorStream(), LOGGER, LogLevel.ERROR);

inputThread.start();
errorThread.start();

process.waitFor();
}
catch (Exception e) {
LOGGER.error("Failed to load {}", source.getSourcePath(), e);
}
finally {
if (inputThread != null) {
inputThread.interrupt();
}
if (errorThread != null) {
errorThread.interrupt();
}
}
}
}

private static class MlcpSource {
private String sourcePath;
private SourceOptions sourceOptions;

public MlcpSource(String sourcePath, SourceOptions sourceOptions) {
this.sourcePath = sourcePath;
this.sourceOptions = sourceOptions;
}

public String getSourcePath() {
return sourcePath;
}

public List<String> getMlcpArguments() throws IOException {
File file = new File(sourcePath);
String canonicalPath = file.getCanonicalPath();

List<String> arguments = new ArrayList<>();
arguments.add("-input_file_path");
arguments.add(canonicalPath);
arguments.add("-input_file_type");
if (sourceOptions.getInputFileType() == null) {
arguments.add("documents");
}
else {
arguments.add(sourceOptions.getInputFileType());
}

if (sourceOptions.getInputFilePattern() != null) {
arguments.add("-input_file_pattern");
arguments.add(sourceOptions.getInputFilePattern());
}

// by default, cut the source directory path to make URIs shorter
String uriReplace = "/" + canonicalPath + ",''";
uriReplace = uriReplace.replaceAll("\\\\", "/");

arguments.add("-output_uri_replace");
arguments.add(uriReplace);

arguments.add("-transform_module");
arguments.add("/com.marklogic.hub/mlcp-flow-transform.xqy");
arguments.add("-transform_namespace");
arguments.add("http://marklogic.com/hub-in-a-box/mlcp-flow-transform");
arguments.add("-transform_param");
arguments.add("\"" + sourceOptions.getTransformParams() + "\"");

return arguments;
}
}

public static class SourceOptions {
private String domainName;
private String flowName;
private String flowType;
private String inputFileType;
private String inputFilePattern;

public SourceOptions(String domainName, String flowName, String flowType) {
this.domainName = domainName;
this.flowName = flowName;
this.flowType = flowType;
}

public String getDomainName() {
return domainName;
}

public String getFlowName() {
return flowName;
}

public String getFlowType() {
return flowType;
}

public String getInputFileType() {
return inputFileType;
}

public void setInputFileType(String inputFileType) {
this.inputFileType = inputFileType;
}

public String getInputFilePattern() {
return inputFilePattern;
}

public void setInputFilePattern(String inputFilePattern) {
this.inputFilePattern = inputFilePattern;
}

protected String getTransformParams() {
return String.format("<params><domain-name>%s</domain-name><flow-name>%s</flow-name><flow-type>%s</flow-type></params>", domainName, flowName, flowType);
}
}
}
23 changes: 15 additions & 8 deletions data-hub/src/main/java/com/marklogic/hub/Scaffolding.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,36 @@ public static void createDomain(String domainName, File userlandPath) {
domainDir.mkdirs();
}

public static void createFlow(String name, String type, File domainPath) throws IOException {
public static void createFlow(String name, String type, File domainPath)
throws IOException {
File typeDir = new File(domainPath, type);
File flowDir = new File(typeDir, name);

File collectorDir = new File(flowDir, "collector");
collectorDir.mkdirs();
writeFile("scaffolding/collector.xqy", Paths.get(collectorDir.getPath(), "collector.xqy"));
writeFile("scaffolding/collector.xqy",
Paths.get(collectorDir.getPath(), "collector.xqy"));

File contentDir = new File(flowDir, "content");
contentDir.mkdirs();
writeFile("scaffolding/content.xqy", Paths.get(contentDir.getPath(), "content.xqy"));
writeFile("scaffolding/content.xqy",
Paths.get(contentDir.getPath(), "content.xqy"));

File headerDir = new File(flowDir, "header");
File headerDir = new File(flowDir, "headers");
headerDir.mkdirs();
writeFile("scaffolding/header.xqy", Paths.get(contentDir.getPath(), "header.xqy"));
writeFile("scaffolding/headers.xqy",
Paths.get(headerDir.getPath(), "headers.xqy"));

File triplesDir = new File(flowDir, "triples");
triplesDir.mkdirs();
writeFile("scaffolding/triples.xqy", Paths.get(contentDir.getPath(), "triples.xqy"));
writeFile("scaffolding/triples.xqy",
Paths.get(triplesDir.getPath(), "triples.xqy"));
}

private static void writeFile(String srcFile, Path dstFile) throws IOException {
InputStream inputStream = Scaffolding.class.getClassLoader().getResourceAsStream(srcFile);
private static void writeFile(String srcFile, Path dstFile)
throws IOException {
InputStream inputStream = Scaffolding.class.getClassLoader()
.getResourceAsStream(srcFile);
Files.copy(inputStream, dstFile);
}
}
84 changes: 84 additions & 0 deletions data-hub/src/main/java/com/marklogic/hub/util/IOUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2012-2016 MarkLogic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.marklogic.hub.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.slf4j.Logger;

public class IOUtil {

public static Thread createInputStreamSink(InputStream inputStream) {
return IOUtil.createInputStreamSink(inputStream, null, LogLevel.DEBUG);
}

public static Thread createInputStreamSink(InputStream inputStream, Logger logger, LogLevel logLevel) {
return new InputStreamSinkThread(inputStream, logger, logLevel);
}

public static enum LogLevel {
WARN
,INFO
,DEBUG
,ERROR
}

private static class InputStreamSinkThread extends Thread {

private InputStream inputStream;
private Logger logger;
private LogLevel logLevel;

public InputStreamSinkThread(InputStream inputStream, Logger logger, LogLevel logLevel) {
super("InputStreamSinkThread(" + inputStream + ")");

this.inputStream = inputStream;
this.logger = logger;
this.logLevel = logLevel;
}

@Override
public void run() {
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
String line = null;
try {
while ((line = br.readLine()) != null) {
if (logger != null) {
if (logLevel == LogLevel.DEBUG) {
logger.debug(line);
}
else if (logLevel == LogLevel.ERROR) {
logger.error(line);
}
else if (logLevel == LogLevel.WARN) {
logger.error(line);
}
else if (logLevel == LogLevel.INFO) {
logger.info(line);
}
}
}
} catch (IOException e) {
if (logger != null) {
logger.error("Error encountered while reading stream", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,9 @@ declare function flow:run-flow(
map:get($content, "triple"),
$options)
return
map:put($content, $destination, $resp)
if (fn:empty($destination))
then ()
else map:put($content, $destination, $resp)
},
map:new((
map:entry("isolation", "different-transaction"),
Expand Down Expand Up @@ -456,12 +458,17 @@ declare function flow:run-plugin(
$options as map:map)
{
let $module-uri := $plugin/@module
let $destination := $plugin/@dest
let $module-name := hul:get-module-name($module-uri)
let $ns := $PLUGIN-NS || fn:lower-case($module-name)
let $func := xdmp:function(fn:QName($ns, "create-" || $destination), $module-uri)
return
$func($identifier, $content, $headers, $triples, $options)
if (fn:empty($module-uri))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than this fn:empty check and the one above on line 408, I'd rather you check in the caller:

line 396:

    xdmp:invoke-function(function() {
      for $plugin in $flow/hub:plugins/hub:plugin[fn:not(hub:type = 'null')]
      let $destination := $plugin/@dest

Note the change:

[fn:not(hub:type = 'null')]

then
()
else
let $destination := $plugin/@dest
let $module-name := hul:get-module-name($module-uri)
let $ns := $PLUGIN-NS || fn:lower-case($module-name)
let $func := xdmp:function(fn:QName($ns, "create-" || $destination), $module-uri)
return
$func($identifier, $content, $headers, $triples, $options)
};

(:~
Expand Down
Loading