Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

JIRA

Released: 

...


Motivation

More details can be found in the Flink ML Roadmap Document and in the Flink Model Serving effort specific document.A related discussion on the list can be found here.

Model Serving Use Cases and solution Architecture

...

This pattern fits well into the overall architecture that we want to implement. 


Flink provides 2 ways of implementing low-level joins - key based join based on CoProcessFunction and partitions-based join based on RichCoFlatMapFunction. Although both can be used for required implementation, they provide different SLAs and are applicable for slightly different use cases.

...

Unfortunately, at the time of implementation, side inputs are not available, so implementation is leveraging low level joins, which do the job.

Speculative Model Serving

The main limitation of the solution presented above is a single model per data type, which is rarely the case in the real life deployments. As described in detail by Ted Dunning, in his Machine Learning Logistics book, in real life deployments, there is typically an ensemble of models scoring the same data item in parallel and then a decision block decides which result to use. Here we will describe an extension of proposed solution supporting speculative model serving.

Why speculative model serving?

According to Wikipedia, speculative execution is:

“an optimization technique where a computer system performs some task that may not be needed. Work is done before it is known whether it is actually needed, so as to prevent a delay that would have to be incurred by doing the work after it is known that it is needed. If it turns out the work was not needed after all, most changes made by the work are reverted and the results are ignored.

The objective is to provide more concurrency if extra resources are available. This approach is employed in a variety of areas, including branch prediction in pipelined processors, value prediction for exploiting value locality, prefetching memory and files” etc.

In the case of Model Serving, speculative execution means scoring data in parallel leveraging a set of models, then selecting the best score based on some metric.

Speculative model serving implementation

The generic implementation of speculative model serving can be presented as follows:

Image Added

Here, instead of sending requests directly to the executor (model server) we send it to a special component - Starter. Starter's responsibility is to distribute these requests to known Executors, that process (implement model serving, in our case) these requests in parallel. Execution results are then send to Collector, responsible for determine final result based on the results calculated by individual Executors

Such simple architecture allows to implement a lot of important model serving patterns including the following:

  • Guaranteed execution time. Assuming that we have several models with the fastest providing fixed execution time, it is possible to provide a model serving implementation with a fixed upper-limit on execution time, as long as that time is larger than the execution time of the simplest model
  • Consensus based model serving. Assuming that we have several models, we can implement model serving where prediction is the one returned by the majority of the models.
  • Quality based model serving. Assuming that we have an metric allowing us to evaluate the quality of model serving results, this approach allows us to pick the result with the best quality. It is, of course, possible to combine multiple feature, for example, consensus based model serving with the guaranteed execution time, where the consensus result is used when it completes within a given time interval.
  • "Canary" deployment, where some of requests are routed to the "new" executors.

Combining Speculative execution with "real time updatable" model serving, described earlier, leads to the following overall architecture:

Image Added

It turns out that such architecture can be easily implemented on Flink (leveraging key-based joins), as presented below:

Image Added

The implementation leverages three CoProcessFunction classes:

  • Router processor implements starter. It receives both models and data streams (see above) and routes them to appropriate Model processor (leveraging side outputs), based on appropriate key. It also notifies Speculative processor about starting new data request processing. As an optimization, if there are no models available for a given data type, Pouter processor directly sends result ("no models available") to Speculative processor. Current implementation forwards request to all available Model processor, but can be overridden to implement required request distribution.
  • Model processor directly follows implementation for key-based joins, described above.
  • Speculative processor implements collector, presented above. Its implementation relies on "decider", which represents an interface (see below), that have to be implemented based on specific set of requirements.

In addition to the above the following extensions to the model are introduced for this implementation: 

  • Additional case classes describing additional messages:

    case class ServingRequest(GUID : String, data : AnyVal)
    case class ServingQualifier(key : String, value : String)
    case class ServingResponse(GUID : String, result : ServingResult, confidence : Option[Double] = None, qualifiers : List[ServingQualifier] = List.empty)
    case class SpeculativeRequest(dataModel : String, dataType : String, GUID: String, data: AnyVal)
    case class SpeculativeModel(dataModel : String, model : ModelToServe)
    case class SpeculativeServiceRequest(dataType : String, dataModel : String, GUID: String, result : Option[ServingResult], models: Int = 0)

  • Decider interface used for selecting results:
    trait Decider {
          def decideResult(results: CurrentProcessing): ServingResult
    }
    case class CurrentProcessing(dataType : String, models : Int, start : Long, results : ListBuffer[ServingResponse])

