Skip to content

Commit

Permalink
🐛 Destination Redshift: fix switching mode (#12085)
Browse files Browse the repository at this point in the history
* fix switching mode for redshift

* bump version

* format code

* update spec
  • Loading branch information
yurii-bidiuk authored and suhomud committed May 23, 2022
1 parent 906104b commit 7812eb2
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
- name: Redshift
destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.31
dockerImageTag: 0.3.32
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3414,7 +3414,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-redshift:0.3.31"
- dockerImage: "airbyte/destination-redshift:0.3.32"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
select forms from {{ ref('pokemon' )}} where forms != json_parse('[{"name":"ditto","url":"https://pokeapi.co/api/v2/pokemon-form/132/"}]')
SELECT
forms
FROM
{{ REF('pokemon') }}
WHERE
forms != json_parse('[{"name":"ditto","url":"https://pokeapi.co/api/v2/pokemon-form/132/"}]')
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.31
LABEL io.airbyte.version=0.3.32
LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,11 @@
package io.airbyte.integrations.destination.redshift;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Map;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,17 +51,15 @@ public static DestinationType determineUploadMode(final JsonNode config) {
final var accessKeyIdNode = config.get("access_key_id");
final var secretAccessKeyNode = config.get("secret_access_key");

// Since region is a Json schema enum with an empty string default, we consider the empty string an
// unset field.
final var emptyRegion = regionNode == null || regionNode.asText().equals("");

if (bucketNode == null && emptyRegion && accessKeyIdNode == null && secretAccessKeyNode == null) {
if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode)
&& isNullOrEmpty(secretAccessKeyNode)) {
LOGGER.warn("The \"standard\" upload mode is not performant, and is not recommended for production. " +
"Please use the Amazon S3 upload mode if you are syncing a large amount of data.");
"Please use the Amazon S3 upload mode if you are syncing a large amount of data.");
return DestinationType.INSERT_WITH_SUPER_TMP_TYPE;
}

