Skip to content

Commit

Permalink
[Improve] Add custom job id arg in client (#6943)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Jun 6, 2024
1 parent 84480f0 commit 068bbf7
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs {
description = "Get job metrics by JobId")
private String metricsJobId;

@Parameter(
names = {"--set-job-id"},
description = "Set custom job id for job")
private String customJobId;

@Parameter(
names = {"--get_running_job_metrics"},
description = "Gets metrics for running jobs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ public void execute() throws CommandExecuteException {
configFile.toString(),
clientCommandArgs.getVariables(),
jobConfig,
seaTunnelConfig);
seaTunnelConfig,
clientCommandArgs.getCustomJobId() != null
? Long.parseLong(clientCommandArgs.getCustomJobId())
: null);
}

// get job start time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public void testExecuteClientCommandArgsWithPluginName()
Assertions.assertDoesNotThrow(() -> SeaTunnel.run(clientCommandArgs.buildCommand()));
}

@Test
public void testSetJobId() throws FileNotFoundException, URISyntaxException {
String configurePath = "/config/fake_to_inmemory.json";
String configFile = MultiTableSinkTest.getTestConfigFile(configurePath);
long jobId = 999;
ClientCommandArgs clientCommandArgs = buildClientCommandArgs(configFile, jobId);
Assertions.assertDoesNotThrow(() -> SeaTunnel.run(clientCommandArgs.buildCommand()));
}

@Test
public void testExecuteClientCommandArgsWithoutPluginName()
throws FileNotFoundException, URISyntaxException {
Expand All @@ -58,12 +67,19 @@ public void testExecuteClientCommandArgsWithoutPluginName()
commandExecuteException.getCause().getMessage());
}

private static ClientCommandArgs buildClientCommandArgs(String configFile) {
private static ClientCommandArgs buildClientCommandArgs(String configFile, Long jobId) {
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setVariables(new ArrayList<>());
clientCommandArgs.setConfigFile(configFile);
clientCommandArgs.setMasterType(MasterType.LOCAL);
clientCommandArgs.setCheckConfig(false);
if (jobId != null) {
clientCommandArgs.setCustomJobId(String.valueOf(jobId));
}
return clientCommandArgs;
}

private static ClientCommandArgs buildClientCommandArgs(String configFile) {
return buildClientCommandArgs(configFile, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,18 @@ public ClientJobExecutionEnvironment createExecutionContext(
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig) {
return new ClientJobExecutionEnvironment(
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig);
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, null);
}

@Override
public ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath,
List<String> variables,
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig,
Long jobId) {
return new ClientJobExecutionEnvironment(
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, jobId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ ClientJobExecutionEnvironment createExecutionContext(
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig);

ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath,
List<String> variables,
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig,
Long jobId);

ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
@NonNull JobConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ public ClientJobExecutionEnvironment(
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.seaTunnelConfig = seaTunnelConfig;
this.jobConfig.setJobContext(
new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
Long finalJobId;
if (isStartWithSavePoint || jobId != null) {
finalJobId = jobId;
} else {
finalJobId = jobClient.getNewJobId();
}
this.jobConfig.setJobContext(new JobContext(finalJobId));
this.connectorPackageClient = new ConnectorPackageClient(seaTunnelHazelcastClient);
}

Expand All @@ -77,15 +82,16 @@ public ClientJobExecutionEnvironment(
String jobFilePath,
List<String> variables,
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
SeaTunnelConfig seaTunnelConfig) {
SeaTunnelConfig seaTunnelConfig,
Long jobId) {
this(
jobConfig,
jobFilePath,
variables,
seaTunnelHazelcastClient,
seaTunnelConfig,
false,
null);
jobId);
}

/** Search all jars in SEATUNNEL_HOME/plugins */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -312,6 +313,44 @@ public void testCancelJob() throws ExecutionException, InterruptedException {
}
}

@Test
public void testSetJobId() throws ExecutionException, InterruptedException {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("testSetJobId");
long jobId = 12345;
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
JobClient jobClient = seaTunnelClient.getJobClient();
try {
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(
filePath, new ArrayList<>(), jobConfig, SEATUNNEL_CONFIG, jobId);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Assertions.assertEquals(jobId, clientJobProxy.getJobId());

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"RUNNING", jobClient.getJobStatus(jobId)));

jobClient.cancelJob(jobId);

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"CANCELED", jobClient.getJobStatus(jobId)));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
}
}

@Test
public void testGetJobInfo() {
Common.setDeployMode(DeployMode.CLIENT);
Expand Down

0 comments on commit 068bbf7

Please sign in to comment.