...
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA: KAFKA-15347
...
- we propose a public class MultiVersionedKeyQuery., MultiVersionedKeyQuery
- and a public enum ResultOrder
- Moreover, the public interface VersionedRecordIterator is added to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp).
- In addition, a new method is added to the VersionedKeyValueStore interface to support single-key_multi-timestamp queries.
- Finally, a field called validTo is added to the VersionedRecord class to enable us representing tombstones as well.
...
To be able to list the tombstones, the validTo filed Optional field is added to the VersionedRecord class. The default value of validTo is positive infinity is Optional.empty() which means the record is still valid.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.state; public final class VersionedRecord<V> { /** * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. * * @param value the value * @param timestamp the timestamp */ public VersionedRecord(final V value, final long timestamp) { this.value = Objects.requireNonNull(value); this.timestamp = timestamp; this.validTo = Long.MAX_VALUEOptional.empty(); } /** * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}. * * @param value The value * @param timestamp The timestamp * @param validTo The exclusive upper bound of the validity interval */ public VersionedRecord(final V value, final long timestamp, final longOptional<Long> validTo); /** * Returns the {@code validTo} */ public longOptional<Long> validTo(); } |
For single-key queries, MultiVersionedKeyQuery and VersionedRecordIterator classes will be used.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.state;
/**
* Iterator interface of {@link V}.
* <p>
* Users must call its {@code close} method explicitly upon completeness to release resources,
* or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
* Note that {@code remove()} is not supported.
*
* @param <V> Type of values
*/
public interface VersionedRecordIterator<V> extends Iterator<VersionedRecord<V>>, Closeable {
@Override
void close();
} |
ResultOrder enum
It helps with specifying the order of the returned results by the query.
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
package org.apache.kafka.streams.query; public enum ResultOrder { ANY, ASCENDING, DESCENDING } |
MultiVersionedKeyQuery class
...