Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: "Under Discussion"

Discussion thread

JIRA

Released: 

 

Motivation

More details can be found in the Flink ML Roadmap Document and in the Flink Model Serving effort specific document.

A related discussion on the list can be found here.

Model Serving Use Cases and solution Architecture

We describe here the requirements for the core part of a model serving system. Architecture should cover the use cases described below.

Model Deployment Cases

There are two basic approaches to machine learning - online and offline. This proposal refers only to offline model training case, although as a next step we might consider the on-line case as well. The served model is fully trained at the point of deployment.

Basic Use Case


The simplest use case is as follows: user deploys a single ML model (eg. regression model) in the model serving system and then it is accessible for scoring. For the general case the user runs N models. There might be more than one instance per model for performance reasons.

...

By introducing the idea of a ML pipeline as described below (Figure 2), the above statements apply to the pipeline concept as well. As a result, the ML pipeline can be seen as unit of deployment for model serving purposes.

Multiple Models Use Case

Now within the same pipeline it is also possible to run multiple models:

...

For the purpose of this implementation, we consider versions of a model as different models. If the versioning is important, we recommend an external system, managing model’s versions, but assigning them unique Ids so that we can treat them as different one for the purpose of serving.

Online Models

This proposal does not address online learning.

Other System Dimensions

Filtering and transformation

Filtering and transformation of both input and output data is included in the pipeline definition, as described below.

Operations

The model serving system is easy to operate so that the user with a specific request can remove or add any pipeline. It also allows to suspend any more pipeline deployments.

This requirement is supported through usage of model’s stream.

Logging - Monitoring

User is able to retrieve any related logs for a particular model or stage of the pipeline.

...

a model/pipeline. We propose usage of the quariable state, see below, for getting all relevant monitoring information

Prediction Results

The implementation provides model serving as a functional transformation on the input data stream. THe serving can be also subject for functional composition of models.

Non Functional

Every served pipeline creates certain demands for resources to the model serving system.

...

Last but not least, users should be able to serve the ML pipelines locally at the development phase, while transition to production should require either minimal or no changes.

Overall implementation architecture

Currently, there are multiple tools available to Data scientists, who tend to use different tools for solving different problems and as a result they are not very keen on tools standardization.

...

Although the overall architecture above is showing a single model, a single streaming engine could score multiple models simultaneously.

Machine Learning pipeline

For the longest period of time model building implementation was ad hoc - people would transform source data any way they see fit, do some feature extraction, and then train their models based on these features. The problem with this approach is that when someone wants to serve this model, it is necessary to discover all of those intermediate transformations and reimplement them in the serving application.

...

From now on, when we are talking about model serving, we will mean serving of the complete pipeline.


Public Interfaces

Model

In the heart of the Model Serving in Flink is an abstraction of model.The question here is whether it is necessary to introduce special abstractions to simplify usage of the model in Flink.

...

Although trait is called model, what we actually mean here is not just a model but a complete machine learning pipeline.

Implementation options

The model trait can be implemented using a wide range of approaches and technologies. In order to minimize implementation options we propose to start with standards introduced by

...

In addition to these two standards we also want to support Tensorflow due its popularity for model building and serving.

Pipeline Metadata Definition

Because we are planning to support multiple implementation options for model, it is necessary to define a format for models representation in the stream that allows to support different kind of model’s representation. We decided to use google protocol buffers for model metadata representation. We start with option as an efficient generic way to integrate with the system but we might need to support other methods for making user experience better:

...

We decided to use Scalapb for protobuf marshalling generation and processing. We might need to revise our choices as we want to make the system accessible to java users as well.

Model Factory

As defined above, a model in the stream is delivered in a form of Protobuf message, which can contain either a complete representation of the model or a reference to the model location. In order to generalise model creation from message, we are introducing additional trait - ModelFactory - supporting building models out of Model Descriptor. Additional use for this interface is support of serialization/deserialization in support of checkpointing.  Model factory can be described using the following trait

