Skip to content

Commit

Permalink
Cassandra: add methods for new actors API (Akka 2.6.4) (#2195)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Mar 17, 2020
1 parent 4fa78b6 commit 1a49144
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.stream.alpakka.cassandra.javadsl
import java.util.concurrent.CompletionStage

import akka.Done
import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.stream.alpakka.cassandra.{scaladsl, CassandraSessionSettings}
import com.datastax.oss.driver.api.core.CqlSession

Expand All @@ -21,9 +21,15 @@ import scala.compat.java8.FutureConverters._
object CassandraSessionRegistry {

/**
* Java API: get the session registry
* Get the session registry with new actors API.
*/
def get(system: ActorSystem): CassandraSessionRegistry =
def get(system: ClassicActorSystemProvider): CassandraSessionRegistry =
new CassandraSessionRegistry(scaladsl.CassandraSessionRegistry(system.classicSystem))

/**
* Get the session registry with the classic actors API.
*/
def get(system: akka.actor.ActorSystem): CassandraSessionRegistry =
new CassandraSessionRegistry(scaladsl.CassandraSessionRegistry(system))

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package akka.stream.alpakka.cassandra.scaladsl

import akka.actor.{ActorSystem, NoSerializationVerificationNeeded}
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
import akka.stream.{Materializer, SystemMaterializer}
import akka.stream.alpakka.cassandra.{CassandraMetricsRegistry, CassandraServerMetaData, CqlSessionProvider}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.OptionVal
Expand All @@ -33,7 +33,7 @@ import scala.util.control.NonFatal
*
* All methods are non-blocking.
*/
final class CassandraSession(system: ActorSystem,
final class CassandraSession(system: akka.actor.ActorSystem,
sessionProvider: CqlSessionProvider,
executionContext: ExecutionContext,
log: LoggingAdapter,
Expand All @@ -42,8 +42,8 @@ final class CassandraSession(system: ActorSystem,
onClose: () => Unit)
extends NoSerializationVerificationNeeded {

implicit private[akka] val ec = executionContext
private lazy implicit val materializer = ActorMaterializer()(system)
implicit private[akka] val ec: ExecutionContext = executionContext
private lazy implicit val materializer: Materializer = SystemMaterializer(system).materializer

log.debug("Starting CassandraSession [{}]", metricsCategory)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.Done
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import akka.annotation.InternalStableApi
import akka.event.Logging
import akka.stream.alpakka.cassandra.{CassandraSessionSettings, CqlSessionProvider}
Expand All @@ -32,11 +34,11 @@ object CassandraSessionRegistry extends ExtensionId[CassandraSessionRegistry] wi
def createExtension(system: ExtendedActorSystem): CassandraSessionRegistry =
new CassandraSessionRegistry(system)

/**
* Java API: get the session registry
*/
override def get(system: ActorSystem): CassandraSessionRegistry =
super.get(system)
override def apply(system: ActorSystem): CassandraSessionRegistry = super.apply(system)

// This is not source compatible with Akka 2.6 as it lacks `overrride`
def apply(system: ClassicActorSystemProvider): CassandraSessionRegistry =
apply(system.classicSystem)

override def lookup(): ExtensionId[CassandraSessionRegistry] = this

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import scala.concurrent.ExecutionContext
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.{Materializer, SystemMaterializer}

/**
* All the tests must be run with a local Cassandra running on default port 9042.
Expand All @@ -24,9 +24,9 @@ abstract class CassandraSpecBase(_system: ActorSystem)
with Matchers
with CassandraLifecycle {

implicit val materializer: Materializer = ActorMaterializer()(_system)
implicit val materializer: Materializer = SystemMaterializer(_system).materializer
implicit val ec: ExecutionContext = system.dispatcher

lazy val sessionRegistry: CassandraSessionRegistry = CassandraSessionRegistry.get(system)
lazy val sessionRegistry: CassandraSessionRegistry = CassandraSessionRegistry(system)

}
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object Dependencies {
val ScalaVersions = Seq(Scala212, Scala211, Scala213).filterNot(_ == Scala211 && Nightly)

val Akka25Version = "2.5.30"
val Akka26Version = "2.6.1"
val Akka26Version = "2.6.4"
val AkkaVersion = if (Nightly) Akka26Version else Akka25Version
val AkkaBinaryVersion = if (Nightly) "2.6" else "2.5"

Expand Down

0 comments on commit 1a49144

Please sign in to comment.