This document outlines the approach that MicroProfile will use to adopting the reactive paradigm for programming and architecting microservices.
Reactive systems are described in The Reactive Manifesto. This calls for a coherent approach for architecting systems that allows them to be responsive, by creating loosely coupled components that are resilient to failure and elastic in the face of changing load. At the heart of the approach to loosely coupling components is the idea of being non blocking, using asynchronous message passing to ensure components are not impacted by each others availability.
Reactive is associated with asynchronous programming, using non blocking IO. While asynchronous programming is certainly an important part of reactive, as it applies to a microservice platform such as MicroProfile, it is also much bigger than just asynchronous programming. Reactive impacts the way we architect our systems, and this most prominently features in the way services communicate with each other - services that communicate asynchronously, and hence are able to run autonomously, are able to scale, fail and evolve independently. Not only does this help a system to be reactive, it allows a microservice based system to realise the full agility of the microservice architecture, and overcome many of the problems that microservices introduce when compared to monoliths.
Hence there are two primary concerns when applying reactive to MicroProfile. The first is the application of reactive programming to the APIs offered by MicroProfile. The second is in deciding what new features, APIs or projects, if any, are needed to allow developers to create reactive system architectures.
In order to provide reactive programming APIs, abstractions are required to allow developers to compose multiple asynchronous operations together.
A naïve approach to offering an asynchronous API called Foo
may be to offer a way to register a callback to receive an asynchronously provided value. For example, that callback interface might look like:
interface FooEventListener {
void onFooEvent(FooEvent fooEvent);
}
Just considering the above API, there are a number of problems with this:
-
This specific API require/s specific code to implement it. If the developer wants to connect this API to an asynchronous API offered by another component, say
Bar
, the developer needs to write an adapter that adapts the API offered byBar
to the API offered byFoo
. And then someone needs to build another adapter for every permutation of APIs that we might want to integrate each other. Of course, this isn’t so bad given thatFooEventListener
is a SAM (Single Abstract Method) and so I can use method references, but… -
The above listener offers no way to receive errors. There are multiple ways that this can be addressed, but none of them are ideal. For example, we might add another method,
onError(Throwable)
. The addition of this method means it’s no longer a SAM, and so can no longer exploit the convenience of Java 8 method references, and also means when connecting two APIs, We now need to adapt the error handling semantics of each API, which may be subtly different. -
This API offers no mechanism for exerting backpressure asynchronously. The best we can do is to synchronously block the thread before returning from
onFooEvent
. If in our processing of the event we want to use an asynchronous database API to persist data, there’s no way to tell the invoker of the listener to wait until the database call is finished. If the database operation is slower than the incoming events are arriving, we have two options, buffer the events we receive and risk running out of memory, or drop the events. Even if it did offer a backpressure mechanism, there are many different ways that backpressure can be implemented, and adapting them to each other is non trivial. -
This API offers no mechanism for ensuring thread safety, especially if we are integrating multiple APIs with callbacks registered with all of them. If we are updating some state using this API, and we also have a
BarEventListener
registered that could execute at the same time to update the state, how do we ensure that these two callbacks execute with exclusion? How do we ensure that the right memory fences are in place before they are executed? The use of thesynchronized
keyword results in blocking, and opens a risk of deadlocks, the use ofAtomicReference
and otherjava.util.concurrent
constructs is overly burdensome for an application developer who just wants to solve their domain problem. -
When using Java 8 lambdas to implement an API like this, this API is susceptible to callback hell. This is a phenomenon that shows itself in two primary ways, firstly the code becomes unreadable due to sequential operations being spread out through deeply nested lambdas. Secondly, its very easy to drop signals, particularly error signals, by forgetting to handle them correctly. The result is that a process just stops mid way through, leaving the developer with no idea why it stopped, with no way to find out, not even with a thread dump.
-
There is no explicitly defined execution context for this API, and no way to customise that context. If thread locals are needed, there’s no way to ensure that this API executes its callbacks with those thread locals and that the stored objects have not be erased by another call using the same thread.
-
Any monitoring solutions that want to trace the flow of control through this API need to be explicitly aware of the API and what it does, so they can explicitly instrument it to carry correlation identifiers. Otherwise, explicit propagation of monitoring structures is required.
And that’s just the problems with one possible trivial API for asynchronous programming. In order to safely provide asynchronous programming features, high level, generally applicable patterns and components are needed to ensure compatibility, correct error propagation, backpressure propagation, thread safety, and clean code when using these features. Unfortunately, there already exist APIs in Java EE (such as JSR 356 WebSockets) that suffer from many of these problems (and I should point out that APIs like JSR 356 were created before standard solutions to these problems existed, so they shouldn’t be blamed for this). It’s important that going forward, we ensure that these same problems aren’t reintroduced.
In general, there are two different types of operations that we want to do when doing asynchronous programming. The first is an operation that produces exactly one value or signals, or fails. Examples of such operations include:
-
Producing an asynchronous response for an incoming HTTP request
-
Making asynchronous requests on remote systems using async and non-blocking IOs
-
Asynchronous database queries and updates using an async and non-blocking client
-
Executing an asynchronous operation and waiting for its completion
The second is an operation that produces many, or a stream of values, with failure and completion at the stream level. Examples of such operations include:
-
Sending and receiving messages to/from a WebSocket
-
Streaming bytes to/from an HTTP request/response body
-
Streaming results from a large database result set
-
Sending and receiving messages to/from a message broker
There exist two standard APIs in the JDK that provide abstractions for representing these operations, they are java.util.concurrent.CompletionStage
, and Reactive Streams à la java.util.concurrent.Flow
.
CompletionStage
is a great abstraction for handling a single value or signal. It offers the following features:
-
A standard interface that can be shared/passed between many asynchronous APIs
-
Values can be transformed using
thenApply
operator. -
Sequential composition can be achieved using the
thenCompose
operator allowing to chain asynchronous operations -
Parallel composition can be achieved using the
allOf
method -
Error handling is well defined - errors propagate through chains of completion stages, with features for recovering from and handling errors at any point in the chain.
-
Thread safety is well defined, with callbacks running between memory fences.
-
Backpressure inherent in the redemption of the value.
-
Customizable execution contexts, allowed by passing an explicit executor into methods like
thenApplyAsync
,thenComposeAsync
.
CompletionStage
should be used for all APIs that asynchronously produce or consume exactly one value or signal. In some places the CompletionStage
may be returned by an API, like so:
CompletionStage<Response> response = someApi.makeRequest();
In other places, a CompletionStage
may be returned by application code, like so:
@Path("/")
class MyResource {
@GET
CompletionStage<Response> handleRequest() {
return someOtherApi.doOperation()
.thenApply(result -> Response.ok());
}
}
The above examples are similar to the way JAX-RS 2.1 handles asynchronous calls. Sometimes, an API might have an existing blocking variant, and the asynchronous API is being added alongside it. To support that, it is recommended to append the Async
suffix to the asynchronous variant of the method, for example:
Response response1 = someApi.makeRequest();
CompletionStage<Response> response2 = someApi.makeRequestAsync();
CompletionStage
may also be used to signal zero or one values, by wrapping the value in an Optional
. For example, a database API may offer a way to get a single row, if one exists. In which case, CompletionStage<Optional<Row>>
. may be used. This is preferred over returning null
, as it makes clear that the value may not be present.
CompletionStage<Optional<Person>> maybe = someApi.findByName("john");
Sometimes there is a need to signify completion, either successfully or with an error, with no value. For example, a database update may not return any value. In such case, CompletionStage<Void>
should be used, which gets redeemed with null
when successful. This may be a little unfamiliar to users, and the requirement to use null
is certainly not immediately obvious. An alternative might be to introduce a unit type or a sentinel object, for example Akka has a singleton Done
type for this purpose. The problem with this is though that it’s not clear where Done
should live, the JDK would probably make the most sense, but that depends on changes to the JDK.
There are also some cases where a finite number of values will be made available in the future, all at once. For example, a query for paged results from a database or a REST API will return a finite number of values at once. In this case, a CompletionStage
of an appropriate collection type, such as List
, should be used.
CompletionStage<List<Person>> maybe = someApi.retrieveTop10();
Reactive Streams is an asynchronous streaming API, produced by a collaboration of engineers representing Netflix, Red Hat, Pivotal, Oracle, Lightbend and others. It was adopted by the JDK in JDK9.
Reactive Streams provides well defined semantics for data flow, backpressure, error propagation, completion and cancelling, thread safety, infinite recursion prevention, and other things, allowing two implementations of Reactive Streams to integrate seamlessly with no specific support beyond the Reactive Streams specification in either of them. To get a feel for how well defined the semantics are, read through the spec. It also has a TCK that does a thorough job of ensuring implementations implement the spec correctly and completely.
Reactive Streams should be used for all cases when multiple values are being received asynchronously over time. A distinction here should be made between receiving multiple values at once (such as the paged results use case discussed above with CompletionStage
) and receiving values asynchronously over time. Reactive Streams should not be used for the former use case, only the latter.
It should be stressed that Reactive Streams is intended to be used as an integration API, not an application developer API. Libraries are meant to implement Reactive Streams interfaces, not application developers, the most that application developers should do is pass around instances of Publisher and Subscriber, and perhaps plumb them together via the subscribe method. This blog post does a good job of demonstrating why application developers should never implement their own publishers or subscribers, showing how just implementing an incredibly simple publisher is incredibly difficult to get right, not just to implement the requirements of the spec, but to get the thread safety and concurrency concerns correct.
When offering byte streams, eg, request/response bodies, or database blobs, then Publisher<ByteBuffer>
/Subscriber<ByteBuffer>
should be offered as the API. The byte buffers passed to application developer code should be unpooled, non reusable, unmodifiable buffers, and byte buffers received from application developer code should not be mutated by the library.
There currently exist two Reactive Streams APIs. The first is provided by http://www.reactive-streams.org/, and lives in the org.reactivestreams
package. The second is provided by JDK9, and lives as inner interfaces of the java.util.concurrent.Flow
class. Both APIs are identical in everything but namespace. The JDK9 would require MicroProfile to move to a baseline supported JDK version of JDK9 before it can be adopted.
For APIs that are introduced before that happens, we need a strategy for how to support Reactive Streams using the org.reactivestreams
version that will be backwards compatible with adding support for the JDK9 version in future, while giving us a path to phase out, rather than breaking, the org.reactivestreams
support.
There are a number of strategies that should be used, depending on the use case:
-
Some CDI based APIs are not strongly typed, eg. a user might implement a method that returns a
Publisher
, and annotates it to indicate that it’s a messaging stream. The framework interacts with this method using reflection, and so can transparently add support for JDK9 flows later, with no impact on user code. -
An API that accepts a
Publisher
orSubscriber
can be overloaded to support the JDK9 types in future. -
When an API has to return a
Publisher
orSubscriber
, or accept aPublisher
orSubscriber
type parameter, a way to future proof this is to decide on a way to disambiguate these methods with different names. It’s recommended that theorg.reactivestreams
variant addsRs
to the name, for examplegetRsPublisher
. When JDK9 support is added, theFlow
types can drop theRs
, allowing the existing methods to coexist for backwards compatibility.
The reason behind this last strategy are the following:
-
An API that accepts a
Publisher
orSubscriber
as a generic type of another type can’t be overloaded, since they will have the same binary signature after erasure. For example, something accepts aSupplier<Subscriber>
. A possible option here would be to accept purpose built SAMs, this solves the binary problem, however in practice this often doesn’t work well with Java type inference with lambdas, it’s far too easy for developers to run into edge cases that javac can’t resolve. -
An API that returns a
Publisher
orSubscriber
can’t be overloaded, as the Java compiler doesn’t allow overloading by return type.
As a result, an API that returns an org.reactivestreams.Subscriber
might be written like this:
org.reactivestreams.Subscriber<T> getRsSubscriber();
When that API migrates to Flow
, it will be changed to this:
java.util.concurrent.Flow.Subscriber<T> getSubscriber();
@Deprecated
default org.reactivestreams.Subscriber<T> getRsSubscriber() {
return new RsSubscriberWrapper(getSubscriber());
}
One of the major shortcomings of adopting Reactive Streams at present is the lack of a standard API for manipulating them unlike CompletableFuture/CompletionStage
. Consider a use case where we want to connect a source, Publisher<Foo>
to a sink, Subscriber<Bar>
, and we have a function, Function<Foo, Bar>
to do the transformation. There doesn’t exist any method in Reactive Streams that allows a developer to apply that transformation function to each element. Instead, they would have to write their own Publisher
/Subscriber
that wrapped the provided publisher/subscriber to do the transformation, which not only is a lot of boilerplate, it’s strongly discouraged that users write their own implementations of Publisher
and Subscriber
. A simple map
transformation may be trivial to write, but it gets far more complex with things like filter, where you drop elements and so need to work with demand, and then substreams, asynchronous mappings, etc, get even worse.
Of course, this isn’t a problem for most existing users of Reactive Streams, because there exist a number of third party libraries that provide these transformations, such as map
/filter
/flatMap
. These libraries include Akka Streams, RxJava 2 or Reactor. Each of these libraries has its differences, advantages and disadvantages, such as using an lifted API to declare the stream vs directly manipulating the stream, the execution model, and whether JDK types for functions and futures are embraced or not. However, for a Java standard like MicroProfile, requiring developers to bring in a third party library to do these elementary operations is not acceptable.
The MicroProfile Reactive Streams Operators seeks to fill this gap. This specification is intended to eventually be proposed as an API for the JDK, but for now it is incubating in MicroProfile.
A reactive system architecture allows different services to function autonomously. This means they must be able to be deployed independently, they must be able to fail and scale independently, they must, as much as possible, not depend on other services to fulfil their business process correctly.
There are a number of common pitfalls that application developers can fall into when architecting their systems that technology can help with. These pitfalls include:
-
Chains of synchronous communication. In this context, synchronous communication is communication where the initiator has to wait for the receiver to finish and perhaps respond in order for the communication to receive. It requires both the iniiator and the receive to be running at the same time, hence why it is considered synchronous. The reason this is a problem is that it only takes one service in the chain to be unavailable, and every service in the chain is unable to fulfill its business purpose. It only takes one service in the chain to be responding slowly, and every service in the chain will resond slowly. And so systems that chain synchronous communication together end up with resilience and performance characteristics that are the sum of the worst resilience and performance characteristics in the system. The primary way in which this pitfall manifests is through chains of REST calls.
-
Relying on non existent transactions. Where in a monolith, developers could perform updates on many tables and rely on ACID transactions from their database to ensure that these were done safely, in microservices, each service has its own database and so updates to multiple services can’t be done in a single ACID transaction. The result is that services can become inconsistent with partial updates due to failure, uncommitted reads, and concurrency is left unhandled.
It is important that a technology for implementing microservices gives the tools to developers necessary to solve these pitfalls - and not only that, but it should do its best to ensure developers are aware of these pitfalls, and encourage them to adopt practices and architectural patterns that avoid them.
Asynchronous messaging plays an important part in solving the pitfalls of implementing microservices. It allows chains of synchronous communication to be broken, by ensuring that each service in the chain doesn’t depend on the next service being available or responsive. It also helps to implement eventual consistency, by providing at least once messaging guarantees, working around the lack of ACID transactions for operations on a microservices system.
In order for asynchronous messaging to achieve this goal of at least once messaging, it’s important that the developer doesn’t need to worry about ensuring messages are delivered to the message broker, but rather that the container guarantees that for them. Traditional imperative APIs that send messages via mechanism like invoking a method called send
make it impossible to achieve this, since if the send fails, it will be left up to the developer to either retry or rollback the current transaction, and indeed it is impossible to both update a database and send a message to a message broker atomically.
Hence, an API where not just receiving messages, but sending messages are managed by the implementation, rather than the developer, is needed. The MicroProfile Reactive Messaging specification seeks to provide this.
It’s impossible to both update a database and publish to a message broker atomically, yet this is a very common requirement when using asynchronous messaging. Distributed transactions based on two-phase commit (2PC) unfortunately don’t solve the problem (since the problem is an instance of the unsolveable Two Generals Problem), 2PC only reduces the window in which failure can cause non convergence, however that window is generally widened when system resources are strained, eg due to GC pauses from memory starvation and dropped packets from network congestion, and its under these scenarios that failure is most likely to occur. The result is that when you most need 2PC, it is of least help. It also introduces performance and scaling bottlenecks, and introduces strong coupling of the availability of all components inovlved in the transaction.
An alternative to solving the problem is, rather than publishing messages directly to a broker, publish them to an event log. In doing this, we introduce a segregation between the responsibility of updating the database in response to the command, and the responsibility for the effective querying of the results of the command to publish to the message broker. This is known as Command Query Responsibility Segragation (CQRS), and through its implementation, the message broker can be allowed to fail independently of the database, and so atomic update and publish is not needed.
In a typical setup, when a service wants to update the database and publish an event to another service, it will do the update and append the event to the event log in the same database, in the same transaction, ensuring that these two operations are atomic. Then, a background process will poll the database for new events, and publish them to the message broker, using an offset based tracking mechanism for ensuring at least once delivery of these events to the broker.
There is an impedence mismatch between modelling your domain using CRUD operations, and propagating events through an event log. The problem is that you effectively have two sources of truth, the data is duplicated between tables updated using CRUD, and the events in the log. Ensuring that these are consistent puts a heavy burden on application developers to be very careful about persisting the right evets with the right data at the right times, and resolving a conflict can be difficult.
An alternative approach is to remove the CRUD maintained tables, and just store the data in the events. When the current state needs to be calculated, eg, for validating the next command, the events can be replayed to rebuild the current state. Efficient queries can be implemented using the CQRS mechanism to build a read side view of the data - this is a duplication of the data, but unlike before, it is clear which is the source of truth, the event log is the source of truth, and if the read side view disagrees, then it must be invalidated, which might mean fixing the bug that caused the inconsistency, then dropping the read side view tables, and allow it to be recreated by replaying all the events from the start.
Event souring has another advantage in that it means that the persistence approach maps nicely to the architecture of a system that is based on asynchronous propagation of events. There is a strong correlation between what is stored in the database, and what is being communicated between services, which facilitates communication between teams and developers as they describe the behaviour and interfaces of their systems to each other.