diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java index 9504c9cb2c1..67293ba2a32 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java @@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs { description = "Get job metrics by JobId") private String metricsJobId; + @Parameter( + names = {"--set-job-id"}, + description = "Set custom job id for job") + private String customJobId; + @Parameter( names = {"--get_running_job_metrics"}, description = "Gets metrics for running jobs") diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index d1e8b780099..4527d98d660 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -144,7 +144,10 @@ public void execute() throws CommandExecuteException { configFile.toString(), clientCommandArgs.getVariables(), jobConfig, - seaTunnelConfig); + seaTunnelConfig, + clientCommandArgs.getCustomJobId() != null + ? Long.parseLong(clientCommandArgs.getCustomJobId()) + : null); } // get job start time diff --git a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java index 87d4aa5b724..b40cc6ad3b3 100644 --- a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java +++ b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgsTest.java @@ -41,6 +41,15 @@ public void testExecuteClientCommandArgsWithPluginName() Assertions.assertDoesNotThrow(() -> SeaTunnel.run(clientCommandArgs.buildCommand())); } + @Test + public void testSetJobId() throws FileNotFoundException, URISyntaxException { + String configurePath = "/config/fake_to_inmemory.json"; + String configFile = MultiTableSinkTest.getTestConfigFile(configurePath); + long jobId = 999; + ClientCommandArgs clientCommandArgs = buildClientCommandArgs(configFile, jobId); + Assertions.assertDoesNotThrow(() -> SeaTunnel.run(clientCommandArgs.buildCommand())); + } + @Test public void testExecuteClientCommandArgsWithoutPluginName() throws FileNotFoundException, URISyntaxException { @@ -58,12 +67,19 @@ public void testExecuteClientCommandArgsWithoutPluginName() commandExecuteException.getCause().getMessage()); } - private static ClientCommandArgs buildClientCommandArgs(String configFile) { + private static ClientCommandArgs buildClientCommandArgs(String configFile, Long jobId) { ClientCommandArgs clientCommandArgs = new ClientCommandArgs(); clientCommandArgs.setVariables(new ArrayList<>()); clientCommandArgs.setConfigFile(configFile); clientCommandArgs.setMasterType(MasterType.LOCAL); clientCommandArgs.setCheckConfig(false); + if (jobId != null) { + clientCommandArgs.setCustomJobId(String.valueOf(jobId)); + } return clientCommandArgs; } + + private static ClientCommandArgs buildClientCommandArgs(String configFile) { + return buildClientCommandArgs(configFile, null); + } } diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java index 2d7508ee2de..0e891710733 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java @@ -64,7 +64,18 @@ public ClientJobExecutionEnvironment createExecutionContext( @NonNull JobConfig jobConfig, @NonNull SeaTunnelConfig seaTunnelConfig) { return new ClientJobExecutionEnvironment( - jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig); + jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, null); + } + + @Override + public ClientJobExecutionEnvironment createExecutionContext( + @NonNull String filePath, + List variables, + @NonNull JobConfig jobConfig, + @NonNull SeaTunnelConfig seaTunnelConfig, + Long jobId) { + return new ClientJobExecutionEnvironment( + jobConfig, filePath, variables, hazelcastClient, seaTunnelConfig, jobId); } @Override diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java index 36a3f2e36ee..a275f3cab77 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClientInstance.java @@ -39,6 +39,13 @@ ClientJobExecutionEnvironment createExecutionContext( @NonNull JobConfig config, @NonNull SeaTunnelConfig seaTunnelConfig); + ClientJobExecutionEnvironment createExecutionContext( + @NonNull String filePath, + List variables, + @NonNull JobConfig config, + @NonNull SeaTunnelConfig seaTunnelConfig, + Long jobId); + ClientJobExecutionEnvironment restoreExecutionContext( @NonNull String filePath, @NonNull JobConfig config, diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java index 18f1a7376f5..6e333543512 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java @@ -67,8 +67,13 @@ public ClientJobExecutionEnvironment( this.seaTunnelHazelcastClient = seaTunnelHazelcastClient; this.jobClient = new JobClient(seaTunnelHazelcastClient); this.seaTunnelConfig = seaTunnelConfig; - this.jobConfig.setJobContext( - new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId())); + Long finalJobId; + if (isStartWithSavePoint || jobId != null) { + finalJobId = jobId; + } else { + finalJobId = jobClient.getNewJobId(); + } + this.jobConfig.setJobContext(new JobContext(finalJobId)); this.connectorPackageClient = new ConnectorPackageClient(seaTunnelHazelcastClient); } @@ -77,7 +82,8 @@ public ClientJobExecutionEnvironment( String jobFilePath, List variables, SeaTunnelHazelcastClient seaTunnelHazelcastClient, - SeaTunnelConfig seaTunnelConfig) { + SeaTunnelConfig seaTunnelConfig, + Long jobId) { this( jobConfig, jobFilePath, @@ -85,7 +91,7 @@ public ClientJobExecutionEnvironment( seaTunnelHazelcastClient, seaTunnelConfig, false, - null); + jobId); } /** Search all jars in SEATUNNEL_HOME/plugins */ diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java index 0fbbe80572b..f16f61a7f07 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java +++ b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java @@ -47,6 +47,7 @@ import com.hazelcast.instance.impl.HazelcastInstanceFactory; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -312,6 +313,44 @@ public void testCancelJob() throws ExecutionException, InterruptedException { } } + @Test + public void testSetJobId() throws ExecutionException, InterruptedException { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("/streaming_fake_to_console.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("testSetJobId"); + long jobId = 12345; + SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + JobClient jobClient = seaTunnelClient.getJobClient(); + try { + ClientJobExecutionEnvironment jobExecutionEnv = + seaTunnelClient.createExecutionContext( + filePath, new ArrayList<>(), jobConfig, SEATUNNEL_CONFIG, jobId); + + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + + Assertions.assertEquals(jobId, clientJobProxy.getJobId()); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + "RUNNING", jobClient.getJobStatus(jobId))); + + jobClient.cancelJob(jobId); + + await().atMost(30000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + "CANCELED", jobClient.getJobStatus(jobId))); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + seaTunnelClient.close(); + } + } + @Test public void testGetJobInfo() { Common.setDeployMode(DeployMode.CLIENT);