...
Code Block |
---|
|
Topology topology() {
final var builder = new StreamsBuilder();
builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
.processValues(() -> new FixedKeyContextualProcessor<String, String, String>() {
@Override
public void process(FixedKeyRecord<String, String> record) {
for (final var word : record.value().split(",")) {
context().forward(record.withValue("Hello " + word));
}
}}, Named.as("process-values-without-repartitioning"))
.process(() -> new ContextualProcessor<String, String, String, String>() {
@Override
public void process(Record<String, String> record) {
for (final var word : record.value().split(",")) {
context().forward(record.withKey(word).withValue("Hello " + word));
}
}}, Named.as("process-with-partitioning"))
.to("output", Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
} |
Infrastructure for Fixed Key Records
FixedKeyRecord
Record with immutable key.
Code Block |
---|
public final class FixedKeyRecord<K, V> {
private final K key;
private final V value;
private final long timestamp;
private final Headers headers;
FixedKeyRecord(final K key, final V value, final long timestamp, final Headers headers) {
this.key = key;
this.value = value;
if (timestamp < 0) {
throw new StreamsException(
"Malformed Record",
new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
);
}
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}
public K key() { return key; }
public V value() { return value; }
public long timestamp() { return timestamp; }
public Headers headers() { return headers; }
public <NewV> FixedKeyRecord<K, NewV> withValue(final NewV value) { return new FixedKeyRecord<>(key, value, timestamp, headers); }
public FixedKeyRecord<K, V> withTimestamp(final long timestamp) { return new FixedKeyRecord<>(key, value, timestamp, headers); }
public FixedKeyRecord<K, V> withHeaders(final Headers headers) { return new FixedKeyRecord<>(key, value, timestamp, headers); }
} |
FixedKeyProcessorSupplier
Code Block |
---|
@FunctionalInterface
public interface FixedKeyProcessorSupplier<KIn, VIn, VOut> extends ConnectedStoreProvider, Supplier<FixedKeyProcessor<KIn, VIn, VOut>> {
FixedKeyProcessor<KIn, VIn, VOut> get();
} |
FixedKeyProcessor
Code Block |
---|
public interface FixedKeyProcessor<KIn, VIn, VOut> {
default void init(final FixedKeyProcessorContext<KIn, VOut> context) {}
void process(FixedKeyRecord<KIn, VIn> record);
default void close() {}
} |
FixedKeyContextualProcessor
Helper, same as ContextualProcessor
.
Code Block |
---|
public abstract class FixedKeyContextualProcessor<KIn, VIn, VOut> implements FixedKeyProcessor<KIn, VIn, VOut> {
private FixedKeyProcessorContext<KIn, VOut> context;
protected FixedKeyContextualProcessor() {}
@Override
public void init(final FixedKeyProcessorContext<KIn, VOut> context) {
this.context = context;
}
protected final FixedKeyProcessorContext<KIn, VOut> context() {
return context;
}
}
|
ProcessingContext
To be extended by FixedKeyProcessorContext
and ProcessorContext
:
Code Block |
---|
interface ProcessingContext {
String applicationId();
TaskId taskId();
Optional<RecordMetadata> recordMetadata();
Serde<?> keySerde();
Serde<?> valueSerde();
File stateDir();
StreamsMetrics metrics();
<S extends StateStore> S getStateStore(final String name);
Cancellable schedule(final Duration interval,
final PunctuationType type,
final Punctuator callback);
void commit();
Map<String, Object> appConfigsWithPrefix(final String prefix);
} |
FixedKeyProcessorContext
Code Block |
---|
public interface FixedKeyProcessorContext<KForward, VForward> extends ProcessingContext {
<K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record);
<K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record, final String childName);
} |
Compatibility, Deprecation, and Migration Plan
...