Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka has become a standard storage system for large scale, streaming data. However, the current user experience when trying to adopt Kafka is poor because Kafka provides little support for getting data into or out of Kafka.

...

Copycat addresses these problems by providing a standard framework for Kafka connectors. It abstracts the common problems these connectors need to solve: fault tolerance, partitioning, offset management and delivery semantics, operations, and monitoring. This makes developing connectors much simpler and adopting connectors easier, especially making each incremental addition trivial.

Proposed Changes

This KIP proposes a standardized framework for handling import and export of data from Kafka called Copycat. This framework can address a variety of use cases including those described in the motivation section; would make adopting Kafka much simpler for users with existing data pipelines; would encourage an ecosystem of tools for integration of other systems with Kafka using a unified interface; and would provide a better user experience, guarantees, and scalability than other frameworks that are not Kafka-specific are able to.

The rest of this section covers some design goals derived from the motivation & use cases, discusses why this tool should be made part of Kafka rather than a third party tool, why it should be a Kafka-specific tool, and provides a rough, high-level system design.

Design Goals

The goal of adding the Copycat framework is to make it as easy as possible to connect Kafka to other systems by copying data between Kafka and these systems. The Copycat tool needs to support a variety of input/output systems and use cases, but they all share the common need of copying data, often at scale. To that end, there are a few important design goals any such framework should strive to achieve:

  • Focus on copying data only – Focus on reliable, scalable data copying; leave transformation, enrichment, and other modifications of the data up to frameworks that focus solely on that process. Because some very simple transformations may be broadly applicable (e.g. drop a field, obfuscate sensitive data), it may make sense to include a very minimal hook or settings to perform these modifications. However, these additions must be added carefully and not come at the cost of other key design goals.

  • Copy broadly by default – Endless configuration tweaking can quickly destroy the effectiveness of a tool like Copycat. Jobs Connectors should be quick to define and be able to copy vast quantities of data between systems. For example, the default unit of work should be an entire database, even if it is possible to define jobs connectors that copy individual tables.

  • Parallel – Parallelism should be included in the core abstractions, providing a clear avenue for the framework to provide automatic scalability. Although some sources or sinks may naturally have no parallelism (e.g. a database change log), many others have significant parallelism (e.g. metrics, logs), and the framework should be capable of -- and encourage -- taking advantage of that parallelism.

  • Strong semantics – If possible, it is better to provide exactly-once delivery than weaker semantics; it is preferable to provide at-least or at-most once delivery than best effort. The framework should make it easy to provide stronger semantics when the connector system supports it.

  • Capture metadata – Many systems provide data with a well-defined structure and types. The framework should be able to capture this metadata and preserve it through an entire data pipeline as long as connectors also preserve it. However, the framework should also be able to handle systems which do not include or do not provide this metadata.

  • Accessible connector API – It must be easy to develop new connectors. The API and runtime model for implementing new connectors should make it simple to use the best library for the job, quickly get data flowing between systems, and still get all the benefits of the framework. Where the framework requires support from the connector, e.g. for recovering from faults, all the tools required should be included in the Copycat APIs.

  • Streaming and batch – Copycat must be able to integrate well with both streaming and batch-oriented systems. Kafka’s ability to interact efficiently with both these types of systems is one of its unique features and one which Copycat can take advantage of to make integrating these types of systems seamless and easy.

  • Scale to the application – Although Copycat should support copying large scale data, it should also scale easily to the application or environment. It should be easy to run a single process with a single copy job connector in development, testing or a small production environment, but also scale up to an organization-wide service for copying data between a wide variety of large scale systems.

Why make Copycat Kafka-specific?

Copycat is designed specifically for Kafka and one endpoint in every Copycat job connector is always Kafka. In contrast, there are already a variety of frameworks for copying and processing data that provide highly generic interfaces and already have plugins for Kafka (examples: fluentd, Flume, Logstash, Heka, Apache Camel). However, this generic approach misses out on a lot of important features of Kafka.

...

