diff --git a/cli/src/main/scala/laserdisc/cli/CLI.scala b/cli/src/main/scala/laserdisc/cli/CLI.scala index 780ebed7..4d528d83 100644 --- a/cli/src/main/scala/laserdisc/cli/CLI.scala +++ b/cli/src/main/scala/laserdisc/cli/CLI.scala @@ -71,7 +71,7 @@ object CLI extends IOApp.WithContext { self => val maybePort = Either.catchNonFatal(arg2.toInt).flatMap(Port.from).toOption (maybeHost, maybePort) match { - case (Some(ip), Some(port)) => IO(println(logo)) *> impl.mkStream(ip, port).compile.drain.as(ExitCode.Success) + case (Some(ip), Some(port)) => IO(println(logo)) >> impl.mkStream(ip, port).compile.drain.as(ExitCode.Success) case _ => IO(println("please supply valid host and port (space separated)")).as(ExitCode.Error) } diff --git a/fs2/src/main/scala/laserdisc/fs2/RedisClient.scala b/fs2/src/main/scala/laserdisc/fs2/RedisClient.scala index 23ae7d35..e83965d0 100644 --- a/fs2/src/main/scala/laserdisc/fs2/RedisClient.scala +++ b/fs2/src/main/scala/laserdisc/fs2/RedisClient.scala @@ -241,13 +241,13 @@ object RedisClient { new Connection[F] { override final def run: F[Unit] = - logger.info("Starting connection") *> + logger.info("Starting connection") >> runner(serverStream).interruptWhen(termSignal).compile.drain.attempt.flatMap { r => logger.info(s"Connection terminated: $r") } override final def shutdown: F[Unit] = - logger.info("Shutting down connection") *> termSignal.set(true) + logger.info("Shutting down connection") >> termSignal.set(true) override final def send[In <: HList, Out <: HList](in: In, timeout: FiniteDuration)( implicit protocolHandler: ProtocolHandler.Aux[F, In, Out] diff --git a/fs2/src/main/scala/laserdisc/fs2/RedisConnection.scala b/fs2/src/main/scala/laserdisc/fs2/RedisConnection.scala index 83362594..750f73f9 100644 --- a/fs2/src/main/scala/laserdisc/fs2/RedisConnection.scala +++ b/fs2/src/main/scala/laserdisc/fs2/RedisConnection.scala @@ -6,10 +6,10 @@ import java.nio.channels.AsynchronousChannelGroup import _root_.fs2._ import _root_.fs2.io.tcp.Socket -import cats.Applicative +import cats.{Applicative, FlatMap} import cats.effect.{ConcurrentEffect, ContextShift, Effect} import cats.syntax.applicative._ -import cats.syntax.apply._ +import cats.syntax.flatMap._ import laserdisc.protocol._ import log.effect.LogWriter import scodec.Codec @@ -38,8 +38,8 @@ object RedisConnection { private[fs2] final object impl { - def send[F[_]: Applicative](sink: Sink[F, Byte])(implicit log: LogWriter[F]): Sink[F, RESP] = - _.evalMap(resp => log.debug(s"sending $resp") *> resp.pure) + def send[F[_]: Applicative: FlatMap](sink: Sink[F, Byte])(implicit log: LogWriter[F]): Sink[F, RESP] = + _.evalMap(resp => log.debug(s"sending $resp") >> resp.pure) .through(streamEncoder.encode) .flatMap(bits => Stream.chunk(Chunk.array(bits.toByteArray))) .to(sink) @@ -71,7 +71,7 @@ object RedisConnection { _.through(framing) .flatMap(complete => streamDecoder.decode(complete.bits)) - .evalMap(resp => log.debug(s"receiving $resp") *> resp.pure) + .evalMap(resp => log.debug(s"receiving $resp") >> resp.pure) } }