Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state:  Under Discussion Adopted.

Discussion threadhere

Vote Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Key formatKey exampleValue formatValue example
status-topic-${topic-name}:connector-${connector-name}
status-topic-foo:connector-some-source

{

  "topic"{

    "name": string, 

    "connector": string,

    "task": int32,

    "timestampdiscoverTimestamp": int64    

  }
}

{

  "topic"{

    "name": "foo",

    "connector": "some-source",

    "task": 0,

    "timestampdiscoverTimestamp": 1579297899

  }
}

The topic name can be safely separated by the connector name because the delimiter : (colon character) is not a valid topic name character in Kafka. 

...

The information stored in the value of the Kafka record is selected to include the topic name, the connector name, the task ID of the task that last reported the topic is used by the connector and a timestamp relative to when this topic was detected as active. While the record value is not essential to decide whether a topic is used by a connector and it is partially redundant compared to what is stored in the key, it makes these entries easier to read and follow as well as more useful during runtime or when troubleshooting the topics used by a connector. Whereas the Kafka record key includes the topic name and the connector name, the Kafka record value stores additionally the ID of the task that succeeded to store a topic status record last (in case more than one task produces a record concurrently for a short period of time) and a timestamp to mark when this topic was detected as active. In the future the record value can be easily extended to include additional information. 

Recording active topics

Compared to the existing keys for the status topic, which currently have prefixes status-connector- and status-task-, the new key format extends the set of status topic record keys in a readable and intuitive way by adding the prefix status-topic- to the keys of the new Kafka records. 

Recording active topics

When a connector is configured to run in a Connect cluster for the first time, its set of active topics is When a connector is configured to run in a Connect cluster for the first time, its set of active topics is empty. Nevertheless, when its tasks start processing their first records (records of type SourceRecord for source tasks and records of type SinkRecord for sink tasks), the worker will start inspecting the Kafka topic of each of these records. If the worker detects that a topic does not belong to the set of active topics for this specific connector, it produces a record to the status topic with the format described in the previous section. The worker in the Connect cluster maintains an up-to-date view of all the connectors' active topics by continuously reading the status.storage.topic

It is expected that, multiple Connect workers may compete to append more than one record to the status.storage.topic. These records will have the same key and because the topic is compacted, all the Kafka records of a specific key will eventually collapse into a single entry. As soon as a worker detects the addition of a topic to as a worker detects the addition of a topic to a connector's set of active topics, the worker will not post to the status.storage.topic additional update records for the connector and this newly-detected active topic.

Resetting a connector's set of active topics

Tracking the set of active topics for a connector as described above implies that such sets are monotonically increasing in size over time. However, during the lifetime of a connector, some topics might stop to be actively used (e.g source tasks no longer produce to that topic, or sink tasks don't have any new records to consume from a topic). For this reason, this KIP proposes to introduce an explicit Connect REST API request to reset a connector's set of active topics, the worker will cease to post update messages to the status.storage.topic for that connector. 

...

When a Connect worker receives this request, it sends a tombstone message for each topic in a connector's set of active topics

...

Tracking the set of active topics for a connector as described above implies that such sets are monotonically increasing in size over time. However, during the lifetime of a connector, some topics might stop to be actively used (e.g source tasks no longer produce to that topic, or sink tasks don't have any new records to consume from a topic). For this reason, this KIP proposes to introduce an explicit Connect REST API request to reset a connector's set of active topics. 

When a Connect worker receives this request, it sends a tombstone message for each topic in . It's worth noting that this operation is not atomic with respect to the whole set of topics. Kafka provides atomicity at the record level and it guarantees that two records (e.g. a tombstone record and a topic status record) will be atomically appended to the log. Therefore, the order in which this may happen if a request to reset a connector's set of active topics . It's worth noting that this operation is not atomic with respect to the whole set of topics. Kafka provides atomicity at the record level and it guarantees that two records (e.g. a tombstone record and a topic status record) will be atomically appended to the log. Therefore, the order in which this may happen if a request to reset a connector's set of active topics is interleaved with actual production or consumption of records from the connector's tasks is not characterized by a happens-before relationship between reset (production of tombstone message) and recording (production of a non-tombstone message) actions. is interleaved with actual production or consumption of records from the connector's tasks is not characterized by a happens-before relationship between reset (production of tombstone message) and recording (production of a non-tombstone message) actions. 

Resetting the set of active topics of a connector while this connector is running is fine, as long as the intention is to reset any topics that are no longer used by the connector and retain the ones that are active. Topic reset is a composable operation with respect to a connector's normal execution. Soon after the reset, the worker tasks will populate the status.storage.topic with new topic status messages for the topics that the connector is currently using.

Restarting, reconfiguring or deleting a connector

...

Code Block
languagejava
titleGet the set of active topics from a connector called 'some-source'
$ curl -s 'http://localhost:8083/connectorconnectors/some-source/topics' | jq
{
  "some-source": {
    "topics": [
      "foo",
	  "bar",
	  "baz",				
    ]
  }
}
$

...

Code Block
languagejava
titleSuccessful reset of the set of active topics of a connector called 'some-source'
$ curl -X PUT -s 'http://localhost:8083/connectorconnectors/some-source/topics/reset' | jq
$

...

Code Block
languagejava
titleAttempt to reset the set of active topics when reset is not allowed
$ curl -X PUT -s 'http://localhost:8083/connectorconnectors/some-source/topics/reset' | jq
{
  "error_code": 403,
  "message": "Topic tracking reset is disabled"
}
$

...