Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under DiscussionAccepted

Discussion thread: here 

JIRA: KAFKA-15347 

...

  • we propose a public class MultiVersionedKeyQuery., MultiVersionedKeyQuery
  • and a public enum ResultOrder
  • Moreover, the public interface VersionedRecordIterator is added to iterate over different values that are returned from a single-key query (each value corresponds to a timestamp). 
  • In addition, a new method is added to the VersionedKeyValueStore interface to support single-key_multi-timestamp queries.
  • Finally, a field called validTo is added to the VersionedRecord class to enable us representing tombstones as well. 

...

To be able to list the tombstones, the validTo filed Optional field is added to the VersionedRecord class. The default value of validTo is positive infinity is Optional.empty() which means the record is still valid.

Code Block
languagejava
firstline1
titleVersionedRecord.java
linenumberstrue
package org.apache.kafka.streams.state;

public final class VersionedRecord<V> {

 	/**
     * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
     *
     * @param value      the value
     * @param timestamp  the timestamp
     */
    public VersionedRecord(final V value, final long timestamp) {
        this.value = Objects.requireNonNull(value);
        this.timestamp = timestamp;
        this.validTo = Long.MAX_VALUEOptional.empty();
    }      
	
	/**
     * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
     *
     * @param value      The value
     * @param timestamp  The timestamp
     * @param validTo    The exclusive upper bound of the validity interval
     */
     public VersionedRecord(final V value, final long timestamp, final longOptional<Long> validTo);


    /**
     * Returns the {@code validTo} 
     */
     public longOptional<Long> validTo();
}


For single-key queries, MultiVersionedKeyQuery and VersionedRecordIterator classes will be used.

Code Block
languagejava
firstline1
titleVersionedRecordIterator.java
linenumberstrue
package org.apache.kafka.streams.state;

/**
 * Iterator interface of {@link V}.
 * <p>
 * Users must call its {@code close} method explicitly upon completeness to release resources,
 * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
 * Note that {@code remove()} is not supported.
 *
 * @param <V> Type of values
 */
public interface VersionedRecordIterator<V> extends Iterator<VersionedRecord<V>>, Closeable {

    @Override
    void close();
}

ResultOrder enum
It helps with specifying the order of the returned results by the query.


Code Block
languagejava
firstline1
titleResultOrder
linenumberstrue
package org.apache.kafka.streams.query;

public enum ResultOrder {
    ANY,
    ASCENDING,
    DESCENDING
}

MultiVersionedKeyQuery class

...