Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Released: <Kafka Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently

...

Kafka's

...

log

...

compaction

...

gives

...

minimal

...

control

...

over

...

what

...

portion

...

of

...

the

...

log

...

remains

...

uncompacted.

...

There

...

is

...

a

...

setting

...

that

...

prevents

...

compaction

...

until

...

a

...

certain

...

dirty

...

ratio

...

has

...

been

...

reached

...

but

...

this

...

does

...

not

...

provide

...

any

...

upper

...

bound

...

on

...

how

...

much

...

of

...

the

...

log's

...

head

...

will

...

remain

...

uncompacted

...

once

...

it

...

runs.

...

Although

...

the

...

segment

...

currently

...

being

...

written

...

will

...

never

...

be

...

compacted,

...

this

...

could

...

leave

...

as

...

little

...

as

...

one

...

message

...

uncompacted.

...

As

...

a

...

result,

...

it

...

is

...

impossible

...

to

...

be

...

guaranteed

...

that

...

a

...

consumer

...

will

...

get

...

every

...

update

...

to

...

a

...

compacted

...

topic.

...

Even

...

if

...

the

...

consumer

...

falls

...

behind

...

by

...

a

...

single

...

message

...

it

...

might

...

get

...

the

...

compacted

...

version.

...

One

...

particularly

...

relevant

...

use

...

case

...

is

...

database

...

state

...

replication

...

through

...

change

...

data

...

capture.

...

This

...

use

...

case

...

is

...

specifically

...

called

...

out

...

in

...

the

...

Kafka

...

documentation

...

for

...

the

...

compaction

...

feature

...

under

...

"Database

...

change

...

subscription".

...

It

...

is

...

convenient

...

to

...

produce

...

this

...

data

...

in

...

multiple

...

topics

...

(e.g.

...

one

...

per

...

source

...

table)

...

and/or

...

partitions.

...

However,

...

in

...

order

...

to

...

be

...

able

...

to

...

recreate

...

a

...

database

...

state

...

at

...

a

...

point

...

of

...

transactional

...

consitency

...

some

...

coordination

...

across

...

topics/partions

...

is

...

required

...

(e.g.

...

a

...

separate

...

_checkpoint_

...

topic

...

with

...

the

...

offsets

...

at

...

each

...

transaction

...

commit).

...

If

...

the

...

table

...

topics

...

are

...

all

...

independently

...

compacting

...

there

...

is

...

currently

...

no

...

way

...

to

...

be

...

assured

...

that

...

any

...

given

...

checkpoint

...

can

...

be

...

materialized

...

as

...

the

...

checkpointed

...

offset

...

for

...

any

...

given

...

topic

...

may

...

have

...

been

...

compacted

...

such

...

that

...

some

...

keys

...

may

...

be

...

taking

...

on

...

some

...

subsequently

...

inserted

...

values.

...

(Details:

...

https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46)

Public Interfaces

This

...

proposal

...

includes

...

new

...

configurations

...

for

...

controlling

...

compaction.

...

The

...

log

...

cleaner

...

can

...

be

...

configured

...

retain

...

a

...

minimum

...

amount

...

of

...

the

...

uncompacted

...

head

...

of

...

the

...

log.

...

This

...

is

...

enabled

...

by

...

setting

...

one

...

or

...

more

...

of

...

the

...

compaction

...

lags:



    log.cleaner.min.compaction.lag.ms
    log.cleaner.min.compaction.lag.bytes
    log.cleaner.min.compaction.lag.messages

for

...

setting

...

minimum

...

message

...

age

...

in

...

milliseconds,

...

cumulative

...

size

...

in

...

bytes

...

of

...

subsequent

...

messages,

...

and

...

subsequent

...

message

...

count

...

respectively.

...

These

...

have

...

similar

...

per-topic

...

configurations:



    min.compaction.lag.ms
    min.compaction.lag.bytes

    min.compaction.lag.messages

Each

...

of

...

these

...

configured

...

lags

...

defaults

...

to

...

zero

...

so

...

that

...

if

...

none

...

are

...

set,

...

all

...

log

...

segments

...

are

...

eligible

...

for

...

compaction

...

except

...

for

...

the

...

last

...

segment

...

(i.e.

...

the

...

one

...

currently

...

being

...

written).

...

The

...

active

...

segment

...

will

...

not

...

be

...

compacted

...

even

...

if

...

all

...

of

...

the

...

compaction

...

lag

...

constraints

...

are

...

satisfied.

...

This

...

leaves

...

unchanged

...

the

...

current

...

behavior.

...

If

...

one

...

or

...

more

...

of

...

the

...

compaction

...

lags

...

is

...

greater

...

than

...

zero

...

then

...

compaction

...

of

...

the

...

segments

...

of

...

the

...

logs

...

containing

...

any

...

messages

...

that

...

do

...

not

...

satisfy

...

all of the constraints will not be compacted. In particular this allows for the example use case like: "any consumer that is no more than 1 hour behind will get every message."

Proposed Changes

Introduce

...

additional

...

configurations

...

to

...

topics

...

that

...

guarantee

...

a

...

minimum

...

portion

...

of

...

the

...

head

...

of

...

the

...

log

...

will

...

remain

...

uncompacted.

...

That

...

is,

...

offer

...

guarantees

...

that

...

a

...

consumer

...

that

...

does

...

not

...

lag

...

too

...

far

...

behind

...

will

...

get

...

every

...

update

...

to

...

a

...

compacted

...

topic.

...

These

...

can

...

be

...

used

...

to

...

set

...

constraints

...

on

...

the

...

minimum

...

_distance_

...

from

...

the

...

topic

...

head

...

that

...

will

...

remain

...

uncompacted,

...

where

...

distance

...

is

...

defined

...

in

...

terms

...

of:

...

  • Time since insertion (i.e.

...

  • message

...

  • age)

...

  • Aggregate

...

  • message

...

  • size

...

  • Message

...

  • count

...

The

...

basic

...

behavior

...

of

...

the

...

compaction

...

ratio

...

to

...

trigger

...

and

...

prioritize

...

compaction

...

order

...

will

...

not

...

be

...

altered.

...

However,

...

the

...

ratio's

...

definition

...

will

...

be

...

expanded

...

to

...

become

...

the

...

ratio

...

of

...

"

...

compactable"

...

to

...

compactable plus

...

compacted

...

message

...

sizes.

...

Where compactable includes log segments that are neither the active segment nor those prohibited from being compacted because they contain messages that do not satisfy all the new lag constraints.

Compatibility, Deprecation, and Migration Plan

The

...

proposed

...

change

...

is

...

backward

...

compatible.

Rejected Alternatives

The

...

database

...

replication

...

use

...

case

...

could

...

be

...

satisfied

...

using

...

a

...

combination

...

of

...

"snapshot"

...

and

...

"journal"

...

topics

...

for

...

each

...

table.

...

The

...

journal

...

topics

...

could

...

use

...

regular

...

time-based

...

deletion.

...

There

...

would

...

need

...

to

...

be

...

some

...

external

...

process

...

periodically

...

creating

...

new

...

snapshots

...

from

...

the

...

most

...

recent

...

snapshots

...

and

...

the

...

journals.