From 34bd14a811e37c9eae9d69581b5cb577c2e6dbed Mon Sep 17 00:00:00 2001 From: alovew Date: Thu, 19 Jan 2023 13:52:44 -0800 Subject: [PATCH] tests --- .../activities/ConfigFetchActivityTest.java | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java index 320df0ac0af4..d372d91b3377 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.when; import io.airbyte.api.client.generated.ConnectionApi; +import io.airbyte.api.client.generated.JobsApi; import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionRead; @@ -18,10 +19,10 @@ import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron; import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionStatus; +import io.airbyte.api.client.model.generated.JobOptionalRead; +import io.airbyte.api.client.model.generated.JobRead; import io.airbyte.api.client.model.generated.WorkspaceRead; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.models.Job; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverOutput; @@ -29,7 +30,6 @@ import java.time.Duration; import java.time.Instant; import java.util.Calendar; -import java.util.Optional; import java.util.TimeZone; import java.util.UUID; import org.assertj.core.api.Assertions; @@ -47,12 +47,12 @@ class ConfigFetchActivityTest { private static final Integer SYNC_JOB_MAX_ATTEMPTS = 3; @Mock - private JobPersistence mJobPersistence; + private JobsApi mJobsApi; @Mock private WorkspaceApi mWorkspaceApi; @Mock - private Job mJob; + private JobRead mJobRead; @Mock private ConnectionApi mConnectionApi; @@ -103,7 +103,7 @@ class ConfigFetchActivityTest { @BeforeEach void setup() { configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond(), mConnectionApi); } @@ -113,8 +113,8 @@ class TimeToWaitTest { @Test @DisplayName("Test that the job gets scheduled if it is not manual and if it is the first run with legacy schedule schema") void testFirstJobNonManual() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.empty()); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead()); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithLegacySchedule); @@ -173,13 +173,13 @@ void testDeleted() throws ApiException { @DisplayName("Test we will wait the required amount of time with legacy config") void testWait() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); - when(mJob.getStartedAtInSecond()) - .thenReturn(Optional.of(60L)); + when(mJobRead.getStartedAt()) + .thenReturn(60L); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithLegacySchedule); @@ -196,13 +196,13 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep @DisplayName("Test we will not wait if we are late in the legacy schedule schema") void testNotWaitIfLate() throws IOException, ApiException { configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi); + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi); - when(mJob.getStartedAtInSecond()) - .thenReturn(Optional.of(60L)); + when(mJobRead.getStartedAt()) + .thenReturn(60L); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithLegacySchedule); @@ -234,8 +234,8 @@ void testManualScheduleType() throws ApiException { @Test @DisplayName("Test that the job will be immediately scheduled if it is a BASIC_SCHEDULE type on the first run") void testBasicScheduleTypeFirstRun() throws IOException, ApiException { - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.empty()); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead()); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithBasicScheduleType); @@ -251,13 +251,13 @@ void testBasicScheduleTypeFirstRun() throws IOException, ApiException { @Test @DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run") void testBasicScheduleSubsequentRun() throws IOException, ApiException { - configFetchActivity = new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); + configFetchActivity = new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); - when(mJob.getStartedAtInSecond()) - .thenReturn(Optional.of(60L)); + when(mJobRead.getStartedAt()) + .thenReturn(60L); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithBasicScheduleType); @@ -282,11 +282,11 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID())); configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithCronScheduleType); @@ -311,12 +311,12 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID())); configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); - when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L)); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobRead.getStartedAt()).thenReturn(mockRightNow.getTimeInMillis() / 1000L); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithCronScheduleType); @@ -342,12 +342,12 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf .thenReturn(new WorkspaceRead().workspaceId(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"))); configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); - when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L)); - when(mJobPersistence.getLastReplicationJob(connectionId)) - .thenReturn(Optional.of(mJob)); + when(mJobRead.getStartedAt()).thenReturn(mockRightNow.getTimeInMillis() / 1000L); + when(mJobsApi.getLastReplicationJob(any())) + .thenReturn(new JobOptionalRead().job(mJobRead)); when(mConnectionApi.getConnection(any())) .thenReturn(connectionReadWithCronScheduleType); @@ -368,7 +368,7 @@ class TestGetMaxAttempt { void testGetMaxAttempt() { final int maxAttempt = 15031990; configFetchActivity = - new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi); + new ConfigFetchActivityImpl(mJobsApi, mWorkspaceApi, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi); Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt()) .isEqualTo(maxAttempt); }