Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate StateDB to support per stream states #13731

Merged
merged 5 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void testBootloaderAppBlankDb() throws Exception {
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
assertEquals("0.39.1.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.39.17.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.Catalog;
import org.jooq.DSLContext;
import org.jooq.EnumType;
import org.jooq.Schema;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.jooq.impl.SchemaImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_39_17_001__AddStreamDescriptorsToStateTable extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_39_17_001__AddStreamDescriptorsToStateTable.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());

migrate(ctx);
}

@VisibleForTesting
public static void migrate(final DSLContext ctx) {
createStateTypeEnum(ctx);
addStreamDescriptorFieldsToStateTable(ctx);
}

private static void createStateTypeEnum(final DSLContext ctx) {
ctx.createType(StateType.NAME)
.asEnum(Arrays.stream(StateType.values()).map(StateType::getLiteral).toList())
.execute();
}

private static void addStreamDescriptorFieldsToStateTable(final DSLContext ctx) {
final String STATE_TABLE = "state";

ctx.alterTable(STATE_TABLE)
.add(Arrays.asList(
DSL.field("stream_name", SQLDataType.CLOB.nullable(true)),
DSL.field("namespace", SQLDataType.CLOB.nullable(true)),
// type defaults to LEGACY to first set the expected type of all existing states
DSL.field("type", SQLDataType.VARCHAR.asEnumDataType(StateType.class).defaultValue(StateType.LEGACY)),
DSL.constraint("state__connection_id__stream_name__namespace__uq")
.unique(DSL.field("connection_id"), DSL.field("stream_name"), DSL.field("namespace"))))
.execute();
}

public enum StateType implements EnumType {

GLOBAL("GLOBAL"),
STREAM("STREAM"),
LEGACY("LEGACY");

public static final String NAME = "state_type";

StateType(String literal) {
this.literal = literal;
}

@Override
public String getLiteral() {
return literal;
}

@Override
public Catalog getCatalog() {
return getSchema().getCatalog();
}

@Override
public Schema getSchema() {
return new SchemaImpl(DSL.name("public"));
}

@Override
public String getName() {
return NAME;
}

private final String literal;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ create table "public"."state"(
"state" jsonb null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
"stream_name" text null,
"namespace" text null,
"type" state_type null default null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be not null? might just be an idiosyncrasy of the dump and an actual problem though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I had a default, added a not nullable constraint.

constraint "state_pkey"
primary key (
"id",
Expand Down Expand Up @@ -276,6 +279,11 @@ create unique index "connection_operation_pkey" on "public"."connection_operatio
"operation_id" asc
);
create unique index "operation_pkey" on "public"."operation"("id" asc);
create unique index "state__connection_id__stream_name__namespace__uq" on "public"."state"(
"connection_id" asc,
"stream_name" asc,
"namespace" asc
);
create unique index "state_pkey" on "public"."state"(
"id" asc,
"connection_id" asc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType;
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.NamespaceDefinitionType;
import io.airbyte.db.instance.configs.migrations.V0_39_17_001__AddStreamDescriptorsToStateTable.StateType;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import java.util.UUID;
import org.flywaydb.core.Flyway;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class V0_39_17_001__AddStreamDescriptorsToStateTableTest extends AbstractConfigsDatabaseTest {

private final String STATE_TABLE = "State";

private UUID connection1;
private UUID connection2;

@Test
public void testSimpleMigration() {
final DSLContext context = getDslContext();

// Adding a couple of states
context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"))
.values(UUID.randomUUID(), connection1)
.values(UUID.randomUUID(), connection2)
.execute();

// Preconditions check: we should have one row in state
Assertions.assertEquals(2, context.select().from(STATE_TABLE).execute());

// Applying the migration
devConfigsDbMigrator.migrate();

final UUID newState = UUID.randomUUID();
context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("stream_name"))
.values(newState, connection1, "new_stream")
.execute();

System.out.println(context.selectFrom("connection").fetch());
System.out.println(context.selectFrom(STATE_TABLE).fetch());

// Our two initial rows and the new row should be LEGACY
Assertions.assertEquals(3,
context.select()
.from(STATE_TABLE)
.where(DSL.field("type").equal(StateType.LEGACY))
.execute());

// There should be no STREAM or GLOBAL
Assertions.assertEquals(0,
context.select()
.from(STATE_TABLE)
.where(DSL.field("type").in(StateType.GLOBAL, StateType.STREAM))
.execute());
}

@Test
public void testUniquenessConstraint() {
devConfigsDbMigrator.migrate();

final DSLContext context = getDslContext();
context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("type"),
DSL.field("stream_name"),
DSL.field("namespace"))
.values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns2")
.execute();

context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("type"),
DSL.field("stream_name"),
DSL.field("namespace"))
.values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns1")
.execute();

context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("type"),
DSL.field("stream_name"),
DSL.field("namespace"))
.values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream2", "ns2")
.execute();

Assertions.assertThrows(DataAccessException.class, () -> {
context.insertInto(DSL.table(STATE_TABLE))
.columns(
DSL.field("id"),
DSL.field("connection_id"),
DSL.field("type"),
DSL.field("stream_name"),
DSL.field("namespace"))
.values(UUID.randomUUID(), connection1, StateType.GLOBAL, "stream1", "ns2")
.execute();
});
}