if (bucketNode == null || regionNode == null || accessKeyIdNode == null || secretAccessKeyNode == null) {
if (isNullOrEmpty(bucketNode) && isNullOrEmpty(regionNode) && isNullOrEmpty(accessKeyIdNode)
&& isNullOrEmpty(secretAccessKeyNode)) {
throw new RuntimeException("Error: Partially missing S3 Configuration.");
}
return DestinationType.COPY_S3_WITH_SUPER_TMP_TYPE;
Expand All @@ -78,4 +72,8 @@ public static void main(final String[] args) throws Exception {
LOGGER.info("completed destination: {}", RedshiftDestination.class);
}

private static boolean isNullOrEmpty(JsonNode jsonNode) {
return jsonNode == null || jsonNode.asText().equals("");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.Map;
import java.util.Optional;

public class RedshiftInsertDestination extends AbstractJdbcDestination {
public class RedshiftInsertDestination extends AbstractJdbcDestination {

private static final String DRIVER_CLASS = "com.amazon.redshift.jdbc.Driver";
private static final String USERNAME = "username";
Expand Down Expand Up @@ -67,4 +67,5 @@ public static JsonNode getJdbcConfig(final JsonNode redshiftConfig) {
.put(SCHEMA, schema)
.build());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,29 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftSqlOperations extends JdbcSqlOperations{
public class RedshiftSqlOperations extends JdbcSqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftSqlOperations.class);
protected static final int REDSHIFT_VARCHAR_MAX_BYTE_SIZE = 65535;

private static final String SELECT_ALL_TABLES_WITH_NOT_SUPER_TYPE_SQL_STATEMENT = """
select tablename, schemaname
from pg_table_def
where tablename in (
select tablename as tablename
from pg_table_def
where schemaname = '%1$s'
and tablename like '%%airbyte_raw%%'
and "column" in ('%2$s', '%3$s', '%4$s')
group by tablename
having count(*) = 3)
and schemaname = '%1$s'
and type <> 'super'
and "column" = '_airbyte_data';
""";

private static final String ALTER_TMP_TABLES_WITH_NOT_SUPER_TYPE_TO_SUPER_TYPE = """
select tablename, schemaname
from pg_table_def
where tablename in (
select tablename as tablename
from pg_table_def
where schemaname = '%1$s'
and tablename like '%%airbyte_raw%%'
and "column" in ('%2$s', '%3$s', '%4$s')
group by tablename
having count(*) = 3)
and schemaname = '%1$s'
and type <> 'super'
and "column" = '_airbyte_data';
""";

private static final String ALTER_TMP_TABLES_WITH_NOT_SUPER_TYPE_TO_SUPER_TYPE =
"""
ALTER TABLE %1$s ADD COLUMN %2$s_super super;
ALTER TABLE %1$s ADD COLUMN %3$s_reserve TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP;
UPDATE %1$s SET %2$s_super = JSON_PARSE(%2$s);
Expand All @@ -68,9 +69,9 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN

@Override
public void insertRecordsInternal(final JdbcDatabase database,
final List<AirbyteRecordMessage> records,
final String schemaName,
final String tmpTableName)
final List<AirbyteRecordMessage> records,
final String schemaName,
final String tmpTableName)
throws SQLException {
LOGGER.info("actual size of batch: {}", records.size());

Expand All @@ -97,9 +98,10 @@ public boolean isValidData(final JsonNode data) {
}

/**
* In case of redshift we need to discover all tables with not super type and update them after to SUPER type. This would be done once.
* In case of redshift we need to discover all tables with not super type and update them after to
* SUPER type. This would be done once.
*
* @param database - Database object for interacting with a JDBC connection.
* @param database - Database object for interacting with a JDBC connection.
* @param writeConfigSet - list of write configs.
*/

Expand All @@ -117,11 +119,11 @@ public void onDestinationCloseOperations(final JdbcDatabase database, final Set<
}

/**
* @param database - Database object for interacting with a JDBC connection.
* @param database - Database object for interacting with a JDBC connection.
* @param schemaName - schema to update.
*/
private List<String> discoverNotSuperTables(final JdbcDatabase database,
final String schemaName) {
final String schemaName) {
List<String> schemaAndTableWithNotSuperType = new ArrayList<>();
try {
LOGGER.info("Discovering NOT SUPER table types...");
Expand All @@ -136,8 +138,8 @@ private List<String> discoverNotSuperTables(final JdbcDatabase database,
if (tablesNameWithoutSuperDatatype.isEmpty()) {
return Collections.emptyList();
} else {
tablesNameWithoutSuperDatatype.forEach(e ->
schemaAndTableWithNotSuperType.add(e.get("schemaname").textValue() + "." + e.get("tablename").textValue()));
tablesNameWithoutSuperDatatype
.forEach(e -> schemaAndTableWithNotSuperType.add(e.get("schemaname").textValue() + "." + e.get("tablename").textValue()));
return schemaAndTableWithNotSuperType;
}
} catch (SQLException e) {
Expand All @@ -149,14 +151,15 @@ private List<String> discoverNotSuperTables(final JdbcDatabase database,
/**
* We prepare one query for all tables with not super type for updating.
*
* @param database - Database object for interacting with a JDBC connection.
* @param database - Database object for interacting with a JDBC connection.
* @param schemaAndTableWithNotSuperType - list of tables with not super type.
*/
private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase database, final List<String> schemaAndTableWithNotSuperType) {
LOGGER.info("Updating VARCHAR data column to SUPER...");
StringBuilder finalSqlStatement = new StringBuilder();
// To keep the previous data, we need to add next columns: _airbyte_data, _airbyte_emitted_at
// We do such workflow because we can't directly CAST VARCHAR to SUPER column. _airbyte_emitted_at column recreated to keep
// We do such workflow because we can't directly CAST VARCHAR to SUPER column. _airbyte_emitted_at
// column recreated to keep
// the COLUMN order. This order is required to INSERT the values in correct way.
schemaAndTableWithNotSuperType.forEach(schemaAndTable -> {
LOGGER.info("Altering table {} column _airbyte_data to SUPER.", schemaAndTable);
Expand All @@ -172,5 +175,5 @@ private void updateVarcharDataColumnToSuperDataColumn(final JdbcDatabase databas
throw new RuntimeException(e);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redshift.enums;

import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -6,7 +10,9 @@
* This enum determines the type for _airbyte_data_ column at _airbyte_raw_**some_table_name**
*/
public enum RedshiftDataTmpTableMode {

SUPER {

@Override
public String getTableCreationMode() {
return "SUPER";
Expand All @@ -16,6 +22,7 @@ public String getTableCreationMode() {
public String getInsertRowMode() {
return "(?, JSON_PARSE(?), ?),\n";
}

};

public abstract String getTableCreationMode();
Expand All @@ -24,13 +31,14 @@ public String getInsertRowMode() {

public String getTmpTableSqlStatement(String schemaName, String tableName) {
return String.format("""
CREATE TABLE IF NOT EXISTS %s.%s (
%s VARCHAR PRIMARY KEY,
%s %s,
%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)
""", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID,
CREATE TABLE IF NOT EXISTS %s.%s (
%s VARCHAR PRIMARY KEY,
%s %s,
%s TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP)
""", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
getTableCreationMode(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

}
Loading

0 comments on commit 7812eb2

Please sign in to comment.