Skip to content

Commit

Permalink
Migrate StateDB to support per stream states (#13731)
Browse files Browse the repository at this point in the history
* Update StateDB to support per Stream states.
* Add `StateType` type
* Add `steam_name`, `namespace` and `type` to `state` table.
* Set the default StateType to LEGACY
  • Loading branch information
gosusnp authored Jun 14, 2022
1 parent 7ce8b49 commit a600f6a
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void testBootloaderAppBlankDb() throws Exception {
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
assertEquals("0.39.1.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.39.17.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.Catalog;
import org.jooq.DSLContext;
import org.jooq.EnumType;
import org.jooq.Schema;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.jooq.impl.SchemaImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_39_17_001__AddStreamDescriptorsToStateTable extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_39_17_001__AddStreamDescriptorsToStateTable.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());

migrate(ctx);
}

@VisibleForTesting
public static void migrate(final DSLContext ctx) {
createStateTypeEnum(ctx);
addStreamDescriptorFieldsToStateTable(ctx);
}

private static void createStateTypeEnum(final DSLContext ctx) {
ctx.createType(StateType.NAME)
.asEnum(Arrays.stream(StateType.values()).map(StateType::getLiteral).toList())
.execute();
}

private static void addStreamDescriptorFieldsToStateTable(final DSLContext ctx) {
final String STATE_TABLE = "state";

ctx.alterTable(STATE_TABLE)
.add(Arrays.asList(
DSL.field("stream_name", SQLDataType.CLOB.nullable(true)),
DSL.field("namespace", SQLDataType.CLOB.nullable(true)),
// type defaults to LEGACY to first set the expected type of all existing states
DSL.field("type", SQLDataType.VARCHAR.asEnumDataType(StateType.class).nullable(false).defaultValue(StateType.LEGACY)),
DSL.constraint("state__connection_id__stream_name__namespace__uq")
.unique(DSL.field("connection_id"), DSL.field("stream_name"), DSL.field("namespace"))))
.execute();
}

public enum StateType implements EnumType {

GLOBAL("GLOBAL"),
STREAM("STREAM"),
LEGACY("LEGACY");

public static final String NAME = "state_type";

StateType(String literal) {
this.literal = literal;
}

@Override
public String getLiteral() {
return literal;
}

@Override
public Catalog getCatalog() {
return getSchema().getCatalog();
}

@Override
public Schema getSchema() {
return new SchemaImpl(DSL.name("public"));
}

@Override
public String getName() {
return NAME;
}

private final String literal;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ create table "public"."state"(
"state" jsonb null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
"stream_name" text null,
"namespace" text null,
"type" state_type not null default null,
constraint "state_pkey"
primary key (
"id",
Expand Down Expand Up @@ -276,6 +279,11 @@ create unique index "connection_operation_pkey" on "public"."connection_operatio
"operation_id" asc
);
create unique index "operation_pkey" on "public"."operation"("id" asc);
create unique index "state__connection_id__stream_name__namespace__uq" on "public"."state"(
"connection_id" asc,
"stream_name" asc,
"namespace" asc
);
create unique index "state_pkey" on "public"."state"(
"id" asc,
"connection_id" asc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType;
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.NamespaceDefinitionType;
import io.airbyte.db.instance.configs.migrations.V0_39_17_001__AddStreamDescriptorsToStateTable.StateType;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import java.util.UUID;
import org.flywaydb.core.Flyway;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class V0_39_17_001__AddStreamDescriptorsToStateTableTest extends AbstractConfigsDatabaseTest {

private final String STATE_TABLE = "State";

private UUID connection1;
private UUID connection2;

@Test
public void testSimpleMigration() {
final DSLContext context = getDslContext();

// Adding a couple of states
context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"))
.values(UUID.randomUUID(), connection1)
.values(UUID.randomUUID(), connection2)
.execute();

// Preconditions check: we should have one row in state
Assertions.assertEquals(2, context.select().from(STATE_TABLE).execute());

// Applying the migration
devConfigsDbMigrator.migrate();

final UUID newState = UUID.randomUUID();
context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("stream_name"))
.values(newState, connection1, "new_stream")
.execute();

System.out.println(context.selectFrom("connection").fetch());
System.out.println(context.selectFrom(STATE_TABLE).fetch());

// Our two initial rows and the new row should be LEGACY
Assertions.assertEquals(3,
context.select()
.from(STATE_TABLE)
.where(DSL.field("type").equal(StateType.LEGACY))
.execute());

// There should be no STREAM or GLOBAL
Assertions.assertEquals(0,
context.select()
.from(STATE_TABLE)
.where(DSL.field("type").in(StateType.GLOBAL, StateType.STREAM))
.execute());
}

@Test
public void testUniquenessConstraint() {
devConfigsDbMigrator.migrate();

final DSLContext context = getDslContext();
context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("type"),
DSL.field("stream_name"),
DSL.field("namespace"))
.values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns2")
.execute();

context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("type"),
DSL.field("stream_name"),
DSL.field("namespace"))
.values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns1")
.execute();

context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("type"),
DSL.field("stream_name"),
DSL.field("namespace"))
.values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream2", "ns2")
.execute();

Assertions.assertThrows(DataAccessException.class, () -> {
context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("type"),
DSL.field("stream_name"),
DSL.field("namespace"))
.values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns2")
.execute();
});
}

