Table of Contents |
---|
Status
Current state: "Under Discussion"Adopted
Vote thread: here
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
This proposal aims to solve this issue, adding Instant-based alternative methods to the interactive query API for Session Stores.
KIP-617 is already moving read methods from SessionStore into ReadOnlySessionStore
...
- org.apache.kafka.streams.state.ReadOnlySessionStore
- add method: KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- add method: KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- add method: AGG fetchSession(final K key, final long startTimeInstant sessionStartTime, final Instant sessionEndTime)
- add method: KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- add method: KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- org.apache.kafka.streams.state.SessionStore
- add method with default implementation calling long-based method:
- KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime)
- KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
- add method with default implementation calling long-based method:
Proposed Changes
Changes to ReadOnlySessionStore, considering new methods added on KIP-617:
Code Block |
---|
public interface ReadOnlySessionStore<K, AGG> { // Moving read functions from SessionStore to ReadOnlySessionStore defaultKeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime); KeyValueIterator<Windowed<K>, AGG> findSessions(final K keykeyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime); AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime); { // To be throw new UnsupportedOperationException("Moved from SessionStore"); } defaulthandled as part of KIP-617 KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime); KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime); } public interface SessionStore<K, AGG> extends ReadOnlySessionStore<K, AGG> { //Existing messages KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); AGG fetchSession(final K key, final long sessionStartTime, final long sessionEndTime); //Default implementations @Override default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime) { return findSessions( throw new UnsupportedOperationException("Moved from SessionStore"key, ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); } @Override default AGG fetchSession(final K key, final longInstant startTimesessionStartTime, final Instant sessionEndTime) { return throw new UnsupportedOperationException("Moved from SessionStore")fetchSession( key, ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")), ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime"))); } @Override default AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) { return fetchSession( key, ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")), ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime"))); } // NewTo APIsbe addedhandled as part of KIP-617 KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime); KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime); @Override default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime) { throw new UnsupportedOperationException()return backwardFindSessions( key, ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")), ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime"))); } @Override default KeyValueIterator<Windowed<K>, AGG> backwardFindSessionsAGG backwardFetchSession(final K keyFromkey, final KInstant keyTosessionStartTime, final Instant earliestSessionEndTime, final Instant latestSessionStartTime) { sessionEndTime) { return backwardFetchSession( key, ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")), throw new UnsupportedOperationException( ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime"))); } } |
Compatibility, Deprecation, and Migration Plan
- Default implementations will avoid changes on custom Custom IQ Session Stores will have to implement new methods if haven't implement read-write APIs yet.
Rejected Alternatives
None yet.