Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-9526 OR key = KAFKA-9913
serverId5aa69414-a9e9-3523-82ec-879b028fb15bkeyKAFKA-9526

Released: 2.6 (target)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
        Topologies:
           Sub-topology: 0
            Source: KSTREAM-SOURCE-0000000000 (topics: [input], keySerde: StringDeserializer, valueSerde: StringDeserializer)
              --> KSTREAM-KEY-SELECT-0000000001
            Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
              --> counts-repartition-filter
              <-- KSTREAM-SOURCE-0000000000
            Processor: counts-repartition-filter (stores: [])
              --> counts-repartition-sink
              <-- KSTREAM-KEY-SELECT-0000000001
            Sink: counts-repartition-sink (topic: counts-repartition, keySerde: StringSerializer, valueSerde: StringSerializer)
              <-- counts-repartition-filter
        
          Sub-topology: 1
            Source: counts-repartition-source (topics: [counts-repartition], keySerde: StringDeserializer, valueSerde: StringDeserializer)
              --> KSTREAM-AGGREGATE-0000000002
            Processor: KSTREAM-AGGREGATE-0000000002 (stores: [(counts, serdes: [StringSerde, LongSerde])])
              --> myname
              <-- counts-repartition-source
            Processor: myname (stores: [(myname-store, serdes: [SessionWindowedSerde, FullChangeSerde])])
              --> KTABLE-TOSTREAM-0000000006
              <-- KSTREAM-AGGREGATE-0000000002
            Processor: KTABLE-TOSTREAM-0000000006 (stores: [])
              --> KSTREAM-MAP-0000000007
              <-- myname
            Processor: KSTREAM-MAP-0000000007 (stores: [])
              --> KSTREAM-SINK-0000000008
              <-- KTABLE-TOSTREAM-0000000006
            Sink: KSTREAM-SINK-0000000008 (topic: output-suppressed, keySerde: StringSerializer, valueSerde: LongSerializer)
              <-- KSTREAM-MAP-0000000007

Proposed Changes


In order to support that, I'd propose the make the following API augments on the TopologyDescription and its corresponding children classes:

Code Block
languagejava
    interface Processor extends Node {
        /**
         * The names of all connected stores.
         * @return set of store names
         */
        @Deprecated
        Set<String> stores();

        /**
         * The names of all connected stores.
         * @return set of stores
         */
        Set<Store> storeSet();                    <---- NEW FUNC
    }

    /**
     * A state store of a topology                    <---- NEW CLASS
     */
    interface Store {
        /**
         * Name of the stat store
         */
        String name();

        /**
         * Name of the corresponding changelog topic of this store.
         * @return name of the changelog topic; null if the store is not logging enabled
         */
        String changelogTopic();

        /**
         * Names of serde classes that are associated with the store
         */
        List<String> serdeNames();
    }

    interface Source extends Node {
        ....

        /**
         * Names of key serde class used for this source node
         */
        String keySerdeName();                    <---- NEW FUNC

        /**
         * Names of value serde class used for this source node
         */
        String valueSerdeName();                    <---- NEW FUNC
    }

    interface Sink extends Node {
        ....

        /**
         * Names of key serde class used for this source node
         */
        String keySerdeName();                    <---- NEW FUNC

        /**
         * Names of value serde class used for this source node
         */
        String valueSerdeName();                    <---- NEW FUNC
    }

    interface Subtopology {
        /**
         * Internally assigned unique ID.
         * @return the ID of the sub-topology
         */
        int id();

        /**
         * All nodes of this sub-topology.
         * @return set of all nodes within the sub-topology
         */
        Set<Node> nodes();

        /**
         * All source nodes of this sub-topology.
         * @return set of all source nodes within the sub-topology
         */
        Set<Source> sourceNodes();

        /**
         * All sink nodes of this sub-topology.
         * @return set of all sink nodes within the sub-topology
         */
        Set<Sink> sinkNodes();

        /**
         * All state stores of this sub-topology.
         * @return set of all state stores within the sub-topology
         */
        Set<Store> stores();                    <---- NEW FUNC
    }


And with the augmented programing interface, we can also allow users to loop over all source / sink nodes and all state stores of a sub-topology so that we can expose all topics (sink, source, intermediate and changelog) as:


Code Block
languagejava
    for (Subtopology subTopology: topology.describe().subtopologies()) {
        for (Source source: subTopology.sourceNodes()) { /* get source and intermediate topics */ }
        for (Sink sink: subTopology.sinkNodes()) { /* get sink and intermediate topics */ }
        for (Store store: subTopology.stores()) { /* get changelog topics */ }
    }


The reason we did not expose APIs for topic names directly is that for source nodes, it is possible to have Pattern and for sink nodes, it is possible to have topic-extractors, and hence it's better to let users leveraging on the lower-level APIs to construct the topic names programmatically themselves.


Proposed Changes

In order to fall back to global config values, we will need to leverage on the newly fall back to case 3) with global config values, we will need to leverage on the newly added `StreamsBuilder#build(Properties)`; if the old `StreamsBuilder#build()` is called, then serde information would not be added to the descriptionexposed via the description (i.e. they will be null) since it is not yet "determined". Also if the TopologyDescription is from the topology built from `StreamsBuilder#build()`, then its `toString` function would not be augmented as well.

Note that the augmented topology description only contains the serde class name, but it does not necessarily include the inner class name.

...