Skip to content

Commit

Permalink
fix query to get most recent actor catalog fetch event for some sourc…
Browse files Browse the repository at this point in the history
…es (#21726)

* fix query to get most recent actor catalog fetch event for some sources

* fix format

* fix pmd

* handle case where we fetch sources for an empty list
  • Loading branch information
mfsiega-airbyte authored Jan 23, 2023
1 parent eb2d980 commit 7b57968
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1348,19 +1348,22 @@ public Optional<ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSo
return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent);
}

// todo (cgardens) - following up on why this arg is not used in this comment:
// https://github.com/airbytehq/airbyte/pull/18125/files#r1027377700
@SuppressWarnings({"unused", "SqlNoDataSourceInspection"})
public Map<UUID, ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSources(final List<UUID> sourceIds) throws IOException {
// noinspection SqlResolve
if (sourceIds.isEmpty()) {
return Collections.emptyMap();
}
return database.query(ctx -> ctx.fetch(
"""
select actor_catalog_id, actor_id, created_at from
(select actor_catalog_id, actor_id, created_at, rank() over (partition by actor_id order by created_at desc) as creation_order_rank
select distinct actor_catalog_id, actor_id, created_at from
(select actor_catalog_id, actor_id, created_at, row_number() over (partition by actor_id order by created_at desc) as creation_order_row_number
from public.actor_catalog_fetch_event
where actor_id in ({0})
) table_with_rank
where creation_order_rank = 1;
"""))
where creation_order_row_number = 1;
""",
DSL.list(sourceIds.stream().map(DSL::value).collect(Collectors.toList()))))
.stream().map(DbConverter::buildActorCatalogFetchEvent)
.collect(Collectors.toMap(ActorCatalogFetchEvent::getActorId, record -> record));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,13 @@ void testGetMostRecentActorCatalogFetchEventForSource() throws SQLException, IOE
fetchEvent2.getActorId(),
fetchEvent2.getActorCatalogId(),
now);
// Insert a second identical copy to verify that the query can handle duplicates since the records
// are not guaranteed to be unique.
insertCatalogFetchEvent(
ctx,
fetchEvent2.getActorId(),
fetchEvent2.getActorCatalogId(),
now);

return null;
});
Expand All @@ -590,6 +597,40 @@ void testGetMostRecentActorCatalogFetchEventForSources() throws SQLException, IO
return null;
});

final Map<UUID, ActorCatalogFetchEvent> result =
configRepository.getMostRecentActorCatalogFetchEventForSources(List.of(MockData.SOURCE_ID_1,
MockData.SOURCE_ID_2));

assertEquals(MockData.ACTOR_CATALOG_ID_1, result.get(MockData.SOURCE_ID_1).getActorCatalogId());
assertEquals(MockData.ACTOR_CATALOG_ID_3, result.get(MockData.SOURCE_ID_2).getActorCatalogId());
assertEquals(0, configRepository.getMostRecentActorCatalogFetchEventForSources(Collections.emptyList()).size());
}

@Test
void testGetMostRecentActorCatalogFetchEventWithDuplicates() throws SQLException, IOException {
// Tests that we can handle two fetch events in the db with the same actor id, actor catalog id, and
// timestamp e.g., from duplicate discoveries.
for (final ActorCatalog actorCatalog : MockData.actorCatalogs()) {
writeActorCatalog(database, Collections.singletonList(actorCatalog));
}

database.transaction(ctx -> {
// Insert the fetch events twice.
MockData.actorCatalogFetchEventsForAggregationTest().forEach(actorCatalogFetchEvent -> {
insertCatalogFetchEvent(
ctx,
actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorId(),
actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorCatalogId(),
actorCatalogFetchEvent.getCreatedAt());
insertCatalogFetchEvent(
ctx,
actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorId(),
actorCatalogFetchEvent.getActorCatalogFetchEvent().getActorCatalogId(),
actorCatalogFetchEvent.getCreatedAt());
});
return null;
});

final Map<UUID, ActorCatalogFetchEvent> result =
configRepository.getMostRecentActorCatalogFetchEventForSources(List.of(MockData.SOURCE_ID_1,
MockData.SOURCE_ID_2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public class MockData {
private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID();
private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url";
private static final String WEBHOOK_OPERATION_EXECUTION_BODY = "test-webhook-body";
public static final String CONFIG_HASH = "1394";
public static final String CONNECTOR_VERSION = "1.2.0";

public static List<StandardWorkspace> standardWorkspaces() {
final Notification notification = new Notification()
Expand Down Expand Up @@ -656,8 +658,8 @@ public static List<ActorCatalogFetchEvent> actorCatalogFetchEventsSameSource() {
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_2)
.withActorCatalogId(ACTOR_CATALOG_ID_2)
.withActorId(SOURCE_ID_1)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
.withConfigHash(CONFIG_HASH)
.withConnectorVersion(CONNECTOR_VERSION);
return Arrays.asList(actorCatalogFetchEvent1, actorCatalogFetchEvent2);
}

Expand All @@ -683,18 +685,25 @@ public static List<ActorCatalogFetchEventWithCreationDate> actorCatalogFetchEven
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_2)
.withActorCatalogId(ACTOR_CATALOG_ID_2)
.withActorId(SOURCE_ID_2)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
.withConfigHash(CONFIG_HASH)
.withConnectorVersion(CONNECTOR_VERSION);
final ActorCatalogFetchEvent actorCatalogFetchEvent3 = new ActorCatalogFetchEvent()
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_3)
.withActorCatalogId(ACTOR_CATALOG_ID_3)
.withActorId(SOURCE_ID_2)
.withConfigHash("1394")
.withConnectorVersion("1.2.0");
.withConfigHash(CONFIG_HASH)
.withConnectorVersion(CONNECTOR_VERSION);
final ActorCatalogFetchEvent actorCatalogFetchEvent4 = new ActorCatalogFetchEvent()
.withId(ACTOR_CATALOG_FETCH_EVENT_ID_3)
.withActorCatalogId(ACTOR_CATALOG_ID_3)
.withActorId(SOURCE_ID_3)
.withConfigHash(CONFIG_HASH)
.withConnectorVersion(CONNECTOR_VERSION);
return Arrays.asList(
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent1, now),
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent2, yesterday),
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent3, now));
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent3, now),
new ActorCatalogFetchEventWithCreationDate(actorCatalogFetchEvent4, now));
}

public static List<WorkspaceServiceAccount> workspaceServiceAccounts() {
Expand Down

0 comments on commit 7b57968

Please sign in to comment.