Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Table of Contents

Overview

Kafka Streams allows for stateful stream processing, i.e. operators that have an internal state. This internal state is managed in so-called state stores. A state store can be ephemeral (lost on failure) or fault-tolerant (restored after the failure). The default implementation used by Kafka Streams DSL is a fault-tolerant state store using 1. an internally created and compacted changelog topic (for fault-tolerance) and 2. one (or multiple) RocksDB instances (for cached key-value lookups). Thus, in case of starting/stopping applications and rewinding/reprocessing, this internal data needs to get managed correctly.

Current State

We first want to give an overview about the current implementation details of Kafka Streams with regard to (internally) created topics and the usage of RocksDB. We can categorize available transformations for KStream and KTable as shown below. All operators within a category use the same internal state management mechanism. Therefore, we get an overview of the state management strategy for each transformation.

...

State management details are given below.

KStream API

KStream currently offers the following methods which do have different implication with regard to (internally) created topics and RocksDB usage.

 

single stream

multiple streams 

data transformation

other

data transformation

tuple-by-tuple
(KStream -> KStream)

aggregation

tuple-by-tuple (i.e., joins) 

same key

new key

non-windowed
(KStream -> KTable)

windowed

(KTable -> KTable<<W,K> V>

non-windowed

KStream-KTable

windowed

KStream-KStream 

filter

(1:[0,1])

 

aggregateByKey

aggregateByKey

print

 join

filterNot

(1:[0,1])

 

reduceByKey

reduceByKey

writeAsText

leftJoinleftJoin
 

selectKey

(1:1)

countByKey

countByKey

foreach

 outerJoin

mapValues

(1:1)

map

(1:1)

  

through

  

flatMapValues

(1:n)

flatMap

(1:n)

  

to

  

transformValues

(1:1 + state)

transform

(1:1 + state)

  

branch

  
 

process

(1:n + state)

     

 

KTable API

KTable currently offers the following methods which do have different implication with regard to (internally) created topics and RocksDB usage. 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3576

 

single stream

multiple streams

data transformation

other

data transformation

tuple-by-tuple
(KTable -> KTable/KGroupedTable)

aggregation

(KGroupedTable -> KTable)

tuple-by-tuple

(KTable-KTable joins)
 

 

same key

(-> KTable)

new key

(-> KGroupedTable)

filter

(1:[0,1])

 

aggregate

print

join

filterNot

(1:[0,1])

 

reduce

writeAsText

leftJoin

mapValues

(1:1)

 

count

foreach

outerJoin

 

groupBy

(1:1) [internally simple map]

 

through

 
   

to

 
   

toStream

 

 

 

Data Management

Overview

There are four methods to explicitly deal with user topics:

...

Related Work to state stores: 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3909

Commits

Kafka Streams commit the current processing progress in regular intervals (parameter commit.interval.ms). If a commit is triggered, all state stores need to flush data to disc, i.e., all internal topics needs to get flushed to Kafka. Furthermore, all user topics get flushed, too. Finally, all current topic offsets are committed to Kafka. In case of failure and restart, the application can resume processing from its last commit point (providing at-least-once processing guarantees).

Internal Topics and State Store Names

Currently in the Streams DSL we are trying to abstract the auto generated internal topics and state store names as "KTable names" and "window names"; however, in future release all state store name will be exposed to the user. 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3870
Internal topics follow the naming convention <application.id>-<operatorName>-<suffix>; this naming convention might change any time in.

...