Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Add custom job id arg in client #6943

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs {
description = "Get job metrics by JobId")
private String metricsJobId;

@Parameter(
names = {"--set-job-id"},
description = "Set custom job id for job")
private String customJobId;

@Parameter(
names = {"--get_running_job_metrics"},
description = "Gets metrics for running jobs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ public void execute() throws CommandExecuteException {
configFile.toString(),
clientCommandArgs.getVariables(),
jobConfig,
seaTunnelConfig);
seaTunnelConfig,
clientCommandArgs.getCustomJobId() != null
? Long.parseLong(clientCommandArgs.getCustomJobId())
: null);
}

// get job start time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public void testExecuteClientCommandArgsWithPluginName()
Assertions.assertDoesNotThrow(() -> SeaTunnel.run(clientCommandArgs.buildCommand()));
}

@Test
public void testSetJobId() throws FileNotFoundException, URISyntaxException {
String configurePath = "/config/fake_to_inmemory.json";
String configFile = MultiTableSinkTest.getTestConfigFile(configurePath);
long jobId = 999;
ClientCommandArgs clientCommandArgs = buildClientCommandArgs(configFile, jobId);
Assertions.assertDoesNotThrow(() -> SeaTunnel.run(clientCommandArgs.buildCommand()));
}

@Test
public void testExecuteClientCommandArgsWithoutPluginName()
throws FileNotFoundException, URISyntaxException {
Expand All @@ -58,12 +67,19 @@ public void testExecuteClientCommandArgsWithoutPluginName()
commandExecuteException.getCause().getMessage());
}

private static ClientCommandArgs buildClientCommandArgs(String configFile) {
private static ClientCommandArgs buildClientCommandArgs(String configFile, Long jobId) {
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setVariables(new ArrayList<>());
clientCommandArgs.setConfigFile(configFile);
clientCommandArgs.setMasterType(MasterType.LOCAL);
clientCommandArgs.setCheckConfig(false);
if (jobId != null) {
clientCommandArgs.setCustomJobId(String.valueOf(jobId));
}
return clientCommandArgs;
}

private static ClientCommandArgs buildClientCommandArgs(String configFile) {
return buildClientCommandArgs(configFile, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,18 @@ public ClientJobExecutionEnvironment createExecutionContext(
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig) {
return new ClientJobExecutionEnvironment(
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig);
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, null);
}

@Override
public ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath,
List<String> variables,
@NonNull JobConfig jobConfig,
@NonNull SeaTunnelConfig seaTunnelConfig,
Long jobId) {
return new ClientJobExecutionEnvironment(
jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, jobId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ ClientJobExecutionEnvironment createExecutionContext(
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig);

ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath,
List<String> variables,
@NonNull JobConfig config,
@NonNull SeaTunnelConfig seaTunnelConfig,
Long jobId);

ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath,
@NonNull JobConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,13 @@ public ClientJobExecutionEnvironment(
this.seaTunnelHazelcastClient = seaTunnelHazelcastClient;
this.jobClient = new JobClient(seaTunnelHazelcastClient);
this.seaTunnelConfig = seaTunnelConfig;
this.jobConfig.setJobContext(
new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
Long finalJobId;
if (isStartWithSavePoint || jobId != null) {
finalJobId = jobId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X hi, i relealize a problem, if user pass same job id with the running jobs, will it has issue?
now i story some status in IMAP by job id. if use same job id, the status maybe impact?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the key point is that the ID cannot be repeated, which currently needs to be ensured by the user

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix by #7021

} else {
finalJobId = jobClient.getNewJobId();
}
this.jobConfig.setJobContext(new JobContext(finalJobId));
this.connectorPackageClient = new ConnectorPackageClient(seaTunnelHazelcastClient);
}

Expand All @@ -77,15 +82,16 @@ public ClientJobExecutionEnvironment(
String jobFilePath,
List<String> variables,
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
SeaTunnelConfig seaTunnelConfig) {
SeaTunnelConfig seaTunnelConfig,
Long jobId) {
this(
jobConfig,
jobFilePath,
variables,
seaTunnelHazelcastClient,
seaTunnelConfig,
false,
null);
jobId);
}

/** Search all jars in SEATUNNEL_HOME/plugins */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -312,6 +313,44 @@ public void testCancelJob() throws ExecutionException, InterruptedException {
}
}

@Test
public void testSetJobId() throws ExecutionException, InterruptedException {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("testSetJobId");
long jobId = 12345;
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
JobClient jobClient = seaTunnelClient.getJobClient();
try {
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(
filePath, new ArrayList<>(), jobConfig, SEATUNNEL_CONFIG, jobId);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Assertions.assertEquals(jobId, clientJobProxy.getJobId());

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"RUNNING", jobClient.getJobStatus(jobId)));

jobClient.cancelJob(jobId);

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"CANCELED", jobClient.getJobStatus(jobId)));
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
}
}

@Test
public void testGetJobInfo() {
Common.setDeployMode(DeployMode.CLIENT);
Expand Down
Loading