Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • org.apache.kafka.streams.state.ReadOnlySessionStore
    • add method: KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
    • add method: KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
    • add method: AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime)
    • add method: KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
    • add method: KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
  • org.apache.kafka.streams.state.SessionStore
    • add method with default implementation calling long-based method:
      • KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
      • KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
      • AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime)
      • KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)
      • KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime)

Proposed Changes

Changes to ReadOnlySessionStore, considering new methods added on KIP-617:

Code Block
public interface ReadOnlySessionStore<K, AGG> {
    // Moving read functions from SessionStore to ReadOnlySessionStore
    KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);
 
    KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);

    AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime);
 
    // NewTo APIsbe addedhandled as part of KIP-617
    KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);

    KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom, final K keyTo, final Instant earliestSessionEndTime, final Instant latestSessionStartTime);
}

public interface SessionStore<K, AGG> extends ReadOnlySessionStore<K, AGG> {
	//Existing messages    
	KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
                                                    final long earliestSessionEndTime,
                                                    final long latestSessionStartTime);

    KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
                                                    final K keyTo,
                                                    final long earliestSessionEndTime,
                                                    final long latestSessionStartTime);


    AGG fetchSession(final K key, final long sessionStartTime, final long sessionEndTime);

    //Default implementations
    @Override
    default KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
                                                            final Instant earliestSessionEndTime,
                                                            final Instant latestSessionStartTime) {
        return findSessions(
            key,
            ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
            ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
    }


	@Override
    default AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) {
        return fetchSession(
            key,
            ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
            ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
    }

    @Override
    default AGG fetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) {
        return fetchSession(
            key,
            ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
            ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
    }

	
    // To be handled as part of KIP-617
	KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
                                                            final long earliestSessionEndTime,
                                                            final long latestSessionStartTime);

    KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K keyFrom,
                                                            final K keyTo,
                                                            final long earliestSessionEndTime,
                                                            final long latestSessionStartTime);

    @Override
    default KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K key,
                                                                    final Instant earliestSessionEndTime,
                                                                    final Instant latestSessionStartTime) {
        return backwardFindSessions(
            key,
            ApiUtils.validateMillisecondInstant(earliestSessionEndTime, prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, "earliestSessionEndTime")),
            ApiUtils.validateMillisecondInstant(latestSessionStartTime, prepareMillisCheckFailMsgPrefix(latestSessionStartTime, "latestSessionStartTime")));
    }

	@Override
    default AGG backwardFetchSession(final K key, final Instant sessionStartTime, final Instant sessionEndTime) {
        return backwardFetchSession(
            key,
            ApiUtils.validateMillisecondInstant(sessionStartTime, prepareMillisCheckFailMsgPrefix(sessionStartTime, "sessionStartTime")),
            ApiUtils.validateMillisecondInstant(sessionEndTime, prepareMillisCheckFailMsgPrefix(sessionEndTime, "sessionEndTime")));
    }
}


Compatibility, Deprecation, and Migration Plan

...