Besides leveraging Kafka-specific functionality, there are drawbacks to adopting any of the numerous existing general-purpose frameworks. Most are not actually general purpose because they were initially designed around a specific use case (e.g. logs, ETL) and later generalized; however, their designs -- and limitations -- clearly highlight their origins. Many of these systems also grew broader in scope over time, making them more complex to learn, deploy, and making it harder to provide useful guarantees, such as data delivery guarantees. Another issue is that many tools, especially those focused on ETL, tend to require a specific runtime model/environment, e.g. they require YARN. Such a requirement is impractical for a general purpose tool as the number of cluster resource management strategies is quickly growing, let alone traditional process management strategies. It is better to be agnostic to the use of these tools rather than depending on them. Finally, many of these tools do not fit in well with the technology stack Kafka requires. For some users they may be preferable if they match their existing stack and infrastructure (e.g. Ruby for fluentd), but for many users a tool that fits well with the stack (and knowledge) that Kafka already requires would be preferable.

Why add Copycat to Kafka?

Copycat as described should not rely on any Kafka internals. While there is no technical requirement that Copycat be included directly with Kafka, doing so has a number of benefits.

...

Finally, Copycat connectors will generally be better (e.g. better parallelism, delivery guarantees, fault tolerance, scalability) than plugins in other frameworks because Copycat takes advantage of being Kafka-specific, as described in the previous subsection. Copycat will benefit by being closely tied to Kafka development, and vice versa, by ensuring Kafka APIs, abstractions, and features coevolve with Copycat, which represents an important use case.

Design

Copycat’s design can be broken into three key components: the data model that defines the structure of data that Copycat manages, the connector model which defines the interface to external systems, and the worker model which defines how connector jobs connectors are executed and how the system implements various aspects such as coordination, configuration storage, offset storage, and offset commit management for different delivery guarantees.

Data Model

Copycat’s job is to copy events (or messages, records, changes, or your preferred terminology) from one system to another. To do so, it needs a generic representation for structured data that is not dependent on any particular system (data storage system or data serialization system). This is a significant departure from Kafka which is completely agnostic to data formats and, aside from the serializer interfaces provided for convenience, only operates on byte arrays.

Copycat’s data model only assumes an in-memory runtime format, leaving the details of (de)serialization to a pluggable component, much like the existing serializer interface. The data model must be able to represent complex data (object/record/dictionary types), but must also allow for standalone primitive types that will commonly be used as keys for events. Except for primitive types, all records include schemas which describe the format of their data. Including this schema facilitates the translation of the runtime data format to serialization formats that require schemas.

Records

Records are the core data type that Copycat handles. All records, whether for source or sink connectors, include the fields relevant to Kafka: topic, partition, key, and value.

...

Records that are loaded from Kafka and passed to sink connectors for export to another system are guaranteed to have a non-null partition and have one additional field, the offset (type long) of the message in the Kafka topic partition.

Connector Model

The connector model defines how third-party developers create connector plugins which import or export data from another system. The model has two key concepts: a Connector, which manages divides up the jobwork, and Tasks, which are responsible for producing or consuming records.

Connectors

Connectors manage a complete, distributed copy jobare the largest logical unit of work in Copycat and define where data should be copied to and from. This might cover copying a whole database into Kafka, a set of topics matching a regex into HDFS. However, the connector does not perform any copying itself. Instead, it is responsible for accepting the job configuration and figuring out how to partition work across multiple copy tasks (e.g. splitting up the database by table across the target number of workers). It also monitors the input or output system for changes that require updating that distribution of work (e.g., a new table is created and needs to be assigned to a worker).

Tasks

Tasks are responsible for producing or consuming streams of CopycatRecords in order to copy data. They are responsible for accepting a configuration generated by their parent connector and copying the data assigned to them (a set of input streams for sources or a set of input topic-partitions associated with their consumer for sinks). When inputs and outputs may be dynamic, tasks should not need to monitor the external system for changes. This process is handled by the Connector. Tasks should be “dumb” in that their only job is to copy records.

...

Sink tasks are push based and provide an API for accepting input records that have been read and parsed from a Kafka consumer. They may either use Kafka’s built-in offset commit combined with a flush method they provide to have the framework manage offset commits, or they can manage offsets entirely in the output data store.

 

Worker Model

The worker model represents the runtime containers in which connectors and tasks execute. This layer decouples the logical jobs (connectors) from the physical execution (workers executing tasks) and abstracts all the management of that scheduling and coordination between workers.

Workers

