Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra: add methods for new actors API (Akka 2.6.4) #2195

Merged
merged 1 commit into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.stream.alpakka.cassandra.javadsl
import java.util.concurrent.CompletionStage

import akka.Done
import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.stream.alpakka.cassandra.{scaladsl, CassandraSessionSettings}
import com.datastax.oss.driver.api.core.CqlSession

Expand All @@ -21,9 +21,15 @@ import scala.compat.java8.FutureConverters._
object CassandraSessionRegistry {

/**
* Java API: get the session registry
* Get the session registry with new actors API.
*/
def get(system: ActorSystem): CassandraSessionRegistry =
def get(system: ClassicActorSystemProvider): CassandraSessionRegistry =
new CassandraSessionRegistry(scaladsl.CassandraSessionRegistry(system.classicSystem))

/**
* Get the session registry with the classic actors API.
*/
def get(system: akka.actor.ActorSystem): CassandraSessionRegistry =
new CassandraSessionRegistry(scaladsl.CassandraSessionRegistry(system))

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package akka.stream.alpakka.cassandra.scaladsl

import akka.actor.{ActorSystem, NoSerializationVerificationNeeded}
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.stream.ActorMaterializer
import akka.stream.{Materializer, SystemMaterializer}
import akka.stream.alpakka.cassandra.{CassandraMetricsRegistry, CassandraServerMetaData, CqlSessionProvider}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.OptionVal
Expand All @@ -33,7 +33,7 @@ import scala.util.control.NonFatal
*
* All methods are non-blocking.
*/
final class CassandraSession(system: ActorSystem,
final class CassandraSession(system: akka.actor.ActorSystem,
sessionProvider: CqlSessionProvider,
executionContext: ExecutionContext,
log: LoggingAdapter,
Expand All @@ -42,8 +42,8 @@ final class CassandraSession(system: ActorSystem,
onClose: () => Unit)
extends NoSerializationVerificationNeeded {

implicit private[akka] val ec = executionContext
private lazy implicit val materializer = ActorMaterializer()(system)
implicit private[akka] val ec: ExecutionContext = executionContext
private lazy implicit val materializer: Materializer = SystemMaterializer(system).materializer

log.debug("Starting CassandraSession [{}]", metricsCategory)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.Done
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.{
ActorSystem,
ClassicActorSystemProvider,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import akka.annotation.InternalStableApi
import akka.event.Logging
import akka.stream.alpakka.cassandra.{CassandraSessionSettings, CqlSessionProvider}
Expand All @@ -32,11 +34,11 @@ object CassandraSessionRegistry extends ExtensionId[CassandraSessionRegistry] wi
def createExtension(system: ExtendedActorSystem): CassandraSessionRegistry =
new CassandraSessionRegistry(system)

/**
* Java API: get the session registry
*/
override def get(system: ActorSystem): CassandraSessionRegistry =
super.get(system)
override def apply(system: ActorSystem): CassandraSessionRegistry = super.apply(system)

// This is not source compatible with Akka 2.6 as it lacks `overrride`
def apply(system: ClassicActorSystemProvider): CassandraSessionRegistry =
apply(system.classicSystem)

override def lookup(): ExtensionId[CassandraSessionRegistry] = this

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import scala.concurrent.ExecutionContext
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.{Materializer, SystemMaterializer}

/**
* All the tests must be run with a local Cassandra running on default port 9042.
Expand All @@ -24,9 +24,9 @@ abstract class CassandraSpecBase(_system: ActorSystem)
with Matchers
with CassandraLifecycle {

implicit val materializer: Materializer = ActorMaterializer()(_system)
implicit val materializer: Materializer = SystemMaterializer(_system).materializer
implicit val ec: ExecutionContext = system.dispatcher

lazy val sessionRegistry: CassandraSessionRegistry = CassandraSessionRegistry.get(system)
lazy val sessionRegistry: CassandraSessionRegistry = CassandraSessionRegistry(system)

}
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object Dependencies {
val ScalaVersions = Seq(Scala212, Scala211, Scala213).filterNot(_ == Scala211 && Nightly)

val Akka25Version = "2.5.30"
val Akka26Version = "2.6.1"
val Akka26Version = "2.6.4"
ennru marked this conversation as resolved.
Show resolved Hide resolved
val AkkaVersion = if (Nightly) Akka26Version else Akka25Version
val AkkaBinaryVersion = if (Nightly) "2.6" else "2.5"

Expand Down