Status
Current state: "accepted"
Discussion thread: HERE
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
At the moment the timestamp extractor is configured via a StreamConfig
value to KafkaStreams
. That means you can only have a single timestamp extractor per app, even though you may be joining multiple streams/tables
that require different timestamp extraction methods.
Ideally the user should be able to specify a timestamp extractor via KStreamBuilder.stream/table
, just like you can specify key and value serdes that override the StreamConfig
defaults.
Public Changes
We propose to add the following overloaded methods to KStreamBuilder.java and TopologyBuilder.java:
KStreamBuilder.stream(final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics)
KStreamBuilder.stream(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics)
KStreamBuilder.stream(final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern)
KStreamBuilder.stream(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern)
KStreamBuilder.table(final TimestampExtractor timestampExtractor, final String topic, final String storeName)
KStreamBuilder.table(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String topic, final String storeName)
KStreamBuilder.table(final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName)
KStreamBuilder.table(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName)
KStreamBuilder.globalTable(final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName)
KStreamBuilder.globalTable(final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName)
TopologyBuilder.addSource(final TimestampExtractor timestampExtractor, final String name, final String... topics)
TopologyBuilder.addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final String... topics)
TopologyBuilder.addSource(final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern)
TopologyBuilder.addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern)
TopologyBuilder.addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics)
TopologyBuilder.addGlobalStore(final StateStore store, final String sourceName, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier)
TopologyBuilder.addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern)
Moreover, we propose to make the following changes in StreamConfig class:
Deprecate:
StreamConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG
StreamConfig.TIMESTAMP_EXTRACTOR_CLASS_DOC
StreamConfig.KEY_SERDE_CLASS_CONFIG
StreamConfig.KEY_SERDE_CLASS_DOC
StreamConfig.VALUE_SERDE_CLASS_CONFIG
StreamConfig.VALUE_SERDE_CLASS_DOC
StreamConfig.keySerde()
StreamConfig.valueSerde()
Add:
StreamConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
StreamConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC
StreamConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG
StreamConfig.DEFAULT_KEY_SERDE_CLASS_DOC
StreamConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
StreamConfig.DEFAULT_VALUE_SERDE_CLASS_DOC
StreamConfig.defaultKeySerde()
StreamConfig.defaultValueSerde()
All the changes to StreamConfig class are backward compatible.
Proposed Changes
- First, we add
TimestampExtractor
property toSourceNode
class andTopologyBuilder.SourceNodeFactory
inner class. - Second, in
StreamTask
class rather than passingTimestampExtractor
defined inStreamsConfig
, we pass each eachSourceNode
's timestamp. Here there is a small issue we should consider. If the source is defined withPattern
, then getting source with exact topic name returns null inStreamTask
class. For example, we define the source byPattern
"t.*" and topic name is "topic". In this case,topology.source(partition.topic())
will return null. Because in the topology we have the source name as "Pattern [ t.* ]" for particularSourceNode
. To overcome this issue firstly, we change the SourceNode name to be exactly thePattern.string
when defined with Pattern. For example, if the source is defined with "t.*", its name will be exactly "t.*" (up to now it was "Pattern [t.*]"). Secondly, inStreamTask
we change how we discover theSourceNodes
for particularpartition.topic()
. If there exist someSourceNode
in topology withtopology.source(partition.topic())
, then we immediately return the result. Otherwise, we search allSourceNodes
in topology to check if their Pattern matches withpartition.topic()
.
The PR can be found here.
Test Plan
The unit tests are added to TopologyBuilderTest
and KStreamBuilderTest
classes.
Rejected Alternatives
None yet.