Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
fix(sql): PostgreSQL compatibility for keiko-sql (#176)
Browse files Browse the repository at this point in the history
This change includes the following:

- Removing bigint display qualifiers - these are MySQL specific, and display-only
- A number of queries used MySQL-specific upsert features.  These have been expanded with PostgreSQL-compatible varieties.
- A small helper has been added to handle the dynamic Keiko queue table creation for PostgreSQL
- A helper has been added to make use of Postgres's "excluded" conflict feature for inserts

Co-authored-by: Adam Jordens <[email protected]>
  • Loading branch information
jcavanagh and ajordens committed Jul 14, 2020
1 parent c3dfdfa commit 8781046
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import com.netflix.spinnaker.q.DeadMessageCallback
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.sql.util.createTableLike
import com.netflix.spinnaker.q.sql.util.excluded
import de.huxhorn.sulky.ulid.ULID
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
Expand All @@ -16,6 +18,7 @@ import java.nio.charset.StandardCharsets
import java.time.Clock
import java.time.Duration
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL
import org.jooq.util.mysql.MySQLDSL
Expand Down Expand Up @@ -70,17 +73,29 @@ class SqlDeadMessageHandler(
.set(fingerprintField, fingerprint)
.set(updatedAtField, clock.millis())
.set(bodyField, json)
.onDuplicateKeyUpdate()
.set(updatedAtField, MySQLDSL.values(updatedAtField) as Any)
.set(bodyField, MySQLDSL.values(bodyField) as Any)
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doUpdate()
.set(updatedAtField, clock.millis())
.set(bodyField, excluded(bodyField) as Any)
.execute()
else ->
onDuplicateKeyUpdate()
.set(updatedAtField, MySQLDSL.values(updatedAtField) as Any)
.set(bodyField, MySQLDSL.values(bodyField) as Any)
.execute()
}
}
}
} catch (e: Exception) {
log.error("Failed to deadLetter message, fingerprint: $fingerprint, message: $json", e)
}
}

private fun initTables() {
jooq.execute("CREATE TABLE IF NOT EXISTS $dlqTableName LIKE ${dlqBase}_template")
createTableLike(dlqTableName, "${dlqBase}_template", jooq)
}

@Suppress("UnstableApiUsage")
Expand Down
81 changes: 65 additions & 16 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import com.netflix.spinnaker.q.metrics.RetryPolled
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.sql.SqlQueue.RetryCategory.READ
import com.netflix.spinnaker.q.sql.SqlQueue.RetryCategory.WRITE
import com.netflix.spinnaker.q.sql.util.createTableLike
import com.netflix.spinnaker.q.sql.util.excluded
import de.huxhorn.sulky.ulid.ULID
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
Expand All @@ -47,6 +49,7 @@ import kotlin.math.min
import kotlin.random.Random.Default.nextLong
import org.funktionale.partials.partially1
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.SortOrder
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL
Expand Down Expand Up @@ -371,8 +374,17 @@ class SqlQueue(
.set(idField, ulid.toString())
.set(fingerprintField, m.fingerprint)
.set(expiryField, m.expiry)
.onDuplicateKeyIgnore()
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doNothing()
.execute()
else ->
onDuplicateKeyIgnore()
.execute()
}
}

when (changed) {
0 -> toRelease.add(m.queueId)
Expand Down Expand Up @@ -444,25 +456,46 @@ class SqlQueue(
withRetry(WRITE) {
jooq.transaction { config ->
val txn = DSL.using(config)
val bodyVal = mapper.writeValueAsString(message)

txn.insertInto(messagesTable)
.set(idField, ulid.toString())
.set(fingerprintField, fingerprint)
.set(bodyField, mapper.writeValueAsString(message))
.set(bodyField, bodyVal)
.set(updatedAtField, clock.millis())
.onDuplicateKeyUpdate()
.set(idField, MySQLDSL.values(idField) as Any)
.set(bodyField, MySQLDSL.values(bodyField) as Any)
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doUpdate()
.set(bodyField, excluded(bodyField) as Any)
.execute()
else ->
onDuplicateKeyUpdate()
.set(idField, MySQLDSL.values(idField) as Any)
.set(bodyField, MySQLDSL.values(bodyField) as Any)
.execute()
}
}

txn.insertInto(queueTable)
.set(idField, ULID.nextMonotonicValue(ulid).toString())
.set(fingerprintField, fingerprint)
.set(deliveryField, deliveryTime)
.set(lockedField, "0")
.onDuplicateKeyUpdate()
.set(deliveryField, MySQLDSL.values(deliveryField) as Any)
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doUpdate()
.set(deliveryField, deliveryTime)
.execute()
else ->
onDuplicateKeyUpdate()
.set(deliveryField, MySQLDSL.values(deliveryField) as Any)
.execute()
}
}
}
}

