Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Sample implementation: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

JBOD is coming to KRaft in KIP-858: Handle JBOD broker disk failure in KRaft! It has been a feature in Kafka since KIP-112: Handle disk failure for JBOD. The evolution of how disk failures are handled in KIP-858 is more about the notification and persistence mechanisms rather than the sequence of steps undertaken by the controller and broker to prevent future interactions with the affected disk. What we propose in this KIP is to treat a subset of a disk failure - a disk becoming full - in such a way to allow space to be cleared up using Kafka functionality such as modifying retention periods or deleting problematic topics. The examples in this KIP feature use a Zookeeper-backed Kafka cluster, but we believe the functionality will be easily implementable once KIP-858 is code-complete.

...

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

No foreseen changes to public-facing interfaces.

Proposed Changes

Current state

...

A Kafka broker still has a big try-catch statement around all interactions with a log directory. If an IOException due to No space left on device is raised (we will check the remaining space at that point in time rather than the exception message) the broker will stop all operations on logs located in that directory, remove all fetchers and stop compaction. Retention will continue to be respected. The same node as the current state will be written to in Zookeeper. All other IOExceptions will continue to be treated the same way they are treated now and will result in a log directory going offline.

The Kafka controller will get notified by Zookeeper that there was a problem with a log directory on a particular broker. The controller will then reachout to the broker to understand the state of partition replicas. The broker responds with which partition replicas are offline due to a log directory becoming saturated. The controller determines the new leaders of said partitions and issues new leader and in-sync replicas requests.

In addition to the above, upon Kafka server startup we will write and flush to disk a 40MB (from tests: ~10KB per partition for approximately 4000 partitions) file with random bytes to each log directory which Kafka will delete whenever a broker goes into a saturated state. Since the file will be written before any of the log recovery processes are started if there are any problems Kafka will shut down. This reserved space is a space-of-last-resort for any admin operations requiring disk to run while the broker is in a saturated state. For example, if all segments of a partition are marked for deletion Kafka rolls a new segment before deleting any old ones. If we do not have some space put aside for such operations then we will have to change their ordering.


The controller will forward delete topic requests to all brokers hosting replicas of partitions of the topic under deletion. Requests targeting saturated log directories will be respected and will succeed. Requests targeting offline log directories will fail, which is expected.

...

We will add a new state to the broker state machines of a log directory (saturated) and a partition replica (saturated). The partition state machine is only known to the broker and it won’t be replicated on the controller. We need these additional states in order to restrict which background tasks operate on them. If we do not have a separate state then we have no way to tell Kafka that we would like deletion and retention to continue working on saturated log directories and partitions.

Notification mechanism - Zookeeper/KRaft Controller
No changes will be introduced to Zookeeper. We continue to use it only as a notification mechanism.

Controller
Instead of sending a delete topic request only to replicas we know to be online, we will allow a delete topic request to be sent to all replicas regardless of their state. Previously a controller did not send delete topic requests to brokers because it knew they would fail. In the future, topic deletions for saturated topics will succeed, but topic deletions for the offline scenario will continue to fail.

...

  • What impact (if any) will there be on existing users? Currently users cannot interact with a Kafka broker at all when its log directories become full. In the future, users will be able to reclaim space by deleting topics or modifying retention settings to be more aggressive.
  • If we are changing behavior how will we phase out the older behavior? We won't phase out the old behaviour as we are building on top of it.
  • If we need special migration tools, describe them here. N/A
  • When will we remove the existing behavior? N/A

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

We will implement new integration and system tests which artificially constraint the space available to the Kafka log directories in order to gain confidence in the behaviour of the system.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...

  • This is a reactive proposal, a proactive approach can help us not get into this situation in the first place. In our opinion a proactive approach is a reactive approach with a boundary which is moved (i.e. instead of putting a broker in a saturated state when it reaches 100% usage of a log directory and we start getting IOExceptions we will put the broker in a saturated state when it reaches X% and a background thread checks that condition). The problem lies not with where the boundary is, but with defining the behaviour at that boundary. We believe this KIP defines the behaviour at the boundary. One of the limitations which will be alleviated by a reactive approach is to allow compaction to continue working (since there will be space for new segments) - in our experience, however, compacted topics are rarely the cause of disks becoming full.

...