Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-10062
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The motivation is to source the wall-clock time the same way in normal execution environment and in tests. A generic example would be if you want to execute an action when a message arrives based on the current time and then potentially some time later. Currently, if you want to implement a solution that uses wall-clock time and performs actions based on it, you would use System.currentTimeMillis(). The problem is that tests have no control over this value because it is using 'MockTime'.
For stream time, the time is advanced before processing a new record that has a time higher than the currently known stream time. Therefore, it is possible that the ProcessorContext#timestamp may be earlier than the streamTime if the records not processed in order by time. It is simply not reliable to use solely timestamp and System.currentTimeMillis() for actions based on time.
Public Interfaces
Add new public API for currentSystemTimeMs:
org.apache.kafka.streams.processor.ProcessorContext#currentSystemTimeMs
/** * Returns current cached wall-clock system timestamp in milliseconds. * * @return the current cached wall-clock system timestamp in milliseconds */ long currentSystemTimeMs();
Add new public API for currentStreamTimeMs:
org.apache.kafka.streams.processor.ProcessorContext#currentStreamTimeMs
/** * Returns the maximum timestamp of any record yet processed by the task. * * @return the maximum timestamp of any record yet processed by the task */ long currentStreamTimeMs();
Proposed Changes
The goal is to create two separate public API's that return the current system time in milliseconds and the current stream time in milliseconds.
currentSystemTimeMs
It is expected that this will return the wall-clock time in a normal execution environment and the mocked wall-clock time in a testing environment. This time already exists internally, so the goal is to expose it to the public.
currentStreamTimeMs
It is expected that this will return the StreamTask's time from the partition group. This stream time will be the maximum timestamp of any record yet processed by the task. This would provide the actual stream time because relying on the timestamp of records is not reliable when the records might not be processed in order of time.
Compatibility, Deprecation, and Migration Plan
This change will be backwards compatible because it is adding two new API's and no other method will be changed or removed.
Rejected Alternatives
One single API with punctuator type as input
It seems more straightforward to create two separate, well-named API's instead of a single API with punctuator type as an input.