Expand Down Expand Up @@ -680,9 +713,19 @@ class SqlQueue(
.set(fingerprintField, fingerprint)
.set(deliveryField, atTime(lockTtlDuration))
.set(lockedField, "0")
.onDuplicateKeyUpdate()
.set(deliveryField, MySQLDSL.values(deliveryField) as Any)
.execute()
.run {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(fingerprintField)
.doUpdate()
.set(deliveryField, atTime(lockTtlDuration))
.execute()
else ->
onDuplicateKeyUpdate()
.set(deliveryField, MySQLDSL.values(deliveryField) as Any)
.execute()
}
}
}
}

Expand Down Expand Up @@ -808,11 +851,17 @@ class SqlQueue(
}

private fun initTables() {
val tables = listOf(
Pair(queueTableName, queueBase),
Pair(unackedTableName, unackedBase),
Pair(messagesTableName, messagesBase)
)

withPool(poolName) {
withRetry(WRITE) {
jooq.execute("CREATE TABLE IF NOT EXISTS $queueTableName LIKE ${queueBase}_template")
jooq.execute("CREATE TABLE IF NOT EXISTS $unackedTableName LIKE ${unackedBase}_template")
jooq.execute("CREATE TABLE IF NOT EXISTS $messagesTableName LIKE ${messagesBase}_template")
for (tablePair in tables) {
createTableLike(tablePair.first, "${tablePair.second}_template", jooq)
}
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/util/Util.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2020 Apple, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.q.sql.util

import org.jooq.DSLContext
import org.jooq.Field
import org.jooq.SQLDialect
import org.jooq.impl.DSL

fun currentSchema(context: DSLContext): String {
return context.fetch("select current_schema()")
.getValue(0, DSL.field("current_schema")).toString()
}

fun createTableLike(newTable: String, templateTable: String, context: DSLContext) {
var sql = "CREATE TABLE IF NOT EXISTS "
sql += when (context.dialect()) {
SQLDialect.POSTGRES -> {
val cs = currentSchema(context)
"$cs.$newTable ( LIKE $cs.$templateTable INCLUDING ALL )"
}
else -> "$newTable LIKE $templateTable"
}
context.execute(sql)
}

// Allows insertion of virtual Postgres values on conflict, similar to MySQLDSL.values
fun <T> excluded(values: Field<T>): Field<T> {
return DSL.field("excluded.{0}", values.dataType, values)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ databaseChangeLog:
- changeSet:
id: create-keiko-queue-table-v1
author: afeldman
validCheckSum:
# Original changeset checksum
- 8:551ab623136d3c624d7fd4928d0f4c51
changes:
- createTable:
tableName: keiko_v1_queue_template
Expand All @@ -19,7 +22,7 @@ databaseChangeLog:
nullable: false
- column:
name: delivery
type: bigint(13)
type: bigint
constraints:
nullable: false
- column:
Expand Down Expand Up @@ -175,6 +178,9 @@ databaseChangeLog:
- changeSet:
id: create-keiko-dlq-table-v1
author: afeldman
validCheckSum:
# Original changeset checksum
- 8:7b2c146b7f0ff7de56fdb729c8e87277
changes:
- createTable:
tableName: keiko_v1_dlq_template
Expand All @@ -192,7 +198,7 @@ databaseChangeLog:
nullable: false
- column:
name: updated_at
type: bigint(13)
type: bigint
constraints:
nullable: false
- column:
Expand Down Expand Up @@ -233,13 +239,16 @@ databaseChangeLog:
- changeSet:
id: add-keiko-queue-messages-updated-col
author: afeldman
validCheckSum:
# Original changeset checksum
- 8:53370973bc79095573dfe46d41cffae7
changes:
- addColumn:
tableName: keiko_v1_messages_template
columns:
- column:
name: updated_at
type: bigint(13)
type: bigint
defaultValue: 0
constraints:
nullable: false
Expand Down

0 comments on commit 8781046

Please sign in to comment.