Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update examples explaining interaction with process management; update embedded example with more detail

...

Exactly once: each message is guaranteed to be delivered to (or stored in) the output system exactly once. The implementation will depend on the output system, requiring either idempotency or that input system offsets are tracked in the output system. For source connectors, the output system is Kafka and should be able to achieve exactly once semantics with the idempotent producer; for sink connectors, it can be achieved in a system-specific way, e.g. using <topic>-<partition>-<offset> as a document ID in a document store like Elasticsearch or atomically committing data & offset using a file rename in HDFS.

 

Integration with Process Management & Cluster

...

Resource Managers

Process and resource management is orthogonal to Copycat's runtime model.

  • Run copycat as a service - for example, run 10 copycat workers. Users can just submit jobs and they will be balanced over the workers. No per-connector resource constraint configs are required. As demand increases, simply increase the number of processes and the cluster manager can start them up automatically, restart upon failures, etc.
  • Manage resources on a per-connector basis - create separate processes in cluster manager for each connector, allowing configuration . For example, in Mesos, use marathon to run N Copycat workers that only run one connector.
  • No cluster manager - start your processes however you normally do, e.g. through devops tools like Chef and Puppet. When there are failures, this is still detected and connectors/tasks are moved to other workers, which pick up the slack. If the worker returns, rebalancing is performed again.

Examples

To make some of these concepts more concrete, this section describes a few possible connectors, their implementations, and applications.

JDBC Import

A JDBC connector would provide a generic way to import data from relational databases. The default scope for copying would be an entire database, and one input stream would be associated with each table (or user query if, for example, they want the input to be a join query). With one stream per table, this connector could be split into multiple tasks and run in distributed mode. The connector can poll the database periodically to check for new tables.

To generate records, tasks can poll each table for updates, using timestamps, autoincrement columns, or a combination of both to detect changes. Schemas can be derived from the table’s columns and the records will have a trivial, flat record structure. The stream ID for records will be the table name and the offsets will be the timestamp, autoincrement ID, or a tuple containing both. These offsets are used to construct a query on each poll of the table that restricts the query results to new data.

HDFS Export

HDFS export is a very common use case for long-term storage and batch analysis of data. The input for this connector would be a list of topics or a regex subscription. Since this will usually cover a large quantity of data, this will generally be used in clustered mode. Dynamic inputs are handled automatically by the Kafka consumer. This connector can provide exactly once delivery by managing offsets with the data stored in HDFS. It can write to temporary files as the data streams in. To commit data, it closes the file and performs an atomic rename. Offset information is included with the file (or in the filename) to ensure atomicity of committing both the data and the offsets. After faults, instead of resuming wherever the Kafka consumer left off, it will check the offsets in HDFS and seek the underlying consumer to the correct position before writing new data.

Log Import

Log file import is a common use case and some applications may not be able to deliver logs directly to Kafka. A log file connector would run in standalone mode since it needs to run on each server that logs are collected from. The configuration would provide a list of files or regexes for files to load. Each file would become an input stream and offsets could be recorded as byte offsets into that file. The simplest implementation would create one record for each line (and records would have trivial structure). A more complex implementation might allow for a simple regex specification of the format of the logs.

Mirror Maker

The existing mirror maker tool can be thought of as a special case where rather than just one endpoint being Kafka, both the input and output are Kafka. This could potentially be implemented as either a source or a sink connector. One of the connections, managed by the framework, would use the Kafka cluster that is set globally for the entire Copycat cluster; the second connection would be managed by the connector itself and use a remote Kafka cluster. If the implementations are sufficiently compatible, it might make sense to eventually deprecate the original mirror maker tool and provide a compatible Copycat wrapper script. A mirror maker connector would be a good candidate for a built-in connector since it is a commonly needed tool and requires no additional dependencies.

Public Interfaces

This section describes the different public, user-facing interfaces that Copycat will include, but these are not intended to necessarily represent the final interface. Rather, the goal of this section is to give a sense of the scope and usage of the Copycat tool.

CLI

The primary interface for Copycat is the REST interface. However, many users’ first exposure to Copycat will be via the command line because that is the natural interface to run a simple, standalone Copycat agent.

This KIP introduces two command line tools. However, since these do not provide access to all Copycat functionality, it is likely that we will want to add more command line utilities to query and modify Copycat. The commands included are the minimum required to provide baseline functionality in conjunction with the REST interface, but additional commands providing CLI access to functionality only exposed by the REST interface in this KIP can be added in subsequent KIPs.

copycat

The copycat command runs Copycat in standalone (agent) mode. This mode runs one or more Copycat jobs in a single process:

copycat [...storage options...] connector.properties [connector2.properties …]

