Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here

JIRA: here KAFKA-1981

Released: <Kafka Version>Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 0.10.1.0

Motivation

Currently

...

Kafka's

...

log

...

compaction

...

gives

...

minimal

...

control

...

over

...

what

...

portion

...

of

...

the

...

log

...

remains

...

uncompacted.

...

There

...

is

...

a

...

setting

...

that

...

prevents

...

compaction

...

until

...

a

...

certain

...

dirty

...

ratio

...

has

...

been

...

reached

...

but

...

this

...

does

...

not

...

provide

...

any

...

upper

...

bound

...

on

...

how

...

much

...

of

...

the

...

log's

...

head

...

will

...

remain

...

uncompacted

...

once

...

it

...

runs.

...

Although

...

the

...

segment

...

currently

...

being

...

written

...

will

...

never

...

be

...

compacted,

...

this

...

could

...

leave

...

as

...

little

...

as

...

one

...

message

...

uncompacted.

...

As

...

a

...

result,

...

it

...

is

...

impossible

...

to

...

be

...

guaranteed

...

that

...

a

...

consumer

...

will

...

get

...

every

...

update

...

to

...

a

...

compacted

...

topic.

...

Even

...

if

...

the

...

consumer

...

falls

...

behind

...

by

...

a

...

single

...

message

...

it

...

might

...

get

...

the

...

compacted

...

version.

...

One

...

particularly

...

relevant

...

use

...

case

...

is

...

database

...

state

...

replication

...

through

...

change

...

data

...

capture.

...

This

...

use

...

case

...

is

...

specifically

...

called

...

out

...

in

...

the

...

Kafka

...

documentation

...

for

...

the

...

compaction

...

feature

...

under

...

"Database

...

change

...

subscription".

...

It

...

is

...

convenient

...

to

...

produce

...

this

...

data

...

in

...

multiple

...

topics

...

(e.g.

...

one

...

per

...

source

...

table)

...

and/or

...

partitions.

...

However,

...

in

...

order

...

to

...

be

...

able

...

to

...

recreate

...

a

...

database

...

state

...

at

...

a

...

point

...

of

...

transactional

...

consistency some

...

coordination

...

across

...

topics/partions

...

is

...

required

...

(e.g.

...

a

...

separate

...

_checkpoint_

...

topic

...

with

...

the

...

offsets

...

at

...

each

...

transaction

...

commit).

...

If

...

the

...

table

...

topics

...

are

...

all

...

independently

...

compacting

...

there

...

is

...

currently

...

no

...

way

...

to

...

be

...

assured

...

that

...

any

...

given

...

checkpoint

...

can

...

be

...

materialized

...

as

...

the

...

checkpointed

...

offset

...

for

...

any

...

given

...

topic

...

may

...

have

...

been

...

compacted

...

such

...

that

...

some

...

keys

...

may

...

be

...

taking

...

on

...

some

...

subsequently

...

inserted

...

values.

...

(Details:

...

 https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46)

Another use case is to handle application mistakes. For example, a compacted topic could be the source of truth for certain type of data. If there is an application error, some incorrect data may be published to the topic. When compaction is triggered, those incorrect data can wipe out the last correct message associated with a key. Being able to delay the compaction based on a configurable amount of time will allow a user to preserve those last known correct messages after an application error is discovered but before compaction destroys the correct data.

Public Interfaces

This

...

proposal

...

includes

...

new

...

configurations

...

for

...

controlling

...

compaction.

...

The

...

log

...

cleaner

...

can

...

be

...

configured

...

retain

...

a

...

minimum

...

amount

...

of

...

the

...

uncompacted

...

head

...

of

...

the

...

log.

...

This

...

is

...

enabled

...

by

...

setting

...

the

...

compaction

...

lag:



    log.cleaner.min.compaction.lag.ms
    log.cleaner.min.compaction.lag.bytes
    log.cleaner.min.compaction.lag.
messages

for setting minimum message age in milliseconds,. cumulativeThis size in bytes of subsequent messages, and subsequent message count respectively. These have has a similar per-topic configurationsconfiguration:


    min.compaction.lag.ms
This lag configuration 
    min.compaction.lag.bytes
    min.compaction.lag.messages

Each of these configured lags defaults to zero so that if none arenot set, all log segments are eligible for compaction except for the last segment (i.e. the one currently being written). The active segment will not be compacted even if all of the compaction lag constraintsconstraint areis satisfied. This leaves unchanged the current behavior. 

If one or more of the compaction lagslag is greater than zero then compaction of the segments of the logs containing any messages that do not satisfy *all*the oflag theconstraint constraints will not be compacted.

 In particular this allows for the example use case like: "any consumer that is no more than 1 hour behind will get every message."

Proposed Changes

Introduce

...

an additional

...

configuration to

...

topics

...

that

...

guarantee

...

a

...

minimum

...

portion

...

of

...

the

...

head

...

of

...

the

...

log

...

will

...

remain

...

uncompacted.

...

That

...

is,

...

offer

...

a guarantee that

...

a

...

consumer

...

that

...

does

...

not

...

lag

...

too

...

far

...

behind

...

will

...

get

...

every

...

update

...

to

...

a

...

compacted

...

topic.

...

This can

...

be

...

used

...

to

...

set

...

constraints

...

on

...

the

...

minimum

...

_distance_

...

from

...

the

...

topic

...

head

...

that

...

will

...

remain

...

uncompacted,

...

where

...

distance

...

is

...

defined

...

in

...

terms

...

of time since insertion (i.e.

...

message

...

age)

...

.

The basic behavior of the compaction ratio to trigger and prioritize compaction order will not be altered. However, the ratio's definition will be expanded to become the ratio of "compactable" to compactable plus compacted message sizes. Where compactable includes log segments that are neither the active segment nor those prohibited from being compacted because they contain messages that do not satisfy the new lag constraint.

The time lag guarantee can be satisfied by preventing compaction of any segment containing a message or messages within the time lag. KIP-33 - Add a time based log index provides a mechanism for this to be accurately computed for a log segment.

Compatibility, Deprecation, and Migration Plan

The

...

proposed

...

change

...

is

...

backward

...

compatible.

Rejected Alternatives

The

...

database

...

replication

...

use

...

case

...

could

...

be

...

satisfied

...

using

...

a

...

combination

...

of

...

"snapshot"

...

and

...

"journal"

...

topics

...

for

...

each

...

table.

...

The

...

journal

...

topics

...

could

...

use

...

regular

...

time-based

...

deletion.

...

There

...

would

...

need

...

to

...

be

...

some

...

external

...

process

...

periodically

...

creating

...

new

...

snapshots

...

from

...

the

...

most

...

recent

...

snapshots

...

and

...

the

...

journals.

While it would be straightforward to introduce other types of "lag" (e.g. aggregate message size, or message count) there were not sufficient motivating use cases to justify their inclusion at this time.