Workers are dumb containers that execute connectors and tasks. They do not do any resource management -- they just run the jobs they are assigned. Workers provide a way to run connectors and tasks without assuming any particular process or cluster management tool (although workers themselves may be running under any one of these tools). They also allow many tasks to share the overhead of a single process since many tasks may be lightweight in terms of CPU and memory usage. Workers are assumed to have homogeneous resources so that scheduling tasks across them can be kept simple.

Coordinator

The coordinator is responsible for managing the current set of connectors and tasks and how they are mapped to workers. The key functionality of the coordinator is scheduling, and in order to accomplish this it performs other operations such as monitoring liveness of other workers.

Copycat worker processes are symmetric in the sense that they all instantiate the Coordinator class and any worker may become the leader; however at any time only one coordinator instance is considered the leader and is actively managing other nodes. All Coordinator classes are able to handle all types of requests, but may need to forward those requests. This provides a standard interface that can be used in embedded mode or by the REST API to accomplish the following operations:

  • Get a list of connectors.

  • Create a new connector job with a provided configuration.

  • Get the current state of a connector and its tasks.

  • Get a connector job configuration.

  • Update a connector job configuration.

  • Get the configuration and state of a connector task.

  • Delete a connector job.

  • Get a list of active workers and their state & tasks.

At least two implementations should be provided: a trivial standalone implementation for agent-style applications and a Zookeeper-based distributed implementation which is required for any non-agent mode deployment.

Data Storage

In order to be fault tolerant, Copycat needs to store three types of data: connector configurations (provided by the user), task configurations (generated by connectors and must be stored for coordinator failover), and offset commit data.

This KIP will not specify the implementation of this storage. However, there are likely to be two implementations. The first will use local file system storage and is useful for standalone mode to store offset data. This mode does not require connector or task configuration storage because there is only one process and it is expected that the job configuration is provided each time Copycat is executed. The second implementation is stored in either Zookeeper or Kafka for use in the distributed mode.

Because Kafka already provides offset storage for consumers, the need for a new storage system for offset data warrants further discussion. Copycat extends the idea of offsets to a more general form, allowing stream IDs and offsets for source connectors to have arbitrary structure. These cannot easily be mapped directly to the topic-partition offsets that Kafka provides for consumers, even with the recent addition of metadata that can be attached to each of the offsets. Therefore, at least for source connectors, another more general mechanism will be necessary. However, sink tasks may commonly be able to use Kafka’s existing offset commit, and will not need to consider offsets at all since this process can be managed by the Copycat framework.

Delivery Guarantees

Copycat can support three different levels of delivery guarantees between a source and sink system: at least once, at most once, and exactly once. Each of these require certain set of functionality for the input and output systems.

At least once: each messages is delivered to the output system at least one time, but possibly multiple time. This requires the output system to support a flush operation, which is followed by offset commit in the input system. For Copycat sources, KafkaProducer is the output and already supports flush, and the Copycat framework provides storage and offset commit for the general form offsets generated from the source system. For sinks, the task is the sink and must implement a flush method, and the source is the KafkaConsumer and already supports offset commit stored in Kafka.

At most once: each message is delivered to the output system at most once, but may not be delivered at all. This requires buffering of records in the framework so that offsets can be committed in the input system (where the offsets may not be available until the data is read) and then records are sent to the output system. Offset commit is provided by the same components as described in the at least once section.

Exactly once: each message is guaranteed to be delivered to (or stored in) the output system exactly once. The implementation will depend on the output system, requiring either idempotency or that input system offsets are tracked in the output system. For source connectors, the output system is Kafka and should be able to achieve exactly once semantics with the idempotent producer; for sink connectors, it can be achieved in a system-specific way, e.g. using <topic>-<partition>-<offset> as a document ID in a document store like Elasticsearch or atomically committing data & offset using a file rename in HDFS.

Examples

To make some of these concepts more concrete, this section describes a few possible connectors, their implementations, and applications.

JDBC Import

A JDBC connector would provide a generic way to import data from relational databases. The default scope for copying would be an entire database, and one input stream would be associated with each table (or user query if, for example, they want the input to be a join query). With one stream per table, this connector could be split into multiple tasks and run in distributed mode. The connector can poll the database periodically to check for new tables.

