Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
// <NodeId, ParentId>
KStream<String, String> nodes = builder.stream("nodes");

// <NodeId, ParentId>
KTable<String, String> parents = nodes.toTable();

// <NodeId, Descendants>
KStream<String, Long> updates = builder.stream("node-updates"); // EXPLICIT TOPIC

// update parent count
nodes
    .map((node, parent) -> { KeyValue(parent, 1L) })
    .to("node-updates")

// increment parents recursively
updates
    .join(parents, (count, parent) -> { KeyValue(parent, count) }) // the root node has no parent, so recursion halts at the root
    .to("node-updates") // recursively send updates back to explicit topic

// sum updates to produce total descendants for graph nodes
// <NodeId, Descendants>
KTable<String, Long> nodeDescendants = updates
    .groupByKey()
    .reduce((acc, next) -> acc + next)

...