@BeforeEach
public void beforeEach() {
Flyway flyway = FlywayFactory.create(dataSource, "V0_39_17_001__AddStreamDescriptorsToStateTableTest", ConfigsDatabaseMigrator.DB_IDENTIFIER,
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
ConfigsDatabaseMigrator configsDbMigrator = new ConfigsDatabaseMigrator(database, flyway);
devConfigsDbMigrator = new DevDatabaseMigrator(configsDbMigrator);

devConfigsDbMigrator.createBaseline();
injectMockData();
}

@AfterEach
public void afterEach() {
// Making sure we reset between tests
dslContext.dropSchemaIfExists("public").cascade().execute();
dslContext.createSchema("public").execute();
dslContext.setSchema("public").execute();
}

private void injectMockData() {
final DSLContext context = getDslContext();

UUID workspaceId = UUID.randomUUID();
UUID actorId = UUID.randomUUID();
UUID actorDefinitionId = UUID.randomUUID();
connection1 = UUID.randomUUID();
connection2 = UUID.randomUUID();

context.insertInto(DSL.table("workspace"))
.columns(
DSL.field("id"),
DSL.field("name"),
DSL.field("slug"),
DSL.field("initial_setup_complete"))
.values(
workspaceId,
"base workspace",
"base_workspace",
true)
.execute();
context.insertInto(DSL.table("actor_definition"))
.columns(
DSL.field("id"),
DSL.field("name"),
DSL.field("docker_repository"),
DSL.field("docker_image_tag"),
DSL.field("actor_type"),
DSL.field("spec"))
.values(
actorDefinitionId,
"Jenkins",
"farosai/airbyte-jenkins-source",
"0.1.23",
ActorType.source,
JSONB.valueOf("{}"))
.execute();
context.insertInto(DSL.table("actor"))
.columns(
DSL.field("id"),
DSL.field("workspace_id"),
DSL.field("actor_definition_id"),
DSL.field("name"),
DSL.field("configuration"),
DSL.field("actor_type"))
.values(
actorId,
workspaceId,
actorDefinitionId,
"ActorName",
JSONB.valueOf("{}"),
ActorType.source)
.execute();

insertConnection(context, connection1, actorId);
insertConnection(context, connection2, actorId);
}

private void insertConnection(final DSLContext context, final UUID connectionId, final UUID actorId) {
context.insertInto(DSL.table("connection"))
.columns(
DSL.field("id"),
DSL.field("namespace_definition"),
DSL.field("source_id"),
DSL.field("destination_id"),
DSL.field("name"),
DSL.field("catalog"),
DSL.field("manual"))
.values(
connectionId,
NamespaceDefinitionType.source,
actorId,
actorId,
"Connection" + connectionId.toString(),
JSONB.valueOf("{}"),
true)
.execute();
}

private DevDatabaseMigrator devConfigsDbMigrator;

}

0 comments on commit a600f6a

Please sign in to comment.