To generate records, tasks can poll each table for updates, using timestamps, autoincrement columns, or a combination of both to detect changes. Schemas can be derived from the table’s columns and the records will have a trivial, flat record structure. The stream ID for records will be the table name and the offsets will be the timestamp, autoincrement ID, or a tuple containing both. These offsets are used to construct a query on each poll of the table that restricts the query results to new data.

HDFS Export

HDFS export is a very common use case for long-term storage and batch analysis of data. The input for this connector would be a list of topics or a regex subscription. Since this will usually cover a large quantity of data, this will generally be used in clustered mode. Dynamic inputs are handled automatically by the Kafka consumer. This connector can provide exactly once delivery by managing offsets with the data stored in HDFS. It can write to temporary files as the data streams in. To commit data, it closes the file and performs an atomic rename. Offset information is included with the file (or in the filename) to ensure atomicity of committing both the data and the offsets. After faults, instead of resuming wherever the Kafka consumer left off, it will check the offsets in HDFS and seek the underlying consumer to the correct position before writing new data.

Log Import

Log file import is a common use case and some applications may not be able to deliver logs directly to Kafka. A log file connector would run in standalone mode since it needs to run on each server that logs are collected from. The configuration would provide a list of files or regexes for files to load. Each file would become an input stream and offsets could be recorded as byte offsets into that file. The simplest implementation would create one record for each line (and records would have trivial structure). A more complex implementation might allow for a simple regex specification of the format of the logs.

Mirror Maker

The existing mirror maker tool can be thought of as a special case where rather than just one endpoint being Kafka, both the input and output are Kafka. This could potentially be implemented as either a source or a sink connector. One of the connections, managed by the framework, would use the Kafka cluster that is set globally for the entire Copycat cluster; the second connection would be managed by the connector itself and use a remote Kafka cluster. If the implementations are sufficiently compatible, it might make sense to eventually deprecate the original mirror maker tool and provide a compatible Copycat wrapper script. A mirror maker connector would be a good candidate for a built-in connector since it is a commonly needed tool and requires no additional dependencies.

Public Interfaces

This section describes the different public, user-facing interfaces that Copycat will include, but these are not intended to necessarily represent the final interface. Rather, the goal of this section is to give a sense of the scope and usage of the Copycat tool.

CLI

The primary interface for Copycat is the REST interface. However, many users’ first exposure to Copycat will be via the command line because that is the natural interface to run a simple, standalone Copycat agent.

This KIP introduces two command line tools. However, since these do not provide access to all Copycat functionality, it is likely that we will want to add more command line utilities to query and modify Copycat. The commands included are the minimum required to provide baseline functionality in conjunction with the REST interface, but additional commands providing CLI access to functionality only exposed by the REST interface in this KIP can be added in subsequent KIPs.

copycat

The copycat command runs Copycat in standalone (agent) mode. This mode runs a single Copycat job:

copycat [...storage options...] job.properties [job2.properties …]

Functionally, this behaves as if you started a single node cluster and submitted each job properties file as a new job, however:

  • It does not require direct Zookeeper access for Copycat storage/coordination. Zookeeper is only used as needed by the Kafka clients. This allows it to run on any machine without extra configuration or any more access than any other Kafka client would have.

  • The job configuration is not placed in persistent storage. It is expected that if you kill the process, you will provide the job definition again if you want to resume it.

  • Offset storage is local and unreplicated. The data will be persisted to disk by default, allowing resumption in the case of failure, reboots, or other causes of process restarts. Extra command line arguments can be passed to control how offset storage is managed, e.g. to disable it entirely for testing purposes.

  • A simpler coordinator implementation is used because there is only one worker process and no real coordination is required.

copycat-worker

The copycat-worker command starts a worker in cluster mode:

copycat-worker --zookeeper zookeeper:2181/prefix

In contrast to the standalone mode, this mode:

  • Must provide Zookeeper connection information. This is used for worker coordination, job config, and offset storage.

  • The command does not include a job; it only starts a worker. All workers will be running a REST interface which can be used to submit/modify/destroy persistent jobs.

  • Uses Zookeeper/Kafka for job and task storage.

  • Uses Zookeeper/Kafka for offset storage.

  • Runs the Zookeeper coordinator implementation to manage how connectors and tasks are distributed across worker nodes. Uses provided Zookeeper information to join cluster, possibly becoming leader and managing jobs, tasks, and other worker nodes.

