Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * A suppression buffer that only supports read operations.
 *
 * @param <K> type of the record keys
 * @param <V> type of the record values
 */
public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> {

    /**
     * Returns suppressed view of the value associated with {@code key} with timestamp, if exists.
     * If not, returns null.
     *
     * @param key record key
     * @return Suppressed view of the value associated with given key, with timestamp (if exists)
     */
    NullableValueAndTimestamp<V> priorValueForBuffered(K key);

    /**
     * Returns the number of key/value pairs suppressed in this buffer.
     *
     * @return the number of key/value pairs suppressed in this buffer
     */
    int numRecords();

    /**
     * Returns the size of this buffer, in bytes.
     *
     * @return the size of the buffer, in bytes
     */
    long bufferSize();

    /**
     * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is empty.
     *
     * @return the timestamp of the oldest record in this buffer
     */
    long minTimestamp();
}

Dislike to ValueAndTimestamp, NullableValueAndTimestamp can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.

Code Block
languagejava
public final class NullableValueAndTimestamp<V> {
    protected final V value;
    protected final long timestamp;

    protected NullableValueAndTimestamp(final V value, final long timestamp) {
        this.value = value;
        this.timestamp = timestamp;
    }

    /**
     * Create a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance.
     *
     * @param value      the value
     * @param timestamp  the timestamp
     * @param <V> the type of the value
     * @return a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance
     */
    public static <V> NullableValueAndTimestamp<V> make(final V value,
                                                        final long timestamp) {
        return new NullableValueAndTimestamp<>(value, timestamp);
    }

    /**
     * Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
     * if the parameter is not {@code null}.
     *
     * @param nullableValueAndTimestamp a {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance; can be {@code null}
     * @param <V> the type of the value
     * @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}
     */
    public static <V> V getValueOrNull(final NullableValueAndTimestamp<V> nullableValueAndTimestamp) {
        return nullableValueAndTimestamp == null ? null : nullableValueAndTimestamp.value();
    }

    public V value() {
        return value;
    }

    public long timestamp() {
        return timestamp;
    }

    public ValueAndTimestamp<V> toValueAndTimestamp() {
        return value == null ? null : ValueAndTimestamp.make(value, timestamp);
    }

    @Override
    public String toString() {
        return "(" + value + "," + timestamp + ")";
    }

    @Override
    public boolean equals(final Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        final NullableValueAndTimestamp<?> that = (NullableValueAndTimestamp<?>) o;
        return timestamp == that.timestamp &&
            Objects.equals(value, that.value);
    }

    @Override
    public int hashCode() {
        return Objects.hash(value, timestamp);
    }
}

2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]

...