Functionally, this behaves as if you started a single node cluster and submitted each connector properties file as a new connector, however:

  • It acts completely standalone and does not need to coordinate with other copycat workers. This allows it to run on any machine without extra configuration or any more access than any other Kafka client would normally have.

  • The connector configuration is not placed in persistent storage. It is expected that if you kill the process, you will provide the connector definition again if you want to resume it.

  • Offset storage is local and unreplicated. The data will be persisted to disk by default, allowing resumption in the case of failure, reboots, or other causes of process restarts. Extra command line arguments can be passed to control how offset storage is managed, e.g. to disable it entirely for testing purposes.

  • A trivial implementation of work balancing is used because there is only one worker process and no real coordination is required.

copycat-worker

The copycat-worker command starts a worker in cluster mode:

copycat-worker <... config options for distributed mode ...>

In contrast to the standalone mode, this mode:

  • Requires some configuration to setup work balancing and shared storage of connector/task configs and offset storage.

  • The command does not include a connector; it only starts a worker. All workers will be running a REST interface which can be used to submit/modify/destroy persistent connectors.

REST Interface

All workers running in cluster mode run a REST interface that allows users to submit, get the status of, modify, and destroy connector. API calls can be made on any worker, but may require coordination with other workers to execute. The following is a sketch of the key API endpoints:

GET /connectors

Get a list of connectors.

POST /connectors

Create a new connector by passing its configuration as a set of string key-value pairs.

GET /connectors/<id>

Get the current state of a connector, including the current list of tasks for the connector.

GET /connectors/<id>/config

Get the connector configuration.

PUT /connectors/<id>/config

Update the connector configuration. Updating requires reconfiguring all tasks and this call will block until reconfiguration is complete or timed out. Because this may require flushing data, this call may block for a long time.

GET /connectors/<id>/tasks/<tid>

Get the configuration and state of a connector task.

DELETE /connectors/<id>

Destroy a connector. This will try to cleanly shutdown and so may block waiting for data to flush from connectors.

Embedded API

Copycat can also be run in an embedded mode, allowing you to adopt it quickly for a specific application without complicating deployment & operations. This mode supports distributed mode so it can scale with your application. A simple example using all the defaults:

Code Block
// Start embedded Copycat worker with a unique ID for the copycat
// cluster
final Copycat copycat = new Copycat(“app-id”);
copycat.start()

// Start import to get data into “input” topic
copycat.addConnector(importProperties);

// Create consumer & worker threads to read/transform/process data 
// from topic “input”; use producer to store results in another
// topic “output”

// Start export to get data from “output” topic to, e.g., HDFS
copycat.addConnector(exportProperties);

copycat.stop()
copycat.awaitStop();

The framework is not responsible for starting/stopping/restarting processes and can be used with any process management strategy, including YARN, Mesos, Kubernetes, or none if you prefer to manage processes manually or through other orchestration tools.

Here are some examples of how Copycat might be deployed and how resource management frameworks can be applied:

  • Copycat as a service: Run a collection of Copycat worker processes (via any mechanism, e.g. using Chef/Puppet/Ansible/Salt). Users submit connectors via the REST API. Internally, the worker processes instantiate the connectors and tasks and balance them over the available workers. A resource manager can be used to make the cluster self-healing (respawning workers if they die); however, even if workers are not automatically restarted, connectors continue to run properly even if a worker fails because the connectors and tasks that were on that worker are relocated to the remaining live workers.
  •  Resource constrained connectors: To apply resource constraints to specific connectors, start the desired number of Copycat workers with resource constraints via YARN, Mesos, or Kubernetes (or the appropriate framework, such as Slider or Marathon). Only one connector is submitted to this cluster. Internally, tasks are still balanced across worker processes and the workers will handle failure of one of the processes as usual. The restriction to a single connector is only required to allow the resource manager to apply constraints to a single connector.
  • Small resource constrained connectors: If the data to be copied is small enough to be handled by one process, you want to rely on a resource manager to handle failures via restart, and you want to apply resource constraints, then you can run Copycat in standalone mode with a single connector configuration under a resource manager. Note, however, that offset storage, managed in a local file in standalone mode, must be available across restarts.
  • Embedded application: Using the embedded API, the application starts a source connector to import data to Kafka and is returned a list of topics where the data will be stored. Using Kafka's client APIs or a higher level stream processing API, it consumes from the topic, transforms the data, and produces the resulting data to a new topic. Many instances of the application can be executed in parallel, e.g. by deploying with Chef/Puppet/Ansible/Salt. The embedded API internally works just like a worker process and connectors and tasks will be automatically balanced across the active set of application processes, and the consumer will independently automatically balance the partitions across the application processes.