...

  • Create - method creating model based on the Model descriptor

  • Restore - method to restore a model from a byte array emitted by the Model’s toByte method. These two methods need to cooperate to ensure proper functionality of ModelSerializer/ModelDeserializer

Data stream

Similar to the model stream, protobufs are used for the data feed definition and encoding. Obviously a specific definition depends on the actual data stream that you are working with.

...

The proposed data type based linkage between data and model feeds works well when a given record is scored with a single model. If this relationship is one to many, where each record needs to be scored by multiple models, a composite key (data type with model id) can be generated for every received record.

Flink implementation Architecture

Flink provides low level stream processing operation - ProcessFunction which provides access to the basic building blocks of all (acyclic) streaming applications:

...

Flink provides 2 ways of implementing low-level joins - key based join based on CoProcessFunction and partitions-based join based on RichCoFlatMapFunction. Although both can be used for required implementation, they provide different SLAs and are applicable for slightly different use cases.

Key-based joins

Flink’s CoProcessFunction allows key-based merge of 2 streams. When using this API, data is key-partitioned across multiple Flink executors. Records from both streams are routed (based on key) to the appropriate executor that is responsible for the actual processing.

...

  • Individual model’s scoring is implemented by a separate executor (a single executor can score multiple models), which means that scaling of Flink leads to a better distribution of individual models and consequently better parallelization of scorings.

  • A given model is always scored by a given executor, which means that depending on input records types distribution this approach can lead to “hot” executors

Partition-based joins

Flink’s RichCoFlatMapFunction allows merging of 2 streams in parallel (based on parallelization parameter). When using this API, on the partitioned stream, data from different partitions is processed by dedicated Flink executor. Records from model stream are broadcasted to all executors. As it can be seen from the figure below, each partition of the input stream, in this case is routed to its instance of model server. If the amount of partitions of the input stream is less than Flink parallelization factor, then only some of the model server instances will be utilized, if it is more, than some of the model server instances will serve more than one partition.

...

  • The same model can be scored in one of several executors based on the partitioning of the data streams, which means that scaling of Flink (and input data partitioning) leads to a better scoring throughput.

  • Because model stream is broadcasted to all executors, which operate independently, some racing conditions in the model update can exist, meaning that at the point of the model switch, some model jitter can occur.

  • Additionally, because the same model in this case is deployed to all executors, the amount of memory required to model deployment will grow proportionally as the amount of executors grow.

Selecting appropriate join type

When deciding on the appropriate join type consider the following:

  • Based on its implementation, key-based joins is an appropriate approach for the situations when we need to score multiple data types with relatively even distribution.  

  • Based on its implementation partition-based joins is an appropriate approach for the situations when we need to score one (or small amount) model under heavy data load assuming that the data source is evenly partitioned. Also keep in mind that this approach requires more memory for models deployment.

Monitoring

Any streaming application, and model serving is no exception, requires well defined monitoring solution. An example of information that we might want to see for model serving application includes:

...

 This seems like an ideal solution for monitoring, but unfortunately works only for key-based joins.

Optimizations

There is space for optimizing the key-based solution to avoid the hot executor issue where all load of scoring is processed by one executor. One solution would be to serve more instances of a model. This can be done by sending down the model stream keys of the form data_type_model_A_key_1, data_type_model_A_key_2, etc...(we serve the same model here on different instances). We assume the first instance has key data_type_model_A_key_0.So when we scale we send new keys. Each executor which gets the full key for the first time will fetch/create the model as usual and store it locally.

...

Note: For this to work we need to be careful with the partitioning done by Flink so that the composite key actually distributes the records to the appropriate models when hashing takes place.


Proposed Changes

Target is to add a new library over flink for model serving. This should be a useful tool that adds value to the flink ecosystem.  Implementation for queryable operator state (for partition-based joins), is required for monitoring implementation (currently covered by https://issues.apache.org/jira/browse/FLINK-7771).

Compatibility, Deprecation, and Migration Plan

There are not backwards compatibility/migration/deprecation concerns since this only adds new API.

Rejected Alternatives

We discussed usage of Side Inputs for stream merging. It has 2 potential benefits:

...