Skip to content

Commit

Permalink
Merge pull request #1129 from WeDataSphere/master
Browse files Browse the repository at this point in the history
Update branch-1.2.1
  • Loading branch information
zqburde authored Jun 27, 2024
2 parents f873293 + 6cfb704 commit 66d949e
Show file tree
Hide file tree
Showing 398 changed files with 17,831 additions and 1,139 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ dss-apps/dss-data-governance/dss-data-warehouse-dao/target
dss-apps/dss-data-governance/dss-data-warehouse-service/target
dss-apps/dss-data-governance/dss-data-warehouse-server/target

#dss-git
dss-git/dss-git-common/target
dss-git/dss-git-server/target

# plugins
plugins/azkaban/linkis-jobtype/target
Expand Down
2 changes: 1 addition & 1 deletion assembly/dss-package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>dss</artifactId>
<groupId>com.webank.wedatasphere.dss</groupId>
<version>1.1.0.20-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>dss</artifactId>
<groupId>com.webank.wedatasphere.dss</groupId>
<version>1.1.0.20-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<packaging>pom</packaging>
Expand Down
1 change: 0 additions & 1 deletion conf/dss-framework-orchestrator-server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ wds.linkis.server.mybatis.typeAliasesPackage=com.webank.wedatasphere.dss.server.

wds.linkis.server.mybatis.BasePackage=com.webank.wedatasphere.dss.framework.appconn.dao,com.webank.wedatasphere.dss.orchestrator.core.dao,com.webank.wedatasphere.dss.server.dao,com.webank.wedatasphere.dss.application.dao,com.webank.wedatasphere.dss.workspace.mapper,com.webank.wedatasphere.dss.workspace.common.dao,com.webank.wedatasphere.dss.workspace.common.dao,com.webank.wedatasphere.dss.orchestrator.db.dao,com.webank.wedatasphere.dss.workflow.dao,com.webank.wedatasphere.dss.framework.appconn.dao,com.webank.wedatasphere.dss.flow.execution.entrance.dao

wds.dss.server.scheduling.clear.cs.cron=0 0 3 * * ?

wds.dss.publish.max.remain.version=3

