You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

The idea here is for topics to have associated a plug-able set of schema's it will validate against.

To best facilitate this we could store <TBD> a list of schemaIdHash + schema.  Every topic would have a class file associated with it to run the management of the schemes for the topic. The plugin could hold the schemas compiled or in another repository.  The storage of the schemas should be key/value based [schemaIdHash] = schema.

1) We are going to need a some cli tool for the crud.

2) Besides validation of the schema on the producer side and keeping client compatibility with that we also need a way for consumers once subscribed to a topic (from a group perspective) to read in the key/value schema information.  This could just be part of the OffsetResponse A Guide To The Kafka Protocol#OffsetResponse

3) We need to get the client compatibility kit (some thoughts on that https://github.com/stealthly/kafka-clients/wiki/Compatibility-Test-Kit ) working too.  

4) The topic needs to also contain encoder/decoder being used and NOT let the API override that, ever.

5) This structure for the topic schema (encoder, decoder, schemas[id]=schema) could just be a new ByteBuffer structure which we can store and transport the same. the schema list could get large and dynamic so this should be plug able too. Out of the box we need to support something functional for development and testing.

6) This design also can work into an implement (attributed set of schemas for security) in regards ACL.  The Kafka server can do whatever it is trying to-do with the data.  The expectation here is that it can reach into a message and grab out the fields it knows about without having to know anything about the message.  The plugable schema storage layer should handle this nuance of mapping the calling applications understanding of the map and what the schema has stored.  The relationship between the callers mapped name == the schema mapped name is 1:many. e.g. the server wants to .get("identity") which has a structure it is expecting.  The kafka server should be able to get this ByteBuffer so it can read it regardless of the implementation.  Having this feature allow that to be more seamless.

7) I think built in initial support for Avro would be awesome and probably account for the largest percentage of existing Kafka installations.  We could use Camus encoders/decoders for avro https://github.com/linkedin/camus/blob/master/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/KafkaAvroMessageEncoder.java / https://github.com/linkedin/camus/blob/master/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/KafkaAvroMessageDecoder.java and json https://github.com/linkedin/camus/blob/master/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java but with a layer of faster xml databind over it e.g. 

 

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
object JsonUtil {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
  def toJson(value: Map[Symbol, Any]): String = {
    toJson(value map { case (k,v) => k.name -> v})
  }
  def toJson(value: Any): String = {
    mapper.writeValueAsString(value)
  }
  def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json)
  def toObj[V](json:String)(implicit m: Manifest[V]) = fromJson[V](json)
  def toSeq[V](json:String)(implicit m: Manifest[V]) = fromJson[Seq[V]](json)
  def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
    mapper.readValue[T](json)
  }
}
object MarshallableImplicits {
  implicit class Unmarshallable(unMarshallMe: String) {
    def toMapFrom: Map[String,Any] = JsonUtil.toMap(unMarshallMe)
    def toMapOf[V]()(implicit m: Manifest[V]): Map[String,V] = JsonUtil.toMap[V](unMarshallMe)
    def toObj[V]()(implicit m: Manifest[V]): V = JsonUtil.toObj[V](unMarshallMe)
    def toSeq[V]()(implicit m: Manifest[V]): Seq[V] = JsonUtil.toSeq[V](unMarshallMe)
    def fromJson[T]()(implicit m: Manifest[T]): T =  JsonUtil.fromJson[T](unMarshallMe)
  }
  implicit class Marshallable[T](marshallMe: T) {
    def toJson: String = JsonUtil.toJson(marshallMe)
  }
}

Then we can make our objects from json easily.  Starting with this first is another approach it would definitly get things work better faster and we can refactor in avro to make sure it does support plugins.

  • No labels