Akka SSE adds support for Server-Sent Events (SSE) – a lightweight and standardized technology for pushing notifications from a HTTP server to a HTTP client – to akka-http. In contrast to WebSocket, which enables two-way communication, SSE only allows for one-way communication from the server to the client. If that's all you need, SSE offers advantages, because it's much simpler and relies on HTTP only.
The latest release of Akka SSE is version 1.6.1 which depends on Akka 2.4.2-RC1 and hence on Java 8. There's also version 1.5.0 which depends on akka-http 2.0.1 and Akka 2.3.x as well as version 1.1.0 which depends on akka-http 1.0.
Akka SSE is published to Bintray and Maven Central.
// All releases including intermediate ones are published here,
// final ones are also published to Maven Central.
resolvers += Resolver.bintrayRepo("hseeberger", "maven")
libraryDependencies ++= List(
"de.heikoseeberger" %% "akka-sse" % "1.6.1",
...
)
Akka SSE models server-sent events as Source[ServerSentEvent, Any]
with Source
from Akka Streams and
ServerSentEvent
from Akka SSE. ServerSentEvent
is a case class with the following fields:
data
of typeString
: payload, may be emptyeventType
of typeOption[String]
with defaultNone
: handler to be invoked, e.g. "message", "added", etc.id
of typeOption[String]
with defaultNone
: sets the client's last event ID stringretry
of typeOption[Int]
with defaultNone
: set the client's reconnection time
More info about the above fields can be found in the specification.
In order to produce server-sent events on the server as a response to a HTTP request, you have to bring the implicit
toResponseMarshaller
defined by the EventStreamMarshalling
trait or object into scope where you define your
respective route. Then you complete the HTTP request with a Source[ServerSentEvent]
:
object TimeServer {
...
def route(system: ActorSystem)(implicit ec: ExecutionContext, mat: Materializer) = {
import Directives._
import EventStreamMarshalling._
get {
complete {
Source.tick(2.seconds, 2.seconds, ())
.map(_ => LocalTime.now())
.map(dateTimeToServerSentEvent)
}
}
}
}
If you need periodic heartbeats, simply use the keepAlive
standard stage with a ServerSentEvent.heartbeat
:
Source.tick(2.seconds, 2.seconds, Unit)
.map(_ => LocalTime.now())
.map(dateTimeToServerSentEvent)
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
}
In order to consume server-sent events on the client as part of a HTTP response, you have to bring the implicit
fromEntityUnmarshaller
defined by the EventStreamUnmarshalling
trait or object into scope where you define your
response handling.
object TimeClient {
import EventStreamUnmarshalling._
...
Source.single(Get())
.via(Http().outgoingConnection("127.0.0.1", 8000))
.mapAsync(1)(Unmarshal(_).to[Source[ServerSentEvent, Any]])
.runForeach(_.runForeach(event => println(s"${LocalTime.now()} $event")))
}
Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.
This code is open source software licensed under the Apache 2.0 License.