Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

The proposal is to leave deprecate the current punctuate() method with the semantics as-is for backward compatibility and add a new overloaded variant that takes an enum value as parameter:method 

Code Block
titleProcessor<K,V>
@Deprecated
void punctuate(long timestamp); // current

void punctuate(long timestamp, PunctuationType type); // new

where PunctuationType is

Add a new Punctuator functional interface:

Code Block
titlePunctuationTypePunctuator
enuminterface PunctuationTypePunctuator {
   EVENT_TIME,
  SYSTEM_TIME,void punctuate(long timestamp);
}

then on ProcessorContext On ProcessorContext the current schedule method would also be overloadeddeprecated and a new variant taking the Punctuator added:

Code Block
titleProcessorContext
@Deprecated
void schedule(long interval); //current, stream-time semantics

void schedule(long interval, PunctuationType type, Punctuator callback); //new
// We could allow this to be called once for each value of PunctuationType to mix approaches.

where PunctuationType is

Code Block
titlePunctuationType
enum PunctuationType {
  STREAM_TIME,
  SYSTEM_TIME,
}

Other alternative under discussion:

...

(D) Yet another alternative would be to leave current semantics as-is but allow users to provide a function determining the timestamp of the stream task. In a similar way to how the TimestampExtractor allows users to decide what the current timestamp is for a given message (event-time, system-time or other), this feature would allow users to decide what the current timestamp is for a given stream task irrespective of the arrival of messages to all of the input partitions. This approach brings more flexibility at the expense of added complexity.

 

(E) Finally, the hybrid approach (this is convenient for the use cases in Punctuate Use Cases but difficult to reason about):

 

schedule(Punctuator callback, long streamTimeInterval, long systemTimeUpperBound); // schedules punctuate at stream-time intervals with a system-time upper bound. For the pure system-time punctuation streamTimeInterval can be set to -1 == infinite

 

schedule(Punctuator callback, long streamTimeInterval); // schedules punctuate at stream-time intervals without a system-time upper bound - this is equivalent to current stream-time based punctuate

Punctuation is triggered when either:
- the stream time advances past the (stream time of the previous punctuation) + streamTimeInterval;
- or (iff systemTimeUpperBound is set) when the system time advances past the (system time of the previous punctuation) + systemTimeUpperBound

In either case:
- we trigger punctuate passing as the argument the stream time at which the current punctuation was meant to happen
- next punctuate is scheduled at (stream time at which the current punctuation was meant to happen) + streamTimeInterval

 

Compatibility, Deprecation, and Migration Plan

...

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

 
schedule(Punctuator callback, long streamTimeInterval, long systemTimeUpperBound); // schedules punctuate at stream-time intervals with a system-time upper bound - systemTimeUpperBound must be no less than streamTimeInterval
schedule(Punctuator callback, long streamTimeInterval); // schedules punctuate at stream-time intervals without a system-time upper bound - this is equivalent to current stream-time based punctuate

Punctuation is triggered when either:
- the stream time advances past the (stream time of the previous punctuation) + streamTimeInterval;
- or (iff systemTimeUpperBound is set) when the system time advances past the (system time of the previous punctuation) + systemTimeUpperBound