The following is a list of potential improvements to the log cleaner.

Integrate with system test framework

We have a good integration test, it would be nice to hook that in to the nightly test run.

Add a tool to measure the duplication in a log

It would be nice to have an operational tool to check the duplication within a log. This could be built as a simple consumer that takes in a particular topic/partition and consumes that log sequentially and estimate the duplication. Each key consumed would be checked against a bloom filter. If it is present we would count a duplicate, otherwise we would add it to the filter. A large enough bloom filter could probably produce an accurate-enough estimate of duplication rate.

KAFKA-1336

Improve dedupe buffer efficiency

Currently we use a fairly naive approach to the approximate deduplication. There are several things that could be improved.

Currently we will only process up to N messages of dirty log in one cleaning where N=buffer_size/24*collision_rate. This may actually be a little bit conservative. The dirty section of the log may itself has many duplicates, in which case it is actually using up much less space in the dedupe buffer. We could check whether the key is present in the dedupe buffer and only increment the entry count if there is nothing there.

Drive-aware scheduling and throttling

Currently we have a global throttle in place on I/O and we use only the ratio of dirty to clean bytes to choose the log to clean. This will work well for a single drive or multiple drives in a RAID configuration or if you have only one cleaner thread. However if you have a JBOD configuration with multiple data directories AND are using multiple cleaner threads this is not ideal.

The problem is that you could end up scheduling many concurrent cleanings for logs which both reside on the same drive. Setting the throttling conservative enough to handle this could result in over-throttling in the case where you are cleaning logs on different drives.

A simple fix would be to have the throttling be per-drive. However this is not ideal either since you would still schedule concurrent cleanings on a single drive which might result in no more cleaning then having only a single thread (because they would all be throttled).

A more sophisticated scheduling approach would be aware of the per-disk throttle rate and choose logs appropriately.

Estimate Survivorship Ratio

The log cleaner works by successively estimating the "dirtiest" log and cleaning that. Intuitively we want to avoid cleaning logs that have no garbage as that will generate lots of I/O with no reduction in size. Since we can't know the amount of garbage in the log without actually performing something like the cleaning this is inherently a prediction problem. The better we make this choice the more efficient the cleaner will be.

I went through several iterations thinking about the right way to rank logs. Originally I thought maybe I would rank them by the total amount of garbage. However using the absolute amount doesn't make sense. Consider a case where you have a 1GB log that is 90% garbage and a 1TB log that is 10% garbage. It is true that cleaning the 1TB log will free up 100GB of space and cleaning the 1GB log will only free up 900MB; however cleaning the 1TB log will also take 1000 times longer. So clearly what we are looking for is to make the cleaner free the most space per second of operation (or per byte of cleaner I/O). This points to the right metric to use which is 

  shrinkage = size_after_cleaning / size_before_cleaning

The problem is that we don't really know the size_after_cleaning without actually doing the cleaning.

Clearly

  size_after_cleaning = clean_size + survivorship_percentage * dirty_size 

That is, if 100% of the dirty segment of the log is "updates" (survivorship_percentage = 0%) then after cleaning all that will be left is clean_size bytes. If, on the other hand, all the messages in the dirty segment are new and unique and there is no overlap with the clean segment (survivorship_percentage = 100%) then the resulting size will be clean_size + dirty_size.

Current Code

So how can we estimate the survivorship_percentage? Well in the current code I avoided this entirely with the following assumption. I assume that most data sets are more or less in a steady state, in other words they are mostly taking updates not inserts. As a result I simply assume that survivorship_percentage = 0% and just use the following ratio for ranking

dirty_bytes / (clean_bytes + dirty_bytes)

Specifically clean_bytes is the size of the log between the last cleaner point and the beginning of the active segment and dirty_bytes is the remaining tail of the log from the last cleaner end point to the beginning of the log.

A Better Approach

This heuristic has obvious drawbacks. A log which is being "bootstrapped" and is taking only inserts would get cleaned with the same frequency as a log which is in steady state, even though these cleanings would result in no size reduction.

A better approach would be to use the survivorship ratio from previous cleanings to estimate the size_after_cleaning for the current cleaning. We already calculate these statistics in CleanerStats.

