Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Problem Description

In SAMZA-974, we built a mechanism to support batch job with bounded data source. The feature provides the following functionality:

...

This works for applications which do not have any message shuffling phase. With the introduction of partitionBy operators, the processors can send output to any partitions of intermediate streams, and the intermediate streams will be consumed again for further processing. Since the end of stream tokens are not carried over from the original input streams to the intermediate streams, the job won’t be able to shut down even if all the input streams reach to the end. To address this problem, we need to extend the existing end-of-stream feature to support applications with intermediate streams.

Goals

  • Build the general support for watermark control messages through intermediate streams, and do reconciliation on the consumers.

  • Use the watermark control messages to support end-of-stream originating from the source input to the following intermediate streams created by the partitionBy operators.

  • Samza will shut down the application once all the input streams reach end-of-stream.

  • The solution should still work if we split the application into multiple jobs based on the partitionBy operators.

Proposed Design

In the following we discussed two approaches to support this feature and compare them, and we propose to use the second approach (In-band watermark).

Approach 1: Out-of-band

...

control stream

In this approach the ApplicationRunner will create a separate control stream for propagating watermarks. The control stream is a one-partition broadcast stream which will be consumed by each container in the application. The application runner will manage the lifecycle of the control stream: it creates it for the first time and purge the stream at the start (same as output streams when consuming from Hadoop) of future runs.

...


Image Added



 

How it works for end-of-stream:

  1. When an input stream is consumed to the end, Samza send an Eos watermark message to the control channel which includes the input topic and partition.

  2. Once all the EOS messages are received (based on the input partition count), we know the input is end-of-stream. For any following intermediate stream whose input streams are all end-of-stream, it will be marked as pending EOS. After that, whenever a marked intermediate stream partition reaches its highest offset (high watermark in Kafka), we can emit end-of-stream message for this partition. It’s guaranteed that the partition is end of stream.


Approach 2: In-band

watermark

control messages

In this approach we don’t use a separate stream to keep the watermark control messages. We use the intermediate streams themselves as both data and watermarkcontrol.


 Image RemovedImage Added

How it works for end-of-stream:

  1. When an input stream is consumed to the end, Samza finds out the following intermediate streams that all the inputs have been end-of-stream (through the topology of the operator graph).

  2. The task will send an Eos watermark message to all the partitions of the intermediate streams in 1.

  3. Each consumer of the intermediate streams will count the watermark messages received for each partition and declare end of stream once all the EOS messages have been received.


Comparisons of the two approaches:

 

 

Pros

Cons

Approach 1

- Intermediate streams are clean with only user data. This is convenient if user wants to consume it elsewhere.

- Simple recovery from failure, just read the control stream from the beginning.

- Less number of messages.

- Need to correlate the out-of-band control message with the source stream, which is complex to track and requires synchronization between input streams and control stream. 

- Need to maintain a separate stream for control messages

Approach 2

- No synchronization needed between watermark message and input messages. The watermark can conclude the input messages before this watermark have been complete.This is critical to support general event-time watermarks.

- Complicated failure scenario of the second job. It needs to checkpoint all the watermark messages received, so when it recovered from failure, it can still count.

- More messages required to write to each partition of the downstream processor.

 

Based on the pros and cons above, we propose to use the in-band approach to support watermarks.

Detail details

Intermediate Stream Message Format:

The format of the intermediate stream message:

Code Block
IntermediateMessage =>  [MessageType MessageData]
  MessageType => byte
  MessageData => byte[]

  MessageData => [UserMessage/WatermarkMessageControlMessage]
  WatermarkMessageControlMessage =>
     Type => int
     TimestampVersion => longint
     TaskNameOther =>Message String
Data (based on different types TaskCountof =>control intmessage)

For user message, we will use the user provided serde (default is the system serde). For watermark control message, we will use JSON serde since it is built in Samza and easy to parse.

ControlMessage

We will support two types of ControlMessage: EndOfStreamMessage and WatermarkMessage

Code Block
public class WatermarkMessageEndOfStreamMessage extends {
 private final int type;ControlMessage{
 private final long timestamp;
 private final String taskName;
 private final int taskCount;

 private WatermarkMessage(int type, long timestamp, String taskName, int taskCount) {
   this.type = typesuper(ControlMessageType.EndOfStream.ordinal());
   this.timestamp = timestamp;
   this.taskName = taskName;
   this.taskCount = taskCount;
 }

 public intlong getTypegetTimestamp() {   return typetimestamp;  }

 public longString getTimestampgetTaskName() {   return timestamptaskName;  }

 public Stringint getTaskNamegetTaskCount() {   return taskNametaskCount;  }
}
 
public publicclass intWatermarkMessage getTaskCount()extends ControlMessage{
 private  returnfinal long timestamp;
 private final String taskName;
 private final int taskCount; }

 publicprivate static WatermarkMessage endOfStream(WatermarkMessage(long timestamp, String taskName, int taskCount) {
   return new WatermarkMessage(WatermarkType.END_OF_STREAMsuper(ControlMessageType.Watermark.ordinal(), Long.MAX_VALUE,);
   this.timestamp = timestamp;
   taskName,this.taskName = taskName;
   this.taskCount = taskCount);
 }

 public static WatermarkMessage eventTime(long timestamp, String taskName, int taskCount) {
  return new WatermarkMessage(WatermarkType.EVENT_TIME.ordinal(), timestamp, taskName, taskCount);
 long getTimestamp() {   return timestamp;  }

 public String getTaskName() {   return taskName;  }

 public int getTaskCount() {   return taskCount; }
}
Event-time Watermark Message

Reconciliation

When SystemConsumers gets watermark messages, Samza needs to reconcile based on the task counts. The reconciliation works as follows:

  1. For each intermediate stream partition, Samza keeps track of the end-of-stream/watermark messages received from the producing tasks, and counts the number of tasks that it has received watermark been received in the messages from.

  2. When the count matches the total task count, Samza will emit a end-of-stream/watermark message to the task that’s assigned for this stream partition.

  3. When Samza received further watermark messages, it will emit a watermark with the earliest event time across all the stream partitions. No emission if the earliest event time doesn’t change.

Checkpoint watermarks

For failure scenario, the latest watermark control message received from each intermediate stream partition will be lost without checkpointing. The checkpoint of watermark control messages need to preserve both intermediate stream partition and the producing task information. A checkpoint will be:

 

Code Block
Key => IntermediateStreamPartition.WatermarkTypeControlMessageType
Value => WatermarkCheckpointControlMessageCheckpoint
 
public class WatermarkCheckpointControlMessageCheckpoint {
 int taskCount;
 Map<String, Long> tasksToEventTime;
}

 

 


 

 

...