You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state"Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Connect framework stores the most recent offsets for source and sink connectors in Kafka topics. While offsets for sink connectors are managed by the Kafka consumers, the offsets for source connectors are managed entirely by the Connect runtime. When operating a source connector, it is sometimes desirable to manually change or override the persisted offsets. For example, if a connector fails to produce a message at a particular offset, an operator may choose to skip over that source-specific offset and have the connector restart and the next message. The Connect API allows source connectors to specify a source offset for each source partition, where the partitions and offsets are defined by the source connectors as sets of key-value pairs (e.g., maps). The proposed tool will allow users to read the existing partition-offset pairs for the connectors, update/modify them, and then store the partition-offset pairs back to the Kafka topic.

Public Interfaces

A new command line tool called kafka-connect-source-offset-reset.sh that will accept several command line parameters. The tool will have two main modes:

  1. Exporting the existing partition-offset pairs and writing in JSON format to standard out or to a specified file; and
  2. Importing the partition-offset pairs specified in JSON format read from standard in or a specified file

Exporting

The tool will read all of the persisted partition-offset pairs and write them out in JSON form to standard out. The following parameters can be used with the tool's export mode:

ParameterRequiredDescription
--configyesSpecifies the path to the Connect worker configuration. The configuration is used to know how to access the offset storage and the converters used to serialize and deserialize the offset information.
--exportyesSpecifies the export mode.
--offsets-filenoSpecifies the path to the file where the partition-offset pairs will be written in JSON form
--connector-namesnoSpecifies a comma-separated list of the connector names used to filter which partition-offset pairs are to be exported.



For example, the following command will read the Connect worker configuration in the my-worker-config.properties file and write out all persisted partition-offset pairs to standard out:

bin/kafka-connect-source-offset-reset.sh --config=my-worker-config.properties --export

A sample of this output might be:

[{
"connector": "MyConnector",
"partition": {
"file": "a"
},
"offset": {
"offsetKey1": "offsetValue1",
"offsetKey2": "offsetValue2"
}
},{
"connector": "MyConnector",
"partition": {
"file": "b"
},
"offset": {
"offsetKey1": "offsetValue3",
"offsetKey2": "offsetValue4"
}
}]

Note how the response is a JSON array of one or more partition-offset pairs, where each partition-offset pair includes the connector name, the partition information, and the offset information.

 

The tool can also write the JSON to a file:

bin/kafka-connect-source-offset-reset.sh --config=my-worker-config.properties --export --offsets-file=my-offsets.json

 

and output only those partition-offset pairs for a named connector:

bin/kafka-connect-source-offset-reset.sh --config=my-worker-config.properties --export --offsets-file=my-offsets.json --connector-names=MyConnector

 

Importing

The following parameters can be used with the tool's import mode:

ParameterRequiredDescription
--configyesSpecifies the path to the Connect worker configuration. The configuration is used to know how to access the offset storage and the converters used to serialize and deserialize the offset information.
--importyesSpecifies the import mode.
--offsets-filenoSpecifies the path to the file where the partition-offset pairs will be written in JSON form
--dry-runnoDefaults to "false". When set to "true", the tool will write out the actions that would be performed but will not actually modify any persisted partition-offset pairs.

Override or remove the source offset for one or more source partitions

A user can use the tool as mentioned above to obtain a copy of the partition-offset pairs. The user can modify the JSON, and send the updated JSON to the tool to update the persisted partition-offset pairs that appear in the file. For example, the following will read the partition-offset pairs in the specified file and update only those partition-offset pairs:

bin/kafka-connect-source-offset-reset.sh --config=my-worker-config.properties --import --offsets-file=my-offsets.json

Any persisted partition-offset pair that is not overwritten by this operation will be unmodified.

[{
"connector": "MyConnector",
"partition": {
"file": "a"
},
"offset": {
"offsetKey1": "offsetValue1",
"offsetKey2": "offsetValue2"
}
},{
"connector": "MyConnector",
"partition": {
"file": "b"
},
"offset": null
}]

Note that in addition to modifying the persisted partition-offset pairs, the tool can also be used to remove partition offset pairs when the "offset" object in the JSON is null. For example, if the tool is used to read the following JSON representation of the partition-offset pairs, it will update the offsets for the partition with file "a" but will remove the offsets for the partition with file "b":

 

Proposed Changes

Add the tool as described in the Public API section.Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

This proposal adds a new tool and changes no existing functionality.

Rejected Alternatives

One option considered was to have the tool parameters specify the group ID, broker bootstrap servers, topic name, and other connection information. However, this would only have worked for Connect's distributed mode, whereas the proposed approach will work for both Standalone and Distributed mode configurations.

 

  • No labels