THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
KStream<String, String> source_o365_user_activity = builder.stream("source"); KStream<String, String>[] branches = source_o365_user_activity.branch( (key, value) -> value.contains("A"), (key, value) -> value.contains("B"), (key, value) -> true ); branches[0].to("A"); branches[1].to("B"); branches[2].to("C"); |
we could use
Code Block | ||
---|---|---|
| ||
new KafkaStreamsBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks->ks.to("A"))
.branch((key, value) -> value.contains("B"), ks->ks.to("B"))
//default branch should not necessarily be defined in the end!
.defaultBranch(ks->ks.to("C"))
.onTopOf(builder.stream("source")); |
(onTopOf method returns the provided stream so we can continue with method chaining and do something more with the original stream)
Public Interfaces
Add the new org.apache.kafka.streams.kstream.KafkaStreamsBrancher class (see https://github.com/apache/kafka/pull/6164).
...