Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion"Adopted

Vote thread: here

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This proposal aims to solve this issue, adding Instant-based alternative methods to the interactive query API for Session Stores.

KIP-617 is already moving read methods from SessionStore into ReadOnlySessionStore

...

  • org.apache.kafka.streams.state.ReadOnlySessionStore
    • add method: KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
    • add method: KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
    • add method: AGG fetchSession(final K key, final long startTimeInstant sessionStartTime, final Instant sessionEndTime)
    • add method: KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
    • add method: KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
  • org.apache.kafka.streams.state.SessionStore
    • add method with default implementation calling long-based method:
      • KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
      • KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
      • AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime)
      • KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
      • KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)

Proposed Changes

Changes to ReadOnlySessionStore, considering new methods added on KIP-617:

Code Block
public interface ReadOnlySessionStore<K, AGG> {
    // Moving read functions from SessionStore to ReadOnlySessionStore
     defaultKeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);
 
    KeyValueIterator<Windowed<K>, AGG> findSessions(final K keykeyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);

    AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime);
 {
    // To be  throw new UnsupportedOperationException("Moved from SessionStore");
    }
 
    defaulthandled as part of KIP-617
    KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);

    KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);
}

public interface SessionStore<K, AGG> extends ReadOnlySessionStore<K, AGG> {
	//Existing messages    
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
                                                    final long earliestSessionEndTime,
                                                    final long latestSessionStartTime);

    KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
                                                    final K keyTo,
                                                    final long earliestSessionEndTime,
                                                    final long latestSessionStartTime);


    AGG fetchSession(final K key, final long sessionStartTime, final long sessionEndTime);

    //Default implementations
    @Override
    default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
                                                            final Instant earliestSessionEndTime,
                                                            final Instant latestSessionStartTime) {
        return findSessions(
            throw new UnsupportedOperationException("Moved from SessionStore"key,
            ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
            ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
    }


	@Override
    default AGG fetchSession(final K key, final longInstant startTimesessionStartTime, final Instant sessionEndTime) {
        return   throw new UnsupportedOperationException("Moved from SessionStore")fetchSession(
            key,
            ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
            ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
    }

    @Override
    default AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) {
        return fetchSession(
            key,
            ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
            ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
    }

	
    // NewTo APIsbe addedhandled as part of KIP-617
	KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
                                                            final long earliestSessionEndTime,
                                                            final long latestSessionStartTime);

    KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom,
                                                            final K keyTo,
                                                            final long earliestSessionEndTime,
                                                            final long latestSessionStartTime);

    @Override
    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
                                                                    final Instant earliestSessionEndTime,
                                                                    final Instant latestSessionStartTime) {
        throw new UnsupportedOperationException()return backwardFindSessions(
            key,
            ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
            ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
    }

	@Override
    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessionsAGG backwardFetchSession(final K keyFromkey, final KInstant keyTosessionStartTime, final Instant earliestSessionEndTime, final Instant latestSessionStartTime) {
 sessionEndTime) {
        return backwardFetchSession(
            key,
            ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
           throw new UnsupportedOperationException( ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
    }
}


Compatibility, Deprecation, and Migration Plan

  • Default implementations will avoid changes on custom Custom IQ Session Stores will have to implement new methods if haven't implement read-write APIs yet.

Rejected Alternatives

None yet.