Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Determining whether a Kafka Connect worker has completed startup and is healthy (or at least, capable of handling REST requests) is surprisingly tricky.

Readiness

In distributed mode, there are several operations a worker completes during startup:

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

  • Creating internal topics if they do not already exist
  • Reading to the end of each internal topic
  • Joining the cluster (which itself involves participating in a rebalance)

And in standalone mode:

  • Creating the offsets file if it did not already exist
  • Reading and parsing the offsets file
  • Started all connectors whose configs were specified on the command line
  • Generated task configs for all of these connectors
  • Started tasks for all of these connectors

Liveness

In distributed mode, it is also sometimes possible for a worker that has completed startup to still be unhealthy. If any of these operations cannot be performed, the worker will be unable to handle some or all REST requests:

In standalone, there is one condition that may also make a completely-started-up worker partially or completely unavailable:

  • Deadlock

Existing means

There is no one-size-fits-all approach to monitor the readiness and liveness of Kafka Connect workers today, but there is one less-than-elegant option available to cover a subset of the cases.

To check for the readiness and liveness of a distributed worker or the liveness of a standalone worker, a request can be issued to the GET /connectors/{connector}  endpoint. If a response with either a 200 or 404 status code is received, the worker can be considered healthy. This has the drawback of either requiring a connector to exist with the expected name (which may be inconvenient for cluster administrators to enforce), or requiring any automated tooling that interacts with the endpoint to count a 404 response as "healthy", which is highly abnormal. This also does sufficiently confirm the readiness of a standalone worker.

Other approaches, such as querying the GET /  or GET /connectors  endpoints, do not actually test for the readiness of either distributed or standalone workers at all. In standalone mode, these requests can be completed before the worker has finished starting all of the command-line-supplied connectors, and in distributed mode, they can be completed before the worker has read to the end of the config topic, joined the cluster, etc.

It's also possible to read worker logs and deduce based on those whether a worker has completed startup, but no sane maintainer of a REST-based project should expect its users to parse log files as a means of checking for node readiness–or at least, to be happy doing so.

Goals

  • Establish a single, simple REST endpoint that can be used to check for worker health
  • This endpoint should cover both readiness and liveness (i.e., a 2XX response from this endpoint indicates that the worker has both completed startup and is capable of serving other REST requests)
  • This endpoint should be available in both standalone and distributed mode, and its semantics should vary as little as possible across the two modes

Public Interfaces

A new REST endpoint, GET /health , will be added to the Kafka Connect REST API. If the worker is healthy, the response will have a 200 status code and its body will be a simple JSON object:

Code Block
languagejs
titleGET /health
{
  "message": "Worker is healthy"
}

This endpoint will only return the above if the worker has completed startup and is capable of serving REST requests.

If the worker is unhealthy, the request may hang for a while, before possibly being met with a 4XX or 5XX response. The exact details of this are not made explicit here, as anything except a 200 response should be considered indicative that the worker is not healthy.

Proposed Changes

Distributed mode

Requests to this endpoint will go through the worker's tick thread. Copying from the current trunk, a comment in that part of the code base explains the purpose of the tick thread:

The main loop does two primary things: 1) drive the group membership protocol, responding to rebalance events as they occur, and 2) handle external requests targeted at the leader. All the "real" work of the herder is performed in this thread, which keeps synchronization straightforward at the cost of some operations possibly blocking up this thread (especially those in callbacks due to rebalance events).

Frequently, bugs and other poor external conditions will block the worker's tick thread, which in turn prevents the worker from being able to handle some REST requests. Affect requests include connector creation/reconfiguration/deletion, reading connector/task configs, restarting connectors/tasks, and reading/resetting/altering connector offsets.

This thread is also responsible for handling worker startup logic.

By handling requests to the health check endpoint on this thread, we can guarantee that the ability to correctly handle a request means that the worker has completed startup, and is capable of handling other REST requests.

Standalone mode

Requests to this endpoint will require synchronization on the StandaloneHerder  instance, which has been susceptible to deadlock in the past.

They will also only be completed after all command-line-supplied connectors have been successfully created by the herder.

Compatibility, Deprecation, and Migration Plan

This change should be fully backwards compatible. Users who already have their own strategies for monitoring worker health/liveness can continue to employ them. Users who would like to use the endpoint introduced by this KIP need only upgrade to a newer version of Kafka Connect.

Test Plan


Rejected Alternatives