Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion

Discussion thread: thread

JIRA: KAFKA-8403

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
/**
 * A suppression buffer that only supports read operations.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> {

    /**
     * Returns suppressed view of the value associated with {@code key} with timestamp, if exists.
     * If not, returns null.
     *
     * @param key record key
     * @return Suppressed view of the value associated with given key, with timestamp (if exists)
     */
    NullableValueAndTimestamp<V> priorValueForBuffered(K key);

    /**
     * Returns the number of key/value pairs suppressed in this buffer.
     *
     * @return the number of key/value pairs suppressed in this buffer
     */
    int numRecords();

    /**
     * Returns the size of this buffer, in bytes.
     *
     * @return the size of the buffer, in bytes
     */
    long bufferSize();

    /**
     * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is empty.
     *
     * @return the timestamp of the oldest record in this buffer
     */
    long minTimestamp();
}

Dislike to ValueAndTimestamp, NullableValueAndTimestamp can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.

...

Add a new QueryableStoreType, SuppressionBufferType

This change makes SuppressionBuffer queriable.

Code Block
languagejava
public final class QueryableStoreTypes {

    ...

    /**
     * A {@link QueryableStoreType} that accepts {@link ReadOnlySuppressionBuffer}.
     *
     * @param <K> key type of the store
     * @param <V> value type of the store
     * @return {@link QueryableStoreTypes.SuppressionBufferType}
     */
    public static <K, V> QueryableStoreType<ReadOnlySuppressionBuffer<K, V>> suppressionBuffer() {
        return new SuppressionBufferType<>();
    }

    ...

    
public static class SuppressionBufferType<K, V> extends QueryableStoreTypeMatcher<ReadOnlySuppressionBuffer<K, V>> {

        SuppressionBufferType() {
            super(Collections.singleton(ReadOnlySuppressionBuffer.class));
        }

        @Override
        public ReadOnlySuppressionBuffer<K, V> create(final StateStoreProvider storeProvider,
                                                      final String storeName) {
            return new CompositeReadOnlySuppressionBuffer<>(storeProvider, this, storeName);
        }
    }
}

...