...
The new methods on `KTableImp` will be implemented:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> {
...
private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
@Override
public <VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier,
final String... stateStoreNames) {
return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
}
@Override
public <VR> KTable<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String... stateStoreNames) {
return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier), stateStoreNames);
}
private <VR> KTable<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerWithKeySupplier,
final String... stateStoreNames) {
final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(this, transformerWithKeySupplier);
builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name);
if (stateStoreNames != null && stateStoreNames.length > 0) {
builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
}
return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, queryableStoreName, isQueryable);
}
} |
The new KTableTransformValues
processor supplier will be implemented as:
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, V1> {
private final KTableImpl<K, ?, V> parent;
private final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> valueTransformerSupplier;
private boolean sendOldValues = false;
KTableTransformValues(final KTableImpl<K, ?, V> parent,
final InternalValueTransformerWithKeySupplier<? super K, ? super V, ? extends V1> valueTransformerSupplier) {
this.parent = Objects.requireNonNull(parent, "parent");
this.valueTransformerSupplier = Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier");
}
@Override
public Processor<K, Change<V>> get() {
return new KTableTransformValuesProcessor(valueTransformerSupplier.get());
}
@Override
public KTableValueGetterSupplier<K, V1> view() {
return new KTableValueGetterSupplier<K, V1>() {
final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = parent.valueGetterSupplier();
public KTableValueGetter<K, V1> get() {
return new KTableTransformValuesGetter(
parentValueGetterSupplier.get(),
valueTransformerSupplier.get());
}
@Override
public String[] storeNames() {
return parentValueGetterSupplier.storeNames();
}
};
}
@Override
public void enableSendingOldValues() {
parent.enableSendingOldValues();
sendOldValues = true;
}
private static <K, V, V1> V1 computeValue(final K key,
final V value,
final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
if (value == null) {
return null;
}
return valueTransformer.transform(key, value);
}
private class KTableTransformValuesProcessor extends AbstractProcessor<K, Change<V>> {
private final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
private KTableTransformValuesProcessor(final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
}
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}
@Override
public void process(final K key, final Change<V> change) {
final V1 newValue = computeValue(key, change.newValue, valueTransformer);
final V1 oldValue = sendOldValues ? computeValue(key, change.oldValue, valueTransformer) : null;
context().forward(key, new Change<>(newValue, oldValue));
}
}
private class KTableTransformValuesGetter implements KTableValueGetter<K, V1> {
private final KTableValueGetter<K, V> parentGetter;
private final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer;
KTableTransformValuesGetter(final KTableValueGetter<K, V> parentGetter,
final InternalValueTransformerWithKey<? super K, ? super V, ? extends V1> valueTransformer) {
this.parentGetter = Objects.requireNonNull(parentGetter, "parentGetter");
this.valueTransformer = Objects.requireNonNull(valueTransformer, "valueTransformer");
}
@Override
public void init(final ProcessorContext context) {
parentGetter.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}
@Override
public V1 get(final K key) {
return computeValue(key, parentGetter.get(key), valueTransformer);
}
}
} |
The ForwardingDisabledProcessorContext
class passed to the init
method of the valueTransformer
has been extracted from the KStreamTransformValuesProcessor
class, and throws a StreamException
should the transformer implementation throw an exceptionadd a new KTableTransformValues
processor node and attach any state stores. The new KTableTransformValues
will be implemented in a similar manner to other processors, instantiating the user supplied transformer once per task.
Compatibility, Deprecation, and Migration Plan
...