From 6972e08a635235e04bfe8d7dbf9f25a38b0d1a33 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 09:58:44 -0800 Subject: [PATCH 01/20] test time ranges for cancellations --- .../test/acceptance/AcceptanceTests.java | 866 +----------------- 1 file changed, 7 insertions(+), 859 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index e7703f41bc7d..12c64e3f31dd 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -4,18 +4,11 @@ package io.airbyte.test.acceptance; -import static io.airbyte.api.client.model.ConnectionSchedule.TimeUnitEnum.MINUTES; import static java.lang.Thread.sleep; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.Resources; @@ -24,12 +17,7 @@ import io.airbyte.api.client.invoker.ApiClient; import io.airbyte.api.client.invoker.ApiException; import io.airbyte.api.client.model.AirbyteCatalog; -import io.airbyte.api.client.model.AirbyteStream; -import io.airbyte.api.client.model.AirbyteStreamAndConfiguration; -import io.airbyte.api.client.model.AirbyteStreamConfiguration; import io.airbyte.api.client.model.AttemptInfoRead; -import io.airbyte.api.client.model.AttemptStatus; -import io.airbyte.api.client.model.CheckConnectionRead; import io.airbyte.api.client.model.ConnectionCreate; import io.airbyte.api.client.model.ConnectionIdRequestBody; import io.airbyte.api.client.model.ConnectionRead; @@ -37,12 +25,9 @@ import io.airbyte.api.client.model.ConnectionState; import io.airbyte.api.client.model.ConnectionStatus; import io.airbyte.api.client.model.ConnectionUpdate; -import io.airbyte.api.client.model.DataType; import io.airbyte.api.client.model.DestinationCreate; -import io.airbyte.api.client.model.DestinationDefinitionCreate; import io.airbyte.api.client.model.DestinationDefinitionIdRequestBody; import io.airbyte.api.client.model.DestinationDefinitionRead; -import io.airbyte.api.client.model.DestinationDefinitionSpecificationRead; import io.airbyte.api.client.model.DestinationIdRequestBody; import io.airbyte.api.client.model.DestinationRead; import io.airbyte.api.client.model.DestinationSyncMode; @@ -50,8 +35,6 @@ import io.airbyte.api.client.model.JobInfoRead; import io.airbyte.api.client.model.JobRead; import io.airbyte.api.client.model.JobStatus; -import io.airbyte.api.client.model.LogType; -import io.airbyte.api.client.model.LogsRequestBody; import io.airbyte.api.client.model.NamespaceDefinitionType; import io.airbyte.api.client.model.OperationCreate; import io.airbyte.api.client.model.OperationIdRequestBody; @@ -61,20 +44,16 @@ import io.airbyte.api.client.model.OperatorNormalization.OptionEnum; import io.airbyte.api.client.model.OperatorType; import io.airbyte.api.client.model.SourceCreate; -import io.airbyte.api.client.model.SourceDefinitionCreate; import io.airbyte.api.client.model.SourceDefinitionIdRequestBody; import io.airbyte.api.client.model.SourceDefinitionRead; -import io.airbyte.api.client.model.SourceDefinitionSpecificationRead; import io.airbyte.api.client.model.SourceIdRequestBody; import io.airbyte.api.client.model.SourceRead; import io.airbyte.api.client.model.SyncMode; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.MoreBooleans; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.util.MoreProperties; -import io.airbyte.container_orchestrator.ContainerOrchestratorApp; import io.airbyte.db.Database; import io.airbyte.db.Databases; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; @@ -84,7 +63,6 @@ import java.io.File; import java.io.IOException; import java.net.Inet4Address; -import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.nio.charset.Charset; @@ -94,17 +72,14 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.lang3.RandomUtils; import org.jooq.JSONB; import org.jooq.Record; import org.jooq.Result; @@ -112,17 +87,13 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.TestMethodOrder; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; @@ -308,687 +279,8 @@ public void tearDown() throws ApiException, SQLException { } } - @Test - @Order(-2) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testGetDestinationSpec() throws ApiException { - final UUID destinationDefinitionId = getDestinationDefId(); - final DestinationDefinitionSpecificationRead spec = apiClient.getDestinationDefinitionSpecificationApi() - .getDestinationDefinitionSpecification(new DestinationDefinitionIdRequestBody().destinationDefinitionId(destinationDefinitionId)); - assertEquals(destinationDefinitionId, spec.getDestinationDefinitionId()); - assertNotNull(spec.getConnectionSpecification()); - } - - @Test - @Order(-1) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testFailedGet404() { - final var e = assertThrows(ApiException.class, () -> apiClient.getDestinationDefinitionSpecificationApi() - .getDestinationDefinitionSpecification(new DestinationDefinitionIdRequestBody().destinationDefinitionId(UUID.randomUUID()))); - assertEquals(404, e.getCode()); - } - - @Test - @Order(0) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testGetSourceSpec() throws ApiException { - final UUID sourceDefId = getPostgresSourceDefinitionId(); - final SourceDefinitionSpecificationRead spec = apiClient.getSourceDefinitionSpecificationApi() - .getSourceDefinitionSpecification(new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceDefId)); - assertEquals(sourceDefId, spec.getSourceDefinitionId()); - assertNotNull(spec.getConnectionSpecification()); - } - - @Test - @Order(1) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testCreateDestination() throws ApiException { - final UUID destinationDefId = getDestinationDefId(); - final JsonNode destinationConfig = getDestinationDbConfig(); - final String name = "AccTestDestinationDb-" + UUID.randomUUID(); - - final DestinationRead createdDestination = createDestination( - name, - workspaceId, - destinationDefId, - destinationConfig); - - assertEquals(name, createdDestination.getName()); - assertEquals(destinationDefId, createdDestination.getDestinationDefinitionId()); - assertEquals(workspaceId, createdDestination.getWorkspaceId()); - assertEquals(getDestinationDbConfigWithHiddenPassword(), createdDestination.getConnectionConfiguration()); - } - - @Test - @Order(2) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testDestinationCheckConnection() throws ApiException { - final UUID destinationId = createDestination().getDestinationId(); - - final CheckConnectionRead.StatusEnum checkOperationStatus = apiClient.getDestinationApi() - .checkConnectionToDestination(new DestinationIdRequestBody().destinationId(destinationId)) - .getStatus(); - - assertEquals(CheckConnectionRead.StatusEnum.SUCCEEDED, checkOperationStatus); - } - - @Test - @Order(3) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testCreateSource() throws ApiException { - final String dbName = "acc-test-db"; - final UUID postgresSourceDefinitionId = getPostgresSourceDefinitionId(); - final JsonNode sourceDbConfig = getSourceDbConfig(); - - final SourceRead response = createSource( - dbName, - workspaceId, - postgresSourceDefinitionId, - sourceDbConfig); - - final JsonNode expectedConfig = Jsons.jsonNode(sourceDbConfig); - // expect replacement of secret with magic string. - ((ObjectNode) expectedConfig).put("password", "**********"); - assertEquals(dbName, response.getName()); - assertEquals(workspaceId, response.getWorkspaceId()); - assertEquals(postgresSourceDefinitionId, response.getSourceDefinitionId()); - assertEquals(expectedConfig, response.getConnectionConfiguration()); - } - - @Test - @Order(4) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testSourceCheckConnection() throws ApiException { - final UUID sourceId = createPostgresSource().getSourceId(); - - final CheckConnectionRead checkConnectionRead = apiClient.getSourceApi().checkConnectionToSource(new SourceIdRequestBody().sourceId(sourceId)); - - assertEquals( - CheckConnectionRead.StatusEnum.SUCCEEDED, - checkConnectionRead.getStatus(), - checkConnectionRead.getMessage()); - } - - @Test - @Order(5) - public void testDiscoverSourceSchema() throws ApiException { - final UUID sourceId = createPostgresSource().getSourceId(); - - final AirbyteCatalog actual = discoverSourceSchema(sourceId); - - final Map> fields = ImmutableMap.of( - COLUMN_ID, ImmutableMap.of("type", DataType.NUMBER), - COLUMN_NAME, ImmutableMap.of("type", DataType.STRING)); - final JsonNode jsonSchema = Jsons.jsonNode(ImmutableMap.builder() - .put("type", "object") - .put("properties", fields) - .build()); - final AirbyteStream stream = new AirbyteStream() - .name(STREAM_NAME) - .namespace("public") - .jsonSchema(jsonSchema) - .defaultCursorField(Collections.emptyList()) - .sourceDefinedPrimaryKey(Collections.emptyList()) - .supportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); - final AirbyteStreamConfiguration streamConfig = new AirbyteStreamConfiguration() - .syncMode(SyncMode.FULL_REFRESH) - .cursorField(Collections.emptyList()) - .destinationSyncMode(DestinationSyncMode.APPEND) - .primaryKey(Collections.emptyList()) - .aliasName(STREAM_NAME.replace(".", "_")) - .selected(true); - final AirbyteCatalog expected = new AirbyteCatalog() - .streams(Lists.newArrayList(new AirbyteStreamAndConfiguration() - .stream(stream) - .config(streamConfig))); - - assertEquals(expected, actual); - } - - @Test - @Order(6) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testCreateConnection() throws ApiException { - final UUID sourceId = createPostgresSource().getSourceId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final String name = "test-connection-" + UUID.randomUUID(); - final ConnectionSchedule schedule = new ConnectionSchedule().timeUnit(MINUTES).units(100L); - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final ConnectionRead createdConnection = createConnection(name, sourceId, destinationId, List.of(operationId), catalog, schedule); - - assertEquals(sourceId, createdConnection.getSourceId()); - assertEquals(destinationId, createdConnection.getDestinationId()); - assertEquals(1, createdConnection.getOperationIds().size()); - assertEquals(operationId, createdConnection.getOperationIds().get(0)); - assertEquals(catalog, createdConnection.getSyncCatalog()); - assertEquals(schedule, createdConnection.getSchedule()); - assertEquals(name, createdConnection.getName()); - } - - @Test - @Order(7) - public void testManualSync() throws Exception { - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - assertSourceAndDestinationDbInSync(false); - } - - @Test - @Order(8) - public void testCancelSync() throws Exception { - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); - - final var resp = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())); - assertEquals(JobStatus.CANCELLED, resp.getJob().getStatus()); - } - - @Test - @Order(9) - public void testIncrementalSync() throws Exception { - LOGGER.info("Starting testIncrementalSync()"); - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final AirbyteStream stream = catalog.getStreams().get(0).getStream(); - - assertEquals(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), stream.getSupportedSyncModes()); - // instead of assertFalse to avoid NPE from unboxed. - assertNull(stream.getSourceDefinedCursor()); - assertTrue(stream.getDefaultCursorField().isEmpty()); - assertTrue(stream.getSourceDefinedPrimaryKey().isEmpty()); - - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; - catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) - .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - LOGGER.info("Beginning testIncrementalSync() sync 1"); - final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() - .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); - LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - - assertSourceAndDestinationDbInSync(false); - - // add new records and run again. - final Database source = getSourceDatabase(); - // get contents of source before mutating records. - final List expectedRecords = retrieveSourceRecords(source, STREAM_NAME); - expectedRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build())); - // add a new record - source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); - // mutate a record that was already synced with out updating its cursor value. if we are actually - // full refreshing, this record will appear in the output and cause the test to fail. if we are, - // correctly, doing incremental, we will not find this value in the destination. - source.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2")); - source.close(); - - LOGGER.info("Starting testIncrementalSync() sync 2"); - final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() - .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); - LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - - assertRawDestinationContains(expectedRecords, new SchemaTableNamePair("public", STREAM_NAME)); - - // reset back to no data. - - LOGGER.info("Starting testIncrementalSync() reset"); - final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - FeatureFlags featureFlags = new EnvVariableFeatureFlags(); - if (featureFlags.usesNewScheduler()) { - waitForJob(apiClient.getJobsApi(), jobInfoRead.getJob(), - Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING, JobStatus.INCOMPLETE, JobStatus.FAILED)); - } else { - waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob()); - } - - LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - - assertRawDestinationContains(Collections.emptyList(), new SchemaTableNamePair("public", - STREAM_NAME)); - - // sync one more time. verify it is the equivalent of a full refresh. - LOGGER.info("Starting testIncrementalSync() sync 3"); - final JobInfoRead connectionSyncRead3 = - apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); - LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - - assertSourceAndDestinationDbInSync(false); - - } - - @Test - @Order(10) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testScheduledSync() throws Exception { - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - - final ConnectionSchedule connectionSchedule = new ConnectionSchedule().units(1L).timeUnit(MINUTES); - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, connectionSchedule); - - // When a new connection is created, Airbyte might sync it immediately (before the sync interval). - // Then it will wait the sync interval. - // todo: wait for two attempts in the UI - // if the wait isn't long enough, failures say "Connection refused" because the assert kills the - // syncs in progress - sleep(Duration.ofMinutes(4).toMillis()); - assertSourceAndDestinationDbInSync(false); - } - - @Test - @Order(11) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testMultipleSchemasAndTablesSync() throws Exception { - // create tables in another schema - PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("postgres_second_schema_multiple_tables.sql"), sourcePsql); - - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - assertSourceAndDestinationDbInSync(false); - } - - @Test - @Order(12) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testMultipleSchemasSameTablesSync() throws Exception { - // create tables in another schema - PostgreSQLContainerHelper.runSqlScript(MountableFile.forClasspathResource("postgres_separate_schema_same_table.sql"), sourcePsql); - - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - assertSourceAndDestinationDbInSync(false); - } - - @Test - @Order(13) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testIncrementalDedupeSync() throws Exception { - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP; - catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) - .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode) - .primaryKey(List.of(List.of(COLUMN_NAME)))); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - - // sync from start - final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() - .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); - - assertSourceAndDestinationDbInSync(true); - - // add new records and run again. - final Database source = getSourceDatabase(); - final List expectedRawRecords = retrieveSourceRecords(source, STREAM_NAME); - expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "sherif").build())); - expectedRawRecords.add(Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 7).put(COLUMN_NAME, "chris").build())); - source.query(ctx -> ctx.execute("UPDATE id_and_name SET id=6 WHERE name='sherif'")); - source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(7, 'chris')")); - // retrieve latest snapshot of source records after modifications; the deduplicated table in - // destination should mirror this latest state of records - final List expectedNormalizedRecords = retrieveSourceRecords(source, STREAM_NAME); - source.close(); - - final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() - .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); - - assertRawDestinationContains(expectedRawRecords, new SchemaTableNamePair("public", STREAM_NAME)); - assertNormalizedDestinationContains(expectedNormalizedRecords); - } - - @Test - @Order(14) - public void testCheckpointing() throws Exception { - final SourceDefinitionRead sourceDefinition = apiClient.getSourceDefinitionApi().createSourceDefinition(new SourceDefinitionCreate() - .name("E2E Test Source") - .dockerRepository("airbyte/source-e2e-test") - .dockerImageTag(SOURCE_E2E_TEST_CONNECTOR_VERSION) - .documentationUrl(URI.create("https://example.com"))); - - final DestinationDefinitionRead destinationDefinition = apiClient.getDestinationDefinitionApi() - .createDestinationDefinition(new DestinationDefinitionCreate() - .name("E2E Test Destination") - .dockerRepository("airbyte/destination-e2e-test") - .dockerImageTag(DESTINATION_E2E_TEST_CONNECTOR_VERSION) - .documentationUrl(URI.create("https://example.com"))); - - final SourceRead source = createSource( - "E2E Test Source -" + UUID.randomUUID(), - workspaceId, - sourceDefinition.getSourceDefinitionId(), - Jsons.jsonNode(ImmutableMap.builder() - .put("type", "EXCEPTION_AFTER_N") - .put("throw_after_n_records", 100) - .build())); - - final DestinationRead destination = createDestination( - "E2E Test Destination -" + UUID.randomUUID(), - workspaceId, - destinationDefinition.getDestinationDefinitionId(), - Jsons.jsonNode(ImmutableMap.of("type", "SILENT"))); - - final String connectionName = "test-connection"; - final UUID sourceId = source.getSourceId(); - final UUID destinationId = destination.getDestinationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final AirbyteStream stream = catalog.getStreams().get(0).getStream(); - - assertEquals( - Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL), - stream.getSupportedSyncModes()); - assertTrue(MoreBooleans.isTruthy(stream.getSourceDefinedCursor())); - - final SyncMode syncMode = SyncMode.INCREMENTAL; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; - catalog.getStreams().forEach(s -> s.getConfig() - .syncMode(syncMode) - .cursorField(List.of(COLUMN_ID)) - .destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null).getConnectionId(); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() - .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - - // wait to get out of pending. - final JobRead runningJob = waitForJob(apiClient.getJobsApi(), connectionSyncRead1.getJob(), Sets.newHashSet(JobStatus.PENDING)); - // wait to get out of running. - waitForJob(apiClient.getJobsApi(), runningJob, Sets.newHashSet(JobStatus.RUNNING)); - // now cancel it so that we freeze state! - apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead1.getJob().getId())); - - final ConnectionState connectionState = waitForConnectionState(apiClient, connectionId); - - // the source is set to emit a state message every 5th message. because of the multi threaded - // nature, we can't guarantee exactly what checkpoint will be registered. what we can do is send - // enough messages to make sure that we checkpoint at least once. - assertNotNull(connectionState.getState()); - assertTrue(connectionState.getState().get("column1").isInt()); - LOGGER.info("state value: {}", connectionState.getState().get("column1").asInt()); - assertTrue(connectionState.getState().get("column1").asInt() > 0); - assertEquals(0, connectionState.getState().get("column1").asInt() % 5); - } - - @Test - @Order(15) - public void testRedactionOfSensitiveRequestBodies() throws Exception { - // check that the source password is not present in the logs - final List serverLogLines = java.nio.file.Files.readAllLines( - apiClient.getLogsApi().getLogs(new LogsRequestBody().logType(LogType.SERVER)).toPath(), - Charset.defaultCharset()); - - assertTrue(serverLogLines.size() > 0); - - boolean hasRedacted = false; - - for (final String line : serverLogLines) { - assertFalse(line.contains(SOURCE_PASSWORD)); - - if (line.contains("REDACTED")) { - hasRedacted = true; - } - } - - assertTrue(hasRedacted); - } - - // verify that when the worker uses backpressure from pipes that no records are lost. - @Test - @Order(16) - @DisabledIfEnvironmentVariable(named = "KUBE", - matches = "true") - public void testBackpressure() throws Exception { - final SourceDefinitionRead sourceDefinition = apiClient.getSourceDefinitionApi().createSourceDefinition(new SourceDefinitionCreate() - .name("E2E Test Source") - .dockerRepository("airbyte/source-e2e-test") - .dockerImageTag(SOURCE_E2E_TEST_CONNECTOR_VERSION) - .documentationUrl(URI.create("https://example.com"))); - - final DestinationDefinitionRead destinationDefinition = apiClient.getDestinationDefinitionApi() - .createDestinationDefinition(new DestinationDefinitionCreate() - .name("E2E Test Destination") - .dockerRepository("airbyte/destination-e2e-test") - .dockerImageTag(DESTINATION_E2E_TEST_CONNECTOR_VERSION) - .documentationUrl(URI.create("https://example.com"))); - - final SourceRead source = createSource( - "E2E Test Source -" + UUID.randomUUID(), - workspaceId, - sourceDefinition.getSourceDefinitionId(), - Jsons.jsonNode(ImmutableMap.builder() - .put("type", "INFINITE_FEED") - .put("max_records", 5000) - .build())); - - final DestinationRead destination = createDestination( - "E2E Test Destination -" + UUID.randomUUID(), - workspaceId, - destinationDefinition.getDestinationDefinitionId(), - Jsons.jsonNode(ImmutableMap.builder() - .put("type", "THROTTLED") - .put("millis_per_record", 1) - .build())); - - final String connectionName = "test-connection"; - final UUID sourceId = source.getSourceId(); - final UUID destinationId = destination.getDestinationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null) - .getConnectionId(); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() - .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - - // wait to get out of pending. - final JobRead runningJob = waitForJob(apiClient.getJobsApi(), connectionSyncRead1.getJob(), Sets.newHashSet(JobStatus.PENDING)); - // wait to get out of running. - waitForJob(apiClient.getJobsApi(), runningJob, Sets.newHashSet(JobStatus.RUNNING)); - - final JobInfoRead jobInfo = apiClient.getJobsApi().getJobInfo(new JobIdRequestBody().id(runningJob.getId())); - final AttemptInfoRead attemptInfoRead = jobInfo.getAttempts().get(jobInfo.getAttempts().size() - 1); - assertNotNull(attemptInfoRead); - - int expectedMessageNumber = 0; - final int max = 10_000; - for (final String logLine : attemptInfoRead.getLogs().getLogLines()) { - if (expectedMessageNumber > max) { - break; - } - - if (logLine.contains("received record: ") && logLine.contains("\"type\": \"RECORD\"")) { - assertTrue( - logLine.contains(String.format("\"column1\": \"%s\"", expectedMessageNumber)), - String.format("Expected %s but got: %s", expectedMessageNumber, logLine)); - expectedMessageNumber++; - } - } - } - - // This test is disabled because it takes a couple minutes to run, as it is testing timeouts. - // It should be re-enabled when the @SlowIntegrationTest can be applied to it. - // See relevant issue: https://github.com/airbytehq/airbyte/issues/8397 - @Test - @Order(17) - @Disabled - public void testFailureTimeout() throws Exception { - final SourceDefinitionRead sourceDefinition = apiClient.getSourceDefinitionApi().createSourceDefinition(new SourceDefinitionCreate() - .name("E2E Test Source") - .dockerRepository("airbyte/source-e2e-test") - .dockerImageTag(SOURCE_E2E_TEST_CONNECTOR_VERSION) - .documentationUrl(URI.create("https://example.com"))); - - final DestinationDefinitionRead destinationDefinition = apiClient.getDestinationDefinitionApi() - .createDestinationDefinition(new DestinationDefinitionCreate() - .name("E2E Test Destination") - .dockerRepository("airbyte/destination-e2e-test") - .dockerImageTag(DESTINATION_E2E_TEST_CONNECTOR_VERSION) - .documentationUrl(URI.create("https://example.com"))); - - final SourceRead source = createSource( - "E2E Test Source -" + UUID.randomUUID(), - workspaceId, - sourceDefinition.getSourceDefinitionId(), - Jsons.jsonNode(ImmutableMap.builder() - .put("type", "INFINITE_FEED") - .put("max_records", 1000) - .put("message_interval", 100) - .build())); - - // Destination fails after processing 5 messages, so the job should fail after the graceful close - // timeout of 1 minute - final DestinationRead destination = createDestination( - "E2E Test Destination -" + UUID.randomUUID(), - workspaceId, - destinationDefinition.getDestinationDefinitionId(), - Jsons.jsonNode(ImmutableMap.builder() - .put("type", "FAILING") - .put("num_messages", 5) - .build())); - - final String connectionName = "test-connection"; - final UUID sourceId = source.getSourceId(); - final UUID destinationId = destination.getDestinationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null) - .getConnectionId(); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - - final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() - .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - - // wait to get out of pending. - final JobRead runningJob = waitForJob(apiClient.getJobsApi(), connectionSyncRead1.getJob(), Sets.newHashSet(JobStatus.PENDING)); - - // wait for job for max of 3 minutes, by which time the job attempt should have failed - waitForJob(apiClient.getJobsApi(), runningJob, Sets.newHashSet(JobStatus.RUNNING), Duration.ofMinutes(3)); - - final JobIdRequestBody jobId = new JobIdRequestBody().id(runningJob.getId()); - final JobInfoRead jobInfo = apiClient.getJobsApi().getJobInfo(jobId); - final AttemptInfoRead attemptInfoRead = jobInfo.getAttempts().get(jobInfo.getAttempts().size() - 1); - - // assert that the job attempt failed, and cancel the job regardless of status to prevent retries - try { - assertEquals(AttemptStatus.FAILED, attemptInfoRead.getAttempt().getStatus()); - } finally { - apiClient.getJobsApi().cancelJob(jobId); - } - } - - @Test - @Order(18) + @RepeatedTest(20) + @Order(-1800000) @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", matches = "true") public void testDowntimeDuringSync() throws Exception { @@ -1010,7 +302,9 @@ public void testDowntimeDuringSync() throws Exception { final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - Thread.sleep(5000); + final int millisToSleep = RandomUtils.nextInt() % 10000; + Thread.sleep(millisToSleep); + LOGGER.info("millisToSleep = " + millisToSleep); LOGGER.info("Scaling down workers..."); kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); @@ -1031,152 +325,6 @@ public void testDowntimeDuringSync() throws Exception { assertEquals(1, numAttempts); } - @Test - @Order(19) - @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", - matches = "true") - public void testCancelSyncWithInterruption() throws Exception { - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); - - Thread.sleep(5000); - - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); - Thread.sleep(1000); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - - final var resp = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())); - assertEquals(JobStatus.CANCELLED, resp.getJob().getStatus()); - } - - @Test - @Order(20) - @Timeout(value = 5, - unit = TimeUnit.MINUTES) - @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", - matches = "true") - public void testCuttingOffPodBeforeFilesTransfer() throws Exception { - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - - LOGGER.info("Creating connection..."); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - - LOGGER.info("Waiting for connection to be available in Temporal..."); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - - LOGGER.info("Run manual sync..."); - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - - LOGGER.info("Waiting for job to run..."); - waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); - - LOGGER.info("Scale down workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); - - LOGGER.info("Wait for worker scale down..."); - Thread.sleep(1000); - - LOGGER.info("Scale up workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - - LOGGER.info("Waiting for worker timeout..."); - Thread.sleep(ContainerOrchestratorApp.MAX_SECONDS_TO_WAIT_FOR_FILE_COPY * 1000 + 1000); - - LOGGER.info("Waiting for job to retry and succeed..."); - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - } - - @Test - @Order(21) - @Timeout(value = 5, - unit = TimeUnit.MINUTES) - @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", - matches = "true") - public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception { - final String connectionName = "test-connection"; - final UUID sourceId = createPostgresSource().getSourceId(); - final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); - final AirbyteCatalog catalog = discoverSourceSchema(sourceId); - final SyncMode syncMode = SyncMode.FULL_REFRESH; - final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; - catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - - LOGGER.info("Creating connection..."); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - - LOGGER.info("Waiting for connection to be available in Temporal..."); - // Avoid Race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } - - LOGGER.info("Run manual sync..."); - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - - LOGGER.info("Waiting for job to run..."); - waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING)); - - LOGGER.info("Waiting for job to run a little..."); - Thread.sleep(5000); - - LOGGER.info("Scale down workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); - - LOGGER.info("Waiting for worker shutdown..."); - Thread.sleep(2000); - - LOGGER.info("Starting background cancellation request..."); - final var pool = Executors.newSingleThreadExecutor(); - final var mdc = MDC.getCopyOfContextMap(); - final Future resp = - pool.submit(() -> { - MDC.setContextMap(mdc); - try { - final JobInfoRead jobInfoRead = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())); - LOGGER.info("jobInfoRead = " + jobInfoRead); - return jobInfoRead; - } catch (ApiException e) { - LOGGER.error("Failed to read from api", e); - throw e; - } - }); - Thread.sleep(2000); - - LOGGER.info("Scaling up workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - - LOGGER.info("Waiting for cancellation to go into effect..."); - assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus()); - } - private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException { return apiClient.getSourceApi().discoverSchemaForSource(new SourceIdRequestBody().sourceId(sourceId)).getCatalog(); } From 0346ca7fc79b65a95402eec9a822cc2fa4e4f516 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 11:51:33 -0800 Subject: [PATCH 02/20] try with wait --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 12c64e3f31dd..45308c3c4218 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -307,7 +307,7 @@ public void testDowntimeDuringSync() throws Exception { LOGGER.info("millisToSleep = " + millisToSleep); LOGGER.info("Scaling down workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); Thread.sleep(1000); LOGGER.info("Scaling up workers..."); From 39211c2c3c5830978bfd382d95997184c24efb60 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:37:09 -0800 Subject: [PATCH 03/20] fix cancellation on worker restart --- .../main/java/io/airbyte/config/Configs.java | 26 ++ .../java/io/airbyte/config/EnvConfigs.java | 31 +++ .../test/acceptance/AcceptanceTests.java | 20 +- .../java/io/airbyte/workers/WorkerApp.java | 174 +++++++------ .../ConnectionManagerWorkflowImpl.java | 2 +- .../shared/ActivityConfiguration.java | 14 +- .../kustomization.yaml | 4 + .../sync-only-worker.yaml | 230 ++++++++++++++++++ .../temporal-ui.yaml | 37 +++ .../worker-patch.yaml | 2 + 10 files changed, 450 insertions(+), 90 deletions(-) create mode 100644 kube/overlays/dev-integration-test-schedulerv2/sync-only-worker.yaml create mode 100644 kube/overlays/dev-integration-test-schedulerv2/temporal-ui.yaml diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 384084d6cf04..d496d4edd340 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -401,6 +401,32 @@ public interface Configs { */ MaxWorkersConfig getMaxWorkers(); + /** + * Define if the worker should run get spec workflows. Defaults to true. Internal-use only. + */ + boolean shouldRunGetSpecWorkflows(); + + /** + * Define if the worker should run check connection workflows. Defaults to true. Internal-use only. + */ + boolean shouldRunCheckConnectionWorkflows(); + + /** + * Define if the worker should run discover workflows. Defaults to true. Internal-use only. + */ + boolean shouldRunDiscoverWorkflows(); + + /** + * Define if the worker should run sync workflows. Defaults to true. Internal-use only. + */ + boolean shouldRunSyncWorkflows(); + + /** + * Define if the worker should run connection manager workflows. Defaults to true. Internal-use + * only. + */ + boolean shouldRunConnectionManagerWorkflows(); + // Worker - Kube only /** * Define the local ports the Airbyte Worker pod uses to connect to the various Job pods. diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 3d4f9fb3ebfd..353b7c300177 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -110,6 +110,12 @@ public class EnvConfigs implements Configs { public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT"; public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS"; + private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS"; + private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS"; + private static final String SHOULD_RUN_DISCOVER_WORKFLOWS = "SHOULD_RUN_DISCOVER_WORKFLOWS"; + private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS"; + private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS"; + // job-type-specific overrides public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS"; public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS"; @@ -692,6 +698,31 @@ public MaxWorkersConfig getMaxWorkers() { Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS))); } + @Override + public boolean shouldRunGetSpecWorkflows() { + return getEnvOrDefault(SHOULD_RUN_GET_SPEC_WORKFLOWS, true); + } + + @Override + public boolean shouldRunCheckConnectionWorkflows() { + return getEnvOrDefault(SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS, true); + } + + @Override + public boolean shouldRunDiscoverWorkflows() { + return getEnvOrDefault(SHOULD_RUN_DISCOVER_WORKFLOWS, true); + } + + @Override + public boolean shouldRunSyncWorkflows() { + return getEnvOrDefault(SHOULD_RUN_SYNC_WORKFLOWS, true); + } + + @Override + public boolean shouldRunConnectionManagerWorkflows() { + return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true); + } + @Override public Set getTemporalWorkerPorts() { final var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, ""); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 45308c3c4218..644669b37e74 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -79,7 +79,6 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.commons.lang3.RandomUtils; import org.jooq.JSONB; import org.jooq.Record; import org.jooq.Result; @@ -279,7 +278,8 @@ public void tearDown() throws ApiException, SQLException { } } - @RepeatedTest(20) + // todo: test with both types of workers getting scaled down, just one of each + @RepeatedTest(1) @Order(-1800000) @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", matches = "true") @@ -295,23 +295,21 @@ public void testDowntimeDuringSync() throws Exception { final UUID connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - // Avoid Race condition with the new scheduler + // Avoid race condition with the new scheduler if (featureFlags.usesNewScheduler()) { waitForTemporalWorkflow(connectionId); } final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - final int millisToSleep = RandomUtils.nextInt() % 10000; - Thread.sleep(millisToSleep); - LOGGER.info("millisToSleep = " + millisToSleep); + // todo: ideally this waits a couple seconds after the orchestrator pod starts + Thread.sleep(10000); - LOGGER.info("Scaling down workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); - Thread.sleep(1000); + LOGGER.info("Scaling down sync workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); - LOGGER.info("Scaling up workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + LOGGER.info("Scaling up sync workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); assertSourceAndDestinationDbInSync(false); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index c7ec5d63630e..ebe51956154a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -134,67 +134,31 @@ public void start() { final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService)); - final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers())); - specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class); - specWorker.registerActivitiesImplementations( - new SpecActivityImpl(specWorkerConfigs, specProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence, - airbyteVersion)); + if (configs.shouldRunGetSpecWorkflows()) { + registerGetSpec(factory); + } - final Worker checkConnectionWorker = - factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers())); - checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class); - checkConnectionWorker - .registerActivitiesImplementations( - new CheckConnectionActivityImpl(checkWorkerConfigs, checkProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, - jobPersistence, airbyteVersion)); + if (configs.shouldRunCheckConnectionWorkflows()) { + registerCheckConnection(factory); + } - final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers())); - discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class); - discoverWorker - .registerActivitiesImplementations( - new DiscoverCatalogActivityImpl(discoverWorkerConfigs, discoverProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, - logConfigs, - jobPersistence, airbyteVersion)); + if (configs.shouldRunDiscoverWorkflows()) { + registerDiscover(factory); + } - final NormalizationActivityImpl normalizationActivity = - new NormalizationActivityImpl( - containerOrchestratorConfig, - defaultWorkerConfigs, - defaultProcessFactory, - secretsHydrator, - workspaceRoot, - workerEnvironment, - logConfigs, - jobPersistence, - airbyteVersion); - final DbtTransformationActivityImpl dbtTransformationActivity = - new DbtTransformationActivityImpl( - containerOrchestratorConfig, - defaultWorkerConfigs, - defaultProcessFactory, - secretsHydrator, - workspaceRoot, - workerEnvironment, - logConfigs, - jobPersistence, - airbyteVersion); - new PersistStateActivityImpl(workspaceRoot, configRepository); - final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository); - final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); - final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl( - containerOrchestratorConfig, - replicationWorkerConfigs, - replicationProcessFactory, - secretsHydrator, - workspaceRoot, - workerEnvironment, - logConfigs, - jobPersistence, - airbyteVersion); - syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class); + if (configs.shouldRunSyncWorkflows() || configs.shouldRunConnectionManagerWorkflows()) { + if (configs.shouldRunSyncWorkflows()) { + registerSync(factory); + } - syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity); + if (configs.shouldRunConnectionManagerWorkflows()) { + registerConnectionManager(factory); + } + } + factory.start(); + } + private void registerConnectionManager(final WorkerFactory factory) { final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements()); final Worker connectionUpdaterWorker = @@ -214,29 +178,57 @@ public void start() { configRepository, jobCreator), new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()), - new ConnectionDeletionActivityImpl(connectionHelper), - replicationActivity, - normalizationActivity, - dbtTransformationActivity, - persistStateActivity); + new ConnectionDeletionActivityImpl(connectionHelper)); + } - factory.start(); + private void registerSync(final WorkerFactory factory) { + final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl(replicationWorkerConfigs, replicationProcessFactory); + + final NormalizationActivityImpl normalizationActivity = getNormalizationActivityImpl( + defaultWorkerConfigs, + defaultProcessFactory); + + final DbtTransformationActivityImpl dbtTransformationActivity = getDbtActivityImpl( + defaultWorkerConfigs, + defaultProcessFactory); + + final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository); + + final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); + syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class); + syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity); + } + + private void registerDiscover(final WorkerFactory factory) { + final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers())); + discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class); + discoverWorker + .registerActivitiesImplementations( + new DiscoverCatalogActivityImpl(discoverWorkerConfigs, discoverProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, + logConfigs, + jobPersistence, airbyteVersion)); + } + + private void registerCheckConnection(final WorkerFactory factory) { + final Worker checkConnectionWorker = + factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers())); + checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class); + checkConnectionWorker + .registerActivitiesImplementations( + new CheckConnectionActivityImpl(checkWorkerConfigs, checkProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, + jobPersistence, airbyteVersion)); + } + + public void registerGetSpec(final WorkerFactory factory) { + final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers())); + specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class); + specWorker.registerActivitiesImplementations( + new SpecActivityImpl(specWorkerConfigs, specProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence, + airbyteVersion)); } - /** - * Switches behavior based on containerOrchestratorEnabled to decide whether to use new container - * launching or not. - */ - private ReplicationActivityImpl getReplicationActivityImpl( - final Optional containerOrchestratorConfig, - final WorkerConfigs workerConfigs, - final ProcessFactory jobProcessFactory, - final SecretsHydrator secretsHydrator, - final Path workspaceRoot, - final WorkerEnvironment workerEnvironment, - final LogConfigs logConfigs, - final JobPersistence jobPersistence, - final String airbyteVersion) { + private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs workerConfigs, + final ProcessFactory jobProcessFactory) { return new ReplicationActivityImpl( containerOrchestratorConfig, @@ -250,6 +242,36 @@ private ReplicationActivityImpl getReplicationActivityImpl( airbyteVersion); } + private NormalizationActivityImpl getNormalizationActivityImpl(final WorkerConfigs workerConfigs, + final ProcessFactory jobProcessFactory) { + + return new NormalizationActivityImpl( + containerOrchestratorConfig, + workerConfigs, + jobProcessFactory, + secretsHydrator, + workspaceRoot, + workerEnvironment, + logConfigs, + jobPersistence, + airbyteVersion); + } + + private DbtTransformationActivityImpl getDbtActivityImpl(final WorkerConfigs workerConfigs, + final ProcessFactory jobProcessFactory) { + + return new DbtTransformationActivityImpl( + containerOrchestratorConfig, + workerConfigs, + jobProcessFactory, + secretsHydrator, + workspaceRoot, + workerEnvironment, + logConfigs, + jobPersistence, + airbyteVersion); + } + private static ProcessFactory getJobProcessFactory(final Configs configs, final WorkerConfigs workerConfigs) throws IOException { if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { final KubernetesClient fabricClient = new DefaultKubernetesClient(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index de9c6ba5bc7e..a0c2c579eb35 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -455,7 +455,7 @@ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) { final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class, ChildWorkflowOptions.newBuilder() .setWorkflowId("sync_" + workflowInternalState.getJobId()) - .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + .setTaskQueue(TemporalJobType.SYNC.name()) // This will cancel the child workflow when the parent is terminated .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL) .build()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index 10a379d665c5..69cde0603974 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -6,15 +6,17 @@ import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; +import io.airbyte.workers.WorkerException; import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; import java.time.Duration; /** * Shared temporal workflow configuration in order to ensure that * {@link io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow} and - * {@link io.airbyte.workers.temporal.sync.SyncWorkflow} configurations are on sync, expecialy for + * {@link io.airbyte.workers.temporal.sync.SyncWorkflow} configurations are on sync, especially for * the grace period. */ public class ActivityConfiguration { @@ -24,12 +26,20 @@ public class ActivityConfiguration { private static final int MAX_SYNC_TIMEOUT_DAYS = configs.getSyncJobMaxTimeoutDays(); private static final Duration DB_INTERACTION_TIMEOUT = Duration.ofSeconds(configs.getMaxActivityTimeoutSecond()); + // retry infinitely if the worker is killed without exceptions and dies due to timeouts + // but fail for everything thrown by the call itself which is rethrown as runtime exceptions + private static final RetryOptions ORCHESTRATOR_RETRY = RetryOptions.newBuilder() + .setDoNotRetry(RuntimeException.class.getName(), WorkerException.class.getName()) + .build(); + + private static final RetryOptions RETRY_POLICY = new EnvConfigs().getContainerOrchestratorEnabled() ? ORCHESTRATOR_RETRY : TemporalUtils.NO_RETRY; + public static final ActivityOptions LONG_RUN_OPTIONS = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(TemporalUtils.NO_RETRY) + .setRetryOptions(RETRY_POLICY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); diff --git a/kube/overlays/dev-integration-test-schedulerv2/kustomization.yaml b/kube/overlays/dev-integration-test-schedulerv2/kustomization.yaml index c6a280178193..06c4a8222a5c 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/kustomization.yaml +++ b/kube/overlays/dev-integration-test-schedulerv2/kustomization.yaml @@ -6,6 +6,10 @@ namespace: default bases: - ../../resources +resources: + - sync-only-worker.yaml + - temporal-ui.yaml + images: - name: airbyte/db newTag: dev diff --git a/kube/overlays/dev-integration-test-schedulerv2/sync-only-worker.yaml b/kube/overlays/dev-integration-test-schedulerv2/sync-only-worker.yaml new file mode 100644 index 000000000000..c9547abdbc91 --- /dev/null +++ b/kube/overlays/dev-integration-test-schedulerv2/sync-only-worker.yaml @@ -0,0 +1,230 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-sync-worker +spec: + replicas: 1 + selector: + matchLabels: + airbyte: sync-worker + template: + metadata: + labels: + airbyte: sync-worker + spec: + serviceAccountName: airbyte-admin + automountServiceAccountToken: true + containers: + - name: airbyte-worker-container + image: airbyte/worker + env: + - name: AIRBYTE_VERSION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: AIRBYTE_VERSION + - name: CONFIG_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONFIG_ROOT + - name: DATABASE_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_HOST + - name: DATABASE_PORT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_PORT + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: DATABASE_PASSWORD + - name: DATABASE_URL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_URL + - name: DATABASE_USER + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: DATABASE_USER + - name: TRACKING_STRATEGY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TRACKING_STRATEGY + - name: WORKSPACE_DOCKER_MOUNT + value: workspace + - name: WORKSPACE_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WORKSPACE_ROOT + - name: WORKER_ENVIRONMENT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WORKER_ENVIRONMENT + - name: LOCAL_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: LOCAL_ROOT + - name: WEBAPP_URL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WEBAPP_URL + - name: TEMPORAL_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_HOST + - name: TEMPORAL_WORKER_PORTS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_WORKER_PORTS + - name: LOG_LEVEL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: LOG_LEVEL + - name: JOB_KUBE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: SUBMITTER_NUM_THREADS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: SUBMITTER_NUM_THREADS + - name: JOB_MAIN_CONTAINER_CPU_REQUEST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_MAIN_CONTAINER_CPU_REQUEST + - name: JOB_MAIN_CONTAINER_CPU_LIMIT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_MAIN_CONTAINER_CPU_LIMIT + - name: JOB_MAIN_CONTAINER_MEMORY_REQUEST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_MAIN_CONTAINER_MEMORY_REQUEST + - name: JOB_MAIN_CONTAINER_MEMORY_LIMIT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_MAIN_CONTAINER_MEMORY_LIMIT + - name: S3_LOG_BUCKET + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_LOG_BUCKET + - name: S3_LOG_BUCKET_REGION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_LOG_BUCKET_REGION + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: AWS_SECRET_ACCESS_KEY + - name: S3_MINIO_ENDPOINT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_MINIO_ENDPOINT + - name: S3_PATH_STYLE_ACCESS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_PATH_STYLE_ACCESS + - name: GOOGLE_APPLICATION_CREDENTIALS + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: GOOGLE_APPLICATION_CREDENTIALS + - name: GCS_LOG_BUCKET + valueFrom: + configMapKeyRef: + name: airbyte-env + key: GCS_LOG_BUCKET + - name: INTERNAL_API_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: INTERNAL_API_HOST + - name: JOB_KUBE_TOLERATIONS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_TOLERATIONS + - name: JOB_KUBE_NODE_SELECTORS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_NODE_SELECTORS + - name: JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY + # todo: add other state storage keys + - name: STATE_STORAGE_MINIO_BUCKET_NAME + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STATE_STORAGE_MINIO_BUCKET_NAME + - name: STATE_STORAGE_MINIO_ENDPOINT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STATE_STORAGE_MINIO_ENDPOINT + - name: STATE_STORAGE_MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: STATE_STORAGE_MINIO_ACCESS_KEY + - name: STATE_STORAGE_MINIO_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: STATE_STORAGE_MINIO_SECRET_ACCESS_KEY + - name: CONTAINER_ORCHESTRATOR_ENABLED + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONTAINER_ORCHESTRATOR_ENABLED + - name: SHOULD_RUN_GET_SPEC_WORKFLOWS + value: "false" + - name: SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS + value: "false" + - name: SHOULD_RUN_DISCOVER_WORKFLOWS + value: "false" + - name: SHOULD_RUN_SYNC_WORKFLOWS + value: "true" + - name: SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS + value: "false" + volumeMounts: + - name: gcs-log-creds-volume + mountPath: /secrets/gcs-log-creds + readOnly: true + volumes: + - name: gcs-log-creds-volume + secret: + secretName: gcs-log-creds diff --git a/kube/overlays/dev-integration-test-schedulerv2/temporal-ui.yaml b/kube/overlays/dev-integration-test-schedulerv2/temporal-ui.yaml new file mode 100644 index 000000000000..541c3ad2ca12 --- /dev/null +++ b/kube/overlays/dev-integration-test-schedulerv2/temporal-ui.yaml @@ -0,0 +1,37 @@ +apiVersion: v1 +kind: Service +metadata: + name: airbyte-temporal-ui +spec: + type: NodePort + ports: + - port: 8088 + protocol: TCP + selector: + airbyte: temporal-ui +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-temporal-ui +spec: + replicas: 1 + selector: + matchLabels: + airbyte: temporal-ui + template: + metadata: + labels: + airbyte: temporal-ui + spec: + containers: + - name: airbyte-temporal-ui-container + image: temporalio/web:1.13.0 + env: + - name: TEMPORAL_GRPC_ENDPOINT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_HOST + - name: TEMPORAL_PERMIT_WRITE_API + value: "true" diff --git a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml index 8e790435392e..17e02669f730 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml +++ b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml @@ -18,3 +18,5 @@ spec: configMapKeyRef: name: airbyte-env key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL + - name: SHOULD_RUN_SYNC_WORKFLOWS + value: "false" From 366bb049006f3b1769f084800d967424e1130bfc Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:37:48 -0800 Subject: [PATCH 04/20] revert for CI testing that the test fails without the retry policy --- .../temporal/scheduling/shared/ActivityConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index 69cde0603974..c56d1704a9cc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -39,7 +39,7 @@ public class ActivityConfiguration { .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(RETRY_POLICY) + .setRetryOptions(TemporalUtils.NO_RETRY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); From a0ab765161d10c199bf1035bba83d130e5d8a106 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:38:05 -0800 Subject: [PATCH 05/20] revert testing change --- .../temporal/scheduling/shared/ActivityConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index c56d1704a9cc..69cde0603974 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -39,7 +39,7 @@ public class ActivityConfiguration { .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(TemporalUtils.NO_RETRY) + .setRetryOptions(RETRY_POLICY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); From 8de3f2e26ca6f8392cd2e57a47bbd17afe1b6be0 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:49:05 -0800 Subject: [PATCH 06/20] matrix test the different possible cases --- .../test/acceptance/AcceptanceTests.java | 54 ++++++++++++++++--- .../shared/ActivityConfiguration.java | 2 +- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 644669b37e74..e2ec252d3087 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -88,9 +88,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.PostgreSQLContainer; @@ -279,11 +280,12 @@ public void tearDown() throws ApiException, SQLException { } // todo: test with both types of workers getting scaled down, just one of each - @RepeatedTest(1) + @Test @Order(-1800000) + @ValueSource(strings = {"KILL_BOTH_SAME_TIME", "KILL_SYNC_FIRST", "KILL_NON_SYNC_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC"}) @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", matches = "true") - public void testDowntimeDuringSync() throws Exception { + public void testDowntimeDuringSync(String type) throws Exception { final String connectionName = "test-connection"; final UUID sourceId = createPostgresSource().getSourceId(); final UUID destinationId = createDestination().getDestinationId(); @@ -305,11 +307,49 @@ public void testDowntimeDuringSync() throws Exception { // todo: ideally this waits a couple seconds after the orchestrator pod starts Thread.sleep(10000); - LOGGER.info("Scaling down sync workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + switch (type) { + case "KILL_BOTH_SAME_TIME" -> { + LOGGER.info("Scaling down both workers at roughly the same time..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); - LOGGER.info("Scaling up sync workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + LOGGER.info("Scaling up both workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_SYNC_FIRST" -> { + LOGGER.info("Scaling down sync worker first..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + + LOGGER.info("Scaling up both workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_NON_SYNC_FIRST" -> { + LOGGER.info("Scaling down non-sync worker first..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + + LOGGER.info("Scaling up both workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_ONLY_SYNC" -> { + LOGGER.info("Scaling down only sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + + LOGGER.info("Scaling up sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_ONLY_NON_SYNC" -> { + LOGGER.info("Scaling down only non-sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + + LOGGER.info("Scaling up non-sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + } + } waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); assertSourceAndDestinationDbInSync(false); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index 69cde0603974..c56d1704a9cc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -39,7 +39,7 @@ public class ActivityConfiguration { .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(RETRY_POLICY) + .setRetryOptions(TemporalUtils.NO_RETRY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); From 9bf2141fa828db6ce8e6930dde3bd45c38843980 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:49:23 -0800 Subject: [PATCH 07/20] re-enable new retry policy --- .../temporal/scheduling/shared/ActivityConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index c56d1704a9cc..69cde0603974 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -39,7 +39,7 @@ public class ActivityConfiguration { .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(TemporalUtils.NO_RETRY) + .setRetryOptions(RETRY_POLICY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); From 1a3ac0a9158390980523794145218291cb848034 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:49:43 -0800 Subject: [PATCH 08/20] switch to no_retry --- .../temporal/scheduling/shared/ActivityConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index 69cde0603974..c56d1704a9cc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -39,7 +39,7 @@ public class ActivityConfiguration { .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(RETRY_POLICY) + .setRetryOptions(TemporalUtils.NO_RETRY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); From fec31dbdf35a919d65073626662c2d9d342fd1d8 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:49:54 -0800 Subject: [PATCH 09/20] switch back to new retry --- .../temporal/scheduling/shared/ActivityConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index c56d1704a9cc..69cde0603974 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -39,7 +39,7 @@ public class ActivityConfiguration { .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(TemporalUtils.NO_RETRY) + .setRetryOptions(RETRY_POLICY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); From 61fe39c2df2ffd93512944521b16615ac7d28156 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:58:08 -0800 Subject: [PATCH 10/20] paramaterize correctly --- .../io/airbyte/test/acceptance/AcceptanceTests.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index e2ec252d3087..55746d30d3e2 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -88,9 +88,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -280,12 +280,12 @@ public void tearDown() throws ApiException, SQLException { } // todo: test with both types of workers getting scaled down, just one of each - @Test + @ParameterizedTest @Order(-1800000) - @ValueSource(strings = {"KILL_BOTH_SAME_TIME", "KILL_SYNC_FIRST", "KILL_NON_SYNC_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC"}) @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", matches = "true") - public void testDowntimeDuringSync(String type) throws Exception { + @ValueSource(strings = {"KILL_BOTH_SAME_TIME", "KILL_SYNC_FIRST", "KILL_NON_SYNC_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC"}) + public void testDowntimeDuringSync(String input) throws Exception { final String connectionName = "test-connection"; final UUID sourceId = createPostgresSource().getSourceId(); final UUID destinationId = createDestination().getDestinationId(); @@ -307,7 +307,7 @@ public void testDowntimeDuringSync(String type) throws Exception { // todo: ideally this waits a couple seconds after the orchestrator pod starts Thread.sleep(10000); - switch (type) { + switch (input) { case "KILL_BOTH_SAME_TIME" -> { LOGGER.info("Scaling down both workers at roughly the same time..."); kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); From c6de6ebca3ad479bb7e3fc7a41112ec11446ee3a Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 15:58:32 -0800 Subject: [PATCH 11/20] revert to no-retry --- .../temporal/scheduling/shared/ActivityConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index 69cde0603974..c56d1704a9cc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -39,7 +39,7 @@ public class ActivityConfiguration { .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(RETRY_POLICY) + .setRetryOptions(TemporalUtils.NO_RETRY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); From f407393105c8d1f28a73126cc1be5fe37a9f412d Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Tue, 22 Feb 2022 16:27:47 -0800 Subject: [PATCH 12/20] re-enable new retry policy --- .../temporal/scheduling/shared/ActivityConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index c56d1704a9cc..69cde0603974 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -39,7 +39,7 @@ public class ActivityConfiguration { .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(TemporalUtils.NO_RETRY) + .setRetryOptions(RETRY_POLICY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); From 055e2a46c13866361c2dc262ba48b2c6b022bdd6 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Wed, 23 Feb 2022 15:44:48 -0800 Subject: [PATCH 13/20] speed up test + fixees --- .../test/acceptance/AcceptanceTests.java | 139 ++++++++++-------- 1 file changed, 75 insertions(+), 64 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 55746d30d3e2..21510d27fcb9 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -88,10 +88,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.PostgreSQLContainer; @@ -279,88 +278,100 @@ public void tearDown() throws ApiException, SQLException { } } - // todo: test with both types of workers getting scaled down, just one of each - @ParameterizedTest + @Test @Order(-1800000) @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", matches = "true") - @ValueSource(strings = {"KILL_BOTH_SAME_TIME", "KILL_SYNC_FIRST", "KILL_NON_SYNC_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC"}) - public void testDowntimeDuringSync(String input) throws Exception { + public void testDowntimeDuringSync() throws Exception { final String connectionName = "test-connection"; final UUID sourceId = createPostgresSource().getSourceId(); final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); final AirbyteCatalog catalog = discoverSourceSchema(sourceId); final SyncMode syncMode = SyncMode.FULL_REFRESH; final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - // Avoid race condition with the new scheduler - if (featureFlags.usesNewScheduler()) { - waitForTemporalWorkflow(connectionId); - } + for (final var input : List.of("KILL_BOTH_SAME_TIME", "KILL_SYNC_FIRST", "KILL_NON_SYNC_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC")) { + for (int i = 0; i < 3; i++) { + LOGGER.info("Checking " + input + " #" + i); - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + final UUID connectionId = + createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId(); - // todo: ideally this waits a couple seconds after the orchestrator pod starts - Thread.sleep(10000); + // todo: remove after merging master + // Avoid race condition with the new scheduler + if (featureFlags.usesNewScheduler()) { + waitForTemporalWorkflow(connectionId); + } - switch (input) { - case "KILL_BOTH_SAME_TIME" -> { - LOGGER.info("Scaling down both workers at roughly the same time..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + JobInfoRead connectionSyncRead = null; - LOGGER.info("Scaling up both workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); - } - case "KILL_SYNC_FIRST" -> { - LOGGER.info("Scaling down sync worker first..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); - - LOGGER.info("Scaling up both workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); - } - case "KILL_NON_SYNC_FIRST" -> { - LOGGER.info("Scaling down non-sync worker first..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); - - LOGGER.info("Scaling up both workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); - } - case "KILL_ONLY_SYNC" -> { - LOGGER.info("Scaling down only sync worker..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + while (connectionSyncRead == null) { - LOGGER.info("Scaling up sync worker..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); - } - case "KILL_ONLY_NON_SYNC" -> { - LOGGER.info("Scaling down only non-sync worker..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + try { + connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + } catch (Exception e) { + LOGGER.error("retrying after error", e); + } + } - LOGGER.info("Scaling up non-sync worker..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - } - } + Thread.sleep(10000); + + switch (input) { + case "KILL_BOTH_SAME_TIME" -> { + LOGGER.info("Scaling down both workers at roughly the same time..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + + LOGGER.info("Scaling up both workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_SYNC_FIRST" -> { + LOGGER.info("Scaling down sync worker first..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + + LOGGER.info("Scaling up both workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_NON_SYNC_FIRST" -> { + LOGGER.info("Scaling down non-sync worker first..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + + LOGGER.info("Scaling up both workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_ONLY_SYNC" -> { + LOGGER.info("Scaling down only sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + + LOGGER.info("Scaling up sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_ONLY_NON_SYNC" -> { + LOGGER.info("Scaling down only non-sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + + LOGGER.info("Scaling up non-sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + } + } - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - assertSourceAndDestinationDbInSync(false); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - final long numAttempts = apiClient.getJobsApi() - .getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())) - .getAttempts() - .size(); + final long numAttempts = apiClient.getJobsApi() + .getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())) + .getAttempts() + .size(); - // it should be able to accomplish the resume without an additional attempt! - assertEquals(1, numAttempts); + // it should be able to accomplish the resume without an additional attempt! + assertEquals(1, numAttempts); + } + } } private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException { From 070fb127fed35fa3408bc0565eca82db888c994a Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Wed, 23 Feb 2022 16:46:53 -0800 Subject: [PATCH 14/20] significantly speed up test --- .../test/acceptance/AcceptanceTests.java | 100 +++++++----------- 1 file changed, 40 insertions(+), 60 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index ef28bf2c1628..6f73a06cb8e2 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -970,80 +970,60 @@ public void testDowntimeDuringSync() throws Exception { final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - for (final var input : List.of("KILL_BOTH_SAME_TIME", "KILL_SYNC_FIRST", "KILL_NON_SYNC_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC")) { - for (int i = 0; i < 3; i++) { - LOGGER.info("Checking " + input + " #" + i); + for (final var input : List.of("KILL_BOTH_NON_SYNC_SLIGHTLY_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC")) { + LOGGER.info("Checking " + input); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId(); + final UUID connectionId = + createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId(); - JobInfoRead connectionSyncRead = null; + JobInfoRead connectionSyncRead = null; - while (connectionSyncRead == null) { + while (connectionSyncRead == null) { - try { - connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); - } catch (Exception e) { - LOGGER.error("retrying after error", e); - } + try { + connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + } catch (Exception e) { + LOGGER.error("retrying after error", e); } + } - Thread.sleep(10000); + Thread.sleep(10000); - switch (input) { - case "KILL_BOTH_SAME_TIME" -> { - LOGGER.info("Scaling down both workers at roughly the same time..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + switch (input) { + case "KILL_BOTH_NON_SYNC_SLIGHTLY_FIRST" -> { + LOGGER.info("Scaling down both workers at roughly the same time..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); - LOGGER.info("Scaling up both workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); - } - case "KILL_SYNC_FIRST" -> { - LOGGER.info("Scaling down sync worker first..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); - - LOGGER.info("Scaling up both workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); - } - case "KILL_NON_SYNC_FIRST" -> { - LOGGER.info("Scaling down non-sync worker first..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); - - LOGGER.info("Scaling up both workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); - } - case "KILL_ONLY_SYNC" -> { - LOGGER.info("Scaling down only sync worker..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + LOGGER.info("Scaling up both workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_ONLY_SYNC" -> { + LOGGER.info("Scaling down only sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); - LOGGER.info("Scaling up sync worker..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); - } - case "KILL_ONLY_NON_SYNC" -> { - LOGGER.info("Scaling down only non-sync worker..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + LOGGER.info("Scaling up sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_ONLY_NON_SYNC" -> { + LOGGER.info("Scaling down only non-sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); - LOGGER.info("Scaling up non-sync worker..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); - } + LOGGER.info("Scaling up non-sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); } + } - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - final long numAttempts = apiClient.getJobsApi() - .getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())) - .getAttempts() - .size(); + final long numAttempts = apiClient.getJobsApi() + .getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())) + .getAttempts() + .size(); - // it should be able to accomplish the resume without an additional attempt! - assertEquals(1, numAttempts); - } + // it should be able to accomplish the resume without an additional attempt! + assertEquals(1, numAttempts); } } From 0eae99171e07a49611c1ef6ba682c9ca4eabf85f Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Wed, 23 Feb 2022 16:47:18 -0800 Subject: [PATCH 15/20] fix ordering --- .../java/io/airbyte/test/acceptance/AcceptanceTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 6f73a06cb8e2..d904fec044d1 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -958,7 +958,7 @@ public void testFailureTimeout() throws Exception { } @Test - @Order(-1800000) + @Order(18) @EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR", matches = "true") public void testDowntimeDuringSync() throws Exception { From 582f0f9396534aaa42fde4e9c16ef9c9c76a15a8 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Wed, 23 Feb 2022 17:46:11 -0800 Subject: [PATCH 16/20] use multiple task queues in connection manager test --- .../ConnectionManagerWorkflowTest.java | 68 ++++++++++++------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index dc1daef75858..566b1c318935 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -86,7 +86,6 @@ public class ConnectionManagerWorkflowTest { Mockito.mock(JobCreationAndStatusUpdateActivity.class, Mockito.withSettings().withoutAnnotations()); private TestWorkflowEnvironment testEnv; - private Worker worker; private WorkflowClient client; private ConnectionManagerWorkflow workflow; @@ -131,14 +130,16 @@ class AsynchronousWorkflow { @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); - // Register your workflow implementations - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, EmptySyncWorkflow.class); - client = testEnv.getWorkflowClient(); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(EmptySyncWorkflow.class); - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + managerWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); + managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + + client = testEnv.getWorkflowClient(); testEnv.start(); workflow = client @@ -414,14 +415,16 @@ class SynchronousWorkflow { @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); - // Register your workflow implementations - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); - client = testEnv.getWorkflowClient(); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(SleepingSyncWorkflow.class); - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + managerWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); + managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + + client = testEnv.getWorkflowClient(); testEnv.start(); workflow = client @@ -711,10 +714,13 @@ class SyncWorkflowReplicationFailuresRecorded { @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + + final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + managerWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); + managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + client = testEnv.getWorkflowClient(); - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, mGenerateInputActivityImpl, - mJobCreationAndStatusUpdateActivity); workflow = client.newWorkflowStub(ConnectionManagerWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()).build()); @@ -726,7 +732,9 @@ public void setup() { unit = TimeUnit.SECONDS) @DisplayName("Test that source and destination failures are recorded") public void testSourceAndDestinationFailuresRecorded() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SourceAndDestinationFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(SourceAndDestinationFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -753,7 +761,9 @@ public void testSourceAndDestinationFailuresRecorded() { unit = TimeUnit.SECONDS) @DisplayName("Test that normalization failure is recorded") public void testNormalizationFailure() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, NormalizationFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(NormalizationFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -779,7 +789,9 @@ public void testNormalizationFailure() { unit = TimeUnit.SECONDS) @DisplayName("Test that dbt failure is recorded") public void testDbtFailureRecorded() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, DbtFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(DbtFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -805,7 +817,9 @@ public void testDbtFailureRecorded() { unit = TimeUnit.SECONDS) @DisplayName("Test that persistence failure is recorded") public void testPersistenceFailureRecorded() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, PersistFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(PersistFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -831,7 +845,9 @@ public void testPersistenceFailureRecorded() { unit = TimeUnit.SECONDS) @DisplayName("Test that replication worker failure is recorded") public void testReplicationFailureRecorded() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, ReplicateFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(ReplicateFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -861,14 +877,16 @@ class StuckWorkflow { @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); - // Register your workflow implementations - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); - client = testEnv.getWorkflowClient(); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(SleepingSyncWorkflow.class); - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + managerWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); + managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + + client = testEnv.getWorkflowClient(); testEnv.start(); workflow = client @@ -880,7 +898,6 @@ public void setup() { } public static Stream getSetupFailingFailingActivityBeforeRun() { - Thread.currentThread().run(); return Stream.of( Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), @@ -969,7 +986,6 @@ void testCanGetUnstuck() { } public static Stream getSetupFailingFailingActivityAfterRun() { - Thread.currentThread().run(); return Stream.of( Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) From ffda146f22230a79ca3fda9fd6dea3bed4c9fdb9 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Wed, 23 Feb 2022 18:19:51 -0800 Subject: [PATCH 17/20] use versioning for task queue change --- .../scheduling/ConnectionManagerWorkflowImpl.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 5d9bf2cd5e96..400f325ae498 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -54,6 +54,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow public static final long NON_RUNNING_JOB_ID = -1; public static final int NON_RUNNING_ATTEMPT_ID = -1; + private static final int TASK_QUEUE_CHANGE_CURRENT_VERSION = 1; + private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener()); private final WorkflowInternalState workflowInternalState = new WorkflowInternalState(); @@ -453,10 +455,18 @@ private void reportJobStarting() { * make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999 */ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) { + int taskQueueChangeVersion = Workflow.getVersion("task_queue_change", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION); + + String taskQueue = TemporalJobType.SYNC.name(); + + if (taskQueueChangeVersion < TASK_QUEUE_CHANGE_CURRENT_VERSION) { + taskQueue = TemporalJobType.CONNECTION_UPDATER.name(); + } + final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class, ChildWorkflowOptions.newBuilder() .setWorkflowId("sync_" + workflowInternalState.getJobId()) - .setTaskQueue(TemporalJobType.SYNC.name()) + .setTaskQueue(taskQueue) // This will cancel the child workflow when the parent is terminated .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL) .build()); From bb2c303ab01c427038b329366aaa1e08faea8236 Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Wed, 23 Feb 2022 18:20:12 -0800 Subject: [PATCH 18/20] remove sync workflow registration for the connection manager queue --- airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index e0b0a9d089cb..7f1bb97d4f8c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -163,7 +163,7 @@ private void registerConnectionManager(final WorkerFactory factory) { final Worker connectionUpdaterWorker = factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); - connectionUpdaterWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SyncWorkflowImpl.class); + connectionUpdaterWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); connectionUpdaterWorker.registerActivitiesImplementations( new GenerateInputActivityImpl( jobPersistence), From 38553f5f8df7838bebf31940f359a10facf2770b Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Wed, 23 Feb 2022 18:21:29 -0800 Subject: [PATCH 19/20] use more specific example --- .../temporal/scheduling/ConnectionManagerWorkflowImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 400f325ae498..f631fd762c95 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -455,7 +455,8 @@ private void reportJobStarting() { * make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999 */ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) { - int taskQueueChangeVersion = Workflow.getVersion("task_queue_change", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION); + int taskQueueChangeVersion = + Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION); String taskQueue = TemporalJobType.SYNC.name(); From d73c466a94e8dc0b59bb566f7ce5cdda2b1b2a0f Mon Sep 17 00:00:00 2001 From: Jared Rhizor Date: Wed, 23 Feb 2022 21:15:28 -0800 Subject: [PATCH 20/20] respond to parker's comments --- .../main/java/io/airbyte/workers/WorkerApp.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 7f1bb97d4f8c..22a4444445b2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -146,15 +146,14 @@ public void start() { registerDiscover(factory); } - if (configs.shouldRunSyncWorkflows() || configs.shouldRunConnectionManagerWorkflows()) { - if (configs.shouldRunSyncWorkflows()) { - registerSync(factory); - } - - if (configs.shouldRunConnectionManagerWorkflows()) { - registerConnectionManager(factory); - } + if (configs.shouldRunSyncWorkflows()) { + registerSync(factory); } + + if (configs.shouldRunConnectionManagerWorkflows()) { + registerConnectionManager(factory); + } + factory.start(); } @@ -219,7 +218,7 @@ private void registerCheckConnection(final WorkerFactory factory) { jobPersistence, airbyteVersion)); } - public void registerGetSpec(final WorkerFactory factory) { + private void registerGetSpec(final WorkerFactory factory) { final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers())); specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class); specWorker.registerActivitiesImplementations(