Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here todo

JIRA: here todo KAFKA-6849

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The following methods would be added to the the Java KTable interface: 

Code Block
languagejava
titleNew KTable methods
linenumberstrue
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                   final String... stateStoreNames); 
 
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                   final String... stateStoreNames)

Proposed Changes

The new methods on `KTableImp` will be implemented:

Code Block
languagejava
titleKTableImpl additions
linenumberstrue
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
    ...

    private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";

    @Override
    public ;
 
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                              final String... stateStoreNames) {
        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) {
        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
    }

    private <VR> KTable<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerWithKeySupplier,
                                               final String... stateStoreNames) {
        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);

        final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(this, transformerWithKeySupplier);final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);

        if (stateStoreNames != null && stateStoreNames.length > 0) {
         final   builderString..internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
        }

       return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, queryableStoreName, isQueryable);
    }
}

The following methods would be added to the Scala KTable class.The new KTableTransformValues processor supplier will be implemented as:

Code Block
languagejavascala
titleKTableTransformValues
linenumberstrue
collapsetrue
Additional Scala KTable methods
def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {

    private final KTableImpl<K, ?, V> parent;
    private final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> valueTransformerSupplier;
    private boolean sendOldValues = false;

    KTableTransformValues(final KTableImpl<K, ?, V> parent,
                          final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> valueTransformerSupplier) {
        this.parent = Objects.requireNonNull(parent, "parent");
        this.valueTransformerSupplier = Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier");
    }

    @Override
    public Processor<K, Change<V>> get() {
        return new KTableTransformValuesProcessor(valueTransformerSupplier.get());
    }

    @Override
    public KTableValueGetterSupplier<K, V1> view() {
        return new KTableValueGetterSupplier<K, V1>() {
            final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();

            public KTableValueGetter<K, V1> get() {
                return new KTableTransformValuesGetter(
                        parentValueGetterSupplier.get(),
                        valueTransformerSupplier.get());
            }

            @Override
            public String[] storeNames() {
                return parentValueGetterSupplier.storeNames();
            }
        };
    }

    @Override
    public void enableSendingOldValues() {
        parent.enableSendingOldValues();
        sendOldValues = true;
    }

    private static <K, V, V1> V1 computeValue(final K key,
                                              final V value,
                                              final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {stateStoreNames: String*): KTable[K, VR]

def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
        if (value == null) {
            return null;
        }

        return valueTransformer.transform(key, value);
    }

    private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
        private final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;

        private KTableTransformValuesProcessor(final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
            this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
        }

        @SuppressWarnings("unchecked")
        @Override
        public void init(final ProcessorContext context) {
            super.init(context);

            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
        }

        @Override
        public void process(final K key, final Change<V> change) {
            final V1 newValue = computeValue(key, change.newValue, valueTransformer);
            final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue, valueTransformer) : null;

            context().forward(key, new Change<>(newValue, oldValue));
        }
    }

    private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {

        private final KTableValueGetter<K, V> parentGetter;
        private final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;

        KTableTransformValuesGetter(final KTableValueGetter<K, V> parentGetter,
                                    final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
            this.parentGetter = Objects.requireNonNull(parentGetter, "parentGetter");
            this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
        }

        @Override
        public void init(final ProcessorContext context) {
            parentGetter.init(context);

            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
        }

        @Override
        public V1 get(final K key) {
            return computeValue(key, parentGetter.get(key), valueTransformer);
        }
    }
}stateStoreNames: String*): KTable[K, VR]

 

Proposed Changes

The new methods on `KTableImpl` will add a new KTableTransformValues processor node and attach any state stores.  The new KTableTransformValues will be implemented in a similar manner to other processors, instantiating the user supplied transformer once per task.

The `ProcessorContext` passed to the `init()` method of transformer implementations will be restricted: any call to any variant of the `forward()` method will throw a `StreamsException`. This will stop implementations outputting values with a new keyThe ForwardingDisabledProcessorContext class passed to the init method of the valueTransformer has been extracted from the KStreamTransformValuesProcessor class, and throws a StreamException should the transformer implementation throw an exception.

Compatibility, Deprecation, and Migration Plan

...

Unit tests to cover new classes and methods. Integration or system test are not required.

Rejected Alternatives

NoneInclude overloads that take a `ValueTransformerSupplier`, matching the overloads available on `KStream`.  These were no included as it keeps the interface more succinct, users can ignore the key value if they do not need it, and likely these overloads on the `KStream` interface will be deprecated, in favour of the 'WithKey' variants, in time.