THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
/** * A suppression buffer that only supports read operations. * * @param <K> type of the record keys * @param <V> type of the record values */ public interface ReadOnlySuppressionBuffer<K, V> extends ReadOnlyKeyValueStore<K, V> { /** * Returns suppressed view of the value associated with {@code key} with timestamp, if exists. * If not, returns null. * * @param key record key * @return Suppressed view of the value associated with given key, with timestamp (if exists) */ NullableValueAndTimestamp<V> priorValueForBuffered(K key); /** * Returns the number of key/value pairs suppressed in this buffer. * * @return the number of key/value pairs suppressed in this buffer */ int numRecords(); /** * Returns the size of this buffer, in bytes. * * @return the size of the buffer, in bytes */ long bufferSize(); /** * Returns the timestamp of the oldest record in this buffer. {@link Long#MAX_VALUE} iff the buffer is empty. * * @return the timestamp of the oldest record in this buffer */ long minTimestamp(); } |
Dislike to ValueAndTimestamp, NullableValueAndTimestamp can contain null as its value; it was introduced since suppression buffer can return a null value with a timestamp.
Code Block | ||
---|---|---|
| ||
public final class NullableValueAndTimestamp<V> {
protected final V value;
protected final long timestamp;
protected NullableValueAndTimestamp(final V value, final long timestamp) {
this.value = value;
this.timestamp = timestamp;
}
/**
* Create a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance.
*
* @param value the value
* @param timestamp the timestamp
* @param <V> the type of the value
* @return a new {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance
*/
public static <V> NullableValueAndTimestamp<V> make(final V value,
final long timestamp) {
return new NullableValueAndTimestamp<>(value, timestamp);
}
/**
* Return the wrapped {@code value} of the given {@code valueAndTimestamp} parameter
* if the parameter is not {@code null}.
*
* @param nullableValueAndTimestamp a {@link org.apache.kafka.streams.state.NullableValueAndTimestamp} instance; can be {@code null}
* @param <V> the type of the value
* @return the wrapped {@code value} of {@code valueAndTimestamp} if not {@code null}; otherwise {@code null}
*/
public static <V> V getValueOrNull(final NullableValueAndTimestamp<V> nullableValueAndTimestamp) {
return nullableValueAndTimestamp == null ? null : nullableValueAndTimestamp.value();
}
public V value() {
return value;
}
public long timestamp() {
return timestamp;
}
public ValueAndTimestamp<V> toValueAndTimestamp() {
return value == null ? null : ValueAndTimestamp.make(value, timestamp);
}
@Override
public String toString() {
return "(" + value + "," + timestamp + ")";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final NullableValueAndTimestamp<?> that = (NullableValueAndTimestamp<?>) o;
return timestamp == that.timestamp &&
Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value, timestamp);
}
} |
2. Add Stores#[SuppressionBytesStoreSupplier, suppressionBufferBuilder]
...