diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivity.java index aa283703b844..3da403ea19ed 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivity.java @@ -4,6 +4,9 @@ package io.airbyte.workers.temporal.sync; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.validation.json.JsonValidationException; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import java.io.IOException; @@ -15,4 +18,6 @@ public interface RefreshSchemaActivity { @ActivityMethod boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException; + public void refreshSchema(UUID sourceCatalogId) throws JsonValidationException, ConfigNotFoundException, IOException, ApiException; + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java index 9def0b60ce1a..bc12f37d0ff6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java @@ -7,6 +7,9 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; import datadog.trace.api.Trace; +import io.airbyte.api.client.generated.SourceApi; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.persistence.ConfigRepository; import java.io.IOException; @@ -18,8 +21,11 @@ public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { private final Optional configRepository; - public RefreshSchemaActivityImpl(Optional configRepository) { + private final SourceApi sourceApi; + + public RefreshSchemaActivityImpl(Optional configRepository, SourceApi sourceApi) { this.configRepository = configRepository; + this.sourceApi = sourceApi; } @Override @@ -33,6 +39,13 @@ public boolean shouldRefreshSchema(UUID sourceCatalogId) throws IOException { return !schemaRefreshRanRecently(sourceCatalogId); } + @Override + public void refreshSchema(UUID sourceCatalogId) throws ApiException { + SourceDiscoverSchemaRequestBody requestBody = + new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true); + sourceApi.discoverSchemaForSource(requestBody); + } + private boolean schemaRefreshRanRecently(UUID sourceCatalogId) throws IOException { Optional mostRecentFetchEvent = configRepository.get().getMostRecentActorCatalogFetchEventForSource(sourceCatalogId); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java index d7bbdb739761..26086b1dd0d9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java @@ -5,8 +5,13 @@ package io.airbyte.workers.temporal.scheduling.activities; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.airbyte.api.client.generated.SourceApi; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.config.ActorCatalogFetchEvent; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl; @@ -24,6 +29,9 @@ class RefreshSchemaActivityTest { static private ConfigRepository mConfigRepository; + + static private SourceApi mSourceApi; + static private RefreshSchemaActivityImpl refreshSchemaActivity; static private final UUID SOURCE_ID = UUID.randomUUID(); @@ -31,7 +39,8 @@ class RefreshSchemaActivityTest { @BeforeEach void setUp() { mConfigRepository = mock(ConfigRepository.class); - refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository)); + mSourceApi = mock(SourceApi.class); + refreshSchemaActivity = new RefreshSchemaActivityImpl(Optional.of(mConfigRepository), mSourceApi); } @Test @@ -56,4 +65,13 @@ void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); } + @Test + void testRefreshSchema() throws ApiException { + UUID sourceId = UUID.randomUUID(); + refreshSchemaActivity.refreshSchema(sourceId); + SourceDiscoverSchemaRequestBody requestBody = + new SourceDiscoverSchemaRequestBody().sourceId(sourceId).disableCache(true); + verify(mSourceApi, times(1)).discoverSchemaForSource(requestBody); + } + }