Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...


This code gets translated by Samza’s core libraries into an internal representation where every operation/transformation is represented by an OperatorSpec object object, a logical abstraction that describes the operation specified by the app author.

Fig. 2 — An illustration of the OperatorSpec graph of objects generated by Samza for the application in listing 1. OperatorSpecs associated with input/output streams are highlighted in yellow.


Some of these OperatorSpecs represent  represent message streams which can be thought of as channels propagating data into and out of the application, e.g.

  • InputOperatorSpecs represent  represent input data streams, e.g. S1 and S2, from which input messages are read.

  • OutputOperatorSpecs represent  represent output data streams, e.g. S3, to which processed data is produced.

...

The Execution Planner is the core Samza module responsible for verifying that all streams participating in any given Join operation agree in partition count. To achieve this, it traverses the graph of OperatorSpecs OperatorSpecs produced by Samza High-Level API to verify compliance to this requirement among all such sets of streams.

...

Another closely-related responsibility of Samza’s Execution Planner is inferring partition counts of all intermediate streams present in the OperatorSpec graph graph. Such streams are introduced into the OperatorSpec graph  graph whenever the Partition-By operation is used, and are represented by the same type of OperatorSpecs OperatorSpecs used to represent input streams, i.e. InputOperatorSpec. Unlike input streams however, intermediate streams have no defined partition counts by default. As we said, it is the Execution Planner that decides the partition count of every intermediate stream after traversing the OperatorSpech graph OperatorSpec graph, according to the following rules:

  1. Any intermediate stream joined with an input stream gets assigned the same partition count as that input stream.

  2. Any intermediate stream not covered by the first rule is assigned the partition count value specified by the Samza config property job.intermediate.stream.partitions.

  3. If no value is specified for job.intermediate.stream.partitions, the Execution Planner falls back to using the maximum partition count among all input and output streams, capped at a maximum hard-coded value of 256.

...


It is important to realize there are situations where it is not possible to enforce agreement between an intermediate stream and the input streams it is joined with, a scenario that would cause the Execution Planner to signal an error and reject the whole application. Fig. 5 illustrates one such case.

Fig. 5 — The OperatorSpec graph of an example high-level Samza application rejected by the Execution Planner due to the conflict encountered as it attempts to infer the partition count of S2′ since it is is joined with 2 input streams, S1 and S4, that have different partition counts.

...

As far as Samza’s Execution Planner is concerned, Tables with side-input Streams are not much different from other tables. They can be treated like other tables populated with data from a non-side-input stream. However, there is one key difference that requires treating tables with side-inputs differently. This difference is that, unlike all other streams, side-input streams are not represented in the OperatorSpec graph graph. At the time of writing, no InputOperatorSpecs  — or any OperatorSpec for  for that matter — are created to represent side-input streams in the OperatorSpec graph graph. Only the names of side-input streams are specified in the configuration of the tables they populate. And since Samza’s Execution Planner currently relies on traversing the OperatorSpec graph  graph to identify groups of input streams related through Joins, accounting for tables with side-inputs will require some special handling.

...

At the time of writing, Samza’s Execution Planner does not account for Tables, which means it is simply incapable of fulfilling its responsibilities with Samza High-Level applications performing Stream-Table Joins.

...

The table below enumerates a number of cases in which Samza’s current Execution Planner does not enforce the necessary constraints on P1 and P2 to ensure the validity of the Stream-Table Join between table T and stream S2.

#

S1

S2

Required Constraint

1

Input stream

Input stream

P1 must be equal to P2

2

Input stream

Intermediate stream

P2 must be set to P1

3

Intermediate stream

Input stream

P1 must be set to P2

4

Intermediate stream

Intermediate stream

If the result of joining S1 and S2 is subsequently joined with an input stream S3, P1 and P2 must be set to P3.


Cases #1 and #2 apply equally well if S1 is a side-input stream.

...

As explained in the Responsibilities of Samza’s Execution Planner section, Samza’s Execution Planner employs traversal of the OperatorSpec graph  graph to identify groups of input and/or intermediate streams involved in Join operations and verify agreement among their partition counts. To understand the reason why this does not work in the case of Stream-Table Joins, we examine the OperatorSpec graph  graph generated for the Samza application in the code sample below.

...

It is important to observe the following key differences between this graph and the graph representing the Stream-Stream Join application in Fig. 2:

  1. A different OperatorSpec,  StreamTableJoinOperator StreamTableJoinOperator, is used to represent Stream-Table Join operations.

  2. A new terminal SendToTableOperatorSpec is  is used to represent the operation of producing data to a table.

  3. The StreamTableJoinOperatorSpec is  is not connected to the SendToTableOperatorSpec. It only has a reference to the table (TableSpec) participating in the Stream-Table Join operation.

...

The disconnect between the SendToTable and  and StreamTableJoin OperatorSpecs  OperatorSpecs is attributed to the way the OperatorSpec graph  graph gets constructed, whereby it is modeled exactly after the StreamApplication defined  defined by the user, i.e.

  • Chaining StreamGraph’s StreamGraph’s getInputStream(), filter(), and SendTo(Table) results  results in an InputOperatorSpec connected  connected to a FilterOperatorSpec which  which is in turn connected to a SendToTableOperatorSpec.

  • Chaining join(Table) and  and sendTo(Stream) results  results in a StreamTableJoinOperatorSpec connected  connected to an OutputStreamSpec.

And since no further operations can be chained to sendTo(Table) according  according to the current Samza High-Level APIs, a SendToTableOperatorSpec is  is always a terminal vertex in the OperatorSpec graph graph, and all subsequent operations on a table will have their corresponding OperatorSpecs OperatorSpecs in disconnected components of the graph.

Contrasting the OperatorSpec graph  graph generated in this scenario with that of the Stream-Stream Join application in listing 1 confirms this causation.

...

To extend Samza’s ExecutionPlanner to support Tables, we need to address the disconnect between a SendToTableOperatorSpec and  and all relevant StreamTableJoinOperatorSpecs StreamTableJoinOperatorSpecs. One possibility that does not require changing Samza’s High-Level APIs is to modify the OperatorSpec graph  graph traversal such that virtual connections are assumed between every SendToTableOperatorSpec and  and all the StreamTableJoinOperatorSpecs StreamTableJoinOperatorSpecs that reference the same table (TableSpec) in the entire OperatorSpec graph graph.

Fig. 8 — A graph representing the OperatorSpec graph generated for a hypothetical Samza High-Level application where stream S1 is filtered and sent-to table T which is subsequently joined with streams S2 and S3. The proposed change to Samza’s Execution Planner revolves around assuming virtual connections between SendToTableOperatorSpec and all relevant StreamTableJoinOperatorSpecs StreamTableJoinOperatorSpecs, as denoted by the dotted arrows.

...