THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
// <NodeId, ParentId> KStream<String, String> nodes = builder.stream("nodes"); // <NodeId, ParentId> KTable<String, String> parents = nodes.toTable(); // count descendants by recursively producing parent records // 1L is used as a dummy value below, since we will be discarding values when we count the records by key KStream<StringKTable<String, Long> descendants = nodes .map((child, parent) -> { KeyValue(parent, 1L) } // emit "update" for parent of new node .recursively((updates) -> { // recursively emit "updates" for each ancestor of the parent // emit a new update for the parent of this node // the root node has no parent, so recursion terminates at the root node updates .join(parents, (count, parent) -> { parent }) .map((child, parent) -> { KeyValue(parent, 1L) }) }) .groupByKey() .count() |
...