You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread:

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Some use-cases require streaming recursion, which involves piping the output of a pipeline back to the input of the same pipeline. An example of this is graph/tree-traversal, where it can be useful to recursively traverse up a tree as new leaf nodes arrive.

Currently, streaming recursion is possible via an explicit topic:

// <NodeId, ParentId>
KStream<String, String> nodes = builder.stream("nodes");

// <NodeId, ParentId>
KTable<String, String> parents = nodes.toTable();

// <NodeId, Descendants>
KStream<String, Long> updates = builder.stream("node-updates"); // EXPLICIT TOPIC

// update parent count
nodes
    .map((node, parent) -> { KeyValue(parent, 1L) })
    .to("updates")

// increment parents recursively
updates
    .join(parents, (count, parent) -> { KeyValue(parent, count) }) // the root node has no parent, so recursion halts at the root
    .to("updates") // recursively send updates back to explicit topic

// sum updates to produce total descendants for graph nodes
// <NodeId, Descendants>
KTable<String, Long> nodeDescendants = updates
    .groupByKey()
    .reduce((acc, next) -> acc + next)

This approach works (and can be extended to handle out-of-order graph updates), but has the following drawbacks:

  1. The node-updates  topic is entirely internal to the Topology, yet it has to be managed explicitly by the user.
    1. This is functionally a repartition  topic, however, because it's explicitly managed, Streams can't automatically delete consumed records.
    2. Consequently, to prevent the topic growing unbounded, users would need to set retention criteria, which risks possible data loss.
  2. In scenarios where repartitioning is not required, the explicit recursive topic adds overhead.
  3. It also adds some complexity to the user's program, making it more difficult to grok than it needs to be.

A simple, yet ideal, solution would be to extend the DSL API to permit providing a KStream  to the to method. This addresses the above problems:

  1. A repartitioning topic, used for recursion, would be automatically created and managed by Kafka Streams.
    1. As this is a repartitioning topic, records would be automatically deleted once consumed, without retention criteria, preventing data loss and reducing storage overhead.
  2. In cases where repartitioning is not required, no topic would be created, and the stream would recurse directly from processor to processor, reducing overhead.
  3. Less code would be required to implement streaming recursive algorithms, reducing cognitive burden.

Public Interfaces

The following new methods would be introduced:

interface KStream<K, V> {
    void to(KStream<K, V> other);
    void to(KStream<K, V> other, Produced<K, V> produced);
}

Proposed Changes

The above algorithm would instead look like this:

// <NodeId, ParentId>
KStream<String, String> nodes = builder.stream("nodes");

// <NodeId, ParentId>
KTable<String, String> parents = nodes.toTable();

// update parent count
KStream<String, Long> updates = nodes
    .map((node, parent) -> { KeyValue(parent, 1L) })

// increment parents recursively
updates
    .join(parents, (count, parent) -> { KeyValue(parent, count) }) // the root node has no parent, so recursion halts at the root
    .to(updates)

// sum updates to produce total descendants for graph nodes
// <NodeId, Descendants>
KTable<String, Long> nodeDescendants = updates
    .groupByKey()
    .reduce((acc, next) -> acc + next)

This method can be used for recursion, as above, but can also be used for wiring up the process graph "in reverse", if desired.

The optional Produced argument controls data written to/read from the underlying repartitioning topic if one is automatically created.

  • We use Produced  instead of Repartitioned, because when operating recursively, it would be an error to modify the number of partitions, since the topic MUST have the same number of partitions as the current Task.

Implementation

In KStreamImpl, implementation is fairly simple:

  1. We first determine if repartitioning is required, and if it is, we include a repartition node.
  2. Unlike other methods, we then wire up the current graphNode as a parent of the current graphNode in the other KStream.

Automatic Repartitioning

Repartitioning is required only when the current KStream requires repartitioning and the other KStream does not require repartitioning. The reason for this is that if both streams require repartitioning, then the other stream is guaranteed to automatically repartition records before any operation that requires repartitioning. In the above example, to would not include a repartition topic, because join already includes one.

Restrictions:

  • other cannot be this  - this would produce an infinite recursive loop, since there's no opportunity refine the output to break out of the loop.

Compatibility, Deprecation, and Migration Plan

  • No backwards incompatible changes are introduced.

Test Plan

The following tests will be added:

  • Wire up processors "in reverse"
  • Streaming recursion:
    • No repartitioning required
    • Repartitioning handled by other stream.
    • Repartitioning handled by to call.

Rejected Alternatives

It's currently possible to implement streaming recursion via explicit topics, albeit with a number of disadvantages. See "Motivation" section for details.


  • No labels