The idea here is for topics to have associated a plug-able set of schema's that the Kafka broker will validate against when produced. The broker will also execute the plugged in logic based on that topic's associated field for plug-ins. This should be >= 1 so we can implement a pass through of iterations on the data prior to save (e.g. Security Authorizations).

To best facilitate this we could store <TBD> a list of schemaIdHash + schema.  Every topic would have a class file associated with it to run the management of the schemes for the topic. The plugin could hold the schemas compiled or in another repository.  The storage of the schemas should be key/value based [schemaIdHash] = schema. We may want to order and priortize these so that certain plug-in can iterator on the message before others (e.g. you should do authorizations first).

1) We are going to need a cli tool for the crud https://issues.apache.org/jira/browse/KAFKA-1694 and other things that exist today.

2) Besides validation of the schema on the producer side and keeping client compatibility with that we also need a way for consumers once subscribed to a topic (from a group perspective) to read in the key/value schema information.  This could just be part of the OffsetResponse A Guide To The Kafka Protocol#OffsetResponse

3) We should lump the client compatibility kit (some thoughts on that https://github.com/stealthly/kafka-clients/wiki/Compatibility ) work into this effort too.

5) This design also can work to implement Authorizations for the data in regards ACL (at least the security bits on the data to validate). 

6) I think built in initial support for Avro would be awesome and probably account for the largest percentage of existing Kafka installations.  We could use Camus encoders/decoders for avro https://github.com/linkedin/camus/blob/master/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/KafkaAvroMessageEncoder.java / https://github.com/linkedin/camus/blob/master/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/KafkaAvroMessageDecoder.java and json https://github.com/linkedin/camus/blob/master/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/JsonStringMessageDecoder.java but with a layer of faster xml databind over it e.g. 

 

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
object JsonUtil {
  val mapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
  def toJson(value: Map[Symbol, Any]): String = {
    toJson(value map { case (k,v) => k.name -> v})
  }
  def toJson(value: Any): String = {
    mapper.writeValueAsString(value)
  }
  def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json)
  def toObj[V](json:String)(implicit m: Manifest[V]) = fromJson[V](json)
  def toSeq[V](json:String)(implicit m: Manifest[V]) = fromJson[Seq[V]](json)
  def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
    mapper.readValue[T](json)
  }
}
object MarshallableImplicits {
  implicit class Unmarshallable(unMarshallMe: String) {
    def toMapFrom: Map[String,Any] = JsonUtil.toMap(unMarshallMe)
    def toMapOf[V]()(implicit m: Manifest[V]): Map[String,V] = JsonUtil.toMap[V](unMarshallMe)
    def toObj[V]()(implicit m: Manifest[V]): V = JsonUtil.toObj[V](unMarshallMe)
    def toSeq[V]()(implicit m: Manifest[V]): Seq[V] = JsonUtil.toSeq[V](unMarshallMe)
    def fromJson[T]()(implicit m: Manifest[T]): T =  JsonUtil.fromJson[T](unMarshallMe)
  }
  implicit class Marshallable[T](marshallMe: T) {
    def toJson: String = JsonUtil.toJson(marshallMe)
  }
}

Then we can make our objects from json easily.  Starting with this first is another approach it would definitly get things work better faster and we can refactor in avro to make sure it does support plugins.

  • No labels

5 Comments

  1. Jay Kreps I keep going back and forth between which to start with json or avro.  I think we should support both so that we know plug-in development is going to be clean and usable.  I think this is also the first step to the ACL and as we discussed it knocks out cleanly and succinctly the encryption and non repudiation requirements. Right now I am leaning towards starting with JSON especially if we use the fasterxml approach.  We also need to re-work in a new JSON parser to the broker this may fix that KAFKA JIRA too https://issues.apache.org/jira/browse/KAFKA-1595 .

  2. Joe,

    Thanks for creating the wiki. Yes, schema-ed topics can be useful. Some questions for you.

    1. Regarding your comments on ACL and security. If the broker understands the schema, it can potentially support fine-grained ACL (e.g., at field level). Is that your point? Could you comment why schema also helps encryption and non-repudiation?
    2. JSON is probably a bit easier to understand than Avro. How standard is JSON schema? Do you know any good libraries that support JSON schemas?
    1. 1) Fine grained at the field level attribute based access control, yes. It helps with encryption and non-repudiation because to-do those things you just need some more elements added to the schema (e.g. encrypted key[list]: type, name, value and signature[list]: type, name, value).  Then you have the encrypted key you can use your private key to decrypted the encrypted key value. That is the symmetric key to then use to unlock the message. So you can do digital envelope pattern.  For the non-repudiation you sign over it with a list of public keys and their signed value. You need to sign and encrypt for multiple parties at time of entry since the encryption is at rest multiple parties need to read it and have different credentials to see and verify it's integrity at the message level.

      2) I have been using https://github.com/FasterXML/jackson-databind for a while now and like how it builds up case classes.  The sample code above uses it.  You create a class Sample(a: String, b:List[String]) and then on a string you just .toObj[Sample] and it does the deserialization. If you have a Sample object just toJson on it. They are implicit in the class so we can use the functions anywhere (as long as we import the implicit). The library also is native Java to use too for same type uses.

  3. Joe,

    Thanks for the reply.

    Since the schema has to be language independent, we probably want to store each schema as a string, instead of a java class. Avro makes it easy to get a schema string from an Avro object. It doesn't seem that jackson provides that. Also, Avro schema is very standard. I am not sure if there is a standard JSON schema. There is http://json-schema.org/. However, I am not sure if it's widely adopted or has good implementation in different languages. Do you know what most people use for managing the schema for JSON?

    1. I would very much prefer to-do the initial implementation in Avro. After Avro I think Protocol Buffers are the next most widely used serde (at least that I have seen personally) with Kafka. I only brought up JSON to try to make a complete unbiased thought for the feature in case folks felt strongly against Avro for some reason.