You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateUnder Discussion

Discussion thread: link 

JIRA: KAFKA-13261 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today, foreign key (FK) joins in Kafka Streams only work if both tables being joined (the primary table and the foreign-key table) use the default partitioner.

This limitation is due to the fact that the subscription and response topics used in the implementation of FK joins always use the default partitioner today. If the foreign-key table is not copartitioned with the subscription topic, then FK lookups may be routed to a Streams instance that does not have state for the foreign-key table, resulting in missing join records. Similarly, if the primary table is not copartitioned with the response topic, then subscription responses may be routed to an instance that does not contain the original (triggering) record, resulting in a failed hash comparison and a dropped join result.

This KIP proposes to add support for FK joins on tables with custom partitioners, by extending the FK join interface to allow custom partitioners to be passed in.

Public Interfaces

The following KTable interfaces will be added. Note the addition of the thisPartitioner and otherPartitioner parameters to all existing FK join interfaces:

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
	 * @param thisPartitioner     the {@link StreamPartitioner} that specifies this {@code KTable}'s partitioning
								  strategy. Can be left {@code null} if the default partitioner is used.
	 * @param otherPartitioner    the {@link StreamPartitioner} that specifies the other {@code KTable}'s
								  partitioning strategy. Can be left {@code null} if the default partitioner is used.
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
									final StreamPartitioner<K, V> thisPartitioner,
									final StreamPartitioner<KO, VO> otherPartitioner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
	 * @param thisPartitioner     the {@link StreamPartitioner} that specifies this {@code KTable}'s partitioning
								  strategy. Can be left {@code null} if the default partitioner is used.
	 * @param otherPartitioner    the {@link StreamPartitioner} that specifies the other {@code KTable}'s
								  partitioning strategy. Can be left {@code null} if the default partitioner is used.
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Named named,
									final StreamPartitioner<K, V> thisPartitioner,
									final StreamPartitioner<KO, VO> otherPartitioner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
	 * @param thisPartitioner     the {@link StreamPartitioner} that specifies this {@code KTable}'s partitioning
								  strategy. Can be left {@code null} if the default partitioner is used.
	 * @param otherPartitioner    the {@link StreamPartitioner} that specifies the other {@code KTable}'s
								  partitioning strategy. Can be left {@code null} if the default partitioner is used.
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
									final StreamPartitioner<K, V> thisPartitioner,
									final StreamPartitioner<KO, VO> otherPartitioner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
	 * @param thisPartitioner     the {@link StreamPartitioner} that specifies this {@code KTable}'s partitioning
								  strategy. Can be left {@code null} if the default partitioner is used.
	 * @param otherPartitioner    the {@link StreamPartitioner} that specifies the other {@code KTable}'s
								  partitioning strategy. Can be left {@code null} if the default partitioner is used.
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Named named,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
									final StreamPartitioner<K, V> thisPartitioner,
									final StreamPartitioner<KO, VO> otherPartitioner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
	 * @param thisPartitioner     the {@link StreamPartitioner} that specifies this {@code KTable}'s partitioning
								  strategy. Can be left {@code null} if the default partitioner is used.
	 * @param otherPartitioner    the {@link StreamPartitioner} that specifies the other {@code KTable}'s
								  partitioning strategy. Can be left {@code null} if the default partitioner is used.
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains only those records that satisfy the given predicate
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
										final StreamPartitioner<K, V> thisPartitioner,
										final StreamPartitioner<KO, VO> otherPartitioner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
	 * @param thisPartitioner     the {@link StreamPartitioner} that specifies this {@code KTable}'s partitioning
								  strategy. Can be left {@code null} if the default partitioner is used.
	 * @param otherPartitioner    the {@link StreamPartitioner} that specifies the other {@code KTable}'s
								  partitioning strategy. Can be left {@code null} if the default partitioner is used.
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Named named,
										final StreamPartitioner<K, V> thisPartitioner,
										final StreamPartitioner<KO, VO> otherPartitioner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
	 * @param thisPartitioner     the {@link StreamPartitioner} that specifies this {@code KTable}'s partitioning
								  strategy. Can be left {@code null} if the default partitioner is used.
	 * @param otherPartitioner    the {@link StreamPartitioner} that specifies the other {@code KTable}'s
								  partitioning strategy. Can be left {@code null} if the default partitioner is used.
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
										final StreamPartitioner<K, V> thisPartitioner,
										final StreamPartitioner<KO, VO> otherPartitioner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
	 * @param thisPartitioner     the {@link StreamPartitioner} that specifies this {@code KTable}'s partitioning
								  strategy. Can be left {@code null} if the default partitioner is used.
	 * @param otherPartitioner    the {@link StreamPartitioner} that specifies the other {@code KTable}'s
								  partitioning strategy. Can be left {@code null} if the default partitioner is used.
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Named named,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
										final StreamPartitioner<K, V> thisPartitioner,
										final StreamPartitioner<KO, VO> otherPartitioner);

Proposed Changes

The new FK join interfaces shown above will be added. Existing interfaces will be unchanged and will continue to use the default partitioner for subscription and response topics.

The new partitioners will be used by the FK join implementation as follows:

  • otherPartitioner will be used when creating the subscription topic, to ensure copartitioning of the subscription topic and the foreign-key table.
  • thisPartitioner will be used when creating the response topic, to ensure copartitioning of the response topic and the primary table.

In doing so, FK joins will be supported for source tables that do not use the default partitioner. It is up to the user to ensure copartitioning of FK join subscription and response topics by passing the correct partitioners.

Compatibility, Deprecation, and Migration Plan

Existing FK join interfaces (without custom partitioners) are unchanged and will continue to use the default partitioner for subscription and response topics. There are no plans to deprecate these existing interfaces.

Rejected Alternatives

N/A

  • No labels