Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
// <NodeId, ParentId>
KStream<String, String> nodes = builder.stream("nodes");

// <NodeId, ParentId>
KTable<String, String> parents = nodes.toTable();

// count descendants by recursively producing parent records
// 1L is used as a dummy value below, since we will be discarding values when we count the records by key
KStream<StringKTable<String, Long> descendants = nodes
	.map((child, parent) -> { KeyValue(parent, 1L) }   // emit "update" for parent of new node
	.recursively((updates) -> {                        // recursively emit "updates" for each ancestor of the parent
		// emit a new update for the parent of this node
	 	// the root node has no parent, so recursion terminates at the root node
		updates
			.join(parents, (count, parent) -> { parent })
			.map((child, parent) -> { KeyValue(parent, 1L) })
	})
	.groupByKey()
	.count()

...