Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: add VersionedBytesStore interface for completeness

...

  • a new interface for versioned stores: VersionedKeyValueStore<K, V> extends StateStore 
  • a new interface for versioned store suppliers, and a helper interface: VersionedBytesStoreSupplier extends KeyValueBytesStoreSupplier , with helper VersionedBytesStore 
  • three new methods in Stores.java:
    • two for creating a persistent, versioned store supplier: Stores#persistentVersionedKeyValueStore(...) 
    • another for creating a StoreBuilder from a versioned supplier: Stores#versionedKeyValueStoreBuilder(...) 
  • a new static method in ValueAndTimestamp.java for creating ValueAndTimestamp instances where the value may be null: ValueAndTimestamp#makeAllowNullable(...) 

...

Code Block
package org.apache.kafka.streams.state;

/**
 * A store supplier that can be used to create one or more versioned key-value stores,
 * specifically, {@link VersionedBytesStore} instances.
 * <p>
 * Rather than representing the returned store as a {@link VersionedKeyValueStore<Bytes VersionedKeyValueStore} of
 * type <Bytes, byte[]>},
 * this supplier interface represents the returned store as a
 * {@link KeyValueStore} KeyValueStore<Bytesof type <Bytes, byte[]> (via {@link VersionedBytesStore}) in order to be compatible with
 * existing DSL methods for passing key-value stores such as {@link StreamsBuilder#table(String, Materialized)}
 * and {@link KTable#filter(Predicate, Materialized)}. A {@link@code VersionedKeyValueStore<Bytes, byte[]>}
 * is represented as a {@link@code KeyValueStore KeyValueStore<Bytes, byte[]>} by interpreting the
 * value bytes as containing record timestamp information in addition to raw record values.
 */
public interface VersionedBytesStoreSupplier extends KeyValueBytesStoreSupplier {

    /**
     * Returns the history retention (in milliseconds) that stores created from this supplier will have.
     * This value is used to set compaction configs on store changelog topics (if relevant).
	 *
     * @return history retention, i.e., length of time that old record versions are available for
     *         query from a versioned store
     */
    long historyRetentionMs();
}

...

To alleviate this pain, we could expose an additional helper method for the conversion and/or add an additional method to VersionedBytesStoreSupplier which directly returns a VersionedKeyValueStore<Bytes, byte[]>  if implemented. The latter allows us to save on the two additional layers of translation, at the expense of complicating one of the interfaces. Unless reviewers feel strongly about this (avoiding the extra translation and/or making it easier for users to create their own VersionedKeyValueStore implementations), I propose to leave these options out for now and we can always revisit them later.

For completeness, here's the new VersionedBytesStore interface which VersionedBytesStoreSupplier instances will return. Unless a user chooses to implement their own VersionedBytesStoreSupplier (i.e., in order to implement a custom versioned store to pass to the DSL or to the new Stores#versionedKeyValueStoreBuilder() method), then users will not need to interact with this interface.

Code Block
package org.apache.kafka.streams.state;

/**
 * A representation of a versioned key-value store as a {@link KeyValueStore} of type <Bytes, byte[]>.
 * See {@link VersionedBytesStoreSupplier} for more.
 */
public interface VersionedBytesStore extends KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {

    /**
     * The analog of {@link VersionedKeyValueStore#get(Object, long)}.
     */
    byte[] get(Bytes key, long timestampTo);
}

Internally, this interface will be used to assist in the representation of VersionedKeyValueStore<Bytes, byte[]>  as KeyValueStore<Bytes, byte[]> .

Additional Interface Changes

...