You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: here 

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka Streams has grown to augment KTables (e.g., changelogs) with state stores. These state stores can be seen as a way to materialize the KTables. State stores can be queried through the Interactive Queries (IQ for short) APIs, as introduced in KIP-67. This led to some confusions, e.g., :

  • Not all KTables can be materialized, and thus some data cannot be queried through interactive queries. For example, the second KTable below cannot be materialized:

KTable table1 = builder.table(..., topic, stateStoreName); // materialized

KTable table2 = table1.filter(...); // not materialized, i.e., not queryable by IQ

  • Certain operations on KTables, such as print and foreach, are often confusing and redundant, since users can already print the values of a materialized view. Furthermore, users also have the option to use the KStream equivalent functions after converting a KTable to a KStream. So they have 3 ways of printing and doing foreach.


    Apart from the confusions, there is also a semantic gap with late arriving data. Because a changelog topic is compacted in Kafka, a late arriving record with timestamp T1 can over-write a newer record with timestamp T2. That is because log compaction in Kafka does not look at record timestamps.

Proposal

We propose to clean up the KTable API and make the KTable semantics clearer through API improvements and associated JavaDoc improvements.

Interfaces / API Changes (DSL only)

1. Add a materialized() method to the KTable abstraction:

/**

 

    * Materialize this KTable by creating a state store with given name.

 

    * Throws exception if KTable is already materialized.

 

    */

 

KTable<K, V> materialized(final String stateStoreName);

 

2. Remove methods: print(), writeAsText(), foreach() and their overloads.

3. Rename toStream() to toKStream() for consistency.

Changes to Kafka

Add the option to do log compaction based on timestamps:

log.cleanup.policy could now be [compact, delete, compact_timestamp]


Compatibility, Deprecation, and Migration Plan

  • No impact on existing users

Rejected Alternatives

Have the KTable be the materialized view. In this alternative, the KTable would no longer be a changelog stream, it would be a materialized view. So we would collapse two things (the existing KTable and state store) into one abstraction. All KTable methods would need to take a state store name.

  • There are some performance implications of doing this, e.g., each KTable would now always be materialized and that is expensive. However, we could consider mitigating the performance penalty by introducing “virtual” stores, where the data is not written to a topic, but is computed on the fly. For example, the KTable obtained from filter() would not have a changelog topic, but could still have a virtual store queryable through Interactive Queries.

  • It is not clear that collapsing 2 abstractions helps. In particular, a KTable models a changelog. That itself is a useful abstraction. A state store is a materialized view. That’s a distinct abstraction with parallels in the database world.

  • No labels