You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: here todo

JIRA: here todo

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Kafka Streams the KStream interface has several transformValues methods, where as the KTable interface does not.  The related mapValues method is available on both KTable and KStream. The transformValue method is more flexible than mapValues:

  • Stateful implementationstransformValues accepts a supplier of transformers, rather than the mapper instance mapValues accepts. The supplier is used to instantiate a transformer per stream task, meaning the implementation does not need to be thread-safe. Conversely, mapValues shares the passed in mapper instance across stream tasks, so the implementations must be thread-safe.
  • State-store accesstransformValues allows access to existing state-stores, where as mapValues does not. 
  • Richer API: the ValueTransformer interface is much richer than the ValueMapper, supporting both init() and close() calls.

There is no conceptual reason not to support the richer transformValues on the KTable interface.

Public Interfaces

The following methods would be added to the KTable interface: 

New KTable methods
<VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                   final String... stateStoreNames); 
 
<VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier,
                                   final String... stateStoreNames)

Proposed Changes

The new methods on `KTableImp` will be implemented:

KTableImpl additions
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
    ...

    private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";

    @Override
    public <VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                              final String... stateStoreNames) {
        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) {
        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
    }

    private <VR> KTable<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerWithKeySupplier,
                                               final String... stateStoreNames) {
        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);

        final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(this, transformerWithKeySupplier);
        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);

        if (stateStoreNames != null && stateStoreNames.length > 0) {
            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
        }

       return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, queryableStoreName, isQueryable);
    }
}

The new KTableTransformValues processor supplier will be implemented as:

KTableTransformValues
class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {

    private final KTableImpl<K, ?, V> parent;
    private final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> valueTransformerSupplier;
    private boolean sendOldValues = false;

    KTableTransformValues(final KTableImpl<K, ?, V> parent,
                          final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> valueTransformerSupplier) {
        this.parent = Objects.requireNonNull(parent, "parent");
        this.valueTransformerSupplier = Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier");
    }

    @Override
    public Processor<K, Change<V>> get() {
        return new KTableTransformValuesProcessor(valueTransformerSupplier.get());
    }

    @Override
    public KTableValueGetterSupplier<K, V1> view() {
        return new KTableValueGetterSupplier<K, V1>() {
            final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();

            public KTableValueGetter<K, V1> get() {
                return new KTableTransformValuesGetter(
                        parentValueGetterSupplier.get(),
                        valueTransformerSupplier.get());
            }

            @Override
            public String[] storeNames() {
                return parentValueGetterSupplier.storeNames();
            }
        };
    }

    @Override
    public void enableSendingOldValues() {
        parent.enableSendingOldValues();
        sendOldValues = true;
    }

    private static <K, V, V1> V1 computeValue(final K key,
                                              final V value,
                                              final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
        if (value == null) {
            return null;
        }

        return valueTransformer.transform(key, value);
    }

    private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
        private final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;

        private KTableTransformValuesProcessor(final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
            this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
        }

        @SuppressWarnings("unchecked")
        @Override
        public void init(final ProcessorContext context) {
            super.init(context);

            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
        }

        @Override
        public void process(final K key, final Change<V> change) {
            final V1 newValue = computeValue(key, change.newValue, valueTransformer);
            final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue, valueTransformer) : null;

            context().forward(key, new Change<>(newValue, oldValue));
        }
    }

    private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {

        private final KTableValueGetter<K, V> parentGetter;
        private final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;

        KTableTransformValuesGetter(final KTableValueGetter<K, V> parentGetter,
                                    final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
            this.parentGetter = Objects.requireNonNull(parentGetter, "parentGetter");
            this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
        }

        @Override
        public void init(final ProcessorContext context) {
            parentGetter.init(context);

            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
        }

        @Override
        public V1 get(final K key) {
            return computeValue(key, parentGetter.get(key), valueTransformer);
        }
    }
}

The ForwardingDisabledProcessorContext class passed to the init method of the valueTransformer has been extracted from the KStreamTransformValuesProcessor class, and throws a StreamException should the transformer implementation throw an exception.

Compatibility, Deprecation, and Migration Plan

Users must upgrade to new version if they want to use this functionality.

Test Plan

Unit tests to cover new classes and methods. Integration or system test are not required.

Rejected Alternatives

None.

  • No labels