You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

 

Status

Current state: "Under Discussion"

Discussion threadhere

JIRAhere

Released: ...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently punctuate is triggered by the advance of the task's timestamp, which is the minimum of the timestamps of all input partitions. By default this means the event-time from the messages but a custom TimestampExtractor can be provided to use system-time instead of event-time. However, in that case the triggering of puncuate is still driven by the arrival of messages to all partitions and not by the advance of the system-time itself. The effect is that if any one of the input partitions has messages arriving irregularly, punctuate will be also be called at irregular intervals and in the extreme case not called at all if any one of the input partitions doesn't receive any messages.

Public Interfaces

See below

Proposed Changes

The proposal is to leave the current punctuate() method with the semantics as-is for backward compatibility and add a new overloaded variant that takes an enum value as parameter:

Processor<K,V>
void punctuate(long timestamp); // current

void punctuate(long timestamp, PunctuationType type); // new

where PunctuationType is

PunctuationType
enum PunctuationType {
  EVENT_TIME,
  SYSTEM_TIME,
}

then on ProcessorContext the schedule method would also be overloaded:

ProcessorContext
void schedule(long interval); //current

void schedule(long interval, PunctuationType type); //new
// We could allow this to be called once for each value of PunctuationType to mix approaches.

Other alternative under discussion:

(A) Change the semantics of punctuate() to be purely "time driven", instead of "part time driven, and part data-driven". That is, the punctuate function triggering will no longer be dependent whether there are new data arriving, and hence not on the timestamps of the arriving data either. Instead it will be triggered only by system wall-clock time.

As for users, the programming pattern would be:

  1. If you need to add a pure time-driven computation logic, use punctuate().
  2. If you need to add a data-driven computation logic, you should always use process(), and in process() users can choose to trigger some specific logic only every some time units, but still when a new data has arrived and hence being processed. 

The above approach will have consequences for implementations built against the current semantics.

 

(B) An alternative could be to leave current semantics as the defaults for the punctuate method but allow a configuration switch between event and system time.

 

(C) Another alternative would be to leave current semantics as-is and add another callback method to the Processor interface that can be scheduled similarly to punctuate() but would always be called at fixed, wall clock based intervals

 

(D) Yet another alternative would be to leave current semantics as-is but allow users to provide a function determining the timestamp of the stream task. In a similar way to how the TimestampExtractor allows users to decide what the current timestamp is for a given message (event-time, system-time or other), this feature would allow users to decide what the current timestamp is for a given stream task irrespective of the arrival of messages to all of the input partitions. This approach brings more flexibility at the expense of added complexity.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

 
  • No labels