You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-10062

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The motivation is to source the wall-clock time the same way in normal execution environment and in tests. A generic example would be if you want to execute an action when a message arrives based on the current time and then potentially some time later. Currently, if you want to implement a solution that uses wall-clock time and performs actions based on it, you would use System.currentTimeMillis(). The problem is that tests have no control over this value because it is using 'MockTime'.

For stream time, the time is advanced before processing a new record that has a time higher than the currently known stream time. Therefore, it is possible that the ProcessorContext#timestamp may be earlier than the streamTime if the records not processed in order by time. It is simply not reliable to use solely timestamp and System.currentTimeMillis() for actions based on time.

Public Interfaces

Add new public API for currentSystemTimeMs:

org.apache.kafka.streams.processor.ProcessorContext#currentSystemTimeMs

    /**	
     * Returns current wall-clock system timestamp in milliseconds.
	 *
     * @return the current wall-clock system timestamp in milliseconds
     */
	long currentSystemTimeMs();


Add new public API for currentStreamTimeMs:

org.apache.kafka.streams.processor.ProcessorContext#currentStreamTimeMs

	/**	
     * Return the stream-time of this partition group defined as the largest timestamp seen across all partitions.
	 *
     * @return the stream-time of this partition group defined as the largest timestamp seen across all partitions
     */
	long currentStreamTimeMs();

Proposed Changes

The goal is to create two separate public API's that return the current system time in milliseconds and the current stream time in milliseconds.


currentSystemTimeMs

It is expected that this will return the wall-clock time in a normal execution environment and the mocked wall-clock time in a testing environment. This time already exists internally, so the goal is to expose it to the public.


currentStreamTimeMs

It is expected that this will return the StreamTask's time from the partition group. This would provide the actual stream time because relying on the timestamp of records is not reliable when the records might not be processed in order of time.

Compatibility, Deprecation, and Migration Plan

This change will be backwards compatible because it is adding two new API's and no other method will be changed or removed.

Rejected Alternatives

One single API with punctuator type as input

It seems more straightforward to create two separate, well-named API's instead of a single API with punctuator type as an input.

  • No labels