Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: clarified the role of the lexicographical type serialzier

...

We propose to add a new type of state that can be used to create simpler and more efficient implementations of the operators that either maintain a MapState<?, List<?>> , and/or that require range operations. This includes temporal joins, operators that sort their input streams by timestamp, and a few other cases:

...

Several of these operators are using MapState<Long, List<T>> to store lists of records associated with timestamps. Updating those lists is expensive, especially when they are stored in RocksDB, as that requires deserializing and reserializing the lists when appending new list entries.

With this proposal, the common case of MapState<Long, List<T>> becomes BinarySortedState<Long, T>. We can use the timestamps that were the MapState’s keys as The implementation can exploit namespaces in the state backends for representing the keys (timestamps, in this case), and the MapState’s values can be stored as lists using each state backend’s native ListState implementation.

Another opportunity for optimization is to take advantage of the fact that RocksDB stores its key/value pairs in order, sorted by key. We can use this to implement efficient range scans.

But rather than limiting the scope to using Longs as the keys in BinarySortedState<UK, UV>, we propose instead to support any type for which a lexicographically ordered type serializer can be implemented. This means that when comparing any two keys, a and b, their binary, serialized representations will be ordered in exactly the same way as the original objects, a and b. We can then take advantage of the fact that RocksDB stores its key/value pairs in order, sorted by the serialized representations of the keys, to implement efficient range scans.

This FLIP also makes it possible for users to create custom time windows that are implemented as keyed process functions that achieve performance similar to that of the This FLIP also makes it possible for users to create custom time windows that are implemented as keyed process functions that achieve performance similar to that of the built-in window operators. This isn’t currently possible because the window operators take advantage of the state backends’ namespaces to optimize state access in a way as outlined above, something that can’t be done with Flink’s public APIs. By exposing these namespaces in this limited, controlled way, users will be able to achieve comparable performance.

An implementation is underway. POC benchmarking Benchmarking has shown a ~130% speedup increase in throughput for sorting, and ~60% for temporal joins.

...

Code Block
languagejava
@PublicEvolving
public class BinarySortedStateDescriptor<UK, UV> extends ListStateDescriptor<UV> {

    /**
     * Create a new {@code BinarySortedStateDescriptor} with the given name and the given type
     * serializers.
     *
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keySerializer The type serializer for the keys in the state.
     * @param valueSerializer The type serializer for the values in the state.
     */
    public BinarySortedStateDescriptor(
            String name,
            LexicographicalTypeSerializer<UK> keySerializer,
            LexicographicalTypeSerializer<UK> keySerializer,
            TypeSerializer<UV> valueSerializer);   TypeSerializer<UV> valueSerializer);

    /**
     * Create a new {@code BinarySortedStateDescriptor} with the given name and the given type
     * information.
     *
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keySerializer The type serializer for the keys in the state.
     * @param valueTypeInfo The type information for the values in the state.
     */
    public BinarySortedStateDescriptor(
            String name,
            LexicographicalTypeSerializer<UK> keySerializer,
            TypeInformation<UV> valueTypeInfo);

    /**
     * Create a new {@code BinarySortedStateDescriptor} with the given name and the given type
     * information.
     *
     * <p>If this constructor fails (because it is not possible to describe the type via a class),
     * consider using the {@link #BinarySortedStateDescriptor(String, LexicographicalTypeSerializer,
     * TypeInformation)} constructor.
     *
     * @param name The name of the {@code MapStateDescriptor}.
     * @param keySerializer The type serializer for the keys in the state.
     * @param valueType The class of the type of values in the state.
     */
    public BinarySortedStateDescriptor(
            String name, LexicographicalTypeSerializer<UK> keySerializer, Class<UV> valueType);

    /**
     * Gets the serializer for the keys in the state.
     *
     * @return The serializer for the keys in the state.
     */
    public LexicographicalTypeSerializer<UK> getKeySerializer();
}


