You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: (DRAFT) "Under Discussion"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As the Kafka Streams DSL has evolved, some of the APIs have become very overload heavy. For example, we have 8 different overloads for KStream#print. As we add more overloads it becomes harder for a developer using a modern IDE to discover the interfaces hence interrupting the flow and becoming an API usability issue.

Further, we'd like to provide users with a way to override certain StateStore features on a per operator basis, for example, enable caching or logging, for some but not all StateStores. Without a change in approach to the DSL this would add yet more overloads for every operation. Additionally, it should be simple to use the KafkaStreams Caching and Logging wrappers with custom StateStores

Before we go and add many more overloaded methods it is worth while exploring other options to see if we can provide a more concise and intuitive API.

Public Interfaces

KStream
void print(final PrintOptions<K, V> printOptions);

KStream<K, V> through(final String topic, final TopicOptions<K, V> topicOptions);

void to(final String topic, final TopicOptions<V, V> topicOptions);

KGroupedStream<K, V> groupByKey(final GroupByOptions<K, V> groupByOptions);

<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, GroupByOptions<KR, V> groupByOptions);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VO> options);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinOptions<K, V, VT> options);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VO> options);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VT> options);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VO> options);

<VT, VR> KStream<K, VR> outerJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VT> options);
KTable
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, GroupByOptions<KR, VR> groupByOptions);

KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized);

KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized);

void to(final String topic, final TopicOptions<V, V> options);

KTable<K, V> through(final String topic, final Materialized<K, V> options);

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Materialized<K, V, KeyValueStore<K, VR>> materialized);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Materialized<K, V, KeyValueStore<K, VR>> materialized);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Materialized<K, V, KeyValueStore<K, VR>> materialized);

<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final Materialized<K, V, KeyValueStore<K, V>> materialized);
WindowedKStream
public interface WindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator,
                                           final Materialized<K, VR, WindowStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, WindowStore<K, V>> materializedAs);


} 
SessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                              final Merger<? super K, T> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                              final Merger<? super K, T> sessionMerger,
                                              final Materialized<K, VR, SessionStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, SessionStore<K, V>> materializedAs);
}
Materialized
/**
 * Used when materializing a state store
 */
public class Materialized<K, V, S extends StateStore> {
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)

    public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)

    public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

} 
GroupByOptions
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class GroupByOptions<K, V> {

    public static <K, V> GroupByOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new GroupByOptions<>();
    }
    
    public GroupByOptions<K, V> withKeySerde()

    public GroupByOptions<K, V> withValueSerde()
}
JoinOptions
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class JoinOptions<K, V, VO> {

    public static <K, V, VO> JoinOptions<K, V, VO> serdes(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)
    public JoinOptions<K, V, VO> withKeySerde(final Serde<K> keySerde)

    public JoinOptions<K, V, VO> withValueSerde(final Serde<V> valueSerde) 

    public JoinOptions<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
}
TopicOptions
/**
 * Optional arguments that can be specified when doing to and through operations
 */
public class TopicOptions<K, V> {
    public static <K, V> TopicOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde);
    }

    public static <K, V> TopicOptions<K, V> options(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde).withStreamPartitioner(partitioner);
    }

    public TopicOptions<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) {
        return null;
    }

    public TopicOptions<K, V> withValueSerde(final Serde<V> valueSerde) {
        return null;
    }

    public TopicOptions<K, V> withKeySerde(final Serde<K> keySerde) {
        return null;
    }
}
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class PrintOptions<K, V> {
    public static <K, V> PrintOptions<K, V> labeled(final String label) 
    
    public static <K, V> PrintOptions<K, V> toFile(final String filepath)

    public static <K, V> PrintOptions<K, V> sysOut(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label) 

    public static <K, V> PrintOptions<K, V> toFile(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label)

    public PrintOptions<K, V> withLabel(final String label) 

    public PrintOptions<K, V> withFile(final String filepath)

    public PrintOptions<K, V> withKeySerde(final Serde<K> keySerde)

    public PrintOptions<K, V> withValueSerde(final Serde<V> keySerde)

    public PrintOptions<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)
}

 

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels