Problem Description
In SAMZA-974, we built a mechanism to support batch job with bounded data source. The feature provides the following functionality:
...
This works for applications which do not have any message data shuffling phase. With the introduction of partitionBy operators, the processors can send output to any partitions of intermediate streams, and the intermediate streams will be consumed again for further processing. Since the end of stream tokens are not carried over from the original input streams to the intermediate streams, the job won’t be able to shut down even if all the input streams reach to the end. To address this problem, we need to extend the existing end-of-stream feature to support applications with intermediate streams.
The same problem exists for propagating watermarks needed for event time processing. After the shuffling phase, the downstream stage need to compute the event time based on all the watermarks received from the upstream stage producers. So for any downstram task, it needs to be able to consumer watermark messages from upstream tasks and emit watermarks based on the message timestamp.
Goals
Build the general support for control messages through intermediate streams, and do reconciliation on the consumers.
Use the control messages to support end-of-stream and watermark originating from the source input upstream tasks to the following downstream tasks connected by intermediate streams created by the partitionBy operators.
For end-of-stream messages, Samza will shut down the application once all the input streams reach end-of-stream.
For watermark messages, Samza will emit watermark to the consumer tasks with the earliest timestamp that all upstream tasks produce.
The solution should still work if we split the application into multiple jobs based on the partitionBy operators.
Proposed Design
In the following we discussed two approaches to support this feature and compare them, and we We propose to use the second approach (Inin-band control).
Approach 1: Out-of-band control stream
In this approach the ApplicationRunner will create a separate control stream for propagating control messages. The control stream is a one-partition broadcast stream which will be consumed by each container in the application. The application runner will manage the lifecycle of the control stream: it creates it for the first time and purge the stream at the start (same as output streams when consuming from Hadoop) of future runs.
channel of intermediate stream to propagate control message. The design diagram is below:
How it works:
The upstream tasks will send out control message to all the downstream intermediate topic partition. The control message will be serialized and sent out with user messages in the same stream.
Downstream Samza processor will consume the intermediate streams, and deserialize both user messages and control messages in SystemConsumers.
The control messages will be reconciled based on the count from all the producers (tasks) from the upstream. See below for more details of different control message reconsiliation.
How it works for end-of-stream:
When an input stream is consumed to the end, Samza sends an Eos message to the control channel which includes the input topic and partition.
Once the EOS messages are received from all the partitions of this input, we know the input is end-of-stream. Then the ControlStreamConsumer will inspect the stream graph and find out the intermediate stream that all its input streams to it have been all end-of-stream. If so, we mark the intermediate stream pending end-of-stream. After that, whenever a marked intermediate stream partition reaches its highest offset (high watermark in Kafka), we can emit end-of-stream message for this partition. It’s guaranteed that the partition reaches end of stream.
Approach 2: In-band control messages
In this approach we don’t use a separate stream to keep the control messages. We use the intermediate streams themselves as both data and control.
How it works for end-of-stream:
When an input stream is consumed to the end, Samza finds out the following intermediate streams that all the inputs have been end-of-stream (through the topology of the operator graph).
The task will send an Eos message to all the partitions of the intermediate streams in 1.
Each consumer of the intermediate streams will count the watermark messages received for each partition and declare end of stream once all the EOS messages have been received.
Comparisons of the two approaches:
Pros
Cons
Approach 1
- Intermediate streams are clean with only user data. This is convenient if user wants to consume it elsewhere.
- Simple recovery from failure, just read the control stream from the beginning.
- Less number of messages. The control messages needed is the same as the input stream partition count (n partitions). So the total will be n messages.
- Need to correlate the out-of-band control message with the source stream, which is complex to track and requires synchronization between input streams and control stream.
- Need to maintain a separate stream for control messages
Approach 2
- No coordination needed between control message and input messages. When a control message is received, it is a marker that the messages sent before the control message have been consumed completely. This is critical to support general event-time watermarks.
- Complicated failure scenario. The consumer of control messages needs to checkpoint the control messages received, so when it recovered from failure, it can still resume.
- More control messages required. For each intermediate stream (m partitions), we need to write each task of the producer (n tasks) into it. So the total will be n*m messages.
Based on the pros and cons above, we propose to use the in-band approach to support control messages.
Detail detailsIntermediate Stream Message Format:
The format of the intermediate stream message:
Code Block |
---|
IntermediateMessage => [MessageType MessageData] MessageType => byte MessageData => byte[] MessageType => [0(UserMessage), 1(Watermark), 2(EndOfStream)] MessageData => [UserMessage/ControlMessage] ControlMessage => [EndOfStreamMessage/WatermarkMessage] |
Version => int |
TaskName => String TaskCount => int Other Message Data (based on different types of control message) |
For user message, we will use the user provided serde (default is the system serde). For control message, we will use JSON serde since it is built in Samza and easy to parse.
Reconciliation
The reconciliation of control messages happens inside TaskInstance after the message is delivered to it from the chooser. For the scope of this proposal, we support two kinds of control messages: end-of-stream and watermark.
- End-of-stream Message: This message indicate the upstream task has ended producing to this stream.
- Watermark Message: This message contains a timestamp of the upstream task has processed so far.
The reconciliation process works as follows:
- The downstream TaskInstance receives the control message, and update the internal bookkeeping of the messages. For end-of-stream, it keeps the set of upstream tasks for the intermediate stream. For watermark, it keeps the mapping from task to its latest timestamp.
Once the task count in the bookkeeping matches the total count, the TaskInstance will emit a single IncomingMessageEnvelope containing the intermediate stream and partition, and the message itself. The timestamp in the watermark message will be:
InputWatermark = min { OutputWatermark(task) for each task in upstream tasks } - After reconsiliation, the control message evelope will be sent to the task to process.
The TaskInstance uses the following maps for bookkeeping received end-of-stream and watermark messages:
Code Block |
---|
EndOfStream Bookkeeping: Map( streamId -> { Set<TaskName>, totalTasks } )
Watermark Bookkeeping: Map( streamId -> { Map<TaskName, Timestamp>, totalTasks, timestampOfLastEmission } ) |
Checkpoint control messages
For failure scenario, we need to keep the state of bookkeeping so we can restore it during recovery. This can be done by checkpointing the bookkeeping states along with the input messages offset.
The checkpoint for EndOfStream:
Code Block |
---|
EndOfStreamCheckpoint =>
streamId => String
totalTasks => int
tasks => Set<String> |
The checkpoint for Watermark:
Code Block |
---|
WatermarkCheckpoint =>
streamId => String
totalTasks => int
tasksToEventTime => Map<String, Long> |
Detail details
ControlMessage
We will support two types of ControlMessage: EndOfStreamMessage and WatermarkMessage
Code Block |
---|
public abstract class ControlMessage {
|
ControlMessage
We will support two types of ControlMessage: EndOfStreamMessage and WatermarkMessage
Code Block |
---|
public abstract class ControlMessage { private final int version = 1; public int getVersion() { return version; } } public class EndOfStreamMessage extends ControlMessage{ private final String taskName; private final int taskCount; private int version = 1; private public EndOfStreamMessageControlMessage(String taskName, int taskCount) { super(ControlMessageType.EndOfStream.ordinal()); this.timestamp = timestamp; this.taskName = taskName; this.taskCount = taskCount; } public longString getTimestampgetTaskName() { return timestamptaskName; } public Stringint getTaskNamegetTaskCount() { return taskNametaskCount; } public void setVersion(int getTaskCount(version) { this.version return= taskCountversion; } } public classint WatermarkMessage extends ControlMessagegetVersion() { private final longreturn timestampversion; private final} String taskName; private final int taskCount; } public class EndOfStreamMessage extends ControlMessage{ private WatermarkMessageEndOfStreamMessage(longString timestampstreamId, String taskName, int taskCount) { super(ControlMessageType.Watermark.ordinal()taskName, taskCount); this.timestampstreamId = timestampstreamId; } } public class this.taskNameWatermarkMessage =extends taskName;ControlMessage{ private final this.taskCount = taskCount; }long timestamp; publicprivate WatermarkMessage(long getTimestamp() { return timestamp; } public String getTaskName() { return taskName; timestamp, String taskName, int taskCount) { super(taskName, taskCount); this.timestamp = timestamp; } public intlong getTaskCountgetTimestamp() { return taskCounttimestamp; } } |
Reconciliation
When SystemConsumers gets EOS/watermark messages, Samza needs to reconcile based on the task counts. The reconciliation works as follows:
For each intermediate stream partition, Samza keeps track of the end-of-stream/watermark messages received from the producing tasks, and counts the number of tasks that it has been received in the messages.
When the count matches the total task count, Samza will emit a end-of-stream/watermark message to the task that’s assigned for this stream partition.
When Samza received further watermark messages, it will emit a watermark with the earliest event time across all the stream partitions. No emission if the earliest event time doesn’t change.
Checkpoint control messages
For failure scenario, the latest control message received from each intermediate stream partition will be lost without checkpointing. The checkpoint of control messages need to preserve both intermediate stream partition and the producing task information. A checkpoint will be:
Code Block |
---|
Key => IntermediateStreamPartition.ControlMessageType
Value => ControlMessageCheckpoint
public class ControlMessageCheckpoint {
int taskCount;
Map<String, Long> tasksToEventTime;
} |
Rejected Alternative:
Out-of-band control stream
In this approach the ApplicationRunner will create a separate control stream for propagating control messages. The control stream is a one-partition broadcast stream which will be consumed by each container in the application. The application runner will manage the lifecycle of the control stream: it creates it for the first time and purge the stream at the start (same as output streams when consuming from Hadoop) of future runs.
When an input stream is consumed to the end, Samza sends an Eos message to the control channel which includes the input topic and partition.
Once the EOS messages are received from all the partitions of this input, we know the input is end-of-stream. Then the ControlStreamConsumer will inspect the stream graph and find out the intermediate stream that all its input streams to it have been all end-of-stream. If so, we mark the intermediate stream pending end-of-stream. After that, whenever a marked intermediate stream partition reaches its highest offset (high watermark in Kafka), we can emit end-of-stream message for this partition. It’s guaranteed that the partition reaches end of stream.
Comparisons of the two approaches:
Pros | Cons | |
Out-of-band | - Intermediate streams are clean with only user data. This is convenient if user wants to consume it elsewhere. - Simple recovery from failure, just read the control stream from the beginning. - Less number of messages. The control messages needed is the same as the input stream partition count (n partitions). So the total will be n messages. | - Need to correlate the out-of-band control message with the source stream, which is complex to track and requires synchronization between input streams and control stream. - Need to maintain a separate stream for control messages |
In-band | - No coordination needed between control message and input messages. When a control message is received, it is a marker that the messages sent before the control message have been consumed completely. This is critical to support general event-time watermarks. | - Complicated failure scenario. The consumer of control messages needs to checkpoint the control messages received, so when it recovered from failure, it can still resume. - More control messages required. For each intermediate stream (m partitions), we need to write each task of the producer (n tasks) into it. So the total will be n*m messages. |
Based on the pros and cons above, we propose to use the in-band approach to support control messages.