Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Adjust language to be stream processing friendly, add a bunch of images, and clarify the different models

...

First, Kafka builds parallelism into its core abstraction: a partitioned topic. Fixing Kafka as one half of each Copycat connector leverages and builds upon this parallelism: sources are expected to handle many parallel input streams sequences of data that produce data to many partitions, and sinks are expected to consume from many Kafka partitions, or even many topics, and generate many output streams sequences that are sent to or stored in the destination system. In contrast, most frameworks operate at the level of individual streamssequences of records (equivalent to a Kafka partition), both for input and output (examples: fluentd, Flume, Logstash, Morphlines, Heka). While you can achieve parallelism in these systems, it requires defining many tasks to cover all of the for each individual input and output streamspartition. This can become especially problematic when the number of streams partitions is very large; Copycat expects this use case and allows connectors to efficiently partition group a large number of input streamspartitions, mapping them to a smaller set of worker tasks. Some specialized cases may not use this parallelism, e.g. importing a database changelog, but it is critical for the most common use cases.

Second, Copycat can take advantage of Kafka’s built-in fault tolerance and scalability to simplify Copycat, both operationally and it’s implementation. More general frameworks usually standardize on the lowest common denominator abstraction -- a single stream partition that may be persisted to disk but is not fault tolerant in a distributed sense -- because the burden of implementing connectors for many systems would be too high if they did not (examples: Logstash, Heka, Fluentd; Flume's storage is configurable and can be backed by Kafka). By requiring Kafka as one endpoint, Copycat can leverage Kafka features such as consumer groups, which provide automatic partition balancing and fault tolerance, without any additional code. Connectors must be aware of the semantics provided by Kafka, but their code can remain very simple.

...

Copycat’s data model only assumes an in-memory runtime format, leaving the details of (de)serialization to a pluggable component, much like the existing serializer interface. The data model must be able to represent complex data (object/record/dictionary types), but must also allow for standalone primitive types that will commonly be used as keys for events. Except for primitive types, all records include schemas which describe the format of their data. Including this schema facilitates the translation of the runtime data format to serialization formats that require schemas.

 

Records

...

Records are the core data type that Copycat handles. All records, whether for source or sink connectors, include the fields relevant to Kafka: topic, partition, key, and value.

Records generated by source connectors for delivery into Kafka may use null key and partition fields and may include two additional fields to indicate their location in the source system: stream and offset. The stream and offset can both have complex types; they are not tied at all to the topic-partition or offset formats used in Kafka. For example, the stream identifier might be an object with fields for the database name and table name; an offset might be a logical commit log offset in a database change log which might be represented as a UUID. This representation assumes that the input data can be thought of as a set of input streams with unique identifiers for each record, but assumes very little beyond that.

Records that are loaded from Kafka and passed to sink connectors for export to another system are guaranteed to have a non-null partition and have one additional field, the offset (type long) of the message in the Kafka topic partition.

Connector Model

The connector model defines how third-party developers create connector plugins which import or export data from another system. The model has two key concepts: a Connector, which divides up the work, and Tasks, which are responsible for producing or consuming records.

Connectors

Connectors are the largest logical unit of work in Copycat and define where data should be copied to and from. This might cover copying a whole database into Kafka, a set of topics matching a regex into HDFS. However, the connector does not perform any copying itself. Instead, it is responsible for accepting the configuration and figuring out how to partition work across multiple copy tasks (e.g. splitting up the database by table across the target number of workers). It also monitors the input or output system for changes that require updating that distribution of work (e.g., a new table is created and needs to be assigned to a worker).

Tasks

Tasks are responsible for producing or consuming streams of CopycatRecords in order to copy data. They are responsible for accepting a configuration generated by their parent connector and copying the data assigned to them (a set of input streams for sources or a set of input topic-partitions associated with their consumer for sinks). When inputs and outputs may be dynamic, tasks should not need to monitor the external system for changes. This process is handled by the Connector. Tasks should be “dumb” in that their only job is to copy records.

Aside from lifecycle events, source and sink tasks have different APIs. Source tasks are pull-based and provide an API for polling for new input records from the source system. The framework will call this method continuously, then handle serializing the records and producing them to Kafka. The Copycat framework will also track the input stream offsets included with the records and manage the offset commit process, ensuring data has been fully flushed to Kafka.

Sink tasks are push based and provide an API for accepting input records that have been read and parsed from a Kafka consumer. They may either use Kafka’s built-in offset commit combined with a flush method they provide to have the framework manage offset commits, or they can manage offsets entirely in the output data store.

 

Streams

Streams are the finest granularity of partitioning that Copycat works with. They represent a single, possibly infinite, sequence of records with associated offset. Each stream must be uniquely identified (within the scope of a connector) by an ID, but that ID may be arbitrarily structured. Offsets can also have arbitrary structure and their only requirement is that they can be used to rewind a stream to a given position; they do not need to be integer values nor even be comparable with < or >. Streams are not explicitly represented in code in the Copycat API. Rather, records include a stream ID and offset which the framework can use to track the current position in the stream and, upon graceful restarts or failures, reset the stream to the last known-good position.

While connectors are responsible for dividing streams across a number of tasks, the framework never needs to know about the complete set of streams. In fact, the full set of stream IDs may never be generated; tasks only need to know which set of streams to manage (e.g. a range of stream IDs could be specified).

Worker Model

The worker model represents the runtime in which connectors and tasks execute. This layer decouples the logical work (connectors) from the physical execution (workers executing tasks) and abstracts all the management of that scheduling and coordination between workers.

Workers

Workers are processes that execute connectors and tasks. Each worker is a single process, and how these processes are spawned or managed is outside the scope of Copycat. They do not do any resource management -- they just run the work they are assigned by instantiating the appropriate classes, passing in configuration, and managing the components lifecycle. Workers provide a way to run connectors and tasks without assuming any particular process or cluster management tool (although workers themselves may be running under any one of these tools). They also allow many tasks to share the overhead of a single process since many tasks may be lightweight in terms of CPU and memory usage. Workers are assumed to have homogeneous resources so that balancing tasks across them can be kept simple.

Balancing Work

...

and Partitioned Streams

The basic structure that all Copycat source and sink systems must be mapped to is the partitioned stream of records. This is a generalization of Kafka's concept of topic partitions. The stream is the complete set of records, which are split into independent infinite sequences of records. Each record can contain:

  • key and value - the event data, which after serialization are both byte[] (or null)
  • partition - and partition identifier. This can be arbitrarily structured (again, after serialization it will be a byte[]). This is a generalization of Kafka integer partition IDs.
  • offset - a unique identifier indicating the position of the event in the partition. This can be arbitrarily structured (also a byte[]). This is a generalization of Kafka long offsets.

Image Added

As a concrete example, we might model a collection of databases we want to import via JDBC as a stream. Each table is assigned its own partition, and each record in a partition will contain one update to one row of the table. As shown below, since the stream covers many databases, partitions are labeled by the combination of the database and table.

Image Added

Streams may have a dynamic number of partitions. Over time, a stream may grow to add more partitions or shrink to remove them:

Image Added

Although this is possible in Kafka, it is not very common. In Copycat, it may be more common. For example, in the JDBC example, new partitions will be added when new tables are created, as shown here where DB2 has added Table3 and Table 4.

Image Added

The number of partitions may also be very large and Copycat does not require that all partitions be enumerated. A somewhat extreme example of this would be metrics collected from a large number of hosts, e.g. application stats collected via JMX or OS-level stats collected via ganglia. Logically, we can represent these as a very large number of partitions, with one partition per host per metric (perhaps hundreds of thousands or even millions across a data center) and Unix timestamps as offsets.

Image Added

Connector Model

The connector model defines how third-party developers create connector plugins which import or export data from another system. The model has two key concepts: a Connector, which divides up the work, and Tasks, which are responsible for producing or consuming records.

Connectors

Connectors are the largest logical unit of work in Copycat and define where data should be copied to and from. This might cover copying a whole database or collection of databases into Kafka, or a set of topics matching a regex into HDFS. However, the connector does not perform any copying itself. Instead, it is responsible for using the configuration to map the source or sink system to the partitioned stream model described above and grouping partitions into coarser-grained groups called tasks. This is an ongoing task because the partitioning may be dynamic; the connector must monitors the input or output system for changes that require updating that distribution of partitions.

Tasks

Tasks are responsible for producing or consuming sequences of CopycatRecords in order to copy data. They are assigned a a subset of the partitions in the stream and copy those partitions to/from Kafka. Tasks also provide control over the degree of parallelism when copying data: each task is given one thread and the user can configure the maximum number of tasks each connector can create. The following image shows the logical organization of a source connector, its tasks, and the data flow into Kafka. (Note that this is not the physical organization, tasks are not executing inside Connectors.)

Image Added

Partitions are balanced evenly across tasks. Each task reads from its partitions, translates the data to Copycat's format, decides the destination topic (and possibly partition) in Kafka. Note that partitions in the input system do not need to be mapped to a single topic or partition in Kafka; the connector may perform arbitrary shuffling and data transformation.

In this simple case, we can easily enumerate all the partitions and assign them simply by dividing the list between the tasks. However, this approach is not required. The Connector is responsible for assigning partitions to tasks and it can use any approach that makes sense. For example, the metrics example from earlier might look like this:

 

Image Added

Instead of enumerating every single metric across all hosts or application processes, the connector might only divide work between tasks at the granularity of hosts and might even specify this as a range of hosts rather than actually listing the full set of hosts. So in the example, the connector could generate configs that specify the range of hosts the task should handle (e.g. server.range=a-m and server.range=n-z) and tasks, which are implemented as part of the same connector plugin, know to handle all metrics for all servers with hostnames in that range. It is important that each metric be its own partition so that offsets can be tracked for each individually (enabling correct handling of failures), but the Copycat framework does not need to know the full list of partitions or exactly how they are assigned to tasks.

 

Aside from lifecycle events, source and sink tasks have different APIs. Source tasks are pull-based and provide an API for polling for new input records from the source system. The framework will call this method continuously, then handle serializing the records and producing them to Kafka. The Copycat framework will also track the input partition offsets included with the records and manage the offset commit process, ensuring data has been fully flushed to Kafka.

Sink tasks are push based and provide an API for accepting input records that have been read and parsed from a Kafka consumer. They may either use Kafka’s built-in offset commit combined with a flush method they provide to have the framework manage offset commits, or they can manage offsets entirely in the output data store.

 

 

Worker Model

The worker model represents the runtime in which connectors and tasks execute. This layer decouples the logical work (connectors) from the physical execution (workers executing tasks) and abstracts all the management of that scheduling and coordination between workers.

Workers

Workers are processes that execute connectors and tasks. Each worker is a single process, and how these processes are spawned or managed is outside the scope of Copycat. They do not do any resource management -- they just run the work they are assigned by instantiating the appropriate classes, passing in configuration, and managing the components lifecycle. Workers provide a way to run connectors and tasks without assuming any particular process or cluster management tool (although workers themselves may be running under any one of these tools). They also allow many tasks to share the overhead of a single process since many tasks may be lightweight in terms of CPU and memory usage. Workers are assumed to have homogeneous resources so that balancing tasks across them can be kept simple.

Balancing Work

Since connectors and tasks may be lightweight, they are not executed in their own processes. Instead, they are assigned to workers, which execute many in the same process. The framework manages this process by tracking which workers are currently running and balancing work across them. The exact implementation is not specified here. However, this does require some coordination between a collection of worker processes, both to distribute the work and to share some persistent state to track the progress of each connector (i.e. offset commit).

Here is a simple example of a cluster of 3 workers (processes launched via any mechanism you choose) running two connectors. The worker processes have balanced the connectors and tasks across themselves. Note that although the partition assignments are labeled in this image, the workers are not aware of them.

Image Added

If a connector adds partitions, this causes it to regenerate task configurations. Although this can create new tasks in some cases, usually it will just require the connector to change the assignment of partitions to tasks. In this case, as shown below, the tasks need to be reconfigured, but no changes are made in the mapping of tasks to workers.

Image Added

If one of the workers fails, the remaining workers rebalance the connectors and tasks so the work previously handled by the failed worker is moved to other workers:

Image Added

The same mechanism is used to add or remove capacity by starting or stopping worker processes, but the process will be handled gracefully.

This load balancing process performed by the workers can use a simple assignment policy (e.g. uniform distribution) and should be relatively infrequent (comparable to consumer group partition rebalancing), only occurring when:

  • A new worker process is started
  • A worker process shuts down gracefully or fails
  • A user reconfigures the maximum number of tasks for a connector
  • The set of partitions for a connector changes and the resulting assignment of partitions to tasks generated by the connector results in a different number of tasks than the previous configuration. This should be unusual since it requires that the number of partitions is smaller than the user's setting for maximum number of tasks. For example, if a user specifies max.tasks=20 for a JDBC connector where there are only two tables to copy initially it would only generate two tasks, each assigned one table. If a third table was added, the new configuration would have 3 tasks. In most cases, since partitions can be fine-grained, the number of partitions will be much larger than the maximum number of tasks.


Although the implementation is not specified here, it must provide an interface that can be used in embedded mode or by the REST API to accomplish the following operations:

...

Because Kafka already provides offset storage for consumers, the need for a new storage system for offset data warrants further discussion. Copycat extends the idea of offsets to a more general form, allowing stream partition IDs and offsets for source connectors to have arbitrary structure. These cannot easily be mapped directly to the topic-partition offsets that Kafka provides for consumers, even with the recent addition of metadata that can be attached to each of the offsets. Therefore, at least for source connectors, another more general mechanism will be necessary. However, sink tasks may commonly be able to use Kafka’s existing offset commit, and will not need to consider offsets at all since this process can be managed by the Copycat framework.

...

A JDBC connector would provide a generic way to import data from relational databases. The default scope for copying would be an entire databasecollection of databases, and one input stream by default each table would be associated with each table a partition (or user a query would be a partition if , for example, they want the input to be a join query). With one stream per table, this connector could be split into multiple tasks and run in distributed modethe user configured one, e.g. to include a join). The connector can poll the database periodically to check for new tables, resulting in new partitions.

To generate records, tasks can poll each table for updates, using timestamps, autoincrement columns, or a combination of both to detect additions and changes. Schemas can be are derived from the table’s columns and the records will have a trivial, flat record structure. The stream partition ID for records will be a combination of the database name, the table name and the offsets will be the timestamp, autoincrement ID, or a tuple containing both. These offsets are used to construct a query on each poll of the table that restricts the query results to new data.

...

Log file import is a common use case and some applications may not be able to deliver logs directly to Kafka. A log file connector would run in standalone mode since it needs to run on each server that logs are collected from. The configuration would provide a list of files or regexes for files to load. Each file would become an input stream partition and offsets could be recorded as byte offsets into that file. The simplest implementation would create one record for each line (and records would have trivial structure). A more complex implementation might allow for a simple regex specification of the format of the logs.

...