...
Public Interfaces
The following new methods method will be introduced:
Code Block |
---|
interface KStream<K, V> {
KStream<K, V> recursively(UnaryOperator<KStream<K, V>> op);
KStream<K, V> recursively(UnaryOperator<KStream<K, V>> op, Produced<K, V> produced);
} |
Note: UnaryOperator
is java.util.function.UnaryOperator
...
Code Block |
---|
// <NodeId, ParentId> KStream<String, String> nodes = builder.stream("nodes"); // <NodeId, ParentId> KTable<String, String> parents = nodes.toTable(); // count descendants by recursively producing parent records // 1L is used as a dummy value below, since we will be discarding values when we count the records by key KStream<StringKTable<String, Long> descendants = nodes .map((child, parent) -> { KeyValue(parent, 1L) } // emit "update" for parent of new node .recursively((updates) -> { // recursively emit "updates" for each ancestor of the parent // emit a new update for the parent of this node // the root node has no parent, so recursion terminates at the root node updates .join(parents, (count, parent) -> { parent }) .map((child, parent) -> { KeyValue(parent, 1L) }) }) .groupByKey() .count() |
...
The recursively
method applies input records to its op
argument. The results are then both emitted as a result of recursively
and also fed back in to the op
KStream
.The op
KStream
MUST "terminate"; that is, it must have some condition which eventually prevents further recursion of a record. In our example here, the terminating condition is the join
, since the root node of our graph will have no parent
, so the join
will produce no output for the root node.
Automatic Repartitioning
If the op
argument applies a key-changing operation (as it does in our example above), a repartition
topic may be automatically created. The optional Produced
argument can be provided to customize repartitioning behaviour. This argument is ignored if a repartition
topic is not necessary.
- We use
Produced
instead ofRepartitioned
, because when operating recursively, it would be an error to modify the number of partitions, since the topic MUST have the same number of partitions as the currentTask
.
Repartitioning is required only when the op
KStream
requires repartitioning and the current KStream
does not require repartitioning. The reason for this is that if both streams require repartitioning, then the current stream is guaranteed to automatically repartition records before any operation that requires repartitioning. In the above example, recursively
would not include a repartition topic, because join
already includes one.
Restrictions:
- op cannot be
UnaryOperator.identity
, or an equivalent function that simply returns its argument unmodified - this would produce an infinite recursive loop, since there's no opportunity refine the output to break out of the loop. op
MUST "terminate"; that is, it must have some condition which eventually prevents further recursion of a record. In our example here, the terminating condition is thejoin
, since the root node of our graph will have noparent
, so thejoin
will produce no output for the root node.- We can attempt to detect "definitely non-terminating" arguments by failing to detect operations that can cause the stream to terminate (e.g.
filter
,join
,flatMap
, etc.) in the process graph produced by the function. - We cannot guarantee that a function that includes terminating operations (
filter
,join
,flatMap
, etc.) actually terminates.
- We can attempt to detect "definitely non-terminating" arguments by failing to detect operations that can cause the stream to terminate (e.g.
...
In KStreamImpl
, implementation is fairly simple:
...
- We call
op
, passing our currentKStream
as its argument. This produces ouroutput
KStream.
- We wire up the
graphNode
from theoutput
KStream
as a parent of the currentKStream
. This takes care of the recursion. - Finally, we return the
output
KStream
. This enables users to operate on the records that are being recursively produced, as above.
...
The following tests will be added:
- Counting descendants of graph nodes arriving in-order (as above)
- Counting descendants of graph nodes arriving in any orderStreaming recursion:
- No repartitioning required
- Repartitioning handled by main stream. Repartitioning handled by
op
argument.Rejected Alternatives
It's currently possible to implement streaming recursion via explicit topics, albeit with a number of disadvantages:
...