Status
Current state: "Under Discussion"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Kafka Connect framework stores the most recent offsets for source and sink connectors in Kafka topics. While offsets for sink connectors are managed by the Kafka consumers, the offsets for source connectors are managed entirely by the Connect runtime. When operating a source connector, it is sometimes desirable to manually change or override the persisted offsets. For example, if a connector fails to produce a message at a particular offset, an operator may choose to skip over that source-specific offset and have the connector restart at the next message. The Connect API allows source connectors to specify a source offset for each source partition, where the partitions and offsets are defined by the source connectors as sets of key-value pairs (e.g., maps). The proposed tool will allow users to read the existing partition-offset pairs for the connectors, update/modify them, and then store the partition-offset pairs back to the Kafka topic.
Public Interfaces
A new command line tool called kafka-connect-source-offsets.sh
that will accept several command line parameters. The tool will have two main modes:
- Exporting the existing partition-offset pairs and writing in JSON format to standard out or to a specified file; and
- Importing the partition-offset pairs specified in JSON format read from standard in or a specified file
The mode is specified using either the --export
or --import
command line parameters. If neither the --export
or --import
parameter are specified or if any of the parameters are used incorrectly, the tool will display a usage/help statement that lists the parameters and their descriptions.
Exporting
The tool will read all of the persisted partition-offset pairs and write them out in JSON form to standard out. The following parameters can be used with the tool's export mode:
Parameter | Required for "export" mode | Description |
---|---|---|
--config | yes | Specifies the path to the Connect worker configuration. The configuration is used to know how to access the offset storage and the converters used to serialize and deserialize the offset information. |
--export | yes | Specifies the export mode. May not be used with --import . |
--offsets-file | no | Specifies the path to the file where the partition-offset pairs will be written in JSON form |
--connector-names | no | Specifies a comma-separated list of the connector names used to filter which partition-offset pairs are to be exported. |
For example, the following command will read the Connect worker configuration in the my-worker-config.properties
file and write out all persisted partition-offset pairs to standard out:
bin/kafka-connect-source-offsets.sh --config=my-worker-config.properties --export |
A sample of this output might be:
[{ |
Note how the response is a JSON array of one or more partition-offset pairs, where each partition-offset pair includes the connector name, the partition information, and the offset information.
The tool can also write the JSON to a file:
bin/kafka-connect-source-offsets.sh --config=my-worker-config.properties --export --offsets-file=my-offsets.json |
and output only those partition-offset pairs for a named connector:
bin/kafka-connect-source-offsets.sh --config=my-worker-config.properties --export --offsets-file=my-offsets.json --connector-names=MyConnector |
Importing
The following parameters can be used with the tool's import mode:
Parameter | Required for "import" mode | Description |
---|---|---|
--config | yes | Specifies the path to the Connect worker configuration. The configuration is used to know how to access the offset storage and the converters used to serialize and deserialize the offset information. |
--import | yes | Specifies the import mode. May not be used with --export . |
--offsets-file | no | Specifies the path to the file where the partition-offset pairs will be written in JSON form |
--dry-run | no | Defaults to "false". When set to "true", the tool will write out the actions that would be performed but will not actually modify any persisted partition-offset pairs. |
Override or remove the source offset for one or more source partitions
A user can use the tool as mentioned above to obtain a copy of the partition-offset pairs. The user can modify the JSON, and send the updated JSON to the tool to update the persisted partition-offset pairs that appear in the file. For example, the following will read the partition-offset pairs in the specified file and update only those partition-offset pairs:
bin/kafka-connect-source-offsets.sh --config=my-worker-config.properties --import --offsets-file=my-offsets.json |
Any persisted partition-offset pair that is not overwritten by this operation will be unmodified.
Note that in addition to modifying the persisted partition-offset pairs, the tool can also be used to remove partition offset pairs when the "offset" object in the JSON is null. For example, if the tool is used to read the following JSON representation of the partition-offset pairs, it will update the offsets for the partition with file "a" but will remove the offsets for the partition with file "b":
[{ |
Proposed Changes
Add the tool as described in the Public API section.
Compatibility, Deprecation, and Migration Plan
This proposal adds a new tool and changes no existing functionality.
Rejected Alternatives
One option considered was to have the tool parameters specify the group ID, broker bootstrap servers, topic name, and other connection information. However, this would only have worked for Connect's distributed mode, whereas the proposed approach will work for both Standalone and Distributed mode configurations.