REST Interface

All workers running in cluster mode run a REST interface that allows users to submit, get the status of, modify, and destroy connector jobs. Although some operations may only be performed by the current leader, all APIs can be called on any worker node; the worker node is responsible for forwarding any requests that.

GET /connectors

Get a list of connectors.

POST /connectors

Create a new connector by passing its configuration as a set of string key-value pairs.

GET /connectors/<id>

Get the current state of a connector, including the current list of tasks for the connector.

GET /connectors/<id>/config

Get the connector job configuration.

PUT /connectors/<id>/config

Update the connector job configuration. Updating requires reconfiguring all tasks and this call will block until reconfiguration is complete or timed out. Because this may require flushing data, this call may block for a long time.

GET /connectors/<id>/tasks/<tid>

Get the configuration and state of a connector task.

DELETE /connectors/<id>

Destroy a connector. This will try to cleanly shutdown and so may block waiting for data to flush from connectors.

Embedded API

Copycat can also be run in an embedded mode, allowing you to adopt it quickly for a specific application without complicating deployment & operations. This mode supports distributed mode so it can scale with your application. A simple example using all the defaults:

Code Block
// Start embedded Copycat worker with a unique ID for the copycat
// cluster
final Copycat copycat = new Copycat(“app-id”);
copycat.start()

// Start import job to get data into “input” topic
copycat.getCoordinator().addConnector(importProperties);

// Create consumer & worker threads to read/transform/process data 
// from topic “input”; use producer to store results in another
// topic “output”

// Start export job to get data from “output” topic to, e.g., HDFS
copycat.getCoordinator().addConnector(exportProperties);

copycat.stop()
copycat.awaitStop();

This uses the default distributed mode with data storage in Zookeeper/Kafka. We use the coordinator to create connectors (if they don’t already exist) to perform data import and export. The other component of the application processes data that was imported using a regular Kafka consumer and produces the output to a new output topic. With this embedded API, a single application binary can be used to implement a complete, scalable, fault-tolerant, real-time data processing pipeline.

The embedded API also allows configuration of different components:

...

Streams

Streams are the finest granularity of partitioning that Copycat works with. They represent a single, possibly infinite, sequence of records with associated offset. Each stream must be uniquely identified (within the scope of a connector) by an ID, but that ID may be arbitrarily structured. Offsets can also have arbitrary structure and their only requirement is that they can be used to rewind a stream to a given position; they do not need to be integer values nor even be comparable with < or >. Streams are not explicitly represented in code in the Copycat API. Rather, records include a stream ID and offset which the framework can use to track the current position in the stream and, upon graceful restarts or failures, reset the stream to the last known-good position.

While connectors are responsible for dividing streams across a number of tasks, the framework never needs to know about the complete set of streams. In fact, the full set of stream IDs may never be generated; tasks only need to know which set of streams to manage (e.g. a range of stream IDs could be specified).

Worker Model

The worker model represents the runtime in which connectors and tasks execute. This layer decouples the logical work (connectors) from the physical execution (workers executing tasks) and abstracts all the management of that scheduling and coordination between workers.

Workers

Workers are processes that execute connectors and tasks. Each worker is a single process, and how these processes are spawned or managed is outside the scope of Copycat. They do not do any resource management -- they just run the work they are assigned by instantiating the appropriate classes, passing in configuration, and managing the components lifecycle. Workers provide a way to run connectors and tasks without assuming any particular process or cluster management tool (although workers themselves may be running under any one of these tools). They also allow many tasks to share the overhead of a single process since many tasks may be lightweight in terms of CPU and memory usage. Workers are assumed to have homogeneous resources so that balancing tasks across them can be kept simple.

Balancing Work

Since connectors and tasks may be lightweight, they are not executed in their own processes. Instead, they are assigned to workers, which execute many in the same process. The framework manages this process by tracking which workers are currently running and balancing work across them. The exact implementation is not specified here. However, this does require some coordination between a collection of worker processes, both to distribute the work and to share some persistent state to track the progress of each connector (i.e. offset commit).