Expand Down
33 changes: 33 additions & 0 deletions db/dss_ddl.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ CREATE TABLE `dss_orchestrator_info` (
`orchestrator_level` varchar(32) DEFAULT NULL COMMENT '工作流级别',
`update_user` varchar(100) DEFAULT NULL COMMENT '更新人',
`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
`status` VARCHAR(64),
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `unique_idx_uuid` (`uuid`)
) ENGINE=InnoDB AUTO_INCREMENT=326 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;
Expand All @@ -70,6 +71,7 @@ CREATE TABLE `dss_orchestrator_version_info` (
`content` varchar(255) DEFAULT NULL,
`context_id` varchar(200) DEFAULT NULL COMMENT '上下文ID',
`valid_flag` INT(1) DEFAULT '1' COMMENT '版本有效标示,0:无效;1:有效',
`commit_id` varchar(64),
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=422 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

Expand Down Expand Up @@ -116,6 +118,7 @@ CREATE TABLE `dss_project` (
`dev_process` varchar(200) COLLATE utf8_bin DEFAULT NULL COMMENT '开发流程,多个以英文逗号分隔,取得的值是dss_workspace_dictionary中的dic_key(parent_key=p_develop_process)',
`orchestrator_mode` varchar(200) COLLATE utf8_bin DEFAULT NULL COMMENT '编排模式,多个以英文逗号分隔,取得的值是dss_workspace_dictionary中的dic_key(parent_key=p_arrangement_mode或下面一级)',
`visible` tinyint(4) DEFAULT '1' COMMENT '0:已删除;1:未删除(默认)',
`associate_git` TINYINT DEFAULT '0' COMMENT '0:未接入git,1:已接入git',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=313 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=COMPACT;

Expand Down Expand Up @@ -639,3 +642,33 @@ key `idx_limit_name` (`limit_name`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COLLATE=utf8mb4_bin COMMENT ='dss用户限制表';

DROP TABLE IF EXISTS `dss_workspace_associate_git`;
CREATE TABLE `dss_workspace_associate_git` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`workspace_id` bigint(20) DEFAULT NULL,
`git_user` varchar(64) DEFAULT NULL COMMENT 'git登录用户名',
`git_password` VARCHAR(255) DEFAULT NULL COMMENT 'git登录密码,用于跳转',
`git_token` varchar(255) COMMENT '用户配置的git token',
`git_url` varchar(255),
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`create_by` varchar(128) DEFAULT NULL,
`update_by` varchar(128) DEFAULT NULL,
`type` varchar(32) DEFAULT NULL,
`git_user_id` varchar(32) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='工作空间绑定的git信息';


DROP TABLE IF EXISTS `dss_orchestrator_submit_job_info`;
CREATE TABLE `dss_orchestrator_submit_job_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`orchestrator_id` bigint(20) NOT NULL,
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改时间',
`instance_name` varchar(128) DEFAULT NULL COMMENT '提交任务的实例',
`status` varchar(128) DEFAULT NULL COMMENT '提交任务状态',
`error_msg` varchar(2048) DEFAULT NULL COMMENT '提交任务异常信息',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='dss_orchestrator_submit_job_info表';

4 changes: 2 additions & 2 deletions dss-appconn/appconns/dss-datachecker-appconn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>dss</artifactId>
<groupId>com.webank.wedatasphere.dss</groupId>
<version>1.1.0.20-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -54,7 +54,7 @@
<dependency>
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>dss-origin-sso-integration-standard</artifactId>
<version>1.1.0.20-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.apache.linkis</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public DataChecker(Properties p, DataCheckerExecutionAction action) {
maxWaitTime = Long.valueOf(p.getProperty(DataChecker.WAIT_TIME, "1")) * 3600 * 1000;
//test over time
// maxWaitTime = Long.valueOf(p.getProperty(DataChecker.WAIT_TIME, "1")) * 120 * 1000;
queryFrequency = Integer.valueOf(p.getProperty(DataChecker.QUERY_FREQUENCY, "30000"));
queryFrequency = Integer.valueOf(p.getProperty(DataChecker.QUERY_FREQUENCY, "60000"));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ public class DataCheckerDao {
private static final String SQL_SOURCE_TYPE_JOB_PARTITION =
"SELECT * FROM DBS d JOIN TBLS t ON t.DB_ID = d.DB_ID JOIN PARTITIONS p ON p.TBL_ID = t.TBL_ID WHERE d.NAME=? AND t.TBL_NAME=? AND p.PART_NAME=?";

private static final String SQL_SOURCE_TYPE_BDP =
"SELECT * FROM desktop_bdapimport WHERE bdap_db_name = ? AND bdap_table_name = ? AND target_partition_name = ? AND status = '1';";

private static final String SQL_SOURCE_TYPE_BDP_WITH_TIME_CONDITION =
"SELECT * FROM desktop_bdapimport WHERE bdap_db_name = ? AND bdap_table_name = ? AND target_partition_name = ? " +
"AND (UNIX_TIMESTAMP() - UNIX_TIMESTAMP(STR_TO_DATE(modify_time, '%Y-%m-%d %H:%i:%s'))) <= ? AND status = '1';";

private static final String SQL_DOPS_CHECK_TABLE =
"SELECT * FROM dops_clean_task_list WHERE db_name = ? AND tb_name = ? AND part_name is null AND task_state NOT IN (10,13) order by order_id desc limit 1";
private static final String SQL_DOPS_CHECK_PARTITION =
Expand All @@ -72,7 +65,6 @@ public class DataCheckerDao {
private static final String MASK_SOURCE_TYPE = "maskdb";

private static DataSource jobDS;
private static DataSource bdpDS;

private static DataSource dopsDS;
private static volatile DataCheckerDao instance;
Expand All @@ -96,13 +88,6 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe
return false;
}
}
if (bdpDS == null) {
bdpDS = DataDruidFactory.getBDPInstance(props, log);
if (bdpDS == null) {
log.warn("Error getting job Druid DataSource instance");
return false;
}
}
boolean systemCheck = Boolean.valueOf(props.getProperty(DataChecker.QUALITIS_SWITCH));
if (systemCheck && dopsDS == null) {
dopsDS = DataDruidFactory.getDopsInstance(props, log);//通过alibaba的druid数据库连接池获取JOB数据库连接
Expand All @@ -122,7 +107,7 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe
}
log.info("(DataChecker info) database table partition info : " + dataCheckerInfo);
long waitTime = Long.valueOf(props.getProperty(DataChecker.WAIT_TIME, "1")) * 3600 * 1000;
int queryFrequency = Integer.valueOf(props.getProperty(DataChecker.QUERY_FREQUENCY, "30000"));
int queryFrequency = Integer.valueOf(props.getProperty(DataChecker.QUERY_FREQUENCY, "60000"));
// String timeScape = props.getProperty(DataChecker.TIME_SCAPE, "NULL");
log.info("(DataChecker info) wait time : " + waitTime);
log.info("(DataChecker info) query frequency : " + queryFrequency);
Expand All @@ -134,13 +119,12 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe
});
QualitisUtil qualitisUtil = new QualitisUtil(props);
try (Connection jobConn = jobDS.getConnection();
Connection bdpConn = bdpDS.getConnection();
Connection dopsConn = dopsDS != null ? dopsDS.getConnection() : null) {
List<Boolean> allCheckRes = dataObjectList
.parallelStream()
.map(proObjectMap -> {
log.info("Begin to Check dataObject:" + proObjectMap.entrySet().toString());
boolean checkRes = getDataCheckResult(proObjectMap, jobConn, bdpConn, dopsConn, props, log,action,qualitisUtil);
boolean checkRes = getDataCheckResult(proObjectMap, jobConn, dopsConn, props, log,action,qualitisUtil);
if (null != action.getExecutionRequestRefContext()) {
if (checkRes) {
action.getExecutionRequestRefContext().appendLog("Database table partition info : " + proObjectMap.get(DataChecker.DATA_OBJECT) + " has arrived");
Expand Down Expand Up @@ -178,7 +162,6 @@ public boolean validateTableStatusFunction(Properties props, Logger log, DataChe

private boolean getDataCheckResult(Map<String, String> proObjectMap,
Connection jobConn,
Connection bdpConn,
Connection dopsConn,
Properties props,
Logger log,
Expand Down Expand Up @@ -231,7 +214,7 @@ private boolean getDataCheckResult(Map<String, String> proObjectMap,
}
log.info("start to check maskis");
proObjectMap.put(DataChecker.SOURCE_TYPE, MASK_SOURCE_TYPE);
normalCheck= (getBdpTotalCount(dataObject, bdpConn, log, props) > 0 || "success".equals(fetchMaskCode(dataObject, log, props).get("maskStatus")));
normalCheck= "success".equals(fetchMaskCode(dataObject, log, props).get("maskStatus"));
if (null != action.getExecutionRequestRefContext()){
action.getExecutionRequestRefContext().appendLog(dataObjectStr+" check maskis end,check result:"+normalCheck);
}
Expand Down Expand Up @@ -316,25 +299,6 @@ private PreparedStatement getJobStatement(Connection conn, CheckDataObject dataO
}
}

/**
* 构造查询maskis的查询
*/
private PreparedStatement getBdpStatement(Connection conn, CheckDataObject dataObject, String timeScape) throws SQLException {
PreparedStatement pstmt = null;
if (timeScape.equals("NULL")) {
pstmt = conn.prepareCall(SQL_SOURCE_TYPE_BDP);
} else {
pstmt = conn.prepareCall(SQL_SOURCE_TYPE_BDP_WITH_TIME_CONDITION);
pstmt.setInt(4, Integer.valueOf(timeScape) * 3600);
}
if (dataObject.getPartitionName() == null) {
dataObject.setPartitionName("");
}
pstmt.setString(1, dataObject.getDbName());
pstmt.setString(2, dataObject.getTableName());
pstmt.setString(3, dataObject.getPartitionName());
return pstmt;
}

/**
* 构造查询dops库的查询
Expand Down Expand Up @@ -414,27 +378,6 @@ private long getJobTotalCount(CheckDataObject dataObject, Connection conn, Logge
}
}

/**
* 查mask db
*/
private long getBdpTotalCount(CheckDataObject dataObject, Connection conn, Logger log, Properties props) {
String timeScape = props.getOrDefault(DataChecker.TIME_SCAPE, "NULL").toString();
log.info("-------------------------------------- search bdp data ");
log.info("-------------------------------------- dataObject: " + dataObject.toString());
try (PreparedStatement pstmt = getBdpStatement(conn, dataObject, timeScape)) {
ResultSet rs = pstmt.executeQuery();
long ret = 0L;
while (rs.next()) {
ret ++;
}
// long ret=rs.last() ? rs.getRow() : 0;
log.info("-------------------------------------- bdp data result:"+ret);
return ret;
} catch (SQLException e) {
log.error("fetch data from bdp error", e);
return 0;
}
}

/**
* - 返回0表示未找到任何记录 ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

public class DataDruidFactory {
private static volatile DruidDataSource jobInstance;
private static volatile DruidDataSource bdpInstance;

private static volatile DruidDataSource dopsInstance;

private static volatile DruidDataSource msgInstance;

public static DruidDataSource getJobInstance(Properties props, Logger log) {
Expand All @@ -42,6 +44,7 @@ public static DruidDataSource getJobInstance(Properties props, Logger log) {
}
return jobInstance;
}

public static DruidDataSource getBDPInstance(Properties props, Logger log) {
if (bdpInstance == null ) {
synchronized (DataDruidFactory.class) {
Expand Down
5 changes: 2 additions & 3 deletions dss-appconn/appconns/dss-dolphinscheduler-appconn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>dss</artifactId>
<groupId>com.webank.wedatasphere.dss</groupId>
<version>1.1.0.20-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -18,7 +18,6 @@
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>dss-scheduler-appconn</artifactId>
<version>${dss.version}</version>

<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -81,7 +80,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-android</version>
<version>33.1.0-jre</version>
</dependency>


Expand Down
2 changes: 1 addition & 1 deletion dss-appconn/appconns/dss-eventchecker-appconn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>dss</artifactId>
<groupId>com.webank.wedatasphere.dss</groupId>
<version>1.1.0.20-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public EventChecker(Properties p, EventCheckerExecutionAction action) {
String waitTime = p.getProperty(EventChecker.WAIT_TIME, "1");
Double doubleWaitTime = Double.valueOf(waitTime) * 3600 * 1000;
maxWaitTime = Long.valueOf(doubleWaitTime.longValue());
String query_frequency = p.getProperty(EventChecker.QUERY_FREQUENCY, "30000");
String query_frequency = p.getProperty(EventChecker.QUERY_FREQUENCY, "60000");
queryFrequency = Integer.valueOf(query_frequency);
if(queryFrequency <10000){
queryFrequency = 10000;
if(queryFrequency <60000){
queryFrequency = 60000;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void initECParams(Properties props){
runDate = props.getProperty("run_date");
userTime = props.getProperty(EventChecker.USER_TIME);
waitTime = props.getProperty(EventChecker.WAIT_TIME, "1");
query_frequency = props.getProperty(EventChecker.QUERY_FREQUENCY, "30000");
query_frequency = props.getProperty(EventChecker.QUERY_FREQUENCY, "60000");
afterSend = props.getProperty(EventChecker.AFTERSEND);
}

Expand Down
2 changes: 1 addition & 1 deletion dss-appconn/appconns/dss-schedulis-appconn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<artifactId>dss</artifactId>
<groupId>com.webank.wedatasphere.dss</groupId>
<version>1.1.0.20-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ public class AzkabanConstant {
public final static String FLOW_CONTEXT_ID = "wds.linkis.flow.contextID=";
public final static String LINKIS_VERSION = "linkis.version";
public final static String JOB_COMMENT = "comment";
public final static String AUTO_DISABLED = "auto.disabled";

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class LinkisJob {
private String type;
private String linkistype;
private String proxyUser;

private String autoDisabled;
private String dependencies;
private Map<String, String> conf;
private String command;
Expand Down Expand Up @@ -60,6 +62,14 @@ public void setProxyUser(String proxyUser) {
this.proxyUser = proxyUser;
}

public String getAutoDisabled() {
return autoDisabled;
}

public void setAutoDisabled(String autoDisabled) {
this.autoDisabled = autoDisabled;
}

public String getDependencies() {
return dependencies;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private String convertJobToString(LinkisJob job){
map.put(WorkflowConstant.PROXY_USER,job.getProxyUser());
map.put(AzkabanConstant.JOB_COMMAND,job.getCommand());
map.put(AzkabanConstant.JOB_COMMENT,job.getComment());
map.put(AzkabanConstant.AUTO_DISABLED,job.getAutoDisabled());
Map<String, Object> labels = new HashMap<>(1);
labels.put("route", SchedulerConf.JOB_LABEL.getValue());
map.put(AzkabanConstant.JOB_LABELS, DSSCommonUtils.COMMON_GSON.toJson(labels));
Expand Down Expand Up @@ -114,7 +115,8 @@ private void convertConfiguration(WorkflowNode workflowNode, LinkisJob job){
configuration.forEach((k,v)-> {
if(null!=v) {
v.forEach((k2, v2) -> {
if(null !=v2) {job.getConf().put(confprefix + k + "." + k2, v2.toString());}
if(AzkabanConstant.AUTO_DISABLED.equals(k2) && null !=v2){job.setAutoDisabled(v2.toString());}
else if(null !=v2) {job.getConf().put(confprefix + k + "." + k2, v2.toString());}
});
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public ProjectResponseRef searchProject(RefProjectContentRequestRef.RefProjectCo
params.put("project", requestRef.getProjectName());
params.put("ajax", "fetchprojectflows");
try {
logger.info("request url from Schedulis is: {}.", queryUrl);
String responseBody = SchedulisHttpUtils.getHttpGetResult(queryUrl, params, ssoRequestOperation, requestRef.getWorkspace());
logger.info("responseBody from Schedulis is: {}.", responseBody);
Map<String,Object> map = DSSCommonUtils.COMMON_GSON.fromJson(responseBody, new TypeToken<Map<String,Object>>(){}.getType());
Expand Down
Loading

0 comments on commit 66d949e

Please sign in to comment.