Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
  Topology topology() {
    final var builder = new StreamsBuilder();
    builder.stream("words", Consumed.with(Serdes.String(), Serdes.String()));
        .processValues(() -> new FixedKeyContextualProcessor<String, String, String>() {
            @Override
            public void process(FixedKeyRecord<String, String> record) {
                for (final var word : record.value().split(",")) {
                    context().forward(record.withValue("Hello " + word));
                }
            }}, Named.as("process-values-without-repartitioning"))
        .process(() -> new ContextualProcessor<String, String, String, String>() {
            @Override
            public void process(Record<String, String> record) {
                for (final var word : record.value().split(",")) {
                    context().forward(record.withKey(word).withValue("Hello " + word));
                }
            }}, Named.as("process-with-partitioning"))

        .to("output", Produced.with(Serdes.String(), Serdes.String()));
    return builder.build();
  }

Infrastructure for Fixed Key Records

FixedKeyRecord

Record with immutable key.

Code Block
public final class FixedKeyRecord<K, V> {

    private final K key;
    private final V value;
    private final long timestamp;
    private final Headers headers;

    FixedKeyRecord(final K key, final V value, final long timestamp, final Headers headers) {
        this.key = key;
        this.value = value;
        if (timestamp < 0) {
            throw new StreamsException(
                "Malformed Record",
                new IllegalArgumentException("Timestamp may not be negative. Got: " + timestamp)
            );
        }
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }

    public K key() { return key; }

    public V value() { return value; }

    public long timestamp() { return timestamp; }

    public Headers headers() { return headers; }

    public <NewV> FixedKeyRecord<K, NewV> withValue(final NewV value) { return new FixedKeyRecord<>(key, value, timestamp, headers); }

    public FixedKeyRecord<K, V> withTimestamp(final long timestamp) { return new FixedKeyRecord<>(key, value, timestamp, headers); }

    public FixedKeyRecord<K, V> withHeaders(final Headers headers) { return new FixedKeyRecord<>(key, value, timestamp, headers); }
}


FixedKeyProcessorSupplier

Code Block
@FunctionalInterface
public interface FixedKeyProcessorSupplier<KIn, VIn, VOut> extends ConnectedStoreProvider, Supplier<FixedKeyProcessor<KIn, VIn, VOut>> {
    FixedKeyProcessor<KIn, VIn, VOut> get();
}



FixedKeyProcessor


Code Block
public interface FixedKeyProcessor<KIn, VIn, VOut> {

    default void init(final FixedKeyProcessorContext<KIn, VOut> context) {}

    void process(FixedKeyRecord<KIn, VIn> record);

    default void close() {}
}


FixedKeyContextualProcessor

Helper, same as ContextualProcessor.

Code Block
public abstract class FixedKeyContextualProcessor<KIn, VIn, VOut> implements FixedKeyProcessor<KIn, VIn, VOut> {

    private FixedKeyProcessorContext<KIn, VOut> context;

    protected FixedKeyContextualProcessor() {}

    @Override
    public void init(final FixedKeyProcessorContext<KIn, VOut> context) {
        this.context = context;
    }

    protected final FixedKeyProcessorContext<KIn, VOut> context() {
        return context;
    }
}


ProcessingContext

To be extended by FixedKeyProcessorContext  and ProcessorContext :

Code Block
interface ProcessingContext {

    String applicationId();

    TaskId taskId();

    Optional<RecordMetadata> recordMetadata();

    Serde<?> keySerde();

    Serde<?> valueSerde();

    File stateDir();

    StreamsMetrics metrics();

    <S extends StateStore> S getStateStore(final String name);

    Cancellable schedule(final Duration interval,
                         final PunctuationType type,
                         final Punctuator callback);

    void commit();

    Map<String, Object> appConfigsWithPrefix(final String prefix);
}


FixedKeyProcessorContext

Code Block
public interface FixedKeyProcessorContext<KForward, VForward> extends ProcessingContext {

    <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record);

    <K extends KForward, V extends VForward> void forward(FixedKeyRecord<K, V> record, final String childName);
}


Compatibility, Deprecation, and Migration Plan

...