Although the implementation is not specified here, it must provide an interface that can be used in embedded mode or by the REST API to accomplish the following operations:

  • Get a list of connectors.

  • Create a new connector with a provided configuration.

  • Get the current state of a connector and its tasks.

  • Get a connector configuration.

  • Update a connector configuration.

  • Get the configuration and state of a connector task.

  • Delete a connector.

  • Get a list of active workers and their state & tasks.

At least two implementations should be provided: a trivial standalone implementation (single process, no coordination) for agent-style applications and a distributed implementation which is required for any non-agent mode deployment.

Data Storage

In order to be fault tolerant, Copycat needs to store three types of data: connector configurations (provided by the user), task configurations (generated by connectors), and offset data.

This KIP will not specify the implementation of this storage. However, to support the different execution modes, there are likely to be two implementations. The first will use local file system storage and is useful for standalone mode to store offset data. This mode does not require connector or task configuration storage because there is only one process and it is expected that the connector configuration is provided each time Copycat is executed. The second implementation is distributed and will likely store data in Zookeeper or Kafka since both are readily available. The Copycat implementation can cleanly isolate this functionality to an interface so alternative implementations may be possible.

Because Kafka already provides offset storage for consumers, the need for a new storage system for offset data warrants further discussion. Copycat extends the idea of offsets to a more general form, allowing stream IDs and offsets for source connectors to have arbitrary structure. These cannot easily be mapped directly to the topic-partition offsets that Kafka provides for consumers, even with the recent addition of metadata that can be attached to each of the offsets. Therefore, at least for source connectors, another more general mechanism will be necessary. However, sink tasks may commonly be able to use Kafka’s existing offset commit, and will not need to consider offsets at all since this process can be managed by the Copycat framework.

Delivery Guarantees

Copycat can support three different levels of delivery guarantees between a source and sink system: at least once, at most once, and exactly once. Each of these require certain set of functionality for the input and output systems.

At least once: each messages is delivered to the output system at least one time, but possibly multiple time. This requires the output system to support a flush operation, which is followed by offset commit in the input system. For Copycat sources, KafkaProducer is the output and already supports flush, and the Copycat framework provides storage and offset commit for the general form offsets generated from the source system. For sinks, the task is the sink and must implement a flush method, and the source is the KafkaConsumer and already supports offset commit stored in Kafka.

At most once: each message is delivered to the output system at most once, but may not be delivered at all. This requires buffering of records in the framework so that offsets can be committed in the input system (where the offsets may not be available until the data is read) and then records are sent to the output system. Offset commit is provided by the same components as described in the at least once section.

Exactly once: each message is guaranteed to be delivered to (or stored in) the output system exactly once. The implementation will depend on the output system, requiring either idempotency or that input system offsets are tracked in the output system. For source connectors, the output system is Kafka and should be able to achieve exactly once semantics with the idempotent producer; for sink connectors, it can be achieved in a system-specific way, e.g. using <topic>-<partition>-<offset> as a document ID in a document store like Elasticsearch or atomically committing data & offset using a file rename in HDFS.

 

Integration with Cluster/Resource Managers

Process and resource management is orthogonal to Copycat's runtime model.


  • Run copycat as a service - for example, run 10 copycat workers. Users can just submit jobs and they will be balanced over the workers. No per-connector resource constraint configs are required. As demand increases, simply increase the number of processes and the cluster manager can start them up automatically, restart upon failures, etc.
  • Manage resources on a per-connector basis - create separate processes in cluster manager for each connector, allowing configuration . For example, in Mesos, use marathon to run N Copycat workers that only run one connector.
  • No cluster manager - start your processes however you normally do, e.g. through devops tools like Chef and Puppet. When there are failures, this is still detected and connectors/tasks are moved to other workers, which pick up the slack. If the worker returns, rebalancing is performed again.


Examples

To make some of these concepts more concrete, this section describes a few possible connectors, their implementations, and applications.

JDBC Import

A JDBC connector would provide a generic way to import data from relational databases. The default scope for copying would be an entire database, and one input stream would be associated with each table (or user query if, for example, they want the input to be a join query). With one stream per table, this connector could be split into multiple tasks and run in distributed mode. The connector can poll the database periodically to check for new tables.

To generate records, tasks can poll each table for updates, using timestamps, autoincrement columns, or a combination of both to detect changes. Schemas can be derived from the table’s columns and the records will have a trivial, flat record structure. The stream ID for records will be the table name and the offsets will be the timestamp, autoincrement ID, or a tuple containing both. These offsets are used to construct a query on each poll of the table that restricts the query results to new data.

