You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state"Under discussion"

Discussion thread: HERE

JIRAKAFKA-4144

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

 

At the moment the timestamp extractor is configured via a StreamConfig value to KafkaStreams. That means you can only have a single timestamp extractor per app, even though you may be joining multiple streams/tables that require different timestamp extraction methods. 

Ideally the user should be able to specify a timestamp extractor via KStreamBuilder.stream/table, just like you can specify key and value serdes that override the StreamConfig defaults.

Public Changes

 

We propose to add the following overloaded methods to KStreamBuilder.java and TopologyBuilder.java. 

KStreamBuilder.stream(final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics)

KStreamBuilder.stream(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics)

KStreamBuilder.stream(final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern) 

KStreamBuilder.stream(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final Pattern topicPattern) 

KStreamBuilder.table(final TimestampExtractor timestampExtractor, final String topic, final String storeName) 

KStreamBuilder.table(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String topic, final String storeName) 

KStreamBuilder.table(final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName) 

KStreamBuilder.table(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName) 

KStreamBuilder.globalTable(final TimestampExtractor timestampExtractor, final Serde<K> keySerde, final Serde<V> valSerde, final String topic, final String storeName) 

 

TopologyBuilder.addSource(final TimestampExtractor timestampExtractor, final String name, final String... topics) 

TopologyBuilder.addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final String... topics) 

TopologyBuilder.addSource(final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) 

TopologyBuilder.addSource(final AutoOffsetReset offsetReset, final TimestampExtractor timestampExtractor, final String name, final Pattern topicPattern) 

TopologyBuilder.addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) 

TopologyBuilder.addGlobalStore(final StateStore store, final String sourceName, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valueDeserializer, final String topic, final String processorName, final ProcessorSupplier stateUpdateSupplier) 

TopologyBuilder.addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) 

 

 

 

 

 

 

 

  • No labels