Status


StateAccepted
Discussion Thread[PROPOSAL] Beam Schema Options
JIRA

Unable to render Jira issues macro, execution error.

Unable to render Jira issues macro, execution error.

Unable to render Jira issues macro, execution error.

  1. Conversion from Proto and Avro: Extend the schema convertors for Proto and Avro so they translate the metadata present in the schema into options.
  2. Logical Type as Schema Options: Remove the metadata into FieldType and replace it by schema options.
  3. Option API for Logical Types: As the API is stabilised add it to the LogicalType as well.
  4. Investigation enriching IO’s to use provide metadata.
Created

2019-01-01

Motivation

Introduce the concept of Options in Beam Schema’s to add extra context to fields and schemas. In contrast to the current Beam metadata that is present in a FieldType, options would be added to fields, logical types and schemas. The schema convertors (ex. Avro, Proto, …) can add options/annotations/decorators that were in the original schema to the Beam schema with these options. These options, that add contextual metadata, can be used in the pipeline for specific transformations or augment the end schema in the target output.

Current state in Beam

(situation 2.20) Without this proposal, if a user want the act on extra contextual information that is available in the schema she needs to act on this information out of the pipeline. This means that these implementations will be very format specific (a separate implementation for Protobuf, Avro, Json, etc...).

History

Revision

DATEAUTHORDESCRIPTION
2019-02-28Alex Van BoxelChanged to Accepted Proposal

2019-01-01

Alex Van BoxelInitial Proposal

Vote

This proposal has been accepted on February 28, 2020 after a 7 day voting period: [VOTE]

+17 votes (3 binding)
-10 votes

Implementation choice

Typed options

Options are key value pairs on schemas, fields and logical types. 

The key can be any string and Beam should not set any restriction on this, aside that it should be able to serialize in the portable schema. Keys names will be very dependent on where they come from. Option keys from protobuf will generally have a hierarchy then came from the package and field name separated by dots. Options keys generated by the user can be very different.

The values are typed and can contain any type defined in the Beam row schema system. This also includes Arrays, Maps and Rows. By using the same type system for options they are, in contrast to the current metadata, portable.

Having strongly typed options will preserve the type information when mapping from a format that has a rich metadata system (example: Protobuf). The only downside is that most of the source formats don’t have type information and will end up in strings..

Phased rollout

Options can be rolled out in different phases.

  1. Options API for Schema and Field: Add the options to schema and field so it can be used by the user and inside Transforms to test out the concepts.
  2. Conversion from Proto and Avro: Extend the schema convertors for Proto and Avro so they translate the metadata present in the schema into options.
  3. Logical Type as Schema Options: Remove the metadata into FieldType and replace it by schema options.
  4. Option API for Logical Types: As the API is stabilised add it to the LogicalType as well.
  5. Investigation enriching IO’s to use provide metadata.

Rejected Alternatives

Untyped options

An alternative implementation is having untyped options. That would mean storing all the value options as strings. But that means when you have a source format that has typed metadata you need to define mapping between the types and strings (example: boolean true to “true” or 1?). But this is error prone and could lead to different mappings from different source formats.

Extending Metadata

The current metadata is currently defined on the FieldType which makes the access pattern strange. The metadata was also clearly not intended to be portable and just a binary container for specific purposes (data for logical types).

