From 8781046cf2bc749ffe7195ba4a6ef755c4b20bf6 Mon Sep 17 00:00:00 2001 From: jcavanagh Date: Tue, 14 Jul 2020 10:56:22 -0500 Subject: [PATCH] fix(sql): PostgreSQL compatibility for keiko-sql (#176) This change includes the following: - Removing bigint display qualifiers - these are MySQL specific, and display-only - A number of queries used MySQL-specific upsert features. These have been expanded with PostgreSQL-compatible varieties. - A small helper has been added to handle the dynamic Keiko queue table creation for PostgreSQL - A helper has been added to make use of Postgres's "excluded" conflict feature for inserts Co-authored-by: Adam Jordens --- .../spinnaker/q/sql/SqlDeadMessageHandler.kt | 23 +++++- .../com/netflix/spinnaker/q/sql/SqlQueue.kt | 81 +++++++++++++++---- .../com/netflix/spinnaker/q/sql/util/Util.kt | 44 ++++++++++ .../db/changelog/20190822-initial-schema.yml | 15 +++- 4 files changed, 140 insertions(+), 23 deletions(-) create mode 100644 keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/util/Util.kt diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlDeadMessageHandler.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlDeadMessageHandler.kt index 47bd514..013c2d3 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlDeadMessageHandler.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlDeadMessageHandler.kt @@ -7,6 +7,8 @@ import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties import com.netflix.spinnaker.q.DeadMessageCallback import com.netflix.spinnaker.q.Message import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.q.sql.util.createTableLike +import com.netflix.spinnaker.q.sql.util.excluded import de.huxhorn.sulky.ulid.ULID import io.github.resilience4j.retry.Retry import io.github.resilience4j.retry.RetryConfig @@ -16,6 +18,7 @@ import java.nio.charset.StandardCharsets import java.time.Clock import java.time.Duration import org.jooq.DSLContext +import org.jooq.SQLDialect import org.jooq.exception.SQLDialectNotSupportedException import org.jooq.impl.DSL import org.jooq.util.mysql.MySQLDSL @@ -70,9 +73,21 @@ class SqlDeadMessageHandler( .set(fingerprintField, fingerprint) .set(updatedAtField, clock.millis()) .set(bodyField, json) - .onDuplicateKeyUpdate() - .set(updatedAtField, MySQLDSL.values(updatedAtField) as Any) - .set(bodyField, MySQLDSL.values(bodyField) as Any) + .run { + when (jooq.dialect()) { + SQLDialect.POSTGRES -> + onConflict(fingerprintField) + .doUpdate() + .set(updatedAtField, clock.millis()) + .set(bodyField, excluded(bodyField) as Any) + .execute() + else -> + onDuplicateKeyUpdate() + .set(updatedAtField, MySQLDSL.values(updatedAtField) as Any) + .set(bodyField, MySQLDSL.values(bodyField) as Any) + .execute() + } + } } } catch (e: Exception) { log.error("Failed to deadLetter message, fingerprint: $fingerprint, message: $json", e) @@ -80,7 +95,7 @@ class SqlDeadMessageHandler( } private fun initTables() { - jooq.execute("CREATE TABLE IF NOT EXISTS $dlqTableName LIKE ${dlqBase}_template") + createTableLike(dlqTableName, "${dlqBase}_template", jooq) } @Suppress("UnstableApiUsage") diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index d4a69dc..c8ec32a 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -28,6 +28,8 @@ import com.netflix.spinnaker.q.metrics.RetryPolled import com.netflix.spinnaker.q.migration.SerializationMigrator import com.netflix.spinnaker.q.sql.SqlQueue.RetryCategory.READ import com.netflix.spinnaker.q.sql.SqlQueue.RetryCategory.WRITE +import com.netflix.spinnaker.q.sql.util.createTableLike +import com.netflix.spinnaker.q.sql.util.excluded import de.huxhorn.sulky.ulid.ULID import io.github.resilience4j.retry.Retry import io.github.resilience4j.retry.RetryConfig @@ -47,6 +49,7 @@ import kotlin.math.min import kotlin.random.Random.Default.nextLong import org.funktionale.partials.partially1 import org.jooq.DSLContext +import org.jooq.SQLDialect import org.jooq.SortOrder import org.jooq.exception.SQLDialectNotSupportedException import org.jooq.impl.DSL @@ -371,8 +374,17 @@ class SqlQueue( .set(idField, ulid.toString()) .set(fingerprintField, m.fingerprint) .set(expiryField, m.expiry) - .onDuplicateKeyIgnore() - .execute() + .run { + when (jooq.dialect()) { + SQLDialect.POSTGRES -> + onConflict(fingerprintField) + .doNothing() + .execute() + else -> + onDuplicateKeyIgnore() + .execute() + } + } when (changed) { 0 -> toRelease.add(m.queueId) @@ -444,25 +456,46 @@ class SqlQueue( withRetry(WRITE) { jooq.transaction { config -> val txn = DSL.using(config) + val bodyVal = mapper.writeValueAsString(message) txn.insertInto(messagesTable) .set(idField, ulid.toString()) .set(fingerprintField, fingerprint) - .set(bodyField, mapper.writeValueAsString(message)) + .set(bodyField, bodyVal) .set(updatedAtField, clock.millis()) - .onDuplicateKeyUpdate() - .set(idField, MySQLDSL.values(idField) as Any) - .set(bodyField, MySQLDSL.values(bodyField) as Any) - .execute() + .run { + when (jooq.dialect()) { + SQLDialect.POSTGRES -> + onConflict(fingerprintField) + .doUpdate() + .set(bodyField, excluded(bodyField) as Any) + .execute() + else -> + onDuplicateKeyUpdate() + .set(idField, MySQLDSL.values(idField) as Any) + .set(bodyField, MySQLDSL.values(bodyField) as Any) + .execute() + } + } txn.insertInto(queueTable) .set(idField, ULID.nextMonotonicValue(ulid).toString()) .set(fingerprintField, fingerprint) .set(deliveryField, deliveryTime) .set(lockedField, "0") - .onDuplicateKeyUpdate() - .set(deliveryField, MySQLDSL.values(deliveryField) as Any) - .execute() + .run { + when (jooq.dialect()) { + SQLDialect.POSTGRES -> + onConflict(fingerprintField) + .doUpdate() + .set(deliveryField, deliveryTime) + .execute() + else -> + onDuplicateKeyUpdate() + .set(deliveryField, MySQLDSL.values(deliveryField) as Any) + .execute() + } + } } } @@ -680,9 +713,19 @@ class SqlQueue( .set(fingerprintField, fingerprint) .set(deliveryField, atTime(lockTtlDuration)) .set(lockedField, "0") - .onDuplicateKeyUpdate() - .set(deliveryField, MySQLDSL.values(deliveryField) as Any) - .execute() + .run { + when (jooq.dialect()) { + SQLDialect.POSTGRES -> + onConflict(fingerprintField) + .doUpdate() + .set(deliveryField, atTime(lockTtlDuration)) + .execute() + else -> + onDuplicateKeyUpdate() + .set(deliveryField, MySQLDSL.values(deliveryField) as Any) + .execute() + } + } } } @@ -808,11 +851,17 @@ class SqlQueue( } private fun initTables() { + val tables = listOf( + Pair(queueTableName, queueBase), + Pair(unackedTableName, unackedBase), + Pair(messagesTableName, messagesBase) + ) + withPool(poolName) { withRetry(WRITE) { - jooq.execute("CREATE TABLE IF NOT EXISTS $queueTableName LIKE ${queueBase}_template") - jooq.execute("CREATE TABLE IF NOT EXISTS $unackedTableName LIKE ${unackedBase}_template") - jooq.execute("CREATE TABLE IF NOT EXISTS $messagesTableName LIKE ${messagesBase}_template") + for (tablePair in tables) { + createTableLike(tablePair.first, "${tablePair.second}_template", jooq) + } } } } diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/util/Util.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/util/Util.kt new file mode 100644 index 0000000..c7ecdb3 --- /dev/null +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/util/Util.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Apple, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.q.sql.util + +import org.jooq.DSLContext +import org.jooq.Field +import org.jooq.SQLDialect +import org.jooq.impl.DSL + +fun currentSchema(context: DSLContext): String { + return context.fetch("select current_schema()") + .getValue(0, DSL.field("current_schema")).toString() +} + +fun createTableLike(newTable: String, templateTable: String, context: DSLContext) { + var sql = "CREATE TABLE IF NOT EXISTS " + sql += when (context.dialect()) { + SQLDialect.POSTGRES -> { + val cs = currentSchema(context) + "$cs.$newTable ( LIKE $cs.$templateTable INCLUDING ALL )" + } + else -> "$newTable LIKE $templateTable" + } + context.execute(sql) +} + +// Allows insertion of virtual Postgres values on conflict, similar to MySQLDSL.values +fun excluded(values: Field): Field { + return DSL.field("excluded.{0}", values.dataType, values) +} diff --git a/keiko-sql/src/main/resources/db/changelog/20190822-initial-schema.yml b/keiko-sql/src/main/resources/db/changelog/20190822-initial-schema.yml index 2d3d315..1059615 100644 --- a/keiko-sql/src/main/resources/db/changelog/20190822-initial-schema.yml +++ b/keiko-sql/src/main/resources/db/changelog/20190822-initial-schema.yml @@ -2,6 +2,9 @@ databaseChangeLog: - changeSet: id: create-keiko-queue-table-v1 author: afeldman + validCheckSum: + # Original changeset checksum + - 8:551ab623136d3c624d7fd4928d0f4c51 changes: - createTable: tableName: keiko_v1_queue_template @@ -19,7 +22,7 @@ databaseChangeLog: nullable: false - column: name: delivery - type: bigint(13) + type: bigint constraints: nullable: false - column: @@ -175,6 +178,9 @@ databaseChangeLog: - changeSet: id: create-keiko-dlq-table-v1 author: afeldman + validCheckSum: + # Original changeset checksum + - 8:7b2c146b7f0ff7de56fdb729c8e87277 changes: - createTable: tableName: keiko_v1_dlq_template @@ -192,7 +198,7 @@ databaseChangeLog: nullable: false - column: name: updated_at - type: bigint(13) + type: bigint constraints: nullable: false - column: @@ -233,13 +239,16 @@ databaseChangeLog: - changeSet: id: add-keiko-queue-messages-updated-col author: afeldman + validCheckSum: + # Original changeset checksum + - 8:53370973bc79095573dfe46d41cffae7 changes: - addColumn: tableName: keiko_v1_messages_template columns: - column: name: updated_at - type: bigint(13) + type: bigint defaultValue: 0 constraints: nullable: false