THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Example for the interface Predicate:
Code Block | ||
---|---|---|
| ||
public interface Predicate<K, V> extends Named {
/**
* Test if the record with the given key and value satisfies the predicate.
*
* @param key the key of the record
* @param value the value of the record
* @return {@code true} if the {@link KeyValue} pair satisfies the predicate—{@code false} otherwise
*/
boolean test(final K key, final V value);
/**
* Creates a new operation with the specified name.
*
* @param name the name to be used for generated processors applied to this stream.
* @param predicate the predicate.
* @param <K> the type of keys
* @param <V> the type of values
* @return a new {@link Predicate} instance.
*/
static <K, V> Predicate<K, V> named(final String name, final Predicate<K, V> predicate) {
return new Predicate<K, V>() {
@Override
public boolean test(K key, V value) {
return predicate.test(key, value);
}
@Override
public String name() {
return name;
}
};
}
} |
Stateless operations
Stateless operations like foreach(), map(), flatMap(), mapValues(), filter(),filterNot, are translated into a single processor. The action interfaces depicted above will be used to directly change the processor name (i.e ForEachAction, ValueMapper, ValueMapperWithKey KeyValueMapper).
...
Code Block | ||
---|---|---|
| ||
final StreamsBuilder builder = new StreamsBuilder(); builder.stream("topic-input", Consumed.withProcessorNamewith("STREAM-FROM-TOPIC-INPUT") .filter(Predicate.named("FILTER-NULL-VALUE", (k, v) -> true )) .map(KeyValueMapper.named("MAP-TO-UPPERCASE", (k, v) -> KeyValue.pair(k, v.toUpperCase())) .to("topic-output", Produced.with("TO-OUTPUT-TOPIC")); System.out.println(builder.build().describe()); ---- (output)---- Topologies: Sub-topology: 0 Source: STREAM-FROM-TOPIC-INPUT (topics: [topic-input]) --> FILTER-NULL-VALUE Processor: FILTER-NULL-VALUE (stores: []) --> MAP-TO-UPPERCASE <-- STREAM-FROM-TOPIC-INPUT Processor: MAP-TO-UPPERCASE (stores: []) --> TO-OUTPUT-TOPIC <-- FILTER-NULL-VALUE Sink: TO-OUTPUT-TOPIC (topic: topic-output) <-- MAP-TO-UPPERCASE |
...