From fc03550a3645c3dfaca8371c272f7cd2b0ff6d21 Mon Sep 17 00:00:00 2001 From: liuyou2 <405240259@qq.com> Date: Wed, 18 Aug 2021 19:03:54 +0800 Subject: [PATCH] Defines the basic class of the orchestration module. #354 --- .../dss-orchestrator-core/pom.xml | 116 +++++++++++ .../orchestrator/core/DSSOrchestrator.java | 68 +++++++ .../core/DSSOrchestratorContext.java | 41 ++++ .../DSSOrchestratorErrorException.java | 30 +++ .../impl/AbstractDSSOrchestratorContext.java | 47 +++++ .../core/impl/AbstractOrchestrator.java | 49 +++++ .../core/impl/DSSOrchestratorContextImpl.java | 39 ++++ .../core/impl/DefaultOrchestrator.java | 82 ++++++++ .../plugin/AbstractDSSOrchestratorPlugin.java | 40 ++++ .../core/plugin/DSSOrchestratorPlugin.java | 28 +++ .../core/type/OrchestratorKindEnum.java | 62 ++++++ .../core/utils/OrchestratorUtils.java | 38 ++++ .../core/service/BMLService.scala | 183 ++++++++++++++++++ 13 files changed, 823 insertions(+) create mode 100644 dss-orchestrator/dss-orchestrator-core/pom.xml create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/DSSOrchestrator.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/DSSOrchestratorContext.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/exception/DSSOrchestratorErrorException.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/AbstractDSSOrchestratorContext.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/AbstractOrchestrator.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/DSSOrchestratorContextImpl.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/DefaultOrchestrator.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/plugin/AbstractDSSOrchestratorPlugin.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/plugin/DSSOrchestratorPlugin.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/type/OrchestratorKindEnum.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/utils/OrchestratorUtils.java create mode 100644 dss-orchestrator/dss-orchestrator-core/src/main/scala/com/webank/wedatasphere/dss/orchestrator/core/service/BMLService.scala diff --git a/dss-orchestrator/dss-orchestrator-core/pom.xml b/dss-orchestrator/dss-orchestrator-core/pom.xml new file mode 100644 index 0000000000..ec531646bf --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/pom.xml @@ -0,0 +1,116 @@ + + + + + + dss + com.webank.wedatasphere.dss + 1.0.0 + + 4.0.0 + + dss-orchestrator-core + + + com.webank.wedatasphere.linkis + linkis-module + ${linkis.version} + provided + + + com.webank.wedatasphere.dss + dss-appconn-core + ${dss.version} + + + + linkis-bml-client + + + gson + com.google.code.gson + + + com.webank.wedatasphere.linkis + ${linkis.version} + + + + com.webank.wedatasphere.dss + dss-contextservice + ${dss.version} + + + + com.webank.wedatasphere.linkis + linkis-rpc + ${linkis.version} + provided + + + com.webank.wedatasphere.dss + dss-orchestrator-common + ${dss.version} + compile + + + + com.webank.wedatasphere.dss + dss-common + ${dss.version} + provided + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + + src/main/java + + **/*.xml + + + + src/main/resources + + **/*.xml + **/*.properties + **/*.yml + + + + + + \ No newline at end of file diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/DSSOrchestrator.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/DSSOrchestrator.java new file mode 100644 index 0000000000..899346e23e --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/DSSOrchestrator.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core; + +import com.webank.wedatasphere.dss.appconn.core.AppConn; +import com.webank.wedatasphere.dss.common.label.DSSLabel; +import java.util.List; + + + +public interface DSSOrchestrator { + + /** + * 返回Orchestrator的名称,如workflow + * @return + */ + String getName(); + + + /** + * 返回编排关联的AppConn + * @return + */ + AppConn getAppConn(); + + DSSOrchestratorContext getDSSOrchestratorContext(); + + void setAppConn(AppConn appConn); + + /** + *添加当前编排需要使用到在appconn + * @param appconn + */ + void addLinkedAppConn(AppConn appconn); + + + /** + * 为编排提供标签说明,如DEV + * @param dssLabel + */ + void addLinkedDssLabels(DSSLabel dssLabel); + + /** + * 返回所有已经关联到的AppConn + * @return + */ + List getLinkedAppConn(); + + /** + * 用于工具条功能按钮展示,可以查到该模式可以提供的功能按钮 + * @return + */ + List getToolBars(); +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/DSSOrchestratorContext.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/DSSOrchestratorContext.java new file mode 100644 index 0000000000..bef0e6cf9f --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/DSSOrchestratorContext.java @@ -0,0 +1,41 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core; + +import com.webank.wedatasphere.dss.common.exception.DSSRuntimeException; +import com.webank.wedatasphere.dss.orchestrator.core.plugin.DSSOrchestratorPlugin; +import java.io.Closeable; +import java.util.List; +import java.util.Map; + + +public interface DSSOrchestratorContext extends Closeable { + + void initialize(); + + Map getConfigMap(); + + List getOrchestratorPlugins(); + + default T getDSSOrchestratorPlugin(Class clazz) { + return (T) getOrchestratorPlugins().stream().filter(clazz::isInstance) + .findFirst().orElseThrow(() -> new DSSRuntimeException(50321, "Cannot find " + clazz.getSimpleName())); + } + + boolean isActive(); + +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/exception/DSSOrchestratorErrorException.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/exception/DSSOrchestratorErrorException.java new file mode 100644 index 0000000000..601ca00742 --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/exception/DSSOrchestratorErrorException.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.exception; + +import com.webank.wedatasphere.dss.common.exception.DSSErrorException; + + +public class DSSOrchestratorErrorException extends DSSErrorException { + public DSSOrchestratorErrorException(int errCode, String desc) { + super(errCode, desc); + } + + public DSSOrchestratorErrorException(int errCode, String desc, String ip, int port, String serviceKind) { + super(errCode, desc, ip, port, serviceKind); + } +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/AbstractDSSOrchestratorContext.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/AbstractDSSOrchestratorContext.java new file mode 100644 index 0000000000..b2f1799d35 --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/AbstractDSSOrchestratorContext.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.impl; + +import com.webank.wedatasphere.dss.common.utils.DSSExceptionUtils; +import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestratorContext; +import com.webank.wedatasphere.dss.orchestrator.core.plugin.DSSOrchestratorPlugin; +import java.util.HashMap; +import java.util.Map; + + +public abstract class AbstractDSSOrchestratorContext implements DSSOrchestratorContext { + + private boolean isClosed = false; + private Map configMap = new HashMap<>(); + + + @Override + public Map getConfigMap() { + return configMap; + } + + @Override + public boolean isActive() { + return !isClosed; + } + + @Override + public void close() { + isClosed = true; + getOrchestratorPlugins().forEach(DSSExceptionUtils.handling(DSSOrchestratorPlugin::close)); + } +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/AbstractOrchestrator.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/AbstractOrchestrator.java new file mode 100644 index 0000000000..f11c5f1eeb --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/AbstractOrchestrator.java @@ -0,0 +1,49 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.impl; + +import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator; + +import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestratorContext; +import java.util.Arrays; +import java.util.List; + + +abstract class AbstractOrchestrator implements DSSOrchestrator { + + private volatile DSSOrchestratorContext dssOrchestratorContext; + + @Override + public List getToolBars() { + String[] toolNames = {"参数", "资源", "执行", "发布","保存"}; + return Arrays.asList(toolNames); + } + + protected abstract DSSOrchestratorContext createOrchestratorContext(); + + @Override + public DSSOrchestratorContext getDSSOrchestratorContext() { + if(dssOrchestratorContext == null) { + synchronized (this) { + if(dssOrchestratorContext == null) { + dssOrchestratorContext = createOrchestratorContext(); + } + } + } + return dssOrchestratorContext; + } +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/DSSOrchestratorContextImpl.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/DSSOrchestratorContextImpl.java new file mode 100644 index 0000000000..6cc3f940d6 --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/DSSOrchestratorContextImpl.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.impl; + +import com.webank.wedatasphere.dss.common.utils.ClassUtils; +import com.webank.wedatasphere.dss.orchestrator.core.plugin.DSSOrchestratorPlugin; +import java.util.List; + + +public class DSSOrchestratorContextImpl extends AbstractDSSOrchestratorContext { + + private List plugins; + + @Override + public void initialize() { + plugins = ClassUtils.getInstances(DSSOrchestratorPlugin.class); + plugins.forEach(DSSOrchestratorPlugin::init); + } + + @Override + public List getOrchestratorPlugins() { + return plugins; + } + +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/DefaultOrchestrator.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/DefaultOrchestrator.java new file mode 100644 index 0000000000..af8c7bd45a --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/impl/DefaultOrchestrator.java @@ -0,0 +1,82 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.impl; + +import com.webank.wedatasphere.dss.appconn.core.AppConn; +import com.webank.wedatasphere.dss.common.label.DSSLabel; +import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestratorContext; +import java.util.ArrayList; +import java.util.List; + + +public class DefaultOrchestrator extends AbstractOrchestrator { + + private static volatile DSSOrchestratorContext orchestratorContext; + + private static void initDSSOrchestratorContext() { + if(orchestratorContext == null) { + synchronized (DefaultOrchestrator.class) { + if(orchestratorContext == null) { + orchestratorContext = new DSSOrchestratorContextImpl(); + orchestratorContext.initialize(); + } + } + } + } + + private List linkedAppConn = new ArrayList<>(); + + private List labels = new ArrayList<>(); + + private AppConn appConn; + + @Override + public void setAppConn(AppConn appConn){ + this.appConn = appConn; + } + + @Override + public String getName() { + return "DefaultOrchestrator"; + } + + @Override + public AppConn getAppConn() { + return this.appConn; + } + + @Override + public void addLinkedAppConn(AppConn appconn) { + linkedAppConn.add(appconn); + } + + @Override + public void addLinkedDssLabels(DSSLabel dssLabel) { + labels.add(dssLabel); + } + + @Override + public List getLinkedAppConn() { + return linkedAppConn; + } + + @Override + protected DSSOrchestratorContext createOrchestratorContext() { + initDSSOrchestratorContext(); + return orchestratorContext; + } +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/plugin/AbstractDSSOrchestratorPlugin.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/plugin/AbstractDSSOrchestratorPlugin.java new file mode 100644 index 0000000000..7bff434787 --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/plugin/AbstractDSSOrchestratorPlugin.java @@ -0,0 +1,40 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.plugin; + +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AbstractDSSOrchestratorPlugin implements DSSOrchestratorPlugin { + + protected final Logger LOGGER = LoggerFactory.getLogger(this.getClass()); + + @Override + public void init() { + } + + @Override + public boolean isReady() { + return true; + } + + @Override + public void close() throws IOException { + } +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/plugin/DSSOrchestratorPlugin.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/plugin/DSSOrchestratorPlugin.java new file mode 100644 index 0000000000..fde7a4117a --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/plugin/DSSOrchestratorPlugin.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.plugin; + +import java.io.Closeable; + + +public interface DSSOrchestratorPlugin extends Closeable { + + void init(); + + boolean isReady(); + +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/type/OrchestratorKindEnum.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/type/OrchestratorKindEnum.java new file mode 100644 index 0000000000..8e5aab864d --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/type/OrchestratorKindEnum.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.type; + + +public enum OrchestratorKindEnum { + + /** + * 编排模式的多种类型 + */ + WORKFLOW(1, "workflow","工作流"), + SINGLE_TASK(2, "singleTask", "单任务"), + COMBINED(3,"combined", "组合编排"); + + private OrchestratorKindEnum(int index, String name, String chName){ + this.index = index; + this.name = name; + this.chName = chName; + } + + private int index; + + private String name; + + private String chName; + + public int getIndex() { + return index; + } + + public String getName() { + return name; + } + + public String getChName() { + return chName; + } + + public static OrchestratorKindEnum getType(int index){ + for (OrchestratorKindEnum type : values()) { + if (type.getIndex() == index) { + return type; + } + } + return WORKFLOW; + } + +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/utils/OrchestratorUtils.java b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/utils/OrchestratorUtils.java new file mode 100644 index 0000000000..37120a4dcf --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/java/com/webank/wedatasphere/dss/orchestrator/core/utils/OrchestratorUtils.java @@ -0,0 +1,38 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.utils; + + +public class OrchestratorUtils { + + public static String generateNewVersion() { + return "v000001"; + } + + /** + * 注意: flow版本更新需要同步更新ContextID + * @param oldVersion + * @return + */ + public static String increaseVersion(String oldVersion) { + int num = Integer.parseInt(oldVersion.substring(1)) + 1; + String tmp = "00000" + num; + return "v" + tmp.substring(tmp.length() - 6); + } + + +} diff --git a/dss-orchestrator/dss-orchestrator-core/src/main/scala/com/webank/wedatasphere/dss/orchestrator/core/service/BMLService.scala b/dss-orchestrator/dss-orchestrator-core/src/main/scala/com/webank/wedatasphere/dss/orchestrator/core/service/BMLService.scala new file mode 100644 index 0000000000..312b64cfa1 --- /dev/null +++ b/dss-orchestrator/dss-orchestrator-core/src/main/scala/com/webank/wedatasphere/dss/orchestrator/core/service/BMLService.scala @@ -0,0 +1,183 @@ +/* + * Copyright 2019 WeBank + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.webank.wedatasphere.dss.orchestrator.core.service + +import java.io.{ByteArrayInputStream, InputStream} +import java.util +import java.util.UUID + +import com.webank.wedatasphere.dss.common.exception.DSSErrorException +import com.webank.wedatasphere.dss.common.utils.IoUtils +import com.webank.wedatasphere.linkis.bml.client.{BmlClient, BmlClientFactory} +import com.webank.wedatasphere.linkis.bml.protocol.{BmlDownloadResponse, BmlUpdateResponse, BmlUploadResponse} +import com.webank.wedatasphere.linkis.common.utils.{JavaLog, Utils} +import org.apache.commons.io.IOUtils +import org.springframework.stereotype.Component + +import scala.collection.JavaConversions._ + + +@Component +class BMLService extends JavaLog{ + + def upload(userName: String, content: String, fileName: String, projectName:String): util.Map[String, Object] = { + val inputStream = new ByteArrayInputStream(content.getBytes("utf-8")) + val client: BmlClient = createBMLClient(userName) + val resource: BmlUploadResponse = client.uploadShareResource(userName, projectName, fileName, inputStream) + if (!resource.isSuccess) throw new DSSErrorException(911113, "上传失败") + val map = new util.HashMap[String, Object] + map += "resourceId" -> resource.resourceId + map += "version" -> resource.version + } + + def upload(userName: String, inputStream: InputStream, fileName: String, projectName:String): util.Map[String, Object] = { + val client: BmlClient = createBMLClient(userName) + val resource: BmlUploadResponse = client.uploadShareResource(userName, projectName, fileName, inputStream) + if (!resource.isSuccess) throw new DSSErrorException(911113, "上传失败") + val map = new util.HashMap[String, Object] + map += "resourceId" -> resource.resourceId + map += "version" -> resource.version + } + + def update(userName: String, resourceId: String, inputStream: InputStream): util.Map[String, Object] = { + val client: BmlClient = createBMLClient(userName) + val resource: BmlUpdateResponse = client.updateShareResource(userName, resourceId, "", inputStream) + if (!resource.isSuccess) throw new DSSErrorException(911114, "更新失败") + val map = new util.HashMap[String, Object] + map += "resourceId" -> resource.resourceId + map += "version" -> resource.version + } + + def update(userName: String, resourceId: String, content: String): util.Map[String, Object] = { + val inputStream = new ByteArrayInputStream(content.getBytes("utf-8")) + val client: BmlClient = createBMLClient(userName) + val resource: BmlUpdateResponse = client.updateShareResource(userName, resourceId, UUID.randomUUID().toString+".json", inputStream) + if (!resource.isSuccess) throw new DSSErrorException(911114, "更新失败") + val map = new util.HashMap[String, Object] + map += "resourceId" -> resource.resourceId + map += "version" -> resource.version + } + + def query(userName: String, resourceId: String, version: String): util.Map[String, Object] = { + val client: BmlClient = createBMLClient(userName) + var resource: BmlDownloadResponse = null + if (version == null) { + resource = client.downloadShareResource(userName, resourceId) + } else { + resource = client.downloadShareResource(userName, resourceId, version) + } + if (!resource.isSuccess) throw new DSSErrorException(911115, "下载失败") + val map = new util.HashMap[String, Object] + map += "path" -> resource.fullFilePath + map += "string" -> inputstremToString(resource.inputStream) + } + + def download(userName: String, resourceId: String, version: String): util.Map[String, Object] = { + val client: BmlClient = createBMLClient(userName) + var resource: BmlDownloadResponse = null + if (version == null) { + resource = client.downloadShareResource(userName, resourceId) + } else { + resource = client.downloadShareResource(userName, resourceId, version) + } + if (!resource.isSuccess) throw new DSSErrorException(911115, "下载失败") + val map = new util.HashMap[String, Object] + map += "path" -> resource.fullFilePath + map += "is" -> resource.inputStream + } + + def downloadToLocalPath(userName: String, resourceId: String, version: String, path: String): String = { + val result = download(userName,resourceId,version) + val is = result.get("is").asInstanceOf[InputStream] + val os = IoUtils.generateExportOutputStream(path) + Utils.tryFinally(IOUtils.copy(is,os)){ + IOUtils.closeQuietly(os) + IOUtils.closeQuietly(is) + } + path + } + + def downloadAndGetFlowJson(userName: String, resourceId: String, version: String, path: String): String = { + downloadToLocalPath(userName,resourceId,version,path) + //因为下载到指定目录后返回的resource的Stream为null,只能从文件重新读取。 + val is = IoUtils.generateInputInputStream(path) + Utils.tryFinally(inputstremToString(is))(IOUtils.closeQuietly(is)) + } + + def readLocalResourceFile(userName: String,readPath: String): InputStream ={ + IoUtils.generateInputInputStream(readPath) + } + + def readLocalFlowJsonFile(userName: String,readPath: String): String ={ + var inputStream:InputStream = null + var flowJson:String = null + Utils.tryFinally{ + inputStream = IoUtils.generateInputInputStream(readPath) + flowJson = inputstremToString(inputStream) + }{ + IOUtils.closeQuietly(inputStream) + } + flowJson + } + + private def inputstremToString(inputStream: InputStream): String = { + scala.io.Source.fromInputStream(inputStream).mkString + } + + private def createBMLClient(userName: String): BmlClient = { + if (userName == null) + BmlClientFactory.createBmlClient() + else + BmlClientFactory.createBmlClient(userName) + } + + def createBmlProject(username:String, projectName:String, editUsers:util.List[String], + accessUsers:util.List[String] ): Unit ={ + val client = createBMLClient(username) + val response = client.createBmlProject(username, projectName, accessUsers, editUsers) + if (response.isSuccess){ + logger.info(s"for user $username create bml project $projectName success") + }else{ + logger.error(s"for user $username create bml project $projectName failed") + } + } + + def attachResourceAndProject(username:String, projectName:String, resourceId:String):Unit = { + val client = createBMLClient(username) + val response = client.attachResourceAndProject(projectName, resourceId) + if (response.isSuccess){ + logger.info(s"attach $username and $projectName success") + }else{ + logger.error(s"attach $username and $projectName failed") + } + } + + + def updateProjectPriv(projectName:String, username:String, editUsers:util.List[String], + accessUsers:util.List[String]): Unit ={ + val client = createBMLClient(username) + val response = client.updateProjectPriv(username, projectName, editUsers, accessUsers) + if (response.isSuccess){ + logger.info(s"attach $username and $projectName success") + }else{ + logger.error(s"attach $username and $projectName failed") + } + } + + + +}