Skip to content

Commit

Permalink
Move superseding info fields to root
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Mar 30, 2023
1 parent 78399b3 commit 0a16d84
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ END';"""

body JSON NOT NULL,

supersededby VARCHAR(128) NULL
superseded_by VARCHAR(128) NULL
)""")

val draftsCreate = (fr"CREATE TABLE IF NOT EXISTS" ++ Postgres.DraftsTable ++ fr"""(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import doobie.ConnectionIO

object Ninth {

val SchemasTable = "iglu_schemas"
val SupersededByColumn = "supersededby"
val SchemasTable = "iglu_schemas"
val SupersededByColumn = "superseded_by"

def perform: ConnectionIO[Unit] =
for {
Expand All @@ -33,10 +33,14 @@ object Ninth {
} yield ()

def getColumns(tableSchemaName: String, tableName: String) =
Fragment.const(s"SELECT column_name FROM information_schema.columns WHERE table_schema = '$tableSchemaName' AND table_name = '$tableName'")
Fragment
.const(
s"SELECT column_name FROM information_schema.columns WHERE table_schema = '$tableSchemaName' AND table_name = '$tableName'"
)
.queryWithLogHandler[String](LogHandler.jdkLogHandler)

def addColumn(tableName: String, columnName: String) =
Fragment.const(s"ALTER TABLE $tableName ADD COLUMN $columnName VARCHAR(128)")
Fragment
.const(s"ALTER TABLE $tableName ADD COLUMN $columnName VARCHAR(128)")
.updateWithLogHandler(LogHandler.jdkLogHandler)
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object IgluResponse {
case SupersedingVersionUpdated(schemaKey) =>
Json.fromFields(
List(
"message" -> "Superseding schema version of existing schema is updated".asJson,
"message" -> "Superseding schema version of existing schema is updated".asJson,
"schemaKey" -> schemaKey.toSchemaUri.asJson
)
)
Expand Down
109 changes: 62 additions & 47 deletions src/main/scala/com/snowplowanalytics/iglu/server/model/Schema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ import cats.implicits._

import io.circe._
import io.circe.syntax._
import io.circe.{Decoder, Encoder, Json, FailedCursor}
import io.circe.{Decoder, Encoder, FailedCursor, Json}
import io.circe.generic.semiauto._

import doobie._
import doobie.postgres.circe.json.implicits._
import doobie.postgres.implicits._

import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SelfDescribingSchema, SchemaVer}
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema}
import com.snowplowanalytics.iglu.core.circe.implicits._

import Schema.Metadata

case class Schema(schemaMap: SchemaMap, metadata: Metadata, body: Json, supersededby: Option[SchemaVer.Full]) {
case class Schema(schemaMap: SchemaMap, metadata: Metadata, body: Json, supersededBy: Option[SchemaVer.Full]) {
def withFormat(repr: Schema.Repr.Format): Schema.Repr = repr match {
case Schema.Repr.Format.Canonical =>
Schema.Repr.Canonical(canonical, supersededby)
Schema.Repr.Canonical(canonical, supersededBy)
case Schema.Repr.Format.Uri =>
Schema.Repr.Uri(schemaMap.schemaKey)
case Schema.Repr.Format.Meta =>
Expand All @@ -50,7 +50,9 @@ case class Schema(schemaMap: SchemaMap, metadata: Metadata, body: Json, supersed

object Schema {

val CanonicalUri = "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#"
val CanonicalUri = "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#"
val SupersededByField = "$supersededBy"
val SupersedesField = "$supersedes"

case class Metadata(createdAt: Instant, updatedAt: Instant, isPublic: Boolean)

Expand All @@ -68,17 +70,18 @@ object Schema {
object Repr {

/** Canonical self-describing representation */
case class Canonical(schema: SelfDescribingSchema[Json], supersededby: Option[SchemaVer.Full]) extends Repr
case class Canonical(schema: SelfDescribingSchema[Json], supersededBy: Option[SchemaVer.Full]) extends Repr

/** Non-vanilla representation for UIs/non-validation clients */
case class Full(schema: Schema) extends Repr

/** Just URI string (but schema is on the server) */
case class Uri(schemaKey: SchemaKey) extends Repr

def apply(schema: Schema): Repr = Full(schema)
def apply(uri: SchemaMap): Repr = Uri(uri.schemaKey)
def apply(schema: SelfDescribingSchema[Json], supersededby: Option[SchemaVer.Full]): Repr = Canonical(schema, supersededby)
def apply(schema: Schema): Repr = Full(schema)
def apply(uri: SchemaMap): Repr = Uri(uri.schemaKey)
def apply(schema: SelfDescribingSchema[Json], supersededBy: Option[SchemaVer.Full]): Repr =
Canonical(schema, supersededBy)

sealed trait Format extends Product with Serializable
object Format {
Expand All @@ -98,38 +101,46 @@ object Schema {

sealed trait SchemaBody extends Product with Serializable
object SchemaBody {
case class SelfDescribing(schema: SelfDescribingSchema[Json],
supersedingInfo: Option[SupersedingInfo]) extends SchemaBody
case class BodyOnly(schema: Json) extends SchemaBody
case class SelfDescribing(schema: SelfDescribingSchema[Json], supersedingInfo: Option[SupersedingInfo])
extends SchemaBody
case class BodyOnly(schema: Json, supersedingInfo: Option[SupersedingInfo]) extends SchemaBody

implicit val schemaBodyCirceDecoder: Decoder[SchemaBody] =
Decoder.instance { json =>
Decoder.instance { cursor =>
for {
res <- SelfDescribingSchema.parse(json.value) match {
case Right(schema) => json.as[Option[SupersedingInfo]].map(s => SelfDescribing(schema, s))
case Left(_) => json.as[JsonObject].map(obj => BodyOnly(Json.fromJsonObject(obj)))
removed <- SupersedingInfo.removeSupersedingInfoFields(cursor)
supersedingInfo <- cursor.as[Option[SupersedingInfo]]
res <- SelfDescribingSchema.parse(removed) match {
case Right(schema) => SelfDescribing(schema, supersedingInfo).asRight
case Left(_) => removed.as[JsonObject].map(obj => BodyOnly(Json.fromJsonObject(obj), supersedingInfo))
}
} yield res
}
}

sealed trait SupersedingInfo extends Product with Serializable
object SupersedingInfo {
case class SupersededBy(version: SchemaVer.Full) extends SupersedingInfo
case class SupersededBy(version: SchemaVer.Full) extends SupersedingInfo
case class Superseded(versions: NonEmptyList[SchemaVer.Full]) extends SupersedingInfo

implicit val supersedingInfoDecoder: Decoder[Option[SupersedingInfo]] =
Decoder.instance { json =>
val self = json.downField("self")
self.downField("supersededBy") match {
json.downField(SupersededByField) match {
case _: FailedCursor =>
self.downField("supersedes") match {
json.downField(SupersedesField) match {
case _: FailedCursor => None.asRight
case c => c.as[NonEmptyList[SchemaVer.Full]].map(Superseded(_).some)
case c => c.as[NonEmptyList[SchemaVer.Full]].map(Superseded(_).some)
}
case c => c.as[SchemaVer.Full].map(SupersededBy(_).some)
}
}

def removeSupersedingInfoFields(json: HCursor) =
for {
map <- json.as[JsonObject].map(_.toMap)
r = map - SupersededByField - SupersedesField
j = Json.fromJsonObject(JsonObject.fromMap(r))
} yield j
}

sealed trait Format extends Product with Serializable
Expand All @@ -142,29 +153,35 @@ object Schema {
}
}

private def moveToFront[K, V](key: K, fields: List[(K, V)]) =
fields.span(_._1 != key) match {
case (previous, matches :: next) => matches :: previous ++ next
case _ => fields
private def moveToFront[K, V](keys: List[K], fields: List[(K, V)]): List[(K, V)] =
keys match {
case h :: t =>
fields.span(_._1 != h) match {
case (previous, matches :: next) => matches :: moveToFront(t, previous ++ next)
case _ => moveToFront(t, fields)
}
case Nil => fields
}

private def orderedSchema(schema: Json): Json =
schema.asObject match {
case Some(obj) =>
Json.fromFields(moveToFront(s"$$schema", moveToFront("self", moveToFront("metadata", obj.toList))))
val frontKeys = List(s"$$schema", SupersededByField, SupersedesField, "self", "metadata")
Json.fromFields(moveToFront(frontKeys, obj.toList))
case None => schema
}

private def supersededByJson(supersededBy: Option[SchemaVer.Full]): Json =
supersededBy.map(v => Json.obj(SupersededByField -> v.asString.asJson)).getOrElse(JsonObject.empty.asJson)

implicit val schemaEncoder: Encoder[Schema] =
Encoder.instance { schema =>
val supersededByJson = schema.supersededby.map { v =>
Json.obj("supersededBy" -> v.asString.asJson)
}.getOrElse(JsonObject.empty.asJson)
Json
.obj(
"self" -> schema.schemaMap.asJson.deepMerge(supersededByJson),
"self" -> schema.schemaMap.asJson,
"metadata" -> schema.metadata.asJson(Metadata.metadataEncoder)
)
.deepMerge(supersededByJson(schema.supersededBy))
.deepMerge(schema.body)
}

Expand All @@ -173,18 +190,14 @@ object Schema {
case Repr.Full(s) => orderedSchema(schemaEncoder.apply(s))
case Repr.Uri(u) => Encoder[String].apply(u.toSchemaUri)
case Repr.Canonical(schema, supersededBy) =>
orderedSchema(schema.normalize.asObject match {
case Some(obj) =>
val supersededByJson = supersededBy.map { v =>
JsonObject(
"self" -> Json.obj(
"supersededBy"-> v.asString.asJson
)
)
}.getOrElse(JsonObject.empty)
Json.fromJsonObject((s"$$schema", CanonicalUri.asJson) +: supersededByJson.deepMerge(obj))
case None => schema.normalize
})
orderedSchema {
Json
.obj(
s"$$schema" -> CanonicalUri.asJson
)
.deepMerge(supersededByJson(supersededBy))
.deepMerge(schema.normalize)
}
}

implicit val serverSchemaDecoder: Decoder[Schema] =
Expand All @@ -193,17 +206,19 @@ object Schema {
self <- cursor.value.as[SchemaMap]
meta <- cursor.downField("metadata").as[Metadata]
bodyJson <- cursor.as[JsonObject]
body = bodyJson.toList.filterNot { case (key, _) => key == "self" || key == "metadata" }
supersededby <- cursor.downField("self").downField("supersededBy").as[Option[SchemaVer.Full]]
} yield Schema(self, meta, Json.fromFields(body), supersededby)
body = bodyJson.toList.filterNot {
case (key, _) => List("self", "metadata", SupersededByField, SupersedesField).contains(key)
}
supersededBy <- cursor.downField(SupersededByField).as[Option[SchemaVer.Full]]
} yield Schema(self, meta, Json.fromFields(body), supersededBy)
}

implicit val schemaVerFull: Read[Option[SchemaVer.Full]] =
Read[Option[String]].map(_.flatMap(v => SchemaVer.parseFull(v).toOption))

implicit val schemaDoobieRead: Read[Schema] =
Read[(SchemaMap, Metadata, Json, Option[SchemaVer.Full])].map {
case (schemaMap, meta, body, supersededby) =>
Schema(schemaMap, meta, body, supersededby)
case (schemaMap, meta, body, supersededBy) =>
Schema(schemaMap, meta, body, supersededBy)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package service

import java.util.UUID

import cats.data.{Validated, EitherT, NonEmptyList}
import cats.data.{EitherT, NonEmptyList, Validated}
import cats.effect._
import cats.implicits._

Expand Down Expand Up @@ -160,9 +160,9 @@ class SchemaService[F[+_]: Sync](
permission: Permission,
json: SchemaBody
) = json match {
case SchemaBody.BodyOnly(body) =>
case SchemaBody.BodyOnly(body, supersedingInfo) =>
val schemaMap = SchemaMap(vendor, name, format, version)
publishSchema(isPublic, permission, SelfDescribingSchema(schemaMap, body), None)
publishSchema(isPublic, permission, SelfDescribingSchema(schemaMap, body), supersedingInfo)
case SchemaBody.SelfDescribing(schema, supersedingInfo) =>
val schemaMapUri = SchemaMap(vendor, name, format, version)
if (schemaMapUri == schema.self) publishSchema(isPublic, permission, schema, supersedingInfo)
Expand Down Expand Up @@ -203,9 +203,13 @@ class SchemaService[F[+_]: Sync](
case schemas => Ok(schemas)
}

private def addSchema(schema: SelfDescribingSchema[Json], isPublic: Boolean, supersedingInfo: Option[SupersedingInfo]) =
private def addSchema(
schema: SelfDescribingSchema[Json],
isPublic: Boolean,
supersedingInfo: Option[SupersedingInfo]
) =
for {
allowed <- isSchemaAllowed(db, schema.self, patchesAllowed, isPublic)
allowed <- isSchemaAllowed(db, schema.self, patchesAllowed, isPublic)
supersedingCheck <- checkSupersedingVersion(schema.self, supersedingInfo)
response <- (allowed, supersedingCheck) match {
case (Right(_), Right(s)) =>
Expand All @@ -220,7 +224,7 @@ class SchemaService[F[+_]: Sync](
} yield response
case (Left(Inconsistency.AlreadyExists), Right(Some(s))) =>
for {
_ <- updateSupersedingVersion(schema.self, Some(s))
_ <- updateSupersedingVersion(schema.self, Some(s))
response <- Ok(IgluResponse.SupersedingVersionUpdated(schema.self.schemaKey): IgluResponse)
} yield response
case (_, Left(error)) =>
Expand Down Expand Up @@ -252,13 +256,12 @@ class SchemaService[F[+_]: Sync](
case SupersedingInfo.SupersededBy(v) =>
// In here, we want to make sure that schema that is specified as 'supersededBy' exists in the db.
// Also, it is possible superseding schema is superseded by another schema. Therefore, we are
// trying to return its 'supersededby' field. If it is None, we return the schema's own version
// trying to return its 'supersededBy' field. If it is None, we return the schema's own version
// because it means that it isn't superseded by another schema.
val superseded = NonEmptyList.of(currSchema.schemaKey.version)
val supersededBy = db.getSchema(SchemaMap(currSchema.schemaKey.copy(version = v)))
.map { s: Option[Schema] =>
s.map(_.supersededby.getOrElse(v))
}
val supersededBy = db.getSchema(SchemaMap(currSchema.schemaKey.copy(version = v))).map {
s: Option[Schema] => s.map(_.supersededBy.getOrElse(v))
}
(superseded, supersededBy)
case SupersedingInfo.Superseded(superseded) =>
// In here, currSchema is superseding schema. This case can be reached if the schema is created
Expand All @@ -267,10 +270,9 @@ class SchemaService[F[+_]: Sync](
// Therefore, if it is None, we return the schema's own version. If schema exists, we will follow
// the same procedure as above to find superseding schema version.
val currVersion = currSchema.schemaKey.version
val supersededBy = db.getSchema(SchemaMap(currSchema.schemaKey.copy(version = currVersion)))
.map { s: Option[Schema] =>
s.flatMap(_.supersededby).orElse(currVersion.some)
}
val supersededBy = db.getSchema(SchemaMap(currSchema.schemaKey.copy(version = currVersion))).map {
s: Option[Schema] => s.flatMap(_.supersededBy).orElse(currVersion.some)
}
(superseded, supersededBy)
}
val res = for {
Expand Down Expand Up @@ -317,8 +319,8 @@ object SchemaService {
versions = schemas.map(_.schemaMap.schemaKey.version)
} yield
if ((previousPublic && isPublic) || (!previousPublic && !isPublic) || schemas.isEmpty)
VersionCursor.isAllowed(schemaMap.schemaKey.version, versions, patchesAllowed)
else Inconsistency.Availability(isPublic, previousPublic).asLeft
VersionCursor.isAllowed(schemaMap.schemaKey.version, versions, patchesAllowed)
else Inconsistency.Availability(isPublic, previousPublic).asLeft

/** Extract schemas from database, available for particular permission */
def isReadable(permission: Permission)(schema: Schema): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import cats.data.NonEmptyList
import cats.effect.{Bracket, Clock, Sync}
import cats.effect.concurrent.Ref
import io.circe.Json
import com.snowplowanalytics.iglu.core.{SchemaMap, SchemaVer, SchemaKey}
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer}
import com.snowplowanalytics.iglu.server.model.{Permission, Schema, SchemaDraft}
import com.snowplowanalytics.iglu.server.model.SchemaDraft.DraftId

Expand Down Expand Up @@ -115,7 +115,7 @@ case class InMemory[F[_]](ref: Ref[F, InMemory.State]) extends Storage[F] {
val supersededSchemaKey = SchemaMap(SchemaKey(vendor, name, "jsonschema", v))
m.updated(
supersededSchemaKey,
m(supersededSchemaKey).copy(supersededby = Some(supersededBy))
m(supersededSchemaKey).copy(supersededBy = Some(supersededBy))
)
}
_ <- ref.update(_.copy(schemas = schemas))
Expand Down
Loading

0 comments on commit 0a16d84

Please sign in to comment.