Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The new methods on `KTableImp` will be implemented:

Code Block
languagejava
titleKTableImpl additions
linenumberstrue
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
    ...

    private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";

    @Override
    public <VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
                                              final String... stateStoreNames) {
        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
    }

    @Override
    public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) {
        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
    }

    private <VR> KTable<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerWithKeySupplier,
                                               final String... stateStoreNames) {
        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);

        final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(this, transformerWithKeySupplier);
        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);

        if (stateStoreNames != null && stateStoreNames.length > 0) {
            builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
        }

       return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, queryableStoreName, isQueryable);
    }
}

The new KTableTransformValues processor supplier will be implemented as:

Code Block
languagejava
titleKTableTransformValues
linenumberstrue
collapsetrue
class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {

    private final KTableImpl<K, ?, V> parent;
    private final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> valueTransformerSupplier;
    private boolean sendOldValues = false;

    KTableTransformValues(final KTableImpl<K, ?, V> parent,
                          final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> valueTransformerSupplier) {
        this.parent = Objects.requireNonNull(parent, "parent");
        this.valueTransformerSupplier = Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier");
    }

    @Override
    public Processor<K, Change<V>> get() {
        return new KTableTransformValuesProcessor(valueTransformerSupplier.get());
    }

    @Override
    public KTableValueGetterSupplier<K, V1> view() {
        return new KTableValueGetterSupplier<K, V1>() {
            final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();

            public KTableValueGetter<K, V1> get() {
                return new KTableTransformValuesGetter(
                        parentValueGetterSupplier.get(),
                        valueTransformerSupplier.get());
            }

            @Override
            public String[] storeNames() {
                return parentValueGetterSupplier.storeNames();
            }
        };
    }

    @Override
    public void enableSendingOldValues() {
        parent.enableSendingOldValues();
        sendOldValues = true;
    }

    private static <K, V, V1> V1 computeValue(final K key,
                                              final V value,
                                              final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
        if (value == null) {
            return null;
        }

        return valueTransformer.transform(key, value);
    }

    private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
        private final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;

        private KTableTransformValuesProcessor(final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
            this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
        }

        @SuppressWarnings("unchecked")
        @Override
        public void init(final ProcessorContext context) {
            super.init(context);

            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
        }

        @Override
        public void process(final K key, final Change<V> change) {
            final V1 newValue = computeValue(key, change.newValue, valueTransformer);
            final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue, valueTransformer) : null;

            context().forward(key, new Change<>(newValue, oldValue));
        }
    }

    private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {

        private final KTableValueGetter<K, V> parentGetter;
        private final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;

        KTableTransformValuesGetter(final KTableValueGetter<K, V> parentGetter,
                                    final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
            this.parentGetter = Objects.requireNonNull(parentGetter, "parentGetter");
            this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
        }

        @Override
        public void init(final ProcessorContext context) {
            parentGetter.init(context);

            valueTransformer.init(new ForwardingDisabledProcessorContext(context));
        }

        @Override
        public V1 get(final K key) {
            return computeValue(key, parentGetter.get(key), valueTransformer);
        }
    }
}

The ForwardingDisabledProcessorContext class passed to the init method of the valueTransformer has been extracted from the KStreamTransformValuesProcessor class, and throws a StreamException should the transformer implementation throw an exceptionadd a new KTableTransformValues processor node and attach any state stores.  The new KTableTransformValues will be implemented in a similar manner to other processors, instantiating the user supplied transformer once per task.

Compatibility, Deprecation, and Migration Plan

...