Skip to content

Commit

Permalink
Change StreamKey to StreamDescriptor (#13612)
Browse files Browse the repository at this point in the history
* Change StreamKey to StreamDescriptor
  • Loading branch information
alovew authored Jun 9, 2022
1 parent a9c5c4e commit 87515c5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StreamKey.yaml
title: StreamKey
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StreamDescriptor.yaml
title: StreamDescriptor
description: Name and namespace of a stream
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import static org.jooq.impl.DSL.noCondition;

import io.airbyte.config.StreamKey;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.StreamResetRecord;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
Expand Down Expand Up @@ -38,29 +38,29 @@ public StreamResetPersistence(final Database database) {
}

/*
* Get a list of streamKeys for streams that have pending or running resets
* Get a list of streamDescriptors for streams that have pending or running resets
*/
public List<StreamKey> getStreamResets(final UUID connectionId) throws IOException {
public List<StreamDescriptor> getStreamResets(final UUID connectionId) throws IOException {
return database.query(ctx -> ctx.select(DSL.asterisk())
.from(DSL_TABLE_STREAM_RESET))
.where(DSL.field(CONNECTION_ID_COL).eq(connectionId))
.fetch(getStreamResetRecordMapper())
.stream()
.flatMap(row -> Stream.of(new StreamKey().withName(row.streamName()).withNamespace(row.streamNamespace())))
.flatMap(row -> Stream.of(new StreamDescriptor().withName(row.streamName()).withNamespace(row.streamNamespace())))
.toList();
}

/*
* Delete stream resets for a given connection. This is called to delete stream reset records for
* resets that are successfully completed.
*/
public void deleteStreamResets(final UUID connectionId, final List<StreamKey> streamsToDelete) throws IOException {
public void deleteStreamResets(final UUID connectionId, final List<StreamDescriptor> streamsToDelete) throws IOException {
final Condition condition = noCondition();
for (final StreamKey streamKey : streamsToDelete) {
for (final StreamDescriptor streamDescriptor : streamsToDelete) {
condition.or(
DSL.field(CONNECTION_ID_COL).eq(connectionId)
.and(DSL.field(STREAM_NAME_COL).eq(streamKey.getName()))
.and(DSL.field(STREAM_NAMESPACE_COL).eq(streamKey.getNamespace())));
.and(DSL.field(STREAM_NAME_COL).eq(streamDescriptor.getName()))
.and(DSL.field(STREAM_NAMESPACE_COL).eq(streamDescriptor.getNamespace())));
}

database.query(ctx -> ctx.deleteFrom(DSL_TABLE_STREAM_RESET)).where(condition).execute();
Expand All @@ -70,15 +70,15 @@ public void deleteStreamResets(final UUID connectionId, final List<StreamKey> st
* Create stream resets for a given connection. This is called to create stream reset records for
* resets that are going to be run.
*/
public void createStreamResets(final UUID connectionId, final List<StreamKey> streamsToCreate) throws IOException {
for (final StreamKey streamKey : streamsToCreate) {
public void createStreamResets(final UUID connectionId, final List<StreamDescriptor> streamsToCreate) throws IOException {
for (final StreamDescriptor streamDescriptor : streamsToCreate) {
final OffsetDateTime timestamp = OffsetDateTime.now();

database.query(ctx -> ctx.insertInto(DSL_TABLE_STREAM_RESET)
.set(DSL.field(ID_COL), UUID.randomUUID())
.set(DSL.field(CONNECTION_ID_COL), connectionId)
.set(DSL.field(STREAM_NAME_COL), streamKey.getName())
.set(DSL.field(STREAM_NAMESPACE_COL), streamKey.getNamespace())
.set(DSL.field(STREAM_NAME_COL), streamDescriptor.getName())
.set(DSL.field(STREAM_NAMESPACE_COL), streamDescriptor.getNamespace())
.set(DSL.field(CREATED_AT_COL), timestamp)
.set(DSL.field(UPDATED_AT_COL), timestamp)).execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;

import io.airbyte.config.StreamKey;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.FlywayFactory;
Expand Down Expand Up @@ -52,23 +52,25 @@ void tearDown() throws Exception {

@Test
void testCreateAndGetAndDeleteStreamResets() throws Exception {
final List<StreamKey> streamResetList = new ArrayList<>();
final StreamKey streamKey1 = new StreamKey().withName("stream_name_1").withNamespace("stream_namespace_1");
final StreamKey streamKey2 = new StreamKey().withName("stream_name_2");
streamResetList.add(streamKey1);
streamResetList.add(streamKey2);
final List<StreamDescriptor> streamResetList = new ArrayList<>();
final StreamDescriptor streamDescriptor1 = new StreamDescriptor().withName("stream_name_1").withNamespace("stream_namespace_1");
final StreamDescriptor streamDescriptor2 = new StreamDescriptor().withName("stream_name_2");
streamResetList.add(streamDescriptor1);
streamResetList.add(streamDescriptor2);
final UUID uuid = UUID.randomUUID();
streamResetPersistence.createStreamResets(uuid, streamResetList);

final List<StreamKey> result = streamResetPersistence.getStreamResets(uuid);
final List<StreamDescriptor> result = streamResetPersistence.getStreamResets(uuid);
assertEquals(2, result.size());
assertTrue(
result.stream().anyMatch(streamKey -> streamKey.getName().equals("stream_name_1") && streamKey.getNamespace().equals("stream_namespace_1")));
assertTrue(result.stream().anyMatch(streamKey -> streamKey.getName().equals("stream_name_2") && streamKey.getNamespace() == null));
result.stream().anyMatch(
streamDescriptor -> streamDescriptor.getName().equals("stream_name_1") && streamDescriptor.getNamespace().equals("stream_namespace_1")));
assertTrue(
result.stream().anyMatch(streamDescriptor -> streamDescriptor.getName().equals("stream_name_2") && streamDescriptor.getNamespace() == null));

streamResetPersistence.deleteStreamResets(uuid, result);

final List<StreamKey> resultAfterDeleting = streamResetPersistence.getStreamResets(uuid);
final List<StreamDescriptor> resultAfterDeleting = streamResetPersistence.getStreamResets(uuid);
assertEquals(0, resultAfterDeleting.size());
}

Expand Down

0 comments on commit 87515c5

Please sign in to comment.