@BeforeEach
public void beforeEach() {
Flyway flyway = FlywayFactory.create(dataSource, "V0_39_17_001__AddStreamDescriptorsToStateTableTest", ConfigsDatabaseMigrator.DB_IDENTIFIER,
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
ConfigsDatabaseMigrator configsDbMigrator = new ConfigsDatabaseMigrator(database, flyway);
devConfigsDbMigrator = new DevDatabaseMigrator(configsDbMigrator);

devConfigsDbMigrator.createBaseline();
injectMockData();
}

@AfterEach
public void afterEach() {
// Making sure we reset between tests
dslContext.dropSchemaIfExists("public").cascade().execute();
dslContext.createSchema("public").execute();
dslContext.setSchema("public").execute();
}

private void injectMockData() {
final DSLContext context = getDslContext();

UUID workspaceId = UUID.randomUUID();
UUID actorId = UUID.randomUUID();
UUID actorDefinitionId = UUID.randomUUID();
connection1 = UUID.randomUUID();
connection2 = UUID.randomUUID();

context.insertInto(DSL.table("workspace"))
.columns(
DSL.field("id"),
DSL.field("name"),
DSL.field("slug"),
DSL.field("initial_setup_complete"))
.values(
workspaceId,
"base workspace",
"base_workspace",
true)
.execute();
context.insertInto(DSL.table("actor_definition"))
.columns(
DSL.field("id"),
DSL.field("name"),
DSL.field("docker_repository"),
DSL.field("docker_image_tag"),
DSL.field("actor_type"),
DSL.field("spec"))
.values(
actorDefinitionId,
"Jenkins",
"farosai/airbyte-jenkins-source",
"0.1.23",
ActorType.source,
JSONB.valueOf("{}"))
.execute();
context.insertInto(DSL.table("actor"))
.columns(
DSL.field("id"),
DSL.field("workspace_id"),
DSL.field("actor_definition_id"),
DSL.field("name"),
DSL.field("configuration"),
DSL.field("actor_type"))
.values(
actorId,
workspaceId,
actorDefinitionId,
"ActorName",
JSONB.valueOf("{}"),
ActorType.source)
.execute();

insertConnection(context, connection1, actorId);
insertConnection(context, connection2, actorId);
}

private void insertConnection(final DSLContext context, final UUID connectionId, final UUID actorId) {
context.insertInto(DSL.table("connection"))
.columns(
DSL.field("id"),
DSL.field("namespace_definition"),
DSL.field("source_id"),
DSL.field("destination_id"),
DSL.field("name"),
DSL.field("catalog"),
DSL.field("manual"))
.values(
connectionId,
NamespaceDefinitionType.source,
actorId,
actorId,
"Connection" + connectionId.toString(),
JSONB.valueOf("{}"),
true)
.execute();
}

private DevDatabaseMigrator devConfigsDbMigrator;

}