Here the LexicographicalTypeSerializer<T> is a special TypeSerializer which provides an efficient Java object comparator based on the data type's binary sort order. The proposal is to initially provide only an internal implementation of a LexicographicLongSerializer in order to support BinarySortedState<Long, T> out of the box. Users will be able to extend LexicographicalTypeSerializer if they want to use other types as the keys for ordering the state.

Code Block
languagejava
@PublicEvolving
public abstract class LexicographicalTypeSerializer<T> extends TypeSerializer<T> {

    /**
     * CreateProvides an aefficient newJava {@codeobject BinarySortedStateDescriptor}comparator withbased theon giventhis namedata andtype's thebinary givensort typeorder.
     * information.
     *
 @return If empty, implementations can *compare @parambased nameon Theserialized nameform ofand theusing {@code MapStateDescriptor}.@link
     *  @param keySerializer The typeBinarySortOrderComparator#INSTANCE}, serializerotherwise foruse the keysprovided in the state(efficient) comparator.
     */
 @param valueTypeInfo The typepublic informationOptional<Comparator<T>> for the values in the state.findComparator() {}

     */
    public BinarySortedStateDescriptor(public static final class BinarySortOrderComparator implements Comparator<byte[]> {

        public static final BinarySortOrderComparator INSTANCE = String name,new BinarySortOrderComparator();

        @Override
    LexicographicalTypeSerializer<UK> keySerializer,
   public int compare(byte[] first, byte[] second) {}

   TypeInformation<UV> valueTypeInfo);

    /**

         * CreateVariant aof new {@code BinarySortedStateDescriptor} with{@link #compare(byte[], byte[])} that starts the givencomparison namefrom anda the given
 type
     * information.
  * position. This *
can be used if you *know <p>Ifthat thisthe constructorfirst fails (because it is not possible to describe the type via a class),<code>from</code> bytes are equal.
         *
     * consider using the {@link #BinarySortedStateDescriptor(String, LexicographicalTypeSerializer,
     * TypeInformation)} constructor.
 * @param first the first object to be compared.
    *
     * @param namesecond Thethe namesecond ofobject theto {@codebe MapStateDescriptor}compared.
      * @param keySerializer The* type@param serializerfrom forarray theposition keysto instart the comparison state.at
     * @param valueType The class* of@return thea typenegative ofinteger, valueszero, inor thea state.
positive integer as the first */
argument is less
  public BinarySortedStateDescriptor(
      *     than, Stringequal nameto, LexicographicalTypeSerializer<UK>or keySerializer,greater Class<UV> valueType);

than the second.
    /**
     * Gets the serializer for the keys in the state.
@see #compare(byte[], byte[])
         */
     * @return The serializerpublic for the keys in the state.
     */int compare(byte[] first, byte[] second, int from) {}
    public LexicographicalTypeSerializer<UK> getKeySerializer();}
}


Here's an example of how these pieces fit togetherHere the LexicographicalTypeSerializer is a special TypeSerializer which defines a binary sort order to allow retrieving and iterating values in order. The proposal is to initially provide only an internal implementation of a LexicographicLongSerializer that supports BinarySortedState<Long, T> out of the box:

Code Block
languagejava
private transient BinarySortedState<Long, Long> events;

BinarySortedStateDescriptor<Long, Long> desc =
    new BinarySortedStateDescriptor<>(
        "Events",
        LexicographicLongSerializer.INSTANCE,
        Types.LONG);

events = getRuntimeContext()
    .getBinarySortedState(desc);


As illustrated above, RuntimeContext (and KeyedStateStore) need to be extended with a method for registering BinarySortedState:

...

RocksDBBinarySortedNavigator is added to support the various lookup and range operations. It takes advantage of RocksDB's native iterators.

HashMapStateBackend

HeapBinarySortedNavigator is needed to provide the same lookup and range operations for the heap-based state backend. The proposed implementation uses a

...

In addition to javadocs, state#using-keyed-state should needs to be updated to include BinarySortedState.

Compatibility, Deprecation, and Migration Plan

...