Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote scheme provider #53

Merged
merged 8 commits into from
Aug 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/scala/schema/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ package object schema {
case class SchemaDefinitionProviderError(msg: String, maybeCause: Option[Throwable] = None)
extends SchemaError(msg, maybeCause)

object SchemaDefinitionProviderError {
def apply(e: Throwable): SchemaDefinitionProviderError =
SchemaDefinitionProviderError(e.getMessage, Some(e))
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could add a type alias for Either[SchemaDefinitionProviderError, ?] here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely

type SchemaResult[T] = Either[SchemaDefinitionProviderError, T]

case class SchemaValidatorError(msg: String, maybeCause: Option[Throwable] = None)
extends SchemaError(msg, maybeCause)

Expand Down
97 changes: 97 additions & 0 deletions core/src/main/scala/schema/provider/MetadataSchemaProvider.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.cassandra
package schema.provider

import cats.implicits._
import cats.{~>, MonadError}
import com.datastax.driver.core._
import freestyle.{FreeS, _}
import freestyle.cassandra.schema.provider.metadata.SchemaConversions
import freestyle.cassandra.schema.{SchemaDefinition, SchemaDefinitionProviderError, SchemaResult}
import troy.cql.ast.DataDefinition

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.language.postfixOps

class MetadataSchemaProvider(cluster: Cluster)
extends SchemaDefinitionProvider
with SchemaConversions {

def extractTables(keyspaceMetadata: KeyspaceMetadata): List[AbstractTableMetadata] =
keyspaceMetadata.getTables.asScala.toList

def extractIndexes(tableMetadataList: List[AbstractTableMetadata]): List[IndexMetadata] =
tableMetadataList.flatMap {
case (t: TableMetadata) => t.getIndexes.asScala.toList
case _ => Nil
}

def extractUserTypes(keyspaceMetadata: KeyspaceMetadata): List[UserType] =
keyspaceMetadata.getUserTypes.asScala.toList

override def schemaDefinition: SchemaResult[SchemaDefinition] = {

import freestyle.async.implicits._
import freestyle.cassandra.api._
import freestyle.cassandra.handlers.implicits._

import scala.concurrent.ExecutionContext.Implicits.global

implicit val clusterAPIInterpreter: ClusterAPI.Op ~> Future =
clusterAPIHandler[Future] andThen apiInterpreter[Future, Cluster](cluster)

def guarantee[F[_], A](fa: F[A], finalizer: F[Unit])(
implicit M: MonadError[F, Throwable]): F[A] =
M.flatMap(M.attempt(fa)) { e =>
M.flatMap(finalizer)(_ => e.fold(M.raiseError, M.pure))
}

def metadataF[F[_]](implicit clusterAPI: ClusterAPI[F]): FreeS[F, Metadata] =
clusterAPI.connect *> clusterAPI.metadata

def closeF[F[_]](implicit clusterAPI: ClusterAPI[F]): FreeS[F, Unit] =
clusterAPI.close

val fut: Future[SchemaResult[SchemaDefinition]] =
guarantee(
metadataF[ClusterAPI.Op].interpret[Future],
closeF[ClusterAPI.Op].interpret[Future]).attempt.map {
_.leftMap(SchemaDefinitionProviderError(_)) flatMap { metadata =>
val keyspaceList: List[KeyspaceMetadata] = metadata.getKeyspaces.asScala.toList
val tableList: List[AbstractTableMetadata] = keyspaceList.flatMap(extractTables)
val indexList: List[IndexMetadata] = extractIndexes(tableList)
val userTypeList: List[UserType] = keyspaceList.flatMap(extractUserTypes)

keyspaceList.traverse[SchemaResult, DataDefinition](toCreateKeyspace) |+|
tableList.traverse(toCreateTable) |+|
indexList.traverse(toCreateIndex(_)) |+|
userTypeList.traverse(toUserType)
}
}
Await.result(fut, 10.seconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a need to block if instead of all the methods in this class return a M[Whatever] where M[_] : AsyncContext and in this case is M[SchemaResult[SchemaDefinition]]? all the calls such as extractIndexes(tableList) etc.. would be part of a monadic chain instead of direct style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created ticket #52
Should I solve this on this PR? I'm ok with that, I was trying to keep this PR shorter as possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a new ticket is fine 👍

}
}

object MetadataSchemaProvider {

implicit def metadataSchemaProvider(implicit cluster: Cluster): SchemaDefinitionProvider =
new MetadataSchemaProvider(cluster)

}
37 changes: 28 additions & 9 deletions core/src/main/scala/schema/provider/TroySchemaProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,37 @@
package freestyle.cassandra
package schema.provider

import java.io.InputStream

import cats.syntax.either._
import freestyle.cassandra.schema._
import troy.cql.ast.CqlParser

class TroySchemaProvider(cql: String) extends SchemaDefinitionProvider {
class TroySchemaProvider(cqlF: => SchemaResult[String]) extends SchemaDefinitionProvider {

override def schemaDefinition: Either[SchemaDefinitionProviderError, SchemaDefinition] =
CqlParser.parseSchema(cql) match {
case CqlParser.Success(res, _) => Right(res)
case CqlParser.Failure(msg, next) =>
Left(
SchemaDefinitionProviderError(
s"Parse Failure: $msg, line = ${next.pos.line}, column = ${next.pos.column}"))
case CqlParser.Error(msg, _) => Left(SchemaDefinitionProviderError(msg))
override def schemaDefinition: SchemaResult[SchemaDefinition] =
cqlF.flatMap { cql =>
CqlParser.parseSchema(cql) match {
case CqlParser.Success(res, _) => Right(res)
case CqlParser.Failure(msg, next) =>
Left(
SchemaDefinitionProviderError(
s"Parse Failure: $msg, line = ${next.pos.line}, column = ${next.pos.column}"))
case CqlParser.Error(msg, _) => Left(SchemaDefinitionProviderError(msg))
}
}
}

object TroySchemaProvider {

def apply(cql: String): TroySchemaProvider = new TroySchemaProvider(Right(cql))

def apply(is: InputStream): TroySchemaProvider = new TroySchemaProvider(
Either.catchNonFatal {
scala.io.Source.fromInputStream(is).mkString
} leftMap { e =>
SchemaDefinitionProviderError(e.getMessage, Some(e))
}
)

}
200 changes: 200 additions & 0 deletions core/src/main/scala/schema/provider/metadata/SchemaConversions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.cassandra
package schema.provider.metadata

import cats.implicits._
import com.datastax.driver.core.{
AbstractTableMetadata,
ColumnMetadata,
IndexMetadata,
KeyspaceMetadata,
TupleType,
UserType,
DataType => DatastaxDataType
}
import freestyle.cassandra.schema.{SchemaDefinitionProviderError, SchemaResult}
import troy.cql.ast._
import troy.cql.ast.ddl.Keyspace.Replication
import troy.cql.ast.ddl.Table.PrimaryKey
import troy.cql.ast.ddl.{Field, Index, Table}

import scala.collection.JavaConverters._
import scala.language.postfixOps

trait SchemaConversions {

def toCreateKeyspace(keyspaceMetadata: KeyspaceMetadata): SchemaResult[CreateKeyspace] =
Either.catchNonFatal {
val name: String = Option(keyspaceMetadata.getName)
.getOrElse(throw new IllegalArgumentException("Schema name is null"))
val replication: Option[Replication] = Option(keyspaceMetadata.getReplication)
.flatMap { m =>
val seq = m.asScala.toSeq
if (seq.isEmpty) None else Option(Replication(seq.sortBy(_._1)))
}
CreateKeyspace(
ifNotExists = false,
keyspaceName = KeyspaceName(name),
properties = replication map (Seq(_)) getOrElse Seq.empty)
} leftMap (SchemaDefinitionProviderError(_))

def toCreateTable(metadata: AbstractTableMetadata): SchemaResult[CreateTable] =
Either.catchNonFatal {
for {
columns <- metadata.getColumns.asScala.toList.traverse(toTableColumn)
primaryKey <- toPrimaryKey(
metadata.getPartitionKey.asScala.toList,
metadata.getClusteringColumns.asScala.toList)
} yield
CreateTable(
ifNotExists = false,
tableName = TableName(Some(KeyspaceName(metadata.getKeyspace.getName)), metadata.getName),
columns = columns,
primaryKey = Some(primaryKey),
options = Seq.empty
)
} leftMap (SchemaDefinitionProviderError(_)) joinRight

def readTable(metadata: IndexMetadata): TableName =
TableName(Some(KeyspaceName(metadata.getTable.getKeyspace.getName)), metadata.getTable.getName)

def toCreateIndex(
metadata: IndexMetadata,
readTable: (IndexMetadata) => TableName = readTable): Either[
SchemaDefinitionProviderError,
CreateIndex] =
Either.catchNonFatal {
CreateIndex(
isCustom = metadata.isCustomIndex,
ifNotExists = false,
indexName = Option(metadata.getName),
tableName = readTable(metadata),
identifier = Index.Identifier(metadata.getTarget),
using =
if (metadata.isCustomIndex)
// The options are not visible in the IndexMetadata class
Some(Index.Using(metadata.getIndexClassName, None))
else None
)
} leftMap (SchemaDefinitionProviderError(_))

def toUserType(userType: UserType): SchemaResult[CreateType] =
Either.catchNonFatal {
userType.getFieldNames.asScala.toList.traverse { fieldName =>
toField(fieldName, userType.getFieldType(fieldName))
} map { list =>
CreateType(
ifNotExists = false,
typeName = TypeName(Some(KeyspaceName(userType.getKeyspace)), userType.getTypeName),
fields = list)
}
} leftMap (SchemaDefinitionProviderError(_)) joinRight

private[this] def toField(
name: String,
datastaxDataType: DatastaxDataType): SchemaResult[Field] =
toDataType(datastaxDataType) map { dataType =>
Field(name, dataType)
}

private[this] def toTableColumn(metadata: ColumnMetadata): SchemaResult[Table.Column] =
toDataType(metadata.getType).map { dataType =>
Table.Column(
name = metadata.getName,
dataType = dataType,
isStatic = metadata.isStatic,
isPrimaryKey = false)
}

private[this] def toDataType(dataType: DatastaxDataType): SchemaResult[DataType] = {

import DatastaxDataType._

def toDataTypeNative(dataType: DatastaxDataType): SchemaResult[DataType.Native] =
dataType.getName match {
case Name.ASCII => DataType.Ascii.asRight
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is asRight defined? as suually denotes an internal cast and if this is is just lifting the value into Either.Right we should use toRight or the right() syntax from cats.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asRight is the new syntax in Cats (see PR typelevel/cats#1454).

asRight and asLeft where chosen so they can't be confused with the .right and .left methods on Either for RightProjection and LeftProjection.

case Name.BIGINT => DataType.BigInt.asRight
case Name.BLOB => DataType.Blob.asRight
case Name.BOOLEAN => DataType.Boolean.asRight
case Name.COUNTER => DataType.Counter.asRight
case Name.DATE => DataType.Date.asRight
case Name.DECIMAL => DataType.Decimal.asRight
case Name.DOUBLE => DataType.Double.asRight
case Name.FLOAT => DataType.Float.asRight
case Name.INET => DataType.Inet.asRight
case Name.INT => DataType.Int.asRight
case Name.SMALLINT => DataType.Smallint.asRight
case Name.TEXT => DataType.Text.asRight
case Name.TIME => DataType.Time.asRight
case Name.TIMESTAMP => DataType.Timestamp.asRight
case Name.TIMEUUID => DataType.Timeuuid.asRight
case Name.TINYINT => DataType.Tinyint.asRight
case Name.UUID => DataType.Uuid.asRight
case Name.VARCHAR => DataType.Varchar.asRight
case Name.VARINT => DataType.Varint.asRight
case _ =>
Left(SchemaDefinitionProviderError(s"Native DataType ${dataType.getName} not supported"))
}

def toCollectionType(collectionType: CollectionType): SchemaResult[DataType] = {

val typeArgs: List[DatastaxDataType] = collectionType.getTypeArguments.asScala.toList

val maybeCol = collectionType.getName match {
case Name.LIST =>
typeArgs.headOption map { typeArg =>
toDataTypeNative(typeArg) map DataType.List
}
case Name.SET =>
typeArgs.headOption map { typeArg =>
toDataTypeNative(typeArg) map DataType.Set
}
case Name.MAP =>
for {
t1 <- typeArgs.headOption
t2 <- typeArgs.tail.headOption
} yield (toDataTypeNative(t1) |@| toDataTypeNative(t2)).map(DataType.Map)
case _ => None
}

maybeCol getOrElse {
Left(
SchemaDefinitionProviderError(
s"Error parsing collection DataType '${collectionType.asFunctionParameterString()}'"))
}
}

def toTupleType(tupleType: TupleType): SchemaResult[DataType] =
tupleType.getComponentTypes.asScala.toList traverse toDataTypeNative map DataType.Tuple

dataType match {
case nativeType: NativeType => toDataTypeNative(nativeType)
case customType: CustomType => Right(DataType.Custom(customType.getCustomTypeClassName))
case collectionType: CollectionType => toCollectionType(collectionType)
case tupleType: TupleType => toTupleType(tupleType)
case userType: UserType =>
Right(DataType.UserDefined(KeyspaceName(userType.getKeyspace), userType.getTypeName))
}
}

private[this] def toPrimaryKey(
partitionKeys: List[ColumnMetadata],
clusteringColumns: List[ColumnMetadata]): SchemaResult[PrimaryKey] =
PrimaryKey(partitionKeys.map(_.getName), clusteringColumns.map(_.getName)).asRight

}
2 changes: 1 addition & 1 deletion core/src/main/scala/schema/provider/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package schema
package object provider {

trait SchemaDefinitionProvider {
def schemaDefinition: Either[SchemaDefinitionProviderError, SchemaDefinition]
def schemaDefinition: SchemaResult[SchemaDefinition]
}

}
Loading