Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to use the in-band channel of intermediate stream to propagate control message. The design diagram is below:


 Image Modified

How it works:

  • When an input stream is consumed to the end, Samza finds out the following intermediate streams that all the inputs have been end-of-stream (through the topology of the operator graph).

  • The task will send an Eos message to all the partitions of the intermediate streams in 1.

    1. The upstream tasks will send out control message to all the downstream intermediate topic partition. The control message will be serialized and sent out with user messages in the same stream.

    2. Downstream Samza processor will consume the intermediate streams, and deserialize both user messages and control messages in SystemConsumers.

    3. The control messages will be reconciled based on the count from all the producers (tasks) from the upstream. See below for more details of different control message reconsiliation.

    Reconciliation

    For the scope of this proposal, we support two kinds of control messages: end-of-stream and watermark.

    • End-of-stream Message: This message contains a streamId that the upstream task has been consumed to the end. The downstream Samza processor counts the number of upstream tasks it receives the messages from. Once the task count matches the total count, we know the stream of that streamId reaches to the end. Each downstream task will send out a new end-of-stream message to all partitions of its downstream intermediate streams, and the current Samza processor will shut down once all the input streams (including intermediate streams) are end-of-stream.
    • Watermark Message: This message contains a timestamp of the upstream task has processed so far. The downstream Samza processor counts the number of upstream tasks it receives the messages from. Once the task count matches the total count, the downstream Samza processor will emit a watermark with the earliest timestamp across all the upstream tasks
    Each consumer of the intermediate streams will count the watermark messages received for each partition and declare end of stream once all the EOS messages have been received
    • .

    Detail details

    Intermediate Stream Message Format:

    The format of the intermediate stream message:

    Code Block
    IntermediateMessage =>  [MessageType MessageData]
      MessageType => byte
      MessageData => byte[]
    
      MessageType => [0(UserMessage), 1(Watermark), 2(EndOfStream)]
      MessageData => [UserMessage/ControlMessage]
      ControlMessage =>
         Version => int
         TaskName => String
         TaskCount => int
         Other Message Data (based on different types of control message)

    For user message, we will use the user provided serde (default is the system serde). For control message, we will use JSON serde since it is built in Samza and easy to parse.

    ControlMessage

    We will support two types of ControlMessage: EndOfStreamMessage and WatermarkMessage

    Code Block
    public abstract class ControlMessage {
      private final String taskName;
      private final int taskCount;
      private int version = 1;
    
      public ControlMessage(String taskName, int taskCount) {
        this.taskName = taskName;
        this.taskCount = taskCount;
      }
    
      public String getTaskName() {
        return taskName;
      }
    
      public int getTaskCount() {
        return taskCount;
      }
    
      public void setVersion(int version) {
        this.version = version;
      }
    
      public int getVersion() {
        return version;
      }  
    }
     
    public class EndOfStreamMessage extends ControlMessage{
     private final String streamId;
    
     private EndOfStreamMessage(String streamId, String taskName, int taskCount) {
       super(taskName, taskCount);
       this.streamId = streamId;
     }
    
     public String getStreamId() {
       return streamId;
     }
    }
    
    public class WatermarkMessage extends ControlMessage{
     private final long timestamp;
    
     private WatermarkMessage(long timestamp, String taskName, int taskCount) {
       super(taskName, taskCount);
       this.timestamp = timestamp;
     }
    
     public long getTimestamp() {   return timestamp;  }
    }

    Reconciliation

    For EOS/watermark messages, Samza will trigger the event to the consumer tasks after we received the message from all the producers (previous stage tasks). So when SystemConsumers gets EOS/watermark messages, Samza needs to count the tasks based on the total number of producing tasks. The counting works as follows:

  • For each intermediate stream partition, Samza keeps track of the end-of-stream/watermark messages received from the producing tasks, and counts the number of tasks that it has been received in the messages.

  • When the count matches the total task count, Samza will emit a end-of-stream/watermark message to the task that’s assigned for this stream partition.

  • When Samza receives watermark messages, it will emit a watermark with the earliest event time across all the stream partitions. No emission if the earliest event time doesn’t change.

    Checkpoint control messages

    For failure scenario, the latest control message received from each intermediate stream partition could be lost without checkpointing. Since we will need these messages for counting and triggering, we need to checkpoint control messages and preserve both intermediate stream partition and the producing task information. A checkpoint will be:

    Code Block
    Key => IntermediateStreamPartition.ControlMessageType
    Value => ControlMessageCheckpoint
     
    public class ControlMessageCheckpoint {
     int taskCount;
     Map<String, Long> tasksToEventTime;
    }

     

    Rejected Alternative:

    Out-of-band control stream

    In this approach the ApplicationRunner will create a separate control stream for propagating control messages. The control stream is a one-partition broadcast stream which will be consumed by each container in the application. The application runner will manage the lifecycle of the control stream: it creates it for the first time and purge the stream at the start (same as output streams when consuming from Hadoop) of future runs.




     

    How it works for end-of-stream:

    1. When an input stream is consumed to the end, Samza sends an Eos message to the control channel which includes the input topic and partition.

    2. Once the EOS messages are received from all the partitions of this input, we know the input is end-of-stream. Then the ControlStreamConsumer will inspect the stream graph and find out the intermediate stream that all its input streams to it have been all end-of-stream. If so, we mark the intermediate stream pending end-of-stream. After that, whenever a marked intermediate stream partition reaches its highest offset (high watermark in Kafka), we can emit end-of-stream message for this partition. It’s guaranteed that the partition reaches end of stream.

    Comparisons of the two approaches:

     

     

    Pros

    Cons

    Out-of-band

    - Intermediate streams are clean with only user data. This is convenient if user wants to consume it elsewhere.

    - Simple recovery from failure, just read the control stream from the beginning.

    - Less number of messages. The control messages needed is the same as the input stream partition count (n partitions). So the total will be n messages.

    - Need to correlate the out-of-band control message with the source stream, which is complex to track and requires synchronization between input streams and control stream. 

    - Need to maintain a separate stream for control messages

    In-band

    - No coordination needed between control message and input messages. When a control message is received, it is a marker that the messages sent before the control message have been consumed completely. This is critical to support general event-time watermarks.

    - Complicated failure scenario. The consumer of control messages needs to checkpoint the control messages received, so when it recovered from failure, it can still resume.

    - More control messages required. For each intermediate stream (m partitions), we need to write each task of the producer (n tasks) into it. So the total will be n*m messages.

     

    Based on the pros and cons above, we propose to use the in-band approach to support control messages.

     

     

    ...