Examples

To make some of these concepts more concrete, this section describes a few possible connectors, their implementations, and applications.

JDBC Import

A JDBC connector would provide a generic way to import data from relational databases. The default scope for copying would be an entire database, and one input stream would be associated with each table (or user query if, for example, they want the input to be a join query). With one stream per table, this connector could be split into multiple tasks and run in distributed mode. The connector can poll the database periodically to check for new tables.

To generate records, tasks can poll each table for updates, using timestamps, autoincrement columns, or a combination of both to detect changes. Schemas can be derived from the table’s columns and the records will have a trivial, flat record structure. The stream ID for records will be the table name and the offsets will be the timestamp, autoincrement ID, or a tuple containing both. These offsets are used to construct a query on each poll of the table that restricts the query results to new data.

HDFS Export

HDFS export is a very common use case for long-term storage and batch analysis of data. The input for this connector would be a list of topics or a regex subscription. Since this will usually cover a large quantity of data, this will generally be used in clustered mode. Dynamic inputs are handled automatically by the Kafka consumer. This connector can provide exactly once delivery by managing offsets with the data stored in HDFS. It can write to temporary files as the data streams in. To commit data, it closes the file and performs an atomic rename. Offset information is included with the file (or in the filename) to ensure atomicity of committing both the data and the offsets. After faults, instead of resuming wherever the Kafka consumer left off, it will check the offsets in HDFS and seek the underlying consumer to the correct position before writing new data.

Log Import

Log file import is a common use case and some applications may not be able to deliver logs directly to Kafka. A log file connector would run in standalone mode since it needs to run on each server that logs are collected from. The configuration would provide a list of files or regexes for files to load. Each file would become an input stream and offsets could be recorded as byte offsets into that file. The simplest implementation would create one record for each line (and records would have trivial structure). A more complex implementation might allow for a simple regex specification of the format of the logs.

Mirror Maker

The existing mirror maker tool can be thought of as a special case where rather than just one endpoint being Kafka, both the input and output are Kafka. This could potentially be implemented as either a source or a sink connector. One of the connections, managed by the framework, would use the Kafka cluster that is set globally for the entire Copycat cluster; the second connection would be managed by the connector itself and use a remote Kafka cluster. If the implementations are sufficiently compatible, it might make sense to eventually deprecate the original mirror maker tool and provide a compatible Copycat wrapper script. A mirror maker connector would be a good candidate for a built-in connector since it is a commonly needed tool and requires no additional dependencies.

Public Interfaces

This section describes the different public, user-facing interfaces that Copycat will include, but these are not intended to necessarily represent the final interface. Rather, the goal of this section is to give a sense of the scope and usage of the Copycat tool.

CLI

The primary interface for Copycat is the REST interface. However, many users’ first exposure to Copycat will be via the command line because that is the natural interface to run a simple, standalone Copycat agent.

This KIP introduces two command line tools. However, since these do not provide access to all Copycat functionality, it is likely that we will want to add more command line utilities to query and modify Copycat. The commands included are the minimum required to provide baseline functionality in conjunction with the REST interface, but additional commands providing CLI access to functionality only exposed by the REST interface in this KIP can be added in subsequent KIPs.

copycat

The copycat command runs Copycat in standalone (agent) mode. This mode runs one or more Copycat jobs in a single process:

copycat [...storage options...] connector.properties [connector2.properties …]

Functionally, this behaves as if you started a single node cluster and submitted each connector properties file as a new connector, however:

  • It acts completely standalone and does not need to coordinate with other copycat workers. This allows it to run on any machine without extra configuration or any more access than any other Kafka client would normally have.

  • The connector configuration is not placed in persistent storage. It is expected that if you kill the process, you will provide the connector definition again if you want to resume it.

  • Offset storage is local and unreplicated. The data will be persisted to disk by default, allowing resumption in the case of failure, reboots, or other causes of process restarts. Extra command line arguments can be passed to control how offset storage is managed, e.g. to disable it entirely for testing purposes.

  • A trivial implementation of work balancing is used because there is only one worker process and no real coordination is required.

copycat-worker

The copycat-worker command starts a worker in cluster mode:

copycat-worker <... config options for distributed mode ...>

In contrast to the standalone mode, this mode:

  • Requires some configuration to setup work balancing and shared storage of connector/task configs and offset storage.

  • The command does not include a connector; it only starts a worker. All workers will be running a REST interface which can be used to submit/modify/destroy persistent connectors.

REST Interface

All workers running in cluster mode run a REST interface that allows users to submit, get the status of, modify, and destroy connector. API calls can be made on any worker, but may require coordination with other workers to execute. The following is a sketch of the key API endpoints:

GET /connectors

Get a list of connectors.

POST /connectors

Create a new connector by passing its configuration as a set of string key-value pairs.

GET /connectors/<id>

Get the current state of a connector, including the current list of tasks for the connector.

GET /connectors/<id>/config

Get the connector configuration.

PUT /connectors/<id>/config

Update the connector configuration. Updating requires reconfiguring all tasks and this call will block until reconfiguration is complete or timed out. Because this may require flushing data, this call may block for a long time.

GET /connectors/<id>/tasks/<tid>

Get the configuration and state of a connector task.

DELETE /connectors/<id>

Destroy a connector. This will try to cleanly shutdown and so may block waiting for data to flush from connectors.

Embedded API

Copycat can also be run in an embedded mode, allowing you to adopt it quickly for a specific application without complicating deployment & operations. This mode supports distributed mode so it can scale with your application and automatically balance work across the active set of processes. Although this KIP does not specify the complete API, here is a simple example of what this might look like:

Code Block
languagejava
titleEmbeddedSample.java
// Start embedded Copycat worker with a unique ID for the Copycat
// cluster. Any application that uses the embedded API with the same
// ID will join the same group of Copycat processes, across which
// connectors and tasks will be balanced.
final Copycat copycat = new Copycat(“app-id”);
copycat.start();

// Start import to load data into Kafka. Here we show how this could return
// information about where the data is being produced. For simple cases you
// simply know the output (here there is a single output topic). In other
// cases it may be dynamic and connector-specific, so you may need to get the
// information from Copycat after the connector has been started.
Properties importConfig = new Properties();
importConfig.setProperty("connector.class", "org.apache.kafka.copycat.JDBCSourceConnector");
importConfig.setProperty("connector.tasks.max", "20");
importConfig.setProperty("jdbc.connection", "...");
importConfig.setProperty("jdbc.table.whitelist", "some_table"); // Only load a single table
importConfig.setProperty("jdbc.topic.prefix", "my-database-"); // Prefix for output Kafka topics, i.e. the one table's output topic will now be my-database-some_table
 ... other connector settings ...
String[] inputTopics = copycat.addConnector(importConfig);


// Now we can use a stream processing layer to do some transformations and
// then write the resulting data back into Kafka. These APIs are not from
// an existing stream processing framework and are only intended to demonstrate
// how embedded Copycat could combine nicely with an embeddable stream
// processing layer built on Kafka's existing clients. We could also do this
// using the consumer and producer directly.
Streaming streaming = new Streaming(config);
streaming.start();
Stream<K,V> inputStream = streaming.from(inputTopics);
inputStream.map((key,value) -> new KeyValue<K,V>(key, value*2))
           .filter((key,value) -> value != null)
           .sendTo("output");

// Finally, we can create Start export to get data from “output” topic to, e.g., HDFS.
// Since this is a sink connector, we don't need to capture any information that
// is returned when we add the connector like we did with the source connector -- the
// set of input topics and the destination (e.g. HDFS folder) are both specified in
// the config.
Properties exportConfig = new Properties();
exportConfig.setProperty("connector.class", "org.apache.kafka.copycat.HDFSSinkConnector");
exportConfig.setProperty("connector.tasks.max", "20");
exportCOnfig.setProperty("hdfs.url", "hdfs://my-hdfs-server:9001/export/"); // Where to store data
 ... other connector settings ...
copycat.addConnector(exportConfig);

// Handle shutdown cleanly.
Runtime.getRuntime().addShutdownHook(new Thread() {
  @Override
  public void run() {
    copycat.stop();
    streaming.stop();
  }
});

copycat.join();
streaming.join(); 

In this way you can easily and intuitively set up an entire application pipeline and contain everything in one set of processes even though it combines functionality from multiple frameworks. The embedded version of Copycat supports all the same functionality (and under the hood is the same implementation) as the distributed version. You can start as many (or as few) of these processes as desired and each of the components scales up and down automatically, rebalancing work as needed. Note that in embedded mode we still specify a setting for the maximum number of tasks each connector should use. This allows the user to control parallelism (number of threads) regardless of the number of application processes currently running. It may also be useful to have a special setting which always creates one task per server, which more closely matches the new consumer modelThis uses the default distributed mode. It can create connectors (if they don’t already exist) to perform data import and export. The other component of the application processes data that was imported using a regular Kafka consumer and produces the output to a new output topic. With this embedded API, a single application binary can be used to implement a complete, scalable, fault-tolerant, real-time data processing pipeline.

Compatibility, Deprecation, and Migration Plan

...