Welcome! flink-jpmml
is a fresh-made library for dynamic real time machine learning predictions built on top of
PMML standard models and Apache Flink
streaming engine.
flink-jpmml
is ease to use, running at serious scale, backend independent and naturally shaped to streaming
scenario.
In order to getting started, you only need
- any well-known version of a PMML model (3.2 or above)
- flink-jpmml is tested with the latest Flink (i.e. 1.3.2), but any working Apache Flink version (repo) should work properly.
-
if you employ sbt add the following dependecy to your project:
- Snapshot:
"io.radicalbit" %% "flink-jpmml-scala" % "0.7.0-SNAPSHOT"
- Stable:
"io.radicalbit" %% "flink-jpmml-scala" % "0.6.1"
- Snapshot:
-
For maven users instead:
- Snapshot
<dependencies> <dependency> <groupId>io.radicalbit</groupId> <artifactId>flink-jpmml-scala</artifactId> <version>0.7.0-SNAPSHOT</version> </dependency> </dependencies>
- Stable:
<dependencies> <dependency> <groupId>io.radicalbit</groupId> <artifactId>flink-jpmml-scala</artifactId> <version>0.6.1</version> </dependency> </dependencies>
Eventually, you can publish flink-jpmml
on your local repository. Then
- execute within the flink-jpmml root
> sbt
- select flink-jpmml-scala project
> project flink-jpmml-scala
- publish the library to your local repo
> publishLocal
Keep in mind you will need also Flink scala-core
flink-streaming
and flink-clients
libraries.
Lets start.
flink-jpmml
enables Flink users to execute real time predictions based on machine learning models trained by any
system supporting the PMML
standard; this allows efficient streaming model serving along with the powerful Flink engine features.
Since 0.6.0
the project supports dynamic streaming model serving efficiently. For more information
we suggest to watch the related talk
presented at last Flink Forward 2017 in Berlin.
First of all, we indentify univocally models by the ModelId
abstraction, made of an applicationName
and a version.
e.g. Suppose you have two ensemble models A and B (PMML based) where A has a depth level 10 on width 100 and B depth 5 on width 200, and you desire to have a comparison between them, so likely you can identify
applicationName
SVM andversions
A and B.
flink-jpmml
does not store models within its operator state, but related metadata information.
The operator is able to retrieve models from your distributed backend exploiting the concept of
metadata table. Then, your PMML models have to be persisted in a backend system
(see here for supported systems).
If you want to use dynamic model evaluation you're going to define the following streams:
DataStream[ServingMessage]
this stream is the main user tool to feed the operator with necessary model information; here, the user is not demanded to send by stream its PMML models but only the requested descriptive metadata. The user should employServingMessage
ADT in order to feed this stream. By now, the user can define the following two messages:AddMessage
it requires anapplicationName
Java UUID formatted, aversion
, the model sourcepath
and a timestampDelMessage
it requires anapplicationName
Java UUID formatted, aversion
and a timestamp
DataStream[BaseEvent]
your input stream should extend theBaseEvent
trait and defining the stringmodelId
formatted as"<modelApplication>_<modelVersion>"
and a timestamp.
Given the streams above you can achieve predictions way easily.
import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector
import org.apache.flink.streaming.api.scala._
import io.radicalbit.flink.pmml.scala.models.control.ServingMessage
...
val inputStream: DataStream[_ <: BaseEvent] = yourInputStream
val controlStream: DataStream[ServingMessage] = yourControlStream
val predictions =
inputStream
.withSupportStream(controlStream)
.evaluate { (event, model) =>
val vector = event.toVector
val prediction = model.predict(vector)
prediction
}
The features of flink-jpmml PMML models are better discussed here: you will find several ways to
handle your predictions. All the concepts introduced along the first flink-jpmml
, i.e. how the model is built within
the operator, the operator configuration and so forth have been retained and are well described below.
We kept also the single operator model explained later if you want to bind a specific model to an operator instance.
When an event A comes, it declares by its modelId
which is the model it needs to be evaluated against.
If the model has not been uploaded within the operator yet, the latter will exploit the metadata information
to lazily retrieve the targeted model from the underlying distributed backend.
The control stream is the right tool for the user to provide the global picture of the models available to your platform (this well fits a model repository server concept). You will use this stream to feed the operator with the information useful to your input events in order to let them grab easily the models.
If the events are able to find the targeted models, the prediction is computed and a Prediction
(based on ADT) outcome is returned, otherwise
the outcome will be an EmptyPrediction
.
Supposing you have your focused InputStream
and you want to score related data
import org.apache.flink.streaming.api.scala._
case class InputEvent(data: Array)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events: DataStream[InputEvent] = env.yourInputStream
So you can achieve it easily with the following
// This will be all that you need
import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector
import org.apache.flink.streaming.api.scala._
object FlinkJpmmlExample {
def main(args: Array[String]): Unit = {
// your model can reside in any Flink supported backend
val pathToPmml = "/even/to/distributed/systems"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events = env.yourInputStream
// a lazy reader implementation
val reader = ModelReader(pathToPmml)
// lets go with predictions
events.evaluate(reader) { (event, model) =>
// FlinkML Vector abstraction is used
val toBePredicted = vectorize(event)
// and here we are
val prediction: Prediction = model.predict(toBePredicted)
val result = prediction.value
// finally custom returns
(event, result)
}
env.execute("Awesome predictions with flink-jpmml")
}
}
Some useful insights from the code:
- in order to load the PMML model, you need to specify only the PMML source path
ModelReader
is a lazy reader and it provides the right reading abstraction to TaskManagers- The resulting
PMMLModel
will be loaded by once factory for each TaskManager running on your architecture at construction time - the
PmmlModel.predict
method expects Flink Vectors as input event and, if you want to manage NaNs, an optional replace value; Prediction
provides the result of the input event evaluation against the PMML model as aPrediction[Double]
ADT, so if the model can't manage a prediction it will return aEmptyPrediction
value.
flink-jpmml
provides also a quick prediction function if it can run against a Stream of Flink Vectors
...
val vectorStream: DataStream[Vector] = events.map(event => vectorize(event))
val predictions: DataStream[Prediction, Vector] = vectorStream.quickEvaluate(reader)
predictions.map(_.target).print()
env.execute("flink-jpmml quick predictions")
These really basic examples show you how to interact with the library. The following sections try to address some interesting details which worth a deeper analysis.
flink-jpmml
main effort is to retain all the streaming concepts:
- since Flink is able to run against several distributed backend, users need to specify only the PMML source path: the library will take care how to load the model in full compliance of the underlying distributed system (e.g. HDFS, Alluxio, S3, localFS)
ModelReader
is the object implementing the previous behavior; it will provide the loading methods but will read it lazily, i.e. only when the transformation will be applied- The
PMMLModel
will be loaded by a singleton model factory for each TaskManager running on your architecture; that means if you have an active TaskManager A made up of 4 TaskSlots, your TM will load the model from a single loader entity; this is crucial in order to let the system scale in thread-safety (still simple PMML models can grow to several hundreds of MegaBytes proportionally to the model size, meaning a big load in memory terms) - the
PmmlModel.predict
method expects Flink Vectors as input events; this choice let us to leverage the underlying Breeze implementation and no reflection will be applied at all; moreover, the user don't have to specify any key-value structure: you have data matching a feature vector, so the former will be used against the latter; (see input discussion section for further details) flink-jpmml
can also handle sparse data, thus you can just pass the desired replace value as argument to the discussed method (here you will need a SparseVector)val prediction = model.predict(sparseData, replaceValue)
Prediction
is the output case class: it returns the result of the input event evaluation against the PMML model as aScore[Double]
ADT, so if the model can't manage a prediction it will return aEmptyScore
.
The design worths bit more focus: the choice to have a UDF as input prediction method is justified by the need of handling a Machine Learning task (a prediction task) along with a pure Streaming application; in this way the user can manage predictions in the body of the function with the primitive event.
Assume this is the considered PMML feature vector
["sepal_width", "sepal_length", "petal_width", "petal_length"]
and you have values for all these fields; so, just create a DenseVector
val vector: DenseVector = DenseVector(value1, value2, value3, value4)
Suppose you missed value2, so you will need a SparseVector
val vector: SparseVector = SparseVector(4, Array(0, 2, 3), Array(value1, value2, value4))
flink-jpmml
will recognize missing values and it will replace them with replaceValue
if specified
(as second argument of the PmmlModel.predict
method), otherwise the NaNs handling is demanded to the PMML model.
Note also flink-jpmml
assumes that if you employ a DenseVector, it means that the dense instance size is your model
size and it will not predict anything (i.e. returns Score.Empty
); in case of sparse instances,
the library reads the sparse size value.
flink-jpmml
won't break your job if something goes wrong but the model loading step; it is such a crucial
action (it's mandatory for predictions), then in case of failure it raises a ModelLoadingException
.
Each other issue is detailed as log messages.
The handled failures are:
- Input Validation failure - The input is not corresponding to the feature vector in size terms (either too big or too short)
- Input Preparation failure - JPMML library fails to prepare internal data format
- Evaluation step failure - JPMML fails to evaluate the input against the PMML
- JPMML Result Extraction failure - The job fails to retrieve the result from the PMML model
If you want to contribute to the project send an email to [email protected] or just open an issue
here. flink-jpmml
community is looking for you!
- Andrea Spina - [email protected] @spi-x-i
- Francesco Frontera - [email protected] @francescofrontera
- Riccardo Diomedi - [email protected] @riccardo14
- Mauro Cortellazzi - [email protected] @maocorte
- Simone Robutti - Initial prototype [email protected] @chobeat
- Stefano Baghino - Initial prototype @stefanobaghino
- Apache®, Apache Flink™, Flink™, and the Apache feather logo are trademarks of The Apache Software Foundation.
- PMML standard is a trademark of The Data Mining Group. All rights reserved.
- JPMML-Evaluator is licensed under the GNU Affero General Public License (AGPL) version 3.0. Other licenses are available on request.
This library has been published under the GNU Affero General Public License (AGPL) version 3.0 following and respecting the official advices coming from the Apache Software Foundation about the compatibility between the Apache License, Version 2.0 and the GNU General Public License, Version 3.0
THIS STANDARD IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS RELEASE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.