...
Public Interfaces
Code Block |
---|
/** * Joins theJoin records of this {@code KTable} towith another table{@code keyedKTable} onusing anon-windowed differentinner keyjoin. Updates from this table will* join<p> * 1 to 1 on* theThis otheris table.a Updatesforeign tokey thejoin, otherwhere tablethe willjoining inducekey ais joindetermined onby eachthe record in this table that has * that specific foreign key. * * @param other - the table containing the records{@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined onwith this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor -a {@link Function} that extracts the key (KO) from this table's value (V). If the * @param joiner - specifies* how to join the records from both tables * @param materialized - the materialized output store (optional) * @param <VR> the value type of the result {@code KTable} result *is @param <KO> the keynull, the update is ignored as invalid. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param <VR> the value type of the result {@code KTable} * @param <KO> the key type of the other {@code KTable} * @param <VO> the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} */ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other, final Function<V, KO> foreignKeyExtractor, final ValueJoiner<V, VO, VR> joiner); /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join. * <p> * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the * result is null, the update is ignored as invalid. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param named a {@link Named} config used to name the processor in the topology * @param <VR> the value type of the result {@code KTable} * @param <KO> the key type of the other {@code KTable} * @param <VO> the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} */ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other, final Function<V, KO> foreignKeyExtractor, final ValueJoiner<V, VO, VR> joiner, final Named named); /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join. * <p> * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the * result is null, the update is ignored as invalid. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized. Cannot be {@code null} * @param <VR> the value type of the result {@code KTable} * @param <KO> the key type of the other {@code KTable} * @param <VO> the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} */ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other, final Function<V, KO> foreignKeyExtractor, final ValueJoiner<V, VO, VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join. * <p> * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the * result is null, the update is ignored as invalid. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param named a {@link Named} config used to name the processor in the topology * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized. Cannot be {@code null} * @param <VR> the value type of the result {@code KTable} * @param <KO> the key type of the other {@code KTable} * @param <VO> the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} */ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other, final Function<V, KO> foreignKeyExtractor, final ValueJoiner<V, VO, VR> joiner, final Named named, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join. * <p> * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the * result is null, the update is ignored as invalid. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param <VR> the value type of the result {@code KTable} * @param <KO> the key type of the other {@code KTable} * @param <VO> the value type of the other {@code KTable} * @return a {@code KTable} that contains only those records that satisfy the given predicate */ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other, final Function<V, KO> foreignKeyExtractor, final ValueJoiner<V, VO, VR> joiner); /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join. * <p> * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the * result is null, the update is ignored as invalid. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param named a {@link Named} config used to name the processor in the topology * @param <VR> the value type of the result {@code KTable} * @param <KO> the key type of the other {@code KTable} * @param <VO> the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} */ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other, final Function<V, KO> foreignKeyExtractor, final ValueJoiner<V, VO, VR> joiner, final Named named); /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join. * <p> * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the * result is null, the update is ignored as invalid. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized. Cannot be {@code null} * @param <VR> the value type of the result {@code KTable} * @param <KO> the key type of the other {@code KTable} * @param <VO> the value type of the other {@code KTable} * @return a {@code KTable} that contains the result of joining this table with {@code other} */ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other, final Function<V, KO> foreignKeyExtractor, final ValueJoiner<V, VO, VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); /** * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join. * <p> * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}. * * @param other the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO. * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the * result is null, the update is ignored as invalid. * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param named a {@link Named} config used to name the processor in the topology * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized. Cannot be {@code null} * @param <VR> the value type of the result {@code KTable} * @param <KO> the key type of the other {@code KTable} * @param <VO> the value type of the other {@code KTable} * @param <VO> @return a {@code KTable} that contains the valueresult of typejoining ofthis thetable otherwith {@code KTableother} * @return */ <VR, KO, VO> KTable<K, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other, VR> leftJoin(final KTable<KO, VO> other, final Function<V, KO> foreignKeyExtractor, final ValueJoiner<V, ValueMapper<VVO, KO>VR> foreignKeyExtractorjoiner, final ValueJoiner<V, VO, VR> joiner, Named named, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
Workflow
The overall process is outlined below.
Gliffy Diagram macroId 1697ad3c-3394-468e-af23-ed872cdd9138 displayName UpdatedOverview name UpdatedOverview pagePin 3
Gliffy Diagram name JohnWorkflow pagePin 11
...
LHS Event (key, extracted fk) | To which RHS-partition? | RHS-0 State | RHS-1 State | Inner Join Output | Left Join Output | Execute Join Logic? | Notes | Inner-Join SubscriptionWrapper Instruction | |
---|---|---|---|---|---|---|---|---|---|
Publish new event | (k,1) | RHS-0 | (1,foo) | (k,1,foo) | (k,1,foo) | Inner/Left | Normal fk-join induced by LHS event | to RHS-0: | |
Publish update to event by changing fk | (k,1) → (k,2) | RHS-1 | (1,foo) | (k,null) | (k,JoinResult2,null) | LEFT | Must indicate a delete because there is currently no (fk,value) in RHS with key=2, and (k,1,foo) is no longer valid output. | to RHS-0: DELETE_KEY_NO_PROPAGATE to RHS-1: | |
Publish update to event | (k,2) → (k,3) | RHS-0 | (1,foo) | (k,null) | (k,3,JoinResultnull) | LEFT | Ideally would not publish a delete with Inner Join, but we do not maintain sufficient state to know that the (k,2) update resulted in a null output and we don't need to do it again. | to RHS-0: DELETE_KEY_NO_PROPAGATE to RHS-1: | |
Publish a value to RHS-0 | - | - | (1,foo) | (k,3,bar) | (k,3,bar) | Inner/Left | Performs prefix scan join | - | |
Delete k | (k,3) → (k,null) | RHS-0 | (1,foo) | (k,null) | (k,null,JoinResultnull) | LEFT | Propagate null/delete through the sub-topology | to RHS-0: DELETE_KEY_AND_PROPAGATE | |
Publish original event again | (k,null) → (k,1) | RHS-0 | (1,foo) | (k,1,foo) | (k,1,foo) | Inner/Left | Normal fk-join induced by LHS event | to RHS-0: | |
Publish to LHS | (q,10) | RHS-1 | (1,foo) | Nothing | (q,null,JoinResult10) | LEFT | Only real Significant difference between Inner and Outer | - | |
Publish a value to RHS-1 | - | - | (1,foo) | (q,baz) | (q,10,baz) | (q,10,baz) | Inner/Left | Normal fk-join induced by LHS event | to RHS-1: |
...