String pkSequence = f.getType().getMetadataString("example.contract.v1.primary_key");
if (!pkSequence.isEmpty()) {
  primaryKeyMap.put(Integer.valueOf(pkSequence), dataSchema.indexOf(f.getName()));

As metadata is only available on the field type you can only set values on fields not explicitly on a schema.

Not portable implementation

It would be possible to implement options in the Java SDK only and not be included in the model. But doing this would be confusing in cross platform pipelines. If you have options read by the Java implementation, go with a row to a Python transform and then back to Java while loosing the options along the way would be confusing for the user.

Implementation Detail

Beam API

The API is fairly simple and are extensions on schema, field and logical types.

Reading values (schema and fields)

Getting the values is done via the getOptions method on both schema and field.

Schema.Options options = schema.getField("field_one").getOptions();

Integer optionInteger = options.getValue("example.field_option_int32")

The options object has methods to get the value.

@Nullable
public <T> T getValue(String optionName)

@Nullable
public <T> T getValue(String optionName, Class<T> valueClass)

@Nullable
public <T> T getValueOrDefault(String optionName, T defaultValue)

As well as a way to option the type of the option.

@Nullable
public FieldType getType(String optionName)

Because the type system options can also be arrays, maps and rows.

...
Row optionMessage = options.getValue("example.field_option_message")

Some extra methods on the options object to see if any option is set and to get all option names that are available.

public Set<String> getOptionNames()

public boolean hasOptions()

Writing values (schema and fields)

As Schema and Fields are immutable objects, the only way to set options is to start an option builder and attach them to the schema or field via the withOptions method. The option returns a new schema or field with the options set. If the schema already had options they will be kept, except when the option name was the same, they will be overwritten.

// Schema

/** Returns a copy of the Schema with the options set. */
public Schema withOptions(Options options)

// Schema.Field

/** Returns a copy of the Field with the options set. */
public Field withOptions(Options options)

The standard way to start setting options is to start the builder by calling the static builder() method on the Options class (part of Schema).

// Schema.Options
public static Options.Builder builder()

The Options class also includes typed convenience methods so you can start building options in a fluent way.

Schema.Options options = Schema.Options
    .setBooleanOption(OPTION_NAME, true)
    .build();

Because the builder also has the same setters you can conveniently build options by chaining them.

Schema.Options options = Schema.Options
    .setBooleanOption(OPTION_NAME_1, true)
    .setStringOption(OPTION_NAME_2, “foo”)
    .setIntegerOption(OPTION_NAME_3, 42)
    .build();

It’s important to note that the options use the same schema type system. This gets apparent in the setters for arrays, maps and rows.

Schema.Options options = Schema.Options
    .setArrayOption(
        OPTION_NAME, 
        FieldType.array(FieldType.STRING), 
        TEST_LIST)
    .build();

Schema.Options options = Schema.Options
    .setMapOption(
        OPTION_NAME, 
        FieldType.map(FieldType.INT32, FieldType.STRING), 
        TEST_MAP)
    .build();

// Row has a schema type build in
Schema.Options options = Schema.Options
    .setRowOption(OPTION_NAME, TEST_ROW)
    .build()

After the option object is built they need to be attached to the schema or fields with the withOptions method, that will create a copy of the schema with the options set.

Note that options are nullable. That means that removing an option can be done by setting a value to null. Having a value of null is the same as the option being not present.

Logical Type Options

Logical Type options are out of scope for this first draft as first the schema and field options should be frozen.

Portability

Options will introduce a new message in the portable schema.

message Option {
  string name = 1;
  FieldType type = 2;
  FieldValue value = 3;
}

Options can be attached to the schema and field in a map.

message Schema {
  repeated Field fields = 1;
  string id = 2;
  map<string, Option> options = 3;
}

message Field {
  string name = 1;
  string description = 2;
  FieldType type = 3;
  int32 id = 4;
  int32 encoding_position = 5;
  map<string, Option> options = 6;
}

They can also be attached to Logical Types.

message LogicalType {
  string urn = 1;
  bytes payload = 2;
  FieldType representation = 3;
  FieldType argument_type = 4;
  FieldValue argument = 5;
  map<string, Option> options = 6;
}

Mappings

Protobuf

Protobuf has a rich meta system also called options that can be mapped to the Beam Options. Proto supports the options on the following elements:

  • Field
  • Message
  • EnumValue
  • Enum
  • File
  • Service
  • Method


In the first phase mapping Fields and Messages options are straight forward as they map 1:1 to Beam concepts. Mapping Enum/EnumValue will be possible when options on Logical Types are implemented as Enums are mapped to a Logical Type in Beam. File options is too broadly scoped and don't add a lot of semantic meaning on Beam field or row level.

Service and method is out of scope for Beam.

Example of Protobuf Field options:

message ToFlatten {
    // re-usable metadata, we want to always collapse in parent message
    Meta meta = 666 [(example.collapse_in_parent) = true];
    string field_one = 1;
    int32 field_two = 2;
    int32 field_three = 3;
    string field_four = 4;
}

message Meta {
    string event_id = 1;
    int32 event_sequence = 2;
    google.protobuf.Timestamp event_timestamp = 3;
}

Example of Protobuf Message options:

message BigQueryOptionMessageTruncateYear {
  option (message_bigquery).time_partitioning_field = "timestamp";
  option (message_bigquery).add_time_partitioning_truncate_column = YEAR;
  google.protobuf.Timestamp timestamp = 3;
}

Avro

Avros doesn’t specify a clear description about options or other metadata, but it’s been used in the practice. Here is a redacted example:

{
  "type": "record",
  "name": "Value",
  "connect.name": "test.ns.Value",
  "fields": [
    {
      "name": "ts",
      "type": {
        "type": "string",
        "connect.version": 1,
        "connect.default": "1970-01-01T00:00:00Z",
        "connect.name": "io.debezium.time.ZonedTimestamp"
      },
      "default": "1970-01-01T00:00:00Z"
    }
  ]
}

In the above example we are seeing annotations added by a Debezium process. The options can be added to the schema or field.

JSON Schema

...

Zeta SQL

ZetaSQL also has the options concept, these options can be added to the schema and fields as string values. 

See  ZetaSQL data definition language  for more details.

CREATE
   [ OR REPLACE ]
   [ TEMP | TEMPORARY ]
   TABLE
   [ IF NOT EXISTS ]
   table_name [ ( table_element, ... ) ]
   [ PARTITION [ hints ] BY partition_expression, ... ]
   [ CLUSTER [ hints ] BY cluster_expression, ... ]
   [ OPTIONS (key=value, ...) ]
   [ AS query ];


column_definition:
   column_name
   [ column_type ]
   [ generation_clause ]
   [ column_attribute, ... ]
   [ OPTIONS (...) ]


Calcite SQL

Unfortunately Calcite doesn’t have syntax to provide extra metadata to it’s columns. Need more investigation to see how and if it’s worth it to extend Calcite and/or the Beam extension.

See parserImpls.ftl


CREATE TABLE ( IF NOT EXISTS )?
   ( database_name '.' )? table_name '(' column_def ( ',' column_def )* ')'
   TYPE type_name
   ( COMMENT comment_string )?
   ( LOCATION location_string )?
   ( TBLPROPERTIES tbl_properties )?

SQL/MED

The SQL standard for Management of External Data from SQL:2003 and SQL:2008 defines table-level and column-level options. See https://en.wikipedia.org/wiki/SQL/MED for overview and links. SQL standard docs require paid access.

Use Cases

This section describes some examples where options could be used. Those examples transformations are not built into Beam. But as the options are pass-through metadata they are available in the expand of a transform.

Almost all of the use-cases describe examples where the data comes from outside the pipelines, like defined in ProtoBuf or Avro. This is the most common use or source for options.

Note about the examples: Examples are simplified and in general schema inspection should be handled in the expand of a transform for optimal performance as the schema is already known at pipeline expansion.

Some of these examples could be added to the examples in Beam.

Basic Transformations

Pipeline authors could create code that automatically transforms data or structures based on metadata that originates from the source schema.

Collapse child row into parent row

Imagine a system where a lot of schemas are defined by the business and the contracts have a reusable event row to carry timestamp and id. Because we always collapse the field into our target data warehouse we could set a specific option on the event field. In this example we use protobuf to define the schemas and the convention is to have an option on our field we want to collapse.

message ToFlatten {
    // re-usable metadata, we want to always collapse in parent message
    Event event = 666 [(example.collapse_in_parent) = true];
    string field_one = 1;
    int32 field_two = 2;
    int32 field_three = 3;
    string field_four = 4;
}

message Event {
    string event_id = 1;
    int32 event_sequence = 2;
    google.protobuf.Timestamp event_timestamp = 3;
}

Beams Proto to Schema provider is responsible for attaching the  example.collapse_in_parent to the meta field. As the user has dozens of schemas that reuse the metadata and we always want to collapse it into the parent row they found it worth creating a transform that collapses the rows. This transform is reusable over all there different schemas.

Row [ 
  event:Row [
    event_id:string ,
    event_sequence:int , 
    event_timestamp:datetime , 
  ] : { options: “example.collapse_in_parent” = true }
  field_one:string ,
  Field_two:int ,
  field_three:int ,
  field_four:string
]

=> User written transform that handles “example.collapse_in_parent =>

Row [ 
  event_id:string ,
  event_sequence:int , 
  event_timestamp:datetime , 
  field_one:string ,
  Field_two:int ,
  field_three:int ,
  field_four:string
]

Handle CDC fields

Transforming a textual string into a datetime. Imagine a CDC system getting data in a textual format. By having metadata described in options a transform can convert it into a proper datetime. 

{
  "type": "record",
  "name": "Value",
  "connect.name": "test.ns.Value",
  "fields": [
    {
      "name": "ts",
      "type": {
        "type": "string",
        "connect.version": 1,
        "connect.default": "1970-01-01T00:00:00Z",
        "connect.name": "io.debezium.time.ZonedTimestamp"
      },
      "default": "1970-01-01T00:00:00Z"
    }
  ]
}

In this AVRO schema example we could imagine someone writing a transform that handles all the STRING -> TIMESTAMP conversions when the option is set.

...
  switch (field.getType().getTypeName()) {
    case STRING:
      String connectName = field.getOptions().getValue("connect.name")
      if(“io.debezium.time.ZonedTimestamp”.equals(connectName) {
        builder.addValue(
          parseDateTime(row.getValue(field));
      }
      break;
...

Construct a unique key field.

Construct a unique key field from a composition of other fields that are annotated.

Encryption based on option

Imagine a business that annotates it’s schemas with options indicating that a field is personal identifiable information. The option is a string value that indicates the field that holds the userId. 

static DoFn<Row, Row> fnEncryptPerUser() {
  return new DoFn<Row, Row>() {
    @ProcessElement
    public void encryptPerUserFn(
        @Element Row row, MultiOutputReceiver receiver) 
    {
      Row.Builder builder =  Row.withSchema(row.getSchema());
      row.getSchema().getFields().forEach(
        field -> {
          switch (field.getType().getTypeName()) {
            case STRING:
              String userField = field.getOptions().getValue("encrypt.ppi")
              String userId = row.getValue(userId);
              if(userId != null) {
                builder.addValue(
                  encryptForUser(row.getValue(field), userId);
              }
              break;
            default:
              builder.addValue(row.getValue(field);
         }
...

Create a SQL transformation

Options could be used to generate SQL that could be executed by the Calcite SQL engine.

Row [ 
  field_one:string ,
  field_two:int ,
  field_three:datetime 
    { options: “example.sql” = “EXTRACT(DAY FROM ${field})”,
               “example.rename” = “day” } ,
  field_four:string
]

=> Transform that creates a SQL that does the transform =>

SELECT 
  field_one, 
  field_two, 
  EXTRACT(DAY FROM field_three) as day,
  field_four


Note: An option to SQL builder or utilities could be a useful addition to the examples or Beam utilities, but this is not part of the proposal.


Data Quality

Pipeline authors could create transforms for validating the data quality. This could be done through options that set the limits and requirement for the data. For example the options defined by validate.proto could be used for setting these requirements.

Here is a very naive example usage for validating that an array has at least one element.

static DoFn<Row, Row> fnValidateArrayMinOne(String contractName) {
  return new DoFn<Row, Row>() {
    @ProcessElement
    public void validateContentFn(
        @Element Row row, MultiOutputReceiver receiver) 
    {
      DeadLetter.Builder dl = DeadLetter.forContract(contractName)
      inspectRow(dl, row.getRow("data"), "");
      if (dl.hasErrors()) {
        receiver.get(DEAD_LETTER_TAG).output(dl.build());
      } else {
        receiver.get(OUTPUT_TAG).output(row);
      }
    }

    void inspectRow(DeadLetter.Builder dl, Row row, String parent) {
      row.getSchema().getFields().forEach(
        field -> {
          switch (field.getType().getTypeName()) {
            case ARRAY:
              if(field.getOptions().getValue("example.arrayminone")) {
                List<Object> array = row.getArray(field.getName();
                if(array == null || array.size() == 0) {
                  dl.addError("Array should have minimal 1 element");
                }
              }
              break;
            default:
         }
...

IO

IO’s can use options to add extra information to the schema or use the options to change it’s behaviour on certain fields. For example:

  • BigQuery partition column, clustering, etc…
  • Construct a BigTable row key from specific fields with a certain hashing
  • Cloud PubSub map columns to message attributes (BEAM-9208), or message publish time (BEAM-9209)


Note: Options specific to IO could benefit to have a standardized URN schema, but this is out of scope from this initial proposal.

4 Comments

  1. Some other alternatives it may be good to discuss:
    1) Options in the Java SDK only, don't include in the model (allows you to answer the question: why make the options portable),

    2) Use logical types for types with options

    1. For 1) options SDK only > this is not an alternative, this is an partial implementation choice. If this needs to be discussed we better make a section open issues.

      For 2) I don't understand this, but this also looks like an open issue, I'm going to make a section: open issues, that can be resolved in a separate mail thread.

      1. I just meant that these could be useful things to argue against in the "Rejected Alternatives" section, to clarify the motivation for the selected approach. I don't really want to strongly advocate for either of these alternatives. I suppose I can add some detail in "open issues" for now though.

        1. OK, I get it for 1) I think I can add this to the rejected alternatives if you don't have strong objections, for using Logical Types still needs is something that is strange (I don't think it needs explicit mention). I'll add (1) as a rejection unless you need to discuss it, you can then add it to open issues. I'm going to propose a discussion on the voting on the mailing list.