Monitoring

This implementation leverages monitoring approach based on a queryable state (same as above). In addition to ModelServiceStats (per model) this implementation also exposes two additional pieces of information:

  • List of available models (per data type) - List[String]
  • SpeculativeExecutionStats describing statistics on the speculative execution:

    final case class SpeculativeExecutionStats(
          var name: String,
          decider : String,
          tmout: Long,
          since: Long = System.currentTimeMillis(),
          usage: Long = 0,
          duration: Double = 0.0,
          min: Long = Long.MaxValue,
          max: Long = Long.MinValue

Implementation

An initial implementation and examples for this Flip, they include just basic model serving implementation without speculative piece. Implementation is provided in both Scala and Java. It Implements both key-base and partition-base joins and Tensorflow support for both "optimized" and saved formats. Implementation is split into 2 part:

Library implementation


This is implementation of the base library independent from the type of the messages that particular solution is using. It is strongly typed, implemented using generics. Library implementation includes 3 modules:

Here Flink Model Serving shared contains protobuf definition (see Pipeline Metadata Definition above) And Flink model serving Java and Flink model serving Scala provides the same implementation in both Java and Scala. Theoretically it is possible to combine the two, but Java and Scala syntax is sufficiently different, so the 2 parallel implementations are provided.

In addition to this both Java and Scala implementation contain a set of unit tests for validating implementation

Flink Model Serving Java

The implementation is contained in the namespace *org.apache.flink.modelserving.java*, which contains 3 namespaces:

  • model - code containing definition of model and its transformation implementation
  • query - code containing base implementation for the Flink queryable state query
  • server - code containing basic implementation of the Flink specific components of the overall implementation 

Model implementation is split into generic and tensorflow based implementation, such implementation allows to add other model types support in the future without disrupting the base code. Generic model implementation includes the following classes:

  • DataConverter - a set of model transformation methods
  • DataToServe - a trait defining generic container for data used for model serving and its behavior
  • Model - a trait defining generic model and its behavior (see above)
  • ModelFactory - a trait defining generic model factory and its behavior (see above)
  • ModelFactoriesResolver - a trait defining generic model factory resolver and its behavior. The purpose of this trait is to get the model factory based on the model type. 
  • ModelToServe - an intermediate representation of the model
  • ModelToServeStats - model usage statistics
  • ModelWithType - a combined container for model and its type used by Flink implementation
  • ServingResult - generic representation of model serving result

A tensorflow namespace inside model namespace contains 4 classes:

  • TensorFlowModel extends Model by adding Tensorflow specific functionality for the case of optimized Tensorflow model
  • TensorBundelFlowModel extends Model by adding Tensorflow specific functionality for the case of bundled Tensorflow model
  • TField a definition of the field in the tensorfow saved model
  • TSignature a definition of the signature in the tensorfow saved model


Query namespace contains a single class - ModelStateQuery - implementing Flink Queryable state client for the model state

Server namespace contains 3 namespaces:

  • Keyed - contains DataProcessorKeyed - implementation of the model serving using key based join (see above) and based on Flink's CoProcessFunction
  • Partitioned - contains DataProcessorMap - implementation of the model serving using partion based join (see above) and based on Flink's RichCoFlatMapFunction
  • Typeshema contains support classes used for type manipulation and includes the following:
  • ByteArraySchema - deserialization schema for byte arrays used for reading protobuf based data from Kafka
  • ModelTypeSerializer - type serializer used for checkpointing
  • ModelSerializerConfigSnapshot - type serializer snapshot configuration for ModelTypeSerializer
  • ModelWithTypeSerializer - type serializer used for checkpointing 
  • ModelWithTypeSerializerConfigSnapshot - type serializer snapshot configuration for ModelWithTypeSerializer

Flink Model Serving Scala

The implementation provides identical functionality to the Java one and is contained in the namespace org.apache.flink.modelserving.scala, which contains 3 namespaces:

  • model - code containing definition of model and its transformation implementation
  • query - code containing base implementation for the Flink queryable state query
  • server - code containing basic implementation of the Flink specific components of the overall implementation 

Model implementation is split into generic and tensorflow based implementation, such implementation allows to add other model types support in the future without disrupting the base code. Generic model implementation includes the following classes:

  • DataToServe - a trait defining generic container for data used for model serving and its behavior
  • Model - a trait defining generic model and its behavior (see above)
  • ModelFactory - a trait defining generic model factory and its behavior (see above)
  • ModelFactoryResolver - a trait defining generic model factory resolver and its behavior. The purpose of this trait is to get the model factory based on the model type. 
  • ModelToServe defines additional case classes used for the overall implementation and a set of data transformations used for model manipulation and transforming it between different implementations. Additional classes included here include ServingResult - a container for model serving result; ModelToServeStats - model to serve statistics (see above) and ModelToServe - internal generic representation of the model

  • ModelWithType - a combined container for model and its type used by Flink implementation

A tensorflow namespace inside model namespace contains 2 abstract classes:

  • TensorFlowModel extends Model by adding Tensorflow specific functionality for the case of optimized Tensorflow model

  • TensorBundelFlowModel extends Model by adding Tensorflow specific functionality for the case of bundled Tensorflow model

Query namespace contains a single class - ModelStateQuery - implementing Flink Queryable state client for the model state

Server namespace contains 3 namespaces:

  • Keyed - contains DataProcessorKeyed - implementation of the model serving using key based join (see above) and based on Flink's CoProcessFunction
  • Partitioned - contains DataProcessorMap - implementation of the model serving using partion based join (see above) and based on Flink's RichCoFlatMapFunction
  • Typeshema contains support classes used for type manipulation and includes the following:
    • ByteArraySchema - deserialization schema for byte arrays used for reading protobuf based data from Kafka
    • ModelTypeSerializer - type serializer used for checkpointing
    • ModelWithTypeSerializer - type serializer used for checkpointing 

Example implementation

This is implementation of the a wine quality example based on the above library. Implementation includes 3 modules:

Implementation demonstrates how to use library to build Flink Model serving implementation.

When building a new implementation you first need to define data that is used for model serving. An example is using wine quality example. Data definition is for wine is provided in model serving example shared. We are using protobuf encoding for data here, but other encoding can be used as well. Additionally Shared namespace contains implementation of embedded Kafka server (for local testing) and a Kafka provider periodically publishing both data and model, that can be used for testing the example.

There are two implementations of example - Java and Scala, that works exactly the same but are using corresponding version of Library.

Lets walk through the Scala implementation. It is located in the namespace org.apache.flink.examples.modelserving.scala and is comprised of three namespaces:

  • Model
  • Query
  • Server

Model namespace contains three classes, extending library and implementing specific operations for a given data type.

  • WineTensorFlowModel extends TensorFlowModel class by implementing processing specific to Wine quality data
  • WineTensorFlowBundeledModel extends TensorFlowBundelModel class by implementing processing specific to Wine quality data
  • WineFactoryResolver extends ModelFactoryResolver class by specifying above two classes as available factories

Server namespace implements 2 supporting classes : DataRecord implementing DataToServe trait for Wine type and BadDataHandler - simple data error handler implementation

It also provides complete Flink implementation for both key based join (ModelServingKeyedJob) and partition base join (ModelServingFlatJob).

To run the example first start org.apache.flink.examples.modelserving.client.client.DataProvider class, that will publish both data and model messages to Kafka (to test that publication works correctly you can use org.apache.flink.examples.modelserving.client.client.DataReader; do not forget to stop it before you proceed). Once data provider is running you can start actual model serving (ModelServingKeyedJob or ModelServingFlatJob). If you are using keyed version, you can also run ModelStateQueryJob from the query namespace to see execution statistics (Flink only supports query for keyed join). When running query, do not forget to update jobID with the actual ID of your run