You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: "Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

  • Primary and unique constraint are important hints that can be used during query optimizations such as e.g. reducing the number of columns to group on if the group condition contain the whole primary/unique key constraint.
  • Additionally primary keys are necessary for upsert streams. The primary key should be used as the upsert key.

Public Interfaces

Constraint class hierarchy:

Constraint
@PublicEvolving
public interface Constraint {
    String getName();

    boolean isEnforced();

    ConstraintType getType();

    enum ConstraintType {
        PRIMARY_KEY,
        UNIQUE_KEY
    }
}


AbstractConstraint
abstract class AbstractConstraint implements Constraint {

	private final String name;
	private final boolean enforced;

	AbstractConstraint(String name, boolean enforced) {
		this.name = checkNotNull(name);
		this.enforced = enforced;
	}

	@Override
	public String getName() {
		return name;
	}

	@Override
	public boolean isEnforced() {
		return enforced;
	}

}
KeyConstraint
@PublicEvolving
public final class KeyConstraint extends AbstractConstraint {

	private final List<FieldReferenceExpression> columns;
	private final ConstraintType type;

	public static KeyConstraint primaryKey(String name, boolean enforced, FieldReferenceExpression... columns) {
		return new KeyConstraint(name, enforced, ConstraintType.PRIMARY_KEY, Arrays.asList(columns));
	}

	public static KeyConstraint uniqueKey(String name, boolean enforced, FieldReferenceExpression... columns) {
		return new KeyConstraint(name, enforced, ConstraintType.UNIQUE_KEY, Arrays.asList(columns));
	}

	private KeyConstraint(
			String name,
			boolean enforced,
			ConstraintType type,
			List<FieldReferenceExpression> columns) {
		super(name, enforced);

		this.columns = columns;
		this.type = type;
	}

	@Override
	public ConstraintType getType() {
		return ConstraintType.UNIQUE_KEY;
	}

	public List<FieldReferenceExpression> getColumns() {
		return columns;
	}
}


Method for retrieving primary key constraint in CatalogBaseTable

Method in CatalogBaseTable
public interface CatalogBaseTable {
	Optional<KeyConstraint> getPrimaryKey();
}


Constraint DDL:

CREATE TABLE [catalog_name.][db_name.]table_name
  [(col_name1 col_type1 [IN_LINE_CONSTRAINT] [COMMENT col_comment1], ...)]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name1, col_name2, ...)]
  [OUT_OF_LINE_CONSTRAINT]

[OUT_OF_LINE_CONSTRAINT]:=
  [CONSTRAINT constraint_name] ((PRIMARY KEY | UNIQUE) (column, ...)) [[NOT] ENFORCED]

[IN_LINE_CONSTRAINT]:=
  [CONSTRAINT constraint_name] (PRIMARY KEY | UNIQUE) [[NOT] ENFORCED]

ALTER TABLE [catalog_name.][db_name.]table_name
  ADD (OUT_OF_LINE_CONSTRAINT) |
  DROP constraint_name


Proposed Changes

We suggest to introduce the concept of primary key constraint as a hint for FLINK to leverage for optimizations.

Primary key constraints tell that a column or a set of columns of a table or a view are unique and they do not contain null. Neither of columns in a primary can be nullable. Primary key therefore uniquely identify a row in a table.

Unique key constraints tell that a column or a set of columns of a table or a view are unique. Unique key constraint do no impose NOT NULL constraint on its columns.

SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the therefore the only mode we want to support is the NOT ENFORCED mode.

We will assume correctness of the primary key, therefore we assume that the columns nullability is aligned with the columns in primary key. We might validate this at certain location when reading a table from Catalog. Therefore connectors should ensure those are aligned. When creating a table, creating a primary key constraint will alter the columns nullability.


Support for Unique key is not part of the FLIP. It is just mentioned to show how can we extend the primary key concept with more constraints in the future.

Compatibility, Deprecation, and Migration Plan

This change introduces a new feature that does not implies any compatibility concerns.

Implementation Plan

  1. Add the primary key information to CatalogBaseTable
  2. Modify HiveCatalog connector to provide the primary key information
  3. Introduce DDL

Test Plan

We want to make sure, we can leverage the primary key information stored in Hive. This might require changes to the Hive catalog connector in regards to the produced TableSchema.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels