THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
// <NodeId, ParentId> KStream<String, String> nodes = builder.stream("nodes"); // <NodeId, ParentId> KTable<String, String> parents = nodes.toTable(); // <NodeId, Descendants> KStream<String, Long> updates = builder.stream("node-updates"); // EXPLICIT TOPIC // update parent count nodes .map((node, parent) -> { KeyValue(parent, 1L) }) .to("node-updates") // increment parents recursively updates .join(parents, (count, parent) -> { KeyValue(parent, count) }) // the root node has no parent, so recursion halts at the root .to("node-updates") // recursively send updates back to explicit topic // sum updates to produce total descendants for graph nodes // <NodeId, Descendants> KTable<String, Long> nodeDescendants = updates .groupByKey() .reduce((acc, next) -> acc + next) |
...