From 7b57968cc110cd8d9294dcd39c437a20d4b61f8d Mon Sep 17 00:00:00 2001 From: Michael Siega <109092231+mfsiega-airbyte@users.noreply.github.com> Date: Mon, 23 Jan 2023 21:18:58 +0100 Subject: [PATCH] fix query to get most recent actor catalog fetch event for some sources (#21726) * fix query to get most recent actor catalog fetch event for some sources * fix format * fix pmd * handle case where we fetch sources for an empty list --- .../config/persistence/ConfigRepository.java | 15 ++++--- .../ConfigRepositoryE2EReadWriteTest.java | 41 +++++++++++++++++++ .../airbyte/config/persistence/MockData.java | 23 +++++++---- 3 files changed, 66 insertions(+), 13 deletions(-) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 6865993a0126..19f810f6d6cd 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -1348,19 +1348,22 @@ public Optional getMostRecentActorCatalogFetchEventForSo return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent); } - // todo (cgardens) - following up on why this arg is not used in this comment: - // https://github.com/airbytehq/airbyte/pull/18125/files#r1027377700 @SuppressWarnings({"unused", "SqlNoDataSourceInspection"}) public Map getMostRecentActorCatalogFetchEventForSources(final List sourceIds) throws IOException { // noinspection SqlResolve + if (sourceIds.isEmpty()) { + return Collections.emptyMap(); + } return database.query(ctx -> ctx.fetch( """ - select actor_catalog_id, actor_id, created_at from - (select actor_catalog_id, actor_id, created_at, rank() over (partition by actor_id order by created_at desc) as creation_order_rank + select distinct actor_catalog_id, actor_id, created_at from + (select actor_catalog_id, actor_id, created_at, row_number() over (partition by actor_id order by created_at desc) as creation_order_row_number from public.actor_catalog_fetch_event + where actor_id in ({0}) ) table_with_rank - where creation_order_rank = 1; - """)) + where creation_order_row_number = 1; + """, + DSL.list(sourceIds.stream().map(DSL::value).collect(Collectors.toList())))) .stream().map(DbConverter::buildActorCatalogFetchEvent) .collect(Collectors.toMap(ActorCatalogFetchEvent::getActorId, record -> record)); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index d7cbf6f56c45..ba3d6be057e8 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -564,6 +564,13 @@ void testGetMostRecentActorCatalogFetchEventForSource() throws SQLException, IOE fetchEvent2.getActorId(), fetchEvent2.getActorCatalogId(), now); + // Insert a second identical copy to verify that the query can handle duplicates since the records + // are not guaranteed to be unique. + insertCatalogFetchEvent( + ctx, + fetchEvent2.getActorId(), + fetchEvent2.getActorCatalogId(), + now); return null; }); @@ -590,6 +597,40 @@ void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IO return null; }); + final Map result = + configRepository.getMostRecentActorCatalogFetchEventForSources(List.of(MockData.SOURCE_ID_1, + MockData.SOURCE_ID_2)); + + assertEquals(MockData.ACTOR_CATALOG_ID_1, result.get(MockData.SOURCE_ID_1).getActorCatalogId()); + assertEquals(MockData.ACTOR_CATALOG_ID_3, result.get(MockData.SOURCE_ID_2).getActorCatalogId()); + assertEquals(0, configRepository.getMostRecentActorCatalogFetchEventForSources(Collections.emptyList()).size()); + } + + @Test + void testGetMostRecentActorCatalogFetchEventWithDuplicates() throws SQLException, IOException { + // Tests that we can handle two fetch events in the db with the same actor id, actor catalog id, and + // timestamp e.g., from duplicate discoveries. + for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) { + writeActorCatalog(database, Collections.singletonList(actorCatalog)); + } + + database.transaction(ctx -> { + // Insert the fetch events twice. + MockData.actorCatalogFetchEventsForAggregationTest().forEach(actorCatalogFetchEvent -> { + insertCatalogFetchEvent( + ctx, + actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorId(), + actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorCatalogId(), + actorCatalogFetchEvent.getCreatedAt()); + insertCatalogFetchEvent( + ctx, + actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorId(), + actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorCatalogId(), + actorCatalogFetchEvent.getCreatedAt()); + }); + return null; + }); + final Map result = configRepository.getMostRecentActorCatalogFetchEventForSources(List.of(MockData.SOURCE_ID_1, MockData.SOURCE_ID_2)); diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java index a6117a08af06..80d2be2d1a4a 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/MockData.java @@ -140,6 +140,8 @@ public class MockData { private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID(); private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url"; private static final String WEBHOOK_OPERATION_EXECUTION_BODY = "test-webhook-body"; + public static final String CONFIG_HASH = "1394"; + public static final String CONNECTOR_VERSION = "1.2.0"; public static List standardWorkspaces() { final Notification notification = new Notification() @@ -656,8 +658,8 @@ public static List actorCatalogFetchEventsSameSource() { .withId(ACTOR_CATALOG_FETCH_EVENT_ID_2) .withActorCatalogId(ACTOR_CATALOG_ID_2) .withActorId(SOURCE_ID_1) - .withConfigHash("1394") - .withConnectorVersion("1.2.0"); + .withConfigHash(CONFIG_HASH) + .withConnectorVersion(CONNECTOR_VERSION); return Arrays.asList(actorCatalogFetchEvent1, actorCatalogFetchEvent2); } @@ -683,18 +685,25 @@ public static List actorCatalogFetchEven .withId(ACTOR_CATALOG_FETCH_EVENT_ID_2) .withActorCatalogId(ACTOR_CATALOG_ID_2) .withActorId(SOURCE_ID_2) - .withConfigHash("1394") - .withConnectorVersion("1.2.0"); + .withConfigHash(CONFIG_HASH) + .withConnectorVersion(CONNECTOR_VERSION); final ActorCatalogFetchEvent actorCatalogFetchEvent3 = new ActorCatalogFetchEvent() .withId(ACTOR_CATALOG_FETCH_EVENT_ID_3) .withActorCatalogId(ACTOR_CATALOG_ID_3) .withActorId(SOURCE_ID_2) - .withConfigHash("1394") - .withConnectorVersion("1.2.0"); + .withConfigHash(CONFIG_HASH) + .withConnectorVersion(CONNECTOR_VERSION); + final ActorCatalogFetchEvent actorCatalogFetchEvent4 = new ActorCatalogFetchEvent() + .withId(ACTOR_CATALOG_FETCH_EVENT_ID_3) + .withActorCatalogId(ACTOR_CATALOG_ID_3) + .withActorId(SOURCE_ID_3) + .withConfigHash(CONFIG_HASH) + .withConnectorVersion(CONNECTOR_VERSION); return Arrays.asList( new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent1, now), new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent2, yesterday), - new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent3, now)); + new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent3, now), + new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent4, now)); } public static List workspaceServiceAccounts() {