HDFS Export

HDFS export is a very common use case for long-term storage and batch analysis of data. The input for this connector would be a list of topics or a regex subscription. Since this will usually cover a large quantity of data, this will generally be used in clustered mode. Dynamic inputs are handled automatically by the Kafka consumer. This connector can provide exactly once delivery by managing offsets with the data stored in HDFS. It can write to temporary files as the data streams in. To commit data, it closes the file and performs an atomic rename. Offset information is included with the file (or in the filename) to ensure atomicity of committing both the data and the offsets. After faults, instead of resuming wherever the Kafka consumer left off, it will check the offsets in HDFS and seek the underlying consumer to the correct position before writing new data.

Log Import

Log file import is a common use case and some applications may not be able to deliver logs directly to Kafka. A log file connector would run in standalone mode since it needs to run on each server that logs are collected from. The configuration would provide a list of files or regexes for files to load. Each file would become an input stream and offsets could be recorded as byte offsets into that file. The simplest implementation would create one record for each line (and records would have trivial structure). A more complex implementation might allow for a simple regex specification of the format of the logs.

Mirror Maker

The existing mirror maker tool can be thought of as a special case where rather than just one endpoint being Kafka, both the input and output are Kafka. This could potentially be implemented as either a source or a sink connector. One of the connections, managed by the framework, would use the Kafka cluster that is set globally for the entire Copycat cluster; the second connection would be managed by the connector itself and use a remote Kafka cluster. If the implementations are sufficiently compatible, it might make sense to eventually deprecate the original mirror maker tool and provide a compatible Copycat wrapper script. A mirror maker connector would be a good candidate for a built-in connector since it is a commonly needed tool and requires no additional dependencies.

Public Interfaces

This section describes the different public, user-facing interfaces that Copycat will include, but these are not intended to necessarily represent the final interface. Rather, the goal of this section is to give a sense of the scope and usage of the Copycat tool.

CLI

The primary interface for Copycat is the REST interface. However, many users’ first exposure to Copycat will be via the command line because that is the natural interface to run a simple, standalone Copycat agent.

This KIP introduces two command line tools. However, since these do not provide access to all Copycat functionality, it is likely that we will want to add more command line utilities to query and modify Copycat. The commands included are the minimum required to provide baseline functionality in conjunction with the REST interface, but additional commands providing CLI access to functionality only exposed by the REST interface in this KIP can be added in subsequent KIPs.

copycat

The copycat command runs Copycat in standalone (agent) mode. This mode runs one or more Copycat jobs in a single process:

copycat [...storage options...] connector.properties [connector2.properties …]

Functionally, this behaves as if you started a single node cluster and submitted each connector properties file as a new connector, however:

  • It acts completely standalone and does not need to coordinate with other copycat workers. This allows it to run on any machine without extra configuration or any more access than any other Kafka client would normally have.

  • The connector configuration is not placed in persistent storage. It is expected that if you kill the process, you will provide the connector definition again if you want to resume it.

  • Offset storage is local and unreplicated. The data will be persisted to disk by default, allowing resumption in the case of failure, reboots, or other causes of process restarts. Extra command line arguments can be passed to control how offset storage is managed, e.g. to disable it entirely for testing purposes.

  • A trivial implementation of work balancing is used because there is only one worker process and no real coordination is required.

copycat-worker

The copycat-worker command starts a worker in cluster mode:

copycat-worker <... config options for distributed mode ...>

In contrast to the standalone mode, this mode:

  • Requires some configuration to setup work balancing and shared storage of connector/task configs and offset storage.

  • The command does not include a connector; it only starts a worker. All workers will be running a REST interface which can be used to submit/modify/destroy persistent connectors.

REST Interface

All workers running in cluster mode run a REST interface that allows users to submit, get the status of, modify, and destroy connector. API calls can be made on any worker, but may require coordination with other workers to execute. The following is a sketch of the key API endpoints:

GET /connectors

Get a list of connectors.

POST /connectors

Create a new connector by passing its configuration as a set of string key-value pairs.

GET /connectors/<id>

Get the current state of a connector, including the current list of tasks for the connector.

GET /connectors/<id>/config

Get the connector configuration.

PUT /connectors/<id>/config

Update the connector configuration. Updating requires reconfiguring all tasks and this call will block until reconfiguration is complete or timed out. Because this may require flushing data, this call may block for a long time.

GET /connectors/<id>/tasks/<tid>

Get the configuration and state of a connector task.

DELETE /connectors/<id>

Destroy a connector. This will try to cleanly shutdown and so may block waiting for data to flush from connectors.

Embedded API

Copycat can also be run in an embedded mode, allowing you to adopt it quickly for a specific application without complicating deployment & operations. This mode supports distributed mode so it can scale with your application. A simple example using all the defaults:

Code Block
// Start embedded Copycat worker with a unique ID for the copycat
// cluster
final Copycat copycat = new Copycat(“app-id”);
copycat.start()

// Start import to get data into “input” topic
copycat.addConnector(importProperties);

// Create consumer & worker threads to read/transform/process data 
// from topic “input”; use producer to store results in another
// topic “output”

// Start export to get data from “output” topic to, e.g., HDFS
copycat.addConnector(exportProperties);

copycat.stop()
copycat.awaitStop();

This uses the default distributed mode. It can create connectors (if they don’t already exist) to perform data import and export. The other component of the application processes data that was imported using a regular Kafka consumer and produces the output to a new output topic. With this embedded API, a single application binary can be used to implement a complete, scalable, fault-tolerant, real-time data processing pipeline.

Compatibility, Deprecation, and Migration Plan

This KIP only proposes additions. There should be no compatibility issues.

Rejected Alternatives

Make Copycat an external third-party framework

Copycat is a new framework which should not rely on any internals of Kafka. Therefore, it could be maintained as an external framework which happens to be Kafka-specific. We’ve rejected this because:

  • One of the primary goals of Copycat is to ease adoption of Kafka by making it easier to get data into and out of Kafka. Shipping Copycat as part of a normal Kafka release makes it immediately available to every user that downloads Kafka.

  • Perhaps more importantly, it encourages deep integration of Copycat with the Kafka documentation.

  • Feature/mission creep in the project is a danger in adding new tools like this. However, besides custom integrations or consumer applications, which the client libraries serve well, getting other types of data that already exist in other systems is a popular enough need that the project should directly provide some support for doing so. The impact of this change is kept minimal by only including the core Copycat framework in Kafka, leaving connector plugins as third-party plugins.

Maintain connectors in the project along with framework

We could potentially maintain some or all connectors within Kafka alongside the framework. However, this has a number of drawbacks:

...

The major drawback to this approach is that all use cases require using some third-party code, which will require getting the appropriate JARs on the classpath. Additionally, it will not be possible to provide centralized, unified documentation. However, documentation for connector plugins will mostly be limited to configuration options.

Use existing stream processing framework

Some people are already using stream processing frameworks as connectors for other systems. For example, some users do light processing using something like Spark Streaming and then save the output directly to HDFS. There are a couple of problems with this approach.

  • Lack of separation of concerns. Stream processing frameworks should be focused on transformation and processing of data. Philosophically, import/export and transformation should be separate, and they effectively are in these frameworks.

  • Stream processing frameworks face a similar problem as frameworks like fluentd, flume, logstash, and others: in order to support a wide variety of systems, and arbitrary combinations of systems, they have to make some sacrifices in their abstractions and implementations. Copycat can avoid making these sacrifices because it is specific to Kafka, and since Kafka is a de facto standard data store for stream processing and supported by all the major stream processing frameworks, it will quickly benefit all frameworks.

  • There are multiple good stream processing systems, and having Kafka choose one external project to support does not make sense. Further, by providing a set of high quality connectors, Kafka can alleviate the development and integration burden of these other projects because they can leverage these connectors instead of writing their own. Instead, they will be able to focus on their core contribution, a framework for stream processing.

Support push-based source connectors

Some systems push data to a service which collects and logs that information. For example, many metrics systems such as statsd work this way: applications generate the data, send it to the collector, and likely do not save the data locally in any way. To support this use case, Copycat would have to act as a server in a stable location (set of servers and fixed ports). In contrast, most source connectors pull data from the data sources they’re assigned, and so can easily be placed on any worker node. This flexibility keeps the Copycat distributed runtime much simpler.

...