Skip to content

Commit

Permalink
[ISSUE #570] ASoC connect runtime optimization: CLI (#622)
Browse files Browse the repository at this point in the history
feature(rocketmq-runtime) add CLI support for rocketmq-connect-runtime

* Add CLI

* Fix checkstyle

* Optimize CLI structure

* Add README.md

* Rename CLI

* Update pom.xml

* Optimize the connectors and tasks format

* Fix newline format
  • Loading branch information
Dreaouth authored Sep 20, 2020
1 parent 1a49e60 commit b425260
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public String verifyAndSetConfig(KeyValue config) {

@Override
public void start() {

log.info("JdbcSourceConnector start");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,23 @@ private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConf
int parallelism = tdc.getTaskParallelism();
int id = -1;
Map<String, String> topicRouteMap = ((SourceDbConnectorConfig)dbConnectorConfig).getWhiteTopics();
Map<Integer, Map<String, Map<String, String>>> taskTopicList = new HashMap<>();
Map<Integer, String> taskTopicList = new HashMap<>();
Map<Integer, Map<String, Map<String, String>>> taskWhiteList = new HashMap<>();
for (Map.Entry<String, String> entry : topicRouteMap.entrySet()) {
int ind = ++id % parallelism;
if (!taskTopicList.containsKey(ind)) {
taskTopicList.put(ind, new HashMap<>());
if (!taskWhiteList.containsKey(ind)) {
taskWhiteList.put(ind, new HashMap<>());
}
String dbKey = entry.getKey().split("-")[0];
String tableKey = entry.getKey().split("-")[1];
taskTopicList.put(ind, tableKey);
String filter = entry.getValue();
Map<String, String> tableMap = new HashMap<>();
tableMap.put(tableKey, filter);
if(!taskTopicList.get(ind).containsKey(dbKey)){
taskTopicList.get(ind).put(dbKey, tableMap);
if(!taskWhiteList.get(ind).containsKey(dbKey)){
taskWhiteList.get(ind).put(dbKey, tableMap);
}else {
taskTopicList.get(ind).get(dbKey).putAll(tableMap);
taskWhiteList.get(ind).get(dbKey).putAll(tableMap);
}
}

Expand All @@ -66,7 +68,8 @@ private List<KeyValue> divideSourceTaskByTopic(DbConnectorConfig dbConnectorConf
keyValue.put(Config.CONN_DB_PORT, tdc.getDbPort());
keyValue.put(Config.CONN_DB_USERNAME, tdc.getDbUserName());
keyValue.put(Config.CONN_DB_PASSWORD, tdc.getDbPassword());
keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskTopicList.get(i)));
keyValue.put(Config.CONN_WHITE_LIST, JSONObject.toJSONString(taskWhiteList.get(i)));
keyValue.put(Config.CONN_TOPIC_NAMES, taskTopicList.get(i));
keyValue.put(Config.CONN_DATA_TYPE, tdc.getDataType());
keyValue.put(Config.CONN_SOURCE_RECORD_CONVERTER, tdc.getSrcRecordConverter());
keyValue.put(Config.CONN_DB_MODE, tdc.getMode());
Expand Down

0 comments on commit b425260

Please sign in to comment.