You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Copycat needs a runtime data API to represent the data it is importing or exporting from Kafka. Connectors use this API to either convert data from a source system into a format for Copycat to store in Kafka or to take data loaded from Kafka by Copycat and convert it for storage in a sink system. The serialization of the data is handled by the framework using this API, so it abstracts away the details of serialization from connector developers and allows a single connector implementation to work with multiple serialization formats.

Here is how data is currently processed on the import path:

The export path is equivalent but in reverse. Note that Converter and Serializer are separated to enable reuse of existing Serializer implementations and because converting directly from the Copycat data API to serialized form, although possibly faster and more efficient, requires significantly more implementation effort than reusing existing serialization libraries.

This document explores what features are needed in this API and what will be needed to be compatible with a variety of serialization libraries and source/sink systems. JSON, Avro, Thrift, and Protocol Buffers are used as examples that should be representative enough to ensure the API can work with a variety of data.

Data Types

There is a tension in selecting the set of types to include in Copycat's API:

  • Try to be a superset of all types available in all serialization libraries and source/sink. This ensures that converting from any serialization format to Copycat will not lose any information. However, conversions from Copycat to a serialized form may lose information as types need to be cast and this results in a large, complex API. Additionally, differences between how different systems represent complex types (e.g. datetime) may be difficult or impossible to unify into a standard representation in Copycat.
  • Use only the common subset of types. This ensures converting from Copycat to the serialization format never loses information. However, converting from any serialization format to Copycat can lose data do to type casts.

Here's an overview of what's supported in a few different serialization formats. Note that JSON is a bit different because it doesn't have a schema, so some field types can be represented but the type is not necessarily known (e.g., the number type covers floats and ints).

 JSONAvroThriftProtocol Buffers
booleanYYYY
byte  Y 
unsigned int16    

signed int16

  Y 
unsigned int32   Y (with encoding variants)
signed int32 YYY (with encoding variants)
unsigned int64   Y (with encoding variants)
signed int64 YYY (with encoding variants)
float Y Y
double YYY
number typeY   
stringYYYY
bytes YYY
fixed-size bytes Y  
struct/record/objectYYYY
list/arrayYYYY (via repeated modifier)

set (unordered)

  Y 
map w/ string keys YYY (optional, based on structs)
map w/ primitive keys  YY (optional, based on structs)
enum YYY
union Y Y (oneof)
null type Y  
     
optional modifier Y (via union with null type) Y
repeated modifier   Y
default values YYY
aliases Y  
schema names YYY
namespaces/packages YYY
bare primitive typesYY  

 

Schemas

Copycat will need some representation of data's schema to be provided with the data in order to reason about it. This is a strict requirement for a lot of connectors that write data to structured data store, where schema changes are potentially expensive (e.g. databases, HDFS for Hive). Additionally, this is a requirement for any serialization format that needs a schema/IDL spec description of the message format to work properly. Unlike the schema formats in many of the serialization libraries, Copycat schemas have a different set of requirements:

  • It must be easy to construct them dynamically. The main way we expect to construct them is dynamically: source connectors generate a schema by reading from the source system (e.g. get table schema for a database), and sink connectors will load them before reading the data from input topics.
  • Copycat schemas do not need to be saved/parsed directly (they are always converted to/from a schema for the serializer being used). There is no need to support loading them from a file since it generally isn't possible to write down the schema for a connector ahead of time.
  • In order to support interaction with schemaless systems, Copycat needs a way to express this format (e.g. lack of schema, or a catch-all schema)
    • Using this schema will hurt compatibility/usefulness of some connectors (e.g. there is very little a JDBC sink can do with unstructured data aside from store it as a blob in a single column). However, it is better to support some workflows that can still handle this data (e.g. something like MongoDB to Kafka to Elasticsearch) than to limit to systems that provide good schemas.

Schema Projection

When two schemas are compatible, it is possible to convert a message with one schema to a message using the other schema. It is important for some connectors to be able to project between different schemas that may appear in the same set of data. For example, if a topic contains data generated by an application and the application added a field to the message type and ran a rolling upgrade, then there will be a period where there are mixed messaged formats. Adding a field is backwards compatible. When a connector consumes this data, if it is to deliver it to structured data storage (e.g. a database table), it might make the table schema change in the database the first time it encounters the new format. However, it may still encounter messages with the old format. It needs to be able to project these to the new format in order to add them to the database table since it is now only compatible with the new schema.

We could handle this in 3 places:

  1. Deserializer: Make this part of the serializer interface and require each serializer to perform projection. We would need serializers to implement the following methods:

    ValueType deserialize(byte[]);
    ValueType deserialize(byte[], Schema targetSchema);

    This adds to the complexity of deserializers and requires them all to implement similar code. On the other hand, this allows reuse of existing code if the deserializer already supports this operation (e.g. Avro, Protobufs), allows for format-specific compatibility rules (see, e.g., Protobufs rules for compatible type changes)

  2. Copycat Framework: We could implement projection ourselves within Copycat and just provide it as a method used by connectors. This adds to Copycat's complexity and requires clear explanation of compatibility rules in the data API spec.
  3. Connector: If connectors need this functionality, they could implement it themselves. Further, they may do this without using two Copycat schemas, instead just coercing the data to the format of the output system. This is similar to (1) in that many connectors duplicate effort.

Proposal

 

Serializer Implementation Sketches

All implementations of serializer share a common concern: tracking schemas across multiple hops. For example, if we use Copycat to import data from a SQL database into Kafka, then a downstream consumer (whether Copycat or some other system) needs the schema in order to be able to read and understand the structure of this data. This can be accomplished by:

  • Transmitting the schema with every message (high overhead)
  • Centrally register schemas and tag each message with an ID. Readers can look up schemas on demand.

This isn't an issue for many applications because they don't work with dynamic schemas. Normally they compile their applications using a specific schema (or auto-generated code for that schema) because they are only handling that one type of data. They can avoid any central registration service because it is assumed both the reader and writer have a copy of the schema (or at least compatible schemas).

Copycat is different because connectors need to be able to handle data with a variety of schemas. For example, an HDFS sink connector that puts data into Hive tables may require input data to be a flat record (or it may flatten the data into table form), but it should be able to handle any schema with the appropriate structure. This means the schema cannot be available at compile time and must be determined dynamically.

JSON

  • No built-in schema system. Use a simple serialization of Copycat's schema class or the appropriate subset of JSON Schema.
  • Use envelope format to include schemas with each message:

    { 
      "schema": { "type": ...},
      "payload": { ... }
    }
  • Provide a mode to toggle the envelope vs. using only the payload.

    • This a) allows compatibility for loading existing JSON data (which will not have an attached schema) and writing the data to topics when that topic will only be consumed by applications that do not need the schema, and b) avoids the overhead of including the schema with every message.

    • Reading this data in a sink connector requires attaching a schema, but there isn't one. We'll need to be able to define a catch-all schema (or imply that by omitting a schema)

  • Converter does straightforward translation to the JSON library format. For example, a Jackson-based implementation would generate a JsonNode representation.
  • Serializer does trivial conversion to byte[] with JSON library.
  • Note that we could, of course, use a schema registry instead of using this custom format. One reason for this design is to provide a built-in implementation that doesn't require any additional services that aren't included with Kafka.

Avro

  • Use one of the existing Avro (or generic) schema registries. This is already incorporated into some existing serializers (e.g. Confluent's)
  • Converter translates between Copycat types and Avro types (primitive types, GenericRecord, Map, Collection). This implementation is straightforward because Avro has good built-in support for handling schemas dynamically (GenericRecord).

Thrift

  • Requires schema registry.
  • Converter can be a nop. Thrift doesn't have an intermediate format that supports dynamic schemas. (Alternatively, one could build a TBase implementation similar to Avro's GenericRecord)
  • Serializer implementation will need to be custom, but can reuse TProtocol implementations. Thrift doesn't have built-in support for parsing schemas, but third-party libraries have implemented this (e.g., this one from Facebook). By combining these, the serializer should be a straightforward implementation.

Protocol Buffers

  • Requires schema registry.
  • Converter translates between copycat types and Descriptor/DynamicMessage.

 

  • No labels