The simplest implementation would just keep an in-memory map of topic/partition=>survivorship_ratio. This would be initialized with survivorship_percentage = 0% (the current heuristic) and each time we do a cleaning we would update this percentage to be the observed survivorship from the last cleaning.

There are two further improvements.

First we could save out this percentage to a file periodically so that on restart we would not reduce the cleaner efficiency by starting over with a bad estimate.

Second, using only the survivorship_percentage from the previous cleaning might leave us open to some corner cases. Two cases to consider. The first is that there is some kind of oscillating behavior where the true survivorship ratio ranges between 10% and 90%. To handle this it would be nice to use a exponentially weighted average of previous ratios:

survivorship_ratio = alpha * observed_ratio + (1-alpha) * survivorship_ratio

Here alpha is a "learning rate" between 0 and 1.0 that controls how quickly we move away from older ratios and towards the observed ratio (the simple proposal above is basically alpha = 100%). This would help smooth out oscillations at the cost of taking a few cleanings to fully adapt to the observed ratio.

Generational Collection

The current code does a full recopy of the log with each cleaning. I believe this is an optimal strategy if the probability of update is uniform for all log entries--that is if garbage is equally likely to be at the end of the log as anywhere else. However the update probability is almost always non-uniform (this is why we have caches and memory hierarchies).

If this sounds abstract consider a practical example. Let's say you have a log containing user account records. Some active users will update their account daily; some inactive users have forgotten their password and changed their email and can never update their account again. What will happen to the log? Well, every time we clean the log records which are updated will be removed from segments. As a result the log will become sorted by the time since the last update (things at the beginning of the log will be things updated least recently, and things at the end will be the things just updated). So now we can see the inefficiency in our cleaning--the last segments of the log will be full of the user accounts for users who will never update their account again, yet we persist in optimistically recopying these segments looking for garbage.

In other words the same survivorship ratio we used in the above section was actually more like the average survivorship ratio over the whole log, but in reality it would vary significantly within the log, with older segments (usually) having a much higher survivorship.

This is exactly the intuition behind generational collection in both log-structured storage (e.g. leveldb and cassandra) and in programming language garbage collection. You can think of our dirty and clean segments as being two generations, but you could add additional generations beyond that. Typically for these systems the following would be true:

  1. Each successive generation increases in size (e.g. maybe a generation is 2x the size in bytes of its predecessor)
  2. All but the newest generation is approximately deduplicated internally, but generations may duplicate each other.
  3. Collection proceeds by collecting a generation into the generation that succeeds it

You can view the generations as an attempt to approximate the histogram of the update frequency by recency.

A few open questions I have considered but not fully answered:

The first obvious question is if you do have generations is how many should you have and what size should they be? Existing systems seem to hard code this or have you directly configure it. However it would be better to do this in a data-driven way. That is, to measure the survivorship ratio and chose generations appropriately. Intuitively if the distribution is uniform with respect to recency then you should just have one big generation, but otherwise you should chose generation boundaries that will optimally approximate the observed distribution.

Second, though it is true that most existing systems use the concept of generations I wonder if there is another way that produces a more continuous estimation? I have not fully considered this, but I wonder if it would be possible to somehow use a memory mapped offsetmap that would be saved between runs and dynamically choose how far back int he log to collect.

  • No labels

1 Comment

  1. 1. Shrinkage with survivorship_percentage => 0 is clean_bytes / (clean_bytes + dirty_bytes) by equation 1 (percentage of log that is clean). What is used in code is dirty_bytes / (dirty_bytes + clean_bytes) (percentage of log that is unclean). Either one of them can be used (comparison function would differ) but wanted to ensure this is intentionally specified that way.

    2. The calculations assume cleanup time grows linearly with the amount of dirty bytes remaining in the file. A sequential read of 1GB file is much faster than five 200 MB reads of the file. Should this not be factored in while calculating "space cleaned per second"?

    3. Is the definition of clean_bytes and dirty_bytes reversed in the statement below ?

    "Specifically clean_bytes is the size of the log between the last cleaner point and the beginning of the active segment and dirty_bytes is the remaining tail of the log from the last cleaner end point to the beginning of the log."