Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

Code Block
    /**
     * Joins theJoin records of this {@code KTable} towith another table{@code keyedKTable} onusing anon-windowed differentinner keyjoin.
 Updates from this table will* join<p>
 * 1 to 1 on* theThis otheris table.a Updatesforeign tokey thejoin, otherwhere tablethe willjoining inducekey ais joindetermined onby eachthe record in this table that has
 * that specific foreign key.
 *
 * @param other - the table containing the records to be joined on. Keyed by KO.
 * @param foreignKeyExtractor - extracts the key (KO) from this table's value (V).
 * @param joiner - specifies how to join the records from both tables
 * @param materialized - the materialized output store (optional)
 * @param <VR>{@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Named named);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Named named,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains only those records that satisfy the given predicate
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Named named);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the resultother {@code KTable}
 * @param <KO> the key* type of the other@return a {@code KTable}
 * @paramthat <VO>contains the result valueof typejoining ofthis thetable otherwith {@code KTableother}
 *   @return
 */
    <VR, KO, VO> KTable<K, VR> joinleftJoin(final KTable<KO, VO> other,
                                        final ValueMapper<VFunction<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,

                                        final Named named,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Workflow

The overall process is outlined below.

Gliffy Diagram
macroId1697ad3c-3394-468e-af23-ed872cdd9138
displayNameUpdatedOverview
nameUpdatedOverview
pagePin3

Gliffy Diagram
nameJohnWorkflow
pagePin11

  • One optimization possibility is to materialize the data provided to the original node at Step 6. This would allow for changes to an event in This KTable which does not change the foreign key to shortcut sending an event to the Other node. The tradeoff is the need to maintain a materialized state store.

...

The workflow of LHS-generated changes to outputs is shown below. Each step is cumulative with the previous step. Note that the only real difference between LEFT and INNER is that the LEFT will output when the transition is (k,null) → (k, not-null). Any other transitions from LHS events are identical between Inner and Left joinjoin outputs are shown below.


LHS Event
(key, extracted fk)
To which
RHS-partition?
RHS-0 StateRHS-1 StateInner Join OutputLeft Join OutputExecute
Join Logic?
NotesInner-Join SubscriptionWrapper Instruction
Publish new event(k,1)RHS-0(1,foo)
(k,1,foo)(k,1,foo)Yes, Inner/LeftNormal fk-join induced by LHS event

to RHS-0:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE

Publish update to event
by changing fk
(k,1) → (k,2)RHS-1(1,foo)
(k,null)(k,2,null)NoLEFT

Must indicate a delete because there is currently no (fk,value) in RHS with key=2, and (k,1,foo) is no longer valid output.

to RHS-0: DELETE_KEY_NO_PROPAGATE

to RHS-1:
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE

Publish update to event
by changing fk

(k,2) → (k,3)RHS-0(1,foo)
(k,null)(k,3,null)NoLEFT
Ideally would not publish a delete with Inner Join, but we do not maintain sufficient state to know that the (k,2) update resulted in a null output and we don't need to do it again.

to RHS-0: DELETE_KEY_NO_PROPAGATE

to RHS-1:
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE

Publish a value to RHS-0--

(1,foo)
(3,bar)


(k,3,bar)(k,3,bar)Yes, Inner/LeftPerforms prefix scan join-
Delete k(k,3) → (k,null)RHS-0

(1,foo)
(3,bar)


(k,null)(k,null,null)NoLEFTPropagate null/delete through the sub-topology

to RHS-0: DELETE_KEY_AND_PROPAGATE

Publish original event again(k,null) → (k,1)RHS-0

(1,foo)
(3,bar)


(k,1,foo)(k,1,foo)Yes, Inner/LeftNormal fk-join induced by LHS event

to RHS-0:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE

Publish to LHS(q,10)RHS-1

(1,foo)
(3,bar)


Nothing(q,null,10)LEFT ONLY
Only real Significant difference between Inner and Outer-
Publish a value to RHS-1--

(1,foo)
(3,bar)

(q,baz)(q,10,baz)(q,10,baz)Yes, Inner/LeftNormal fk-join induced by LHS event

to RHS-1:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE











...