Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder discussionAccepted Vote

Discussion thread: https://lists.apache.org/thread.html/a5ee2d569448dd57647dca93f288405c13785dec9cb4a0cbddfedd35@%3Cdev.kafka.apache.org%3E

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7658
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9483

Motivation

For users who cannot read their source topic as a changelog stream from the beginning we need to provide a way for event streams to be translated into changelog stream. As pointed out by guozhang Wang this should not be confused with KStream.reduce(), these functions should:

  1. completely change the semantics of the input stream from event stream to changelog stream any null-values will still be serialized, and if the resulted bytes are also null they will be interpreted as "deletes" to the materialized KTable (i.e. tombstones in the changelog stream).
  2. Materialization of the KTable will follow the usual process:
    1. if Optimization is turned off, the KTable will always be materialized (but the store will not be queryable)
    2. if Optimization is turned on and if .toTable() is used, the Ktable may or may not be materialized. The store still cannot be queried.
    3. if .toTable(final Materialized<K, VRV, KeyValueStore<Bytes, byte[]>> materialized) is used irrespective of the optimization strategy used the KTable will be materialized and will be queryable.

...

Two functions will need to be added to the KStream interface:

Code Block
languagejava
<VR> //java
KTable<K,VR>V> toTable()
<VR> KTable<K,VR>V> toTable(Named)
<VR> KTable<K,VR>V> toTable(final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
<VR> KTable<K,VR>V> toTable(Named, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)

//scala  
def toTable: KTable[K, V]
def toTable(materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V]

Proposed Changes

Adding two new functions to the Kstreams DSL.

...