Skip to content

Commit

Permalink
Add support of upsert (insert or update, on conflict) for Postgres an…
Browse files Browse the repository at this point in the history
…d MySQL
  • Loading branch information
mentegy committed Apr 9, 2018
1 parent f6585ef commit 75d1efd
Show file tree
Hide file tree
Showing 32 changed files with 911 additions and 93 deletions.
69 changes: 68 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,74 @@ val a = quote {
ctx.run(a)
// DELETE FROM Person WHERE name = ''
```


### insert or update (upsert, on conflict)

Upsert is only supported by Postgres and MySQL

#### Postgres
Ignore conflict
```scala
val a = quote {
query[Product].insert(_.id -> 1, _.sku -> 10).onConflictIgnore
}

// INSERT INTO Product AS t (id,sku) VALUES (1, 10) ON CONFLICT DO NOTHING
```

Ignore conflict by explicitly setting conflict target
```scala
val a = quote {
query[Product].insert(_.id -> 1, _.sku -> 10).onConflictIgnore(_.id)
}

// INSERT INTO Product AS t (id,sku) VALUES (1, 10) ON CONFLICT (id) DO NOTHING
```

Resolve conflict by updating existing row if needed. In `onConflictUpdate(target)((t, e) => assignment)`: `target` refers to
conflict target, `t` - to existing row and `e` - to excluded, e.g. row proposed for insert.
```scala
val a = quote {
query[Product]
.insert(_.id -> 1, _.sku -> 10)
.onConflictUpdate(_.id)((t, e) => t.sku -> (t.sku + e.sku))
}

// INSERT INTO Product AS t (id,sku) VALUES (1, 10) ON CONFLICT (id) DO UPDATE SET sku = (t.sku + EXCLUDED.sku)
```

#### MySQL

Ignore any conflict, e.g. `insert ignore`
```scala
val a = quote {
query[Product].insert(_.id -> 1, _.sku -> 10).onConflictIgnore
}

// INSERT IGNORE INTO Product (id,sku) VALUES (1, 10)
```

Ignore duplicate key conflict by explicitly setting it
```scala
val a = quote {
query[Product].insert(_.id -> 1, _.sku -> 10).onConflictIgnore(_.id)
}

// INSERT INTO Product (id,sku) VALUES (1, 10) ON DUPLICATE KEY UPDATE id=id
```

Resolve duplicate key by updating existing row if needed. In `onConflictUpdate((t, e) => assignment)`: `t` refers to
existing row and `e` - to values, e.g. values proposed for insert.
```scala
val a = quote {
query[Product]
.insert(_.id -> 1, _.sku -> 10)
.onConflictUpdate((t, e) => t.sku -> (t.sku + e.sku))
}

// INSERT INTO Product (id,sku) VALUES (1, 10) ON DUPLICATE KEY UPDATE sku = (sku + VALUES(sku))
```

## IO Monad

Quill provides an IO monad that allows the user to express multiple computations and execute them separately. This mechanism is also known as a free monad, which provides a way of expressing computations as referentially-transparent values and isolates the unsafe IO operations into a single operation. For instance:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.getquill.context.async.mysql

import io.getquill.context.sql.OnConflictSpec

import scala.concurrent.ExecutionContext.Implicits.global

class OnConflictAsyncSpec extends OnConflictSpec {
val ctx = testContext
import ctx._

override protected def beforeAll(): Unit = {
await(ctx.run(qr1.delete))
()
}

"INSERT IGNORE" in {
import `onConflictIgnore`._
await(ctx.run(testQuery1)) mustEqual res1
await(ctx.run(testQuery2)) mustEqual res2
await(ctx.run(testQuery3)) mustEqual res3
}

"ON DUPLICATE KEY UPDATE i=i " in {
import `onConflictIgnore(_.i)`._
await(ctx.run(testQuery1)) mustEqual res1
await(ctx.run(testQuery2)) mustEqual res2
await(ctx.run(testQuery3)) mustEqual res3
}

"ON DUPLICATE KEY UPDATE ..." in {
import `onConflictUpdate((t, e) => ...)`._
await(ctx.run(testQuery(e1))) mustEqual res1
await(ctx.run(testQuery(e2))) mustEqual res2 + 1
await(ctx.run(testQuery(e3))) mustEqual res3 + 1
await(ctx.run(testQuery4)) mustEqual res4
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.getquill.context.async.postgres

import io.getquill.context.sql.OnConflictSpec

import scala.concurrent.ExecutionContext.Implicits.global

class OnConflictAsyncSpec extends OnConflictSpec {
val ctx = testContext
import ctx._

override protected def beforeAll(): Unit = {
await(ctx.run(qr1.delete))
()
}

"ON CONFLICT DO NOTHING" in {
import `onConflictIgnore`._
await(ctx.run(testQuery1)) mustEqual res1
await(ctx.run(testQuery2)) mustEqual res2
await(ctx.run(testQuery3)) mustEqual res3
}

"ON CONFLICT (i) DO NOTHING" in {
import `onConflictIgnore(_.i)`._
await(ctx.run(testQuery1)) mustEqual res1
await(ctx.run(testQuery2)) mustEqual res2
await(ctx.run(testQuery3)) mustEqual res3
}

"ON CONFLICT (i) DO UPDATE ..." in {
import `onConflictUpdate(_.i)((t, e) => ...)`._
await(ctx.run(testQuery(e1))) mustEqual res1
await(ctx.run(testQuery(e2))) mustEqual res2
await(ctx.run(testQuery(e3))) mustEqual res3
await(ctx.run(testQuery4)) mustEqual res4
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait CqlIdiom extends Idiom {
case a: TraversableOperation => a.token
case a @ (
_: Function | _: FunctionApply | _: Dynamic | _: OptionOperation | _: Block |
_: Val | _: Ordering | _: QuotedReference | _: If
_: Val | _: Ordering | _: QuotedReference | _: If | _: OnConflict.Excluded | _: OnConflict.Existing
) =>
fail(s"Invalid cql: '$a'")
}
Expand Down
35 changes: 35 additions & 0 deletions quill-core/src/main/scala/io/getquill/MirrorIdiom.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class MirrorIdiom extends Idiom {
case ast: QuotedReference => ast.ast.token
case ast: Lift => ast.token
case ast: Assignment => ast.token
case ast: OnConflict.Excluded => ast.token
case ast: OnConflict.Existing => ast.token
}

implicit def ifTokenizer(implicit liftTokenizer: Tokenizer[Lift]): Tokenizer[If] = Tokenizer[If] {
Expand Down Expand Up @@ -181,12 +183,45 @@ class MirrorIdiom extends Idiom {
case e => stmt"${e.name.token}"
}

implicit val excludedTokenizer: Tokenizer[OnConflict.Excluded] = Tokenizer[OnConflict.Excluded] {
case OnConflict.Excluded(ident) => stmt"${ident.token}"
}

implicit val existingTokenizer: Tokenizer[OnConflict.Existing] = Tokenizer[OnConflict.Existing] {
case OnConflict.Existing(ident) => stmt"${ident.token}"
}

implicit def actionTokenizer(implicit liftTokenizer: Tokenizer[Lift]): Tokenizer[Action] = Tokenizer[Action] {
case Update(query, assignments) => stmt"${query.token}.update(${assignments.token})"
case Insert(query, assignments) => stmt"${query.token}.insert(${assignments.token})"
case Delete(query) => stmt"${query.token}.delete"
case Returning(query, alias, body) => stmt"${query.token}.returning((${alias.token}) => ${body.token})"
case Foreach(query, alias, body) => stmt"${query.token}.foreach((${alias.token}) => ${body.token})"
case c: OnConflict => stmt"${c.token}"
}

implicit def conflictTokenizer(implicit liftTokenizer: Tokenizer[Lift]): Tokenizer[OnConflict] = {

def targetProps(l: List[Property]) = l.map(p => Transform(p) {
case Ident(_) => Ident("_")
})

implicit val conflictTargetTokenizer = Tokenizer[OnConflict.Target] {
case OnConflict.NoTarget => stmt""
case OnConflict.Properties(props) => stmt"(${targetProps(props).token})"
}

val updateAssignsTokenizer = Tokenizer[Assignment] {
case Assignment(i, p, v) =>
stmt"(${i.token}, e) => ${p.token} -> ${scopedTokenizer(v)}"
}

Tokenizer[OnConflict] {
case OnConflict(i, t, OnConflict.Update(assign)) =>
stmt"${i.token}.onConflictUpdate${t.token}(${assign.map(updateAssignsTokenizer.token).mkStmt()})"
case OnConflict(i, t, OnConflict.Ignore) =>
stmt"${i.token}.onConflictIgnore${t.token}"
}
}

implicit def assignmentTokenizer(implicit liftTokenizer: Tokenizer[Lift]): Tokenizer[Assignment] = Tokenizer[Assignment] {
Expand Down
14 changes: 14 additions & 0 deletions quill-core/src/main/scala/io/getquill/ast/Ast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ case class Returning(action: Ast, alias: Ident, property: Ast) extends Action

case class Foreach(query: Ast, alias: Ident, body: Ast) extends Action

case class OnConflict(insert: Ast, target: OnConflict.Target, action: OnConflict.Action) extends Action
object OnConflict {

case class Excluded(alias: Ident) extends Ast
case class Existing(alias: Ident) extends Ast

sealed trait Target
case object NoTarget extends Target
case class Properties(props: List[Property]) extends Target

sealed trait Action
case object Ignore extends Action
case class Update(assignments: List[Assignment]) extends Action
}
//************************************************************

case class Dynamic(tree: Any) extends Ast
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ trait StatefulTransformer[T] {
case e: Ident => (e, this)
case e: OptionOperation => apply(e)
case e: TraversableOperation => apply(e)
case e: Property => apply(e)
case e: OnConflict.Existing => (e, this)
case e: OnConflict.Excluded => (e, this)

case Function(a, b) =>
val (bt, btt) = apply(b)
(Function(a, bt), btt)

case Property(a, b) =>
val (at, att) = apply(a)
(Property(at, b), att)

case Infix(a, b) =>
val (bt, btt) = apply(b)(_.apply)
(Infix(a, bt), btt)
Expand Down Expand Up @@ -179,6 +178,13 @@ trait StatefulTransformer[T] {
(Assignment(a, bt, ct), ctt)
}

def apply(e: Property): (Property, StatefulTransformer[T]) =
e match {
case Property(a, b) =>
val (at, att) = apply(a)
(Property(at, b), att)
}

def apply(e: Operation): (Operation, StatefulTransformer[T]) =
e match {
case UnaryOperation(o, a) =>
Expand Down Expand Up @@ -228,6 +234,27 @@ trait StatefulTransformer[T] {
val (at, att) = apply(a)
val (ct, ctt) = att.apply(c)
(Foreach(at, b, ct), ctt)
case OnConflict(a, b, c) =>
val (at, att) = apply(a)
val (bt, btt) = att.apply(b)
val (ct, ctt) = btt.apply(c)
(OnConflict(at, bt, ct), ctt)
}

def apply(e: OnConflict.Target): (OnConflict.Target, StatefulTransformer[T]) =
e match {
case OnConflict.NoTarget => (e, this)
case OnConflict.Properties(a) =>
val (at, att) = apply(a)(_.apply)
(OnConflict.Properties(at), att)
}

def apply(e: OnConflict.Action): (OnConflict.Action, StatefulTransformer[T]) =
e match {
case OnConflict.Ignore => (e, this)
case OnConflict.Update(a) =>
val (at, att) = apply(a)(_.apply)
(OnConflict.Update(at), att)
}

def apply[U, R](list: List[U])(f: StatefulTransformer[T] => U => (R, StatefulTransformer[T])) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait StatelessTransformer {
case e: Assignment => apply(e)
case Function(params, body) => Function(params, apply(body))
case e: Ident => e
case Property(a, name) => Property(apply(a), name)
case e: Property => apply(e)
case Infix(a, b) => Infix(a, b.map(apply))
case e: OptionOperation => apply(e)
case e: TraversableOperation => apply(e)
Expand All @@ -22,6 +22,8 @@ trait StatelessTransformer {
case Block(statements) => Block(statements.map(apply))
case Val(name, body) => Val(name, apply(body))
case o: Ordering => o
case e: OnConflict.Excluded => e
case e: OnConflict.Existing => e
}

def apply(o: OptionOperation): OptionOperation =
Expand Down Expand Up @@ -72,6 +74,11 @@ trait StatelessTransformer {
case Assignment(a, b, c) => Assignment(a, apply(b), apply(c))
}

def apply(e: Property): Property =
e match {
case Property(a, name) => Property(apply(a), name)
}

def apply(e: Operation): Operation =
e match {
case UnaryOperation(o, a) => UnaryOperation(o, apply(a))
Expand All @@ -97,6 +104,19 @@ trait StatelessTransformer {
case Delete(query) => Delete(apply(query))
case Returning(query, alias, property) => Returning(apply(query), alias, apply(property))
case Foreach(query, alias, body) => Foreach(apply(query), alias, apply(body))
case OnConflict(query, target, action) => OnConflict(apply(query), apply(target), apply(action))
}

def apply(e: OnConflict.Target): OnConflict.Target =
e match {
case OnConflict.NoTarget => e
case OnConflict.Properties(props) => OnConflict.Properties(props.map(apply))
}

def apply(e: OnConflict.Action): OnConflict.Action =
e match {
case OnConflict.Ignore => e
case OnConflict.Update(assigns) => OnConflict.Update(assigns.map(apply))
}

}
29 changes: 29 additions & 0 deletions quill-core/src/main/scala/io/getquill/dsl/QueryDsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,35 @@ private[dsl] trait QueryDsl {
sealed trait Insert[E] extends Action[E] {
@compileTimeOnly(NonQuotedException.message)
def returning[R](f: E => R): ActionReturning[E, R] = NonQuotedException()

@compileTimeOnly(NonQuotedException.message)
def onConflictIgnore: Insert[E] = NonQuotedException()

@compileTimeOnly(NonQuotedException.message)
def onConflictIgnore(target: E => Any, targets: (E => Any)*): Insert[E] = NonQuotedException()

@compileTimeOnly(NonQuotedException.message)
def onConflictUpdate(assign: ((E, E) => (Any, Any)), assigns: ((E, E) => (Any, Any))*): Insert[E] = NonQuotedException()

/**
* Generates an atomic INSERT or UPDATE (upsert) action if supported.
*
* @param targets - conflict target
* @param assigns - update statement, declared as function: `(table, excluded) => (assign, result)`
* `table` - is used to extract column for update assignment and reference existing row
* `excluded` - aliases excluded table, e.g. row proposed for insertion.
* `assign` - left hand side of assignment. Should be accessed from `table` argument
* `result` - right hand side of assignment.
*
* Example usage:
* {{{
* insert.onConflictUpdate(_.id)((t, e) => t.col -> (e.col + t.col))
* }}}
* If insert statement violates conflict target then the column `col` of row will be updated with sum of
* existing value and and proposed `col` in insert.
*/
@compileTimeOnly(NonQuotedException.message)
def onConflictUpdate(target: E => Any, targets: (E => Any)*)(assign: ((E, E) => (Any, Any)), assigns: ((E, E) => (Any, Any))*): Insert[E] = NonQuotedException()
}

sealed trait ActionReturning[E, Output] extends Action[E]
Expand Down
Loading

0 comments on commit 75d1efd

Please sign in to comment.