You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Since its creation, the Kafka Connect Java API has provided support for offsets, which track the progress of both source and sink connectors. Tasks can use these offsets on startup to begin reading from the last-consumed point in their upstream system (i.e., Kafka for sink tasks and the external system for source tasks), instead of starting over at the beginning (causing duplicate data to be written) or at the end (causing data to be missed).

However, Kafka Connect does not yet provide an official API for cluster administrators to view or modify the offsets of connectors on the cluster. Some cases where this may be useful include:

  • Resetting the offsets for a connector while iterating rapidly in a development environment (so that the connector does not have to be renamed each time)
  • Viewing the in-memory offsets for source connectors on the cluster in order to recover from accidental deletion of the Connect source offsets topic (this is currently possible with the config topic, and can be a life-saver)
  • Monitoring the progress of running connectors on a per-source-partition basis
  • Skipping records that cause issues with a connector and cannot be addressed using existing error-handling features

Public Interfaces

Endpoints

Several new endpoints will be added to the Kafka Connect REST API:

VerbPathSummary
GET/connectors/{connector}/offsetsRetrieve the offsets for a connector; the connector must exist
GET/offsets/source/{connector}Retrieve the source offsets for a connector; the connector may or may not exist
GET/offsets/sink/{connector}?rawGroupId={true/false}Retrieve the sink offsets for a connector; the connector may or may not exist
DELETE/offsets/source/{connector}Delete the source offsets for a connector; the connector must not exist
DELETE/offsets/sink/{connector}?rawGroupId={true/false}Delete the sink offsets for a connector; the connector must not exist

Response format

All responses that return offsets will use one of these formats, depending on whether the offsets are for a source or a sink connector. Care is taken to keep the two formats symmetrical, and suitable for use with both endpoints that require the connector to exist and endpoints that do not.

Source offsets
{
	"source": {
		"offsets": [
			{
				"partition": {
					// Connector-defined source partition
				},
				"offset": {
 					// Connector-defined source offset
			     }
			}
		]
	}
}
Sink offsets
{
	"sink": {
		"offsets": [
			{
				"partition": {
					"topic": // Kafka topic
					"partition": // Kafka partition
				},
				"offset": {
					"offset": // Kafka offset
				}
			}
		]
	}
}


Proposed Changes

The GET /connectors/{connector}/offsets  endpoint will be useful for examining the offsets of a currently-running connector. It will automatically deduce whether the connector is a source or sink based on the connector's configuration, and then return the offsets for the connector. The request will fail with a 404 response if the connector does not exist on the cluster.

The GET /offsets/source/{connector}  and  GET /offsets/sink/{connector}  endpoints will be useful for examining the offsets of deleted connectors, possibly in preparation for deleting them. Requests will be accepted regardless of whether the connector exists on the cluster. The request will fail with a 404 response if no offsets can be found for the connector.

The DELETE /offsets/source/{connector}  and  DELETE /offsets/sink/{connector}  endpoints will be useful for resetting the offsets of deleted connectors. Requests will be rejected if the connector still exists on the cluster (based on the config topic), or if a rebalance is pending. If successful, the request will be met with a 204 ("no content") response and an empty response body. The request will fail with a 404 response if no offsets can be found for the connector.

The rawGroupId  parameter, whose default value will be false , can be used to control the consumer group ID that is used to look up and reset offsets for a sink connector. By default, Connect uses the group ID connect-{connector}  for sink connectors, but this can be overridden using the consumer.override.group.id  property in a sink connector's configuration. When rawGroupId  is false , the connect-  prefix followed by the connector name will be used as the consumer group ID. However, when rawGroupId  is set to true, only the connector name will be used as the consumer group ID. (In this case, the {connector}  part of the URL need not actually be the connector name; it should be the name of the consumer group ID used for the connector.)

A source offset will only be considered successfully deleted if the Connect worker is able to emit a tombstone to the offsets topic for its partition, and then read to the end of the offsets topic. A request to reset offsets for a source connector will only be considered successful if the worker is able to delete all known offsets for that connector.

Requests to reset sink connector offsets will be satisfied by deleting the consumer group for that sink connector (as opposed to deleting all known offsets for that consumer group).

All offset reset requests will be forwarded to the leader of the cluster.

Compatibility, Deprecation, and Migration Plan

This feature is fully backwards-compatible with existing Kafka Connect releases. Migration should occur automatically whenever a Kafka Connect cluster is upgraded to a version that supports this feature.

Test Plan

Integration and/or system tests will be added for these cases:

  • Cannot view the offsets of a nonexistent connector via the GET /connectors/{connector}/offsets  endpoint
  • Cannot view the offsets of a connector for which no offsets exist via the GET /offsets/source/{connector}  and GET /offsets/sink/{connector}  endpoints
  • Cannot delete the offsets of a connector for which no offsets exist via the DELETE /offsets/source/{connector}  and DELETE /offsets/sink/{connector}  endpoints
  • Can view the offsets of a running sink and source connector via the GET /connectors/{connector}/offsets  endpoint, and verify that that those offsets reflect an expected level of progress for each connector (i.e., they are greater than or equal to a certain value depending on how the connectors are configured and how long they have been running)
  • Can view the offsets of a running sink and source connector via the GET /offsets/source/{connector}  and GET /offsets/sink/{connector}  endpoints, making similar assertions about an expected level of progress
  • After deleting those connectors, can still view their offsets via the GET /offsets/source/{connector}  and GET /offsets/sink/{connector}  endpoints
  • Can reset the offsets for those connectors after they are deleted via the DELETE /offsets/source/{connector}  and DELETE /offsets/sink/{connector}  endpoints
  • Second follow-up requests to reset offsets for those connectors after their offsets have already been reset once are met with 404 responses
  • Can view/reset the offsets for a sink connector that uses an overridden consumer group ID

Future work

Overwriting offsets

Summary: We may add finer-grained support for modifying connector offsets beyond resetting all of the offsets for a connector. This could include resetting offsets for a subset of partitions, or writing new offsets for partitions.

Delayed because: We may refine our error-handling features instead of augmenting offsets support this way, and the cost of implementing this feature is not high enough to justify is inclusion in the first draft of offsets support.

Automatically delete offsets with connectors

Summary: Add a mechanism (such as an includeOffsets  URL query parameter) to allow offsets to be reset at the same time a connector is deleted.

Delayed because: The cost of implementing this feature is not high enough to justify its inclusion in the first draft of offsets support, though it may be a nice quality-of-life improvement later on. The difficulty here is finding a way to ensure that the connector and all of its tasks are stopped before its offsets are reset, since the existing endpoint to delete connectors does not synchronously wait for the connector to actually be deleted (only for the deletion request to be persisted in the config topic). One option might be to add some kind of offsets reset request record to the config topic directly after the tombstone for the connector config.

Rejected Alternatives

Source-only support

Summary: Only provide support for source connector offsets instead of both source and sink connector offsets, since there is existing CLI tooling for sink connector offsets provided already.

Rejected because: The additional cost of supporting sink connector offsets, at least with the APIs proposed here, is negligible, and it will be easier for Connect cluster administrators to interact with a single unified API for both source and sink connectors.

Resetting offsets for active connectors

Summary: Allow offsets to be reset for currently-running connectors.

Rejected because: This would be significantly harder to implement, and the benefits are negligible compared to the workaround of deleting the connector, resetting its offsets, and then recreating it.

  • No labels