...
- Performs updates from both sides of the join.
- Uses user-specified serdes to perform the rangeScansprefixScans.
- Resolves out-of-order processing due to switching the foreignKey on the event side.
- Fully scalable based on
Current Limitations / Points of note:
- Requires two three full materialized State Stores at a minimum
- One for the rangeScannable events from prefixScanning on the left table.
- One for the highwater mark for resolving out-of-order processing of events.
Motivation
Same as above.
Public Interfaces
- One for the final materialized sink. This is required because I am sourcing the events back from a repartitioning topic, and so a state store is required to construct a proper Topic Source.
- Merging the highwater and final materialized may be possible, but it is unlikely to be useful if we wish for users of this API to be able to specify their own Materialized state store.
- Caching is disabled on the prefixScan store, same reasons as Jan gave above.
- ReadOnlyKeyValueStore interface was modified to contain prefixScan. This requires that all implementations support this. A number of which are currently stubbed out with exceptions. I would like to take any advice on this component with regards to its suitability in this interface.
- Currently limited to Inner Join (can do more join logic in future - just limiting the focus of this KIP).
- Using hardwired Stream Partitioners. Need a suitable way to access the configured stream partitioner, especially since I don't wish to maintain the Cluster Metadata to do so.
Motivation
Same as above.
Public Interfaces
Code Block |
---|
<V0, |
Code Block |
<V0, KL, VL, KR, VR> KTable<KL, V0> foreignKeyJoin(KTable<KR, VR> other,
ValueMapper<VL, KR> foreignKeyExtractor,
final ValueJoiner<VR, VL, V0> joiner,
final Materialized<KL, V0, KeyValueStore<Bytes, byte[]>> materialized,
Serde<KL> thisKeySerde,
Serde<KR> otherKeySerde,
Serde<VR> otherValueSerde,
Serde<V0> joinedValueSerde); |
...
For the purposes of this KIP, keep these concepts in mind as you read through the document.
Left KTable - This contains the many. Think of this table as a large number of events with their own distinct keys, each of which contains a foreign key that we wish to join on. This is the table of events which we will be joining from. (ie: KTableLeft.foreignKeyJoin ... )
Example: Users clicking on a product, where an event contains a key on UserId+Timestamp and a value containing the foreign key, productId.
Code Block |
---|
Left Sample: (key,value) = (UserA-TimestampA, Event(type=Click, productid=900560) ) |
Right KTable - This contains theone. This is the table that contains the data keyed on the foreign key.
...
Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.
Usage - Example of PropagationWrapper
...
still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.
Serialization Format
Code Block |
---|
{8-byte long original offset}{byte boolean propagation , stored in bit 0}{4-byte value length}{serialized value} |
Usage - Example of PropagationWrapper
Gliffy Diagram name PropagationWrapperIntro pagePin 1
Multiple Rapid Foreign-Key Value Changes - Example of PropagationWrapper
In this case, there are multiple, rapid changes made to the foreign-key of a single record. We will need to resolve which one is the correct final value.
Gliffy Diagram name HighwaterMarkUsage pagePin 3
...
Multiple Rapid Foreign-Key Value Changes - Example of PropagationWrapper
In this case, there are multiple, rapid changes made to the foreign-key of a single record. We will need to resolve which one is the correct final value.
Gliffy Diagram name HighwaterMarkUsage pagePin 3
Current Issues to Work out
1) Work with DefaultPartitioner class and with custom partitioners. Currently hardwired using a copy of the default Partitioner code for key != null logic.
2) Testing Framework. I have done all of my testing outside of the kafka streams unit testing framework, using my own framework that brings up a kafka cluster with user-specified partition counts. I was able to test the copartitioning and
3) Ensuring covereage for restoring from a failed state. I am unsure how logging state stores back data up to the cluster with regards to internal topic propagation, which has influenced the decision to include a "propagate" boolean in the PropagationWrapper, versus letting the high-water mark resolve it.