Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

Code Block
    /**
     * Joins theJoin records of this {@code KTable} towith another table{@code keyedKTable} onusing anon-windowed differentinner keyjoin.
 Updates from this table will* join<p>
 * 1 to 1 on* theThis otheris table.a Updatesforeign tokey thejoin, otherwhere tablethe willjoining inducekey ais joindetermined onby eachthe record in this table that has
 * that specific foreign key.
 *
 * @param other - the table containing the records{@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined onwith this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor -a {@link Function} that extracts the key (KO) from this table's value (V). If the
 * @param joiner - specifies* how to join the records from both tables
 * @param materialized - the materialized output store (optional)
 * @param <VR> the value type of the result {@code KTable}
result *is @param <KO> the keynull, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Named named);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Named named,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains only those records that satisfy the given predicate
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Named named);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @param <VO> @return a {@code KTable} that contains the valueresult of typejoining ofthis thetable otherwith {@code KTableother}
   * @return
 */
    <VR, KO, VO> KTable<K, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
 VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, ValueMapper<VVO, KO>VR> foreignKeyExtractorjoiner,
                                        final ValueJoiner<V, VO, VR> joiner,
Named named,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Workflow

The overall process is outlined below.

Gliffy Diagram
macroId1697ad3c-3394-468e-af23-ed872cdd9138
displayNameUpdatedOverview
nameUpdatedOverview
pagePin3

Gliffy Diagram
nameJohnWorkflow
pagePin11

...


LHS Event
(key, extracted fk)
To which
RHS-partition?
RHS-0 StateRHS-1 StateInner Join OutputLeft Join OutputExecute
Join Logic?
NotesInner-Join SubscriptionWrapper Instruction
Publish new event(k,1)RHS-0(1,foo)
(k,1,foo)(k,1,foo)Inner/LeftNormal fk-join induced by LHS event

to RHS-0:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE

Publish update to event
by changing fk
(k,1) → (k,2)RHS-1(1,foo)
(k,null)(k,JoinResult2,null)LEFT

Must indicate a delete because there is currently no (fk,value) in RHS with key=2, and (k,1,foo) is no longer valid output.

to RHS-0: DELETE_KEY_NO_PROPAGATE

to RHS-1:
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE

Publish update to event
by changing fk

(k,2) → (k,3)RHS-0(1,foo)
(k,null)(k,3,JoinResultnull)LEFT
Ideally would not publish a delete with Inner Join, but we do not maintain sufficient state to know that the (k,2) update resulted in a null output and we don't need to do it again.

to RHS-0: DELETE_KEY_NO_PROPAGATE

to RHS-1:
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE

Publish a value to RHS-0--

(1,foo)
(3,bar)


(k,3,bar)(k,3,bar)Inner/LeftPerforms prefix scan join-
Delete k(k,3) → (k,null)RHS-0

(1,foo)
(3,bar)


(k,null)(k,null,JoinResultnull)LEFTPropagate null/delete through the sub-topology

to RHS-0: DELETE_KEY_AND_PROPAGATE

Publish original event again(k,null) → (k,1)RHS-0

(1,foo)
(3,bar)


(k,1,foo)(k,1,foo)Inner/LeftNormal fk-join induced by LHS event

to RHS-0:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE

Publish to LHS(q,10)RHS-1

(1,foo)
(3,bar)


Nothing(q,null,JoinResult10)LEFT
Only real Significant difference between Inner and Outer-
Publish a value to RHS-1--

(1,foo)
(3,bar)

(q,baz)(q,10,baz)(q,10,baz)Inner/LeftNormal fk-join induced by LHS event

to RHS-1:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE











...