Skip to content

Commit

Permalink
add task-plugin and spi
Browse files Browse the repository at this point in the history
  • Loading branch information
cyjseagull committed Aug 27, 2024
1 parent ab06f4a commit 4e81c6c
Show file tree
Hide file tree
Showing 39 changed files with 1,518 additions and 1 deletion.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ext {
springfoxVersion = "3.0.0"
guavaVersion = "32.0.1-jre"
bcprovVersion = "1.78.1"
googleAutoServiceVersion = "1.1.1"
}

// check.dependsOn integrationTest
Expand Down Expand Up @@ -135,6 +136,7 @@ allprojects {
implementation logger, spring, spring_boot

implementation("org.projectlombok:lombok:${lombokVersion}")
implementation("com.google.auto.service:auto-service:${googleAutoServiceVersion}")
implementation("org.apache.commons:commons-lang3:${apacheCommonLangVersion}")
implementation("io.springfox:springfox-boot-starter:${springfoxVersion}")
implementation("com.google.guava:guava:${guavaVersion}")
Expand All @@ -144,6 +146,7 @@ allprojects {
testImplementation ("junit:junit:${junitVersion}")

annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
annotationProcessor("com.google.auto.service:auto-service:${googleAutoServiceVersion}")
}

clean.doLast {
Expand Down
17 changes: 16 additions & 1 deletion db/wedpr_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -279,4 +279,19 @@ CREATE TABLE IF NOT EXISTS wedpr_permission (
create_by VARCHAR(20) NOT NULL DEFAULT '',
update_by VARCHAR(20) NOT NULL DEFAULT '',
PRIMARY KEY (permission_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

-- the jupyter table
create table if not exists `wedpr_jupyter_table`(
`id` varchar(64) not null comment "Jupyter资源的ID",
`owner` varchar(255) not null comment "Jupyter属主",
`agency` varchar(255) not null comment "Jupyter所属机构",
`access_entrypoint` text comment "Jupyter访问入口",
`setting` longtext comment "Jupyter配置",
`status` int comment "Jupyter状态",
`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP comment "创建时间",
`last_update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment "更新时间",
PRIMARY KEY (id),
index owner_index(`owner`(128)),
index status_index(`status`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;
9 changes: 9 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ project(":wedpr-components-scheduler").projectDir=file("wedpr-components/schedul
include "wedpr-components-security"
project(":wedpr-components-security").projectDir=file("wedpr-components/security")

include ":wedpr-components-task-plugin-api"
project(":wedpr-components-task-plugin-api").projectDir=file("wedpr-components/task-plugin/api")

include ":wedpr-components-task-plugin-shell"
project(":wedpr-components-task-plugin-shell").projectDir=file("wedpr-components/task-plugin/shell")

include ":wedpr-components-spi"
project(":wedpr-components-spi").projectDir=file("wedpr-components/spi")

include "wedpr-components-user"
project(":wedpr-components-user").projectDir=file("wedpr-components/user")

Expand Down
15 changes: 15 additions & 0 deletions wedpr-components/spi/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Apply the java-library plugin to add support for Java Library
plugins {
id 'java'
id 'com.github.sherter.google-java-format'
}
dependencies{
compile project(":wedpr-core-utils")
}
googleJavaFormat {
//toolVersion = '1.7'
options style: 'AOSP'
source = sourceSets*.allJava
include '**/*.java'
//source = *.allJava
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package com.webank.wedpr.components.spi.plugin;

import lombok.Data;

@Data
public class SPIInfo {
private String name;
private Integer priority = 0;

public SPIInfo(String name) {
this.name = name;
}

public SPIInfo(String name, Integer priority) {
this.name = name;
this.priority = priority;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package com.webank.wedpr.components.spi.plugin;

import com.webank.wedpr.core.utils.WeDPRException;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SPILoader<T extends SPIObject> {
private static final Logger logger = LoggerFactory.getLogger(SPILoader.class);
private final Map<String, T> spiObjectMap = new HashMap<>();

@SneakyThrows(Exception.class)
public SPILoader(Class<T> spiClass) {
for (T spiObject : ServiceLoader.load(spiClass)) {
// load new spi object
if (!spiObjectMap.containsKey(spiObject.getSpiInfo().getName())) {
spiObjectMap.put(spiObject.getSpiInfo().getName(), spiObject);
continue;
}
// spi conflict, load the higher priority object
T existedSPIObject = spiObjectMap.get(spiObject.getSpiInfo().getName());
if (existedSPIObject.equals(spiObject)) {
String errorMsg =
String.format(
"load spi failed for conflict, there are two spi objects with same name '%s' and priority '%s'",
spiObject.getSpiInfo().getName(),
spiObject.getSpiInfo().getPriority());
logger.error(errorMsg);
throw new WeDPRException(errorMsg);
}
if (existedSPIObject.compareTo(spiObject) > 0) {
logger.info(
"load new spi object with higher priority, name: {}, oldPriority: {}, newPriority: {}",
spiObject.getSpiInfo().getName(),
existedSPIObject.getSpiInfo().getPriority(),
spiObject.getSpiInfo().getPriority());
spiObjectMap.put(spiObject.getSpiInfo().getName(), spiObject);
continue;
}
logger.info(
"Ignore spi object with lower priority, name: {}, priority: {}, currentPriority: {}",
spiObject.getSpiInfo().getName(),
spiObject.getSpiInfo().getPriority(),
existedSPIObject.getSpiInfo().getPriority());
}
}

public Map<String, T> getSpiObjectMap() {
return spiObjectMap;
}

public T getSPIObjectByName(String spiName) {
if (!spiObjectMap.containsKey(spiName)) {
return null;
}
return spiObjectMap.get(spiName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package com.webank.wedpr.components.spi.plugin;

public class SPIObject implements Comparable<SPIObject> {
private SPIInfo spiInfo;

public SPIObject(SPIInfo spiInfo) {
this.spiInfo = spiInfo;
}

public SPIObject() {}

public SPIInfo getSpiInfo() {
return spiInfo;
}

public void setSpiInfo(SPIInfo spiInfo) {
this.spiInfo = spiInfo;
}

@Override
public int compareTo(SPIObject o) {
return spiInfo.getPriority().compareTo(o.getSpiInfo().getPriority());
}
}
16 changes: 16 additions & 0 deletions wedpr-components/task-plugin/api/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Apply the java-library plugin to add support for Java Library
plugins {
id 'java'
id 'com.github.sherter.google-java-format'
}
dependencies{
compile project(":wedpr-core-utils")
compile project(":wedpr-components-spi")
}
googleJavaFormat {
//toolVersion = '1.7'
options style: 'AOSP'
source = sourceSets*.allJava
include '**/*.java'
//source = *.allJava
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package com.webank.wedpr.components.task.plugin.api;

import com.webank.wedpr.components.task.plugin.api.model.CommandTaskConfig;
import com.webank.wedpr.components.task.plugin.api.model.CommandTaskExecutionContext;
import com.webank.wedpr.components.task.plugin.api.model.CommandTaskResponse;
import com.webank.wedpr.components.task.plugin.api.shell.ShellBuilder;
import com.webank.wedpr.core.utils.Common;
import com.webank.wedpr.core.utils.Constant;
import com.webank.wedpr.core.utils.WeDPRException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandExecutor implements WorkerExecutor {
private static final Logger logger = LoggerFactory.getLogger(CommandExecutor.class);
protected final CommandTaskExecutionContext context;

public CommandExecutor(CommandTaskExecutionContext taskExecutionContext) {
this.context = taskExecutionContext;
}

@Override
public CommandTaskResponse run(Object builder) throws Exception {
ShellBuilder shellBuilder = (ShellBuilder) builder;
// set the builder information
shellBuilder.context(context);
// set the system envs
if (!CommandTaskConfig.getSystemEnvFiles().isEmpty()) {
CommandTaskConfig.getSystemEnvFiles().forEach(shellBuilder::appendSystemEnv);
}
// set the environment
if (StringUtils.isNotBlank(context.getEnvironmentConfig())) {
shellBuilder.appendCustomEnv(context.getEnvironmentConfig());
}
long remainTime = -1;
if (this.context.getTaskTimeoutMs() > 0) {
remainTime = (System.currentTimeMillis() - this.context.getStartTime());
if (remainTime < 0) {
throw new WeDPRException("task execution timeout");
}
}
// build the launcher
CommandTaskResponse taskResponse = new CommandTaskResponse(this.context.getTaskID());
Process process = shellBuilder.build().execute();
taskResponse.setProcess(process);
this.context.setProcess(process);
int processId = Common.getProcessId(process);
taskResponse.setProcessId(processId);
this.context.setProcessId(processId);

logger.info("bootstrap process start, process id: {}", processId);
// wait for finish
boolean exitNormally = Boolean.FALSE;
if (remainTime > 0) {
exitNormally = process.waitFor(remainTime, TimeUnit.MILLISECONDS);
} else {
exitNormally = (process.waitFor() == 0 ? true : false);
}
if (exitNormally) {
taskResponse.setExitCode(process.exitValue());
} else {
// kill the command
logger.error(
"process has failure, over the task timeout configuration {}, processId: {}, ready to kill",
context.getTaskTimeoutMs(),
processId);
kill();
taskResponse.setExitCode(Constant.WEDPR_FAILED);
}
logger.info(
"execute process finished, executePath: {}, process: {}, exitCode: {}, exitNormally: {}",
this.context.getExecutePath(),
processId,
process.exitValue(),
exitNormally);
return taskResponse;
}

@Override
public void kill() throws Exception {
if (this.context.getProcess() == null) {
return;
}
logger.info("Ready to kill process: {}", this.context.getProcessId());
this.context.getProcess().destroy();
if (!this.context
.getProcess()
.waitFor(CommandTaskConfig.getKillDefaultTimeoutSeconds(), TimeUnit.SECONDS)) {
this.context.getProcess().destroyForcibly();
}
logger.info("Success kill process: {}", this.context.getProcessId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package com.webank.wedpr.components.task.plugin.api;

public interface TaskBuilder {
public abstract TaskInterface createTask(TaskExecutionContext taskExecutionContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2017-2025 [webank-wedpr]
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/
package com.webank.wedpr.components.task.plugin.api;

import com.webank.wedpr.components.spi.plugin.SPIObject;

public abstract class TaskBuilderFactory extends SPIObject {

public abstract String getName();

public abstract TaskBuilder createTaskBuilder();
}
Loading

0 comments on commit 4e81c6c

Please sign in to comment.