You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-10062

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Testing with the TopologyTestDriver will lead to a time mismatch between wall-clock time and system time. This is because you can programmatically advance the wall-clock using advanceWallClockTime. The goal is to give the consumer the internal system time instead of them having to rely on System.currentTimeMillis(). The same is also true for stream time.

Public Interfaces

Add new public API for currentSystemTimeMs:

org.apache.kafka.streams.processor.ProcessorContext#currentSystemTimeMs

    /**	
     * Returns current wall-clock system timestamp in milliseconds.
	 *
     * @return the current wall-clock system timestamp in milliseconds
     */
	long currentSystemTimeMs();


Add new public API for currentStreamTimeMs:

org.apache.kafka.streams.processor.ProcessorContext#currentStreamTimeMs

	/**	
     * Return the stream-time of this partition group defined as the largest timestamp seen across all partitions.
	 *
     * @return the stream-time of this partition group defined as the largest timestamp seen across all partitions
     */
	long currentStreamTimeMs();

Proposed Changes

The goal is to create two separate public API's that return the current system time in milliseconds and the current stream time in milliseconds.


currentSystemTimeMs

Move existing internal API (org.apache.kafka.streams.processor.internals.InternalProcessorContext#currentSystemTimeMs)

to become a new public API (org.apache.kafka.streams.processor.ProcessorContext#currentSystemTimeMs).


currentStreamTimeMs

Create new public API (org.apache.kafka.streams.processor.ProcessorContext#currentStreamTimeMs).

Implement this API (org.apache.kafka.streams.processor.ProcessorContextImpl#currentStreamTimeMs).

    @Override
    public long currentStreamTimeMs() {
        throwUnsupportedOperationExceptionIfStandby("currentStreamTimeMs");
        return streamTask.streamTime();
    }


Compatibility, Deprecation, and Migration Plan

N/A

Rejected Alternatives

One single API with punctuator type as input

It seems more straightforward to create two separate, well-named API's instead of a single API with punctuator type as an input.

  • No labels