Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Performs updates from both sides of the join.
  • Uses user-specified serdes to perform the rangeScansprefixScans.
  • Resolves out-of-order processing due to switching the foreignKey on the event side.
  • Fully scalable based on

Current Limitations / Points of note:

  • Requires two three full materialized State Stores at a minimum
    • One for the rangeScannable events from prefixScanning on the left table.
    • One for the highwater mark for resolving out-of-order processing of events.

Motivation

Same as above.

Public Interfaces

    • One for the final materialized sink. This is required because I am sourcing the events back from a repartitioning topic, and so a state store is required to construct a proper Topic Source.
    • Merging the highwater and final materialized may be possible, but it is unlikely to be useful if we wish for users of this API to be able to specify their own Materialized state store.
  • Caching is disabled on the prefixScan store, same reasons as Jan gave above.
  • ReadOnlyKeyValueStore interface was modified to contain prefixScan. This requires that all implementations support this. A number of which are currently stubbed out with exceptions. I would like to take any advice on this component with regards to its suitability in this interface.
  • Currently limited to Inner Join (can do more join logic in future - just limiting the focus of this KIP).
  • Using hardwired Stream Partitioners. Need a suitable way to access the configured stream partitioner, especially since I don't wish to maintain the Cluster Metadata to do so.

Motivation

Same as above.

Public Interfaces

Code Block
<V0,
Code Block
<V0, KL, VL, KR, VR> KTable<KL, V0> foreignKeyJoin(KTable<KR, VR> other,
                                                    ValueMapper<VL, KR> foreignKeyExtractor,
                                                    final ValueJoiner<VR, VL, V0> joiner,
                                                    final Materialized<KL, V0, KeyValueStore<Bytes, byte[]>> materialized,
                                                    Serde<KL> thisKeySerde,
                                                    Serde<KR> otherKeySerde,
                                                    Serde<VR> otherValueSerde,
                                                    Serde<V0> joinedValueSerde);

...

For the purposes of this KIP, keep these concepts in mind as you read through the document.

Left KTable - This contains the many. Think of this table as a large number of events with their own distinct keys, each of which contains a foreign key that we wish to join on. This is the table of events which we will be joining from. (ie: KTableLeft.foreignKeyJoin ... )
Example: Users clicking on a product, where an event contains a key on UserId+Timestamp and a value containing the foreign key, productId.

Code Block
Left Sample: 
(key,value)  =  (UserA-TimestampA, Event(type=Click, productid=900560) )


Right KTable - This contains theone. This is the table that contains the data keyed on the foreign key.

...

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

Usage - Example of PropagationWrapper

...

still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

Serialization Format

Code Block
{8-byte long original offset}{byte boolean propagation , stored in bit 0}{4-byte value length}{serialized value}



Usage - Example of PropagationWrapper


Gliffy Diagram
namePropagationWrapperIntro
pagePin1

Multiple Rapid Foreign-Key Value Changes - Example of PropagationWrapper

In this case, there are multiple, rapid changes made to the foreign-key of a single record. We will need to resolve which one is the correct final value.

Gliffy Diagram
nameHighwaterMarkUsage
pagePin3

...

Multiple Rapid Foreign-Key Value Changes - Example of PropagationWrapper

In this case, there are multiple, rapid changes made to the foreign-key of a single record. We will need to resolve which one is the correct final value.

Gliffy Diagram
nameHighwaterMarkUsage
pagePin3

Current Issues to Work out

1) Work with DefaultPartitioner class and with custom partitioners. Currently hardwired using a copy of the default Partitioner code for key != null logic.

2) Testing Framework. I have done all of my testing outside of the kafka streams unit testing framework, using my own framework that brings up a kafka cluster with user-specified partition counts. I was able to test the copartitioning and

3) Ensuring covereage for restoring from a failed state. I am unsure how logging state stores back data up to the cluster with regards to internal topic propagation, which has influenced the decision to include a "propagate" boolean in the PropagationWrapper, versus letting the high-water mark resolve it.