Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
enum TtlUpdateEvent {
  // required, default option
  OnCreate,
  // optional
  OnRead,
  OnWrite,
  OnReadWrite
}

enum TtlTimeCharacteristic {
  EVENT_TIME,
  PROCESSING_TIME
}

public class TtlPolicy {
  TtlUpdateEvent ttlEvent;
  int ttl;
  TtlTimeCharacteristics ttlTimeCharacteristics;

  // ...
}

// ttl in sec
int ttl = 5;

// default TTL, using TtlPolicy.OnCreate
TtlPolicy tp1 = new TtlPolicy(ttl, TimeCharacteristic.EVENT_TIME)
// extended TTL, using TtlPolicy.OnCreate and TtlPolicy.onReadWrite
TtlPolicy tp2 = new TtlPolicy(ttl, TimeCharacteristic.PROCESSING_TIME, TtlEvent.OnReadWrite)

ValueState s1 = getRuntimeContext().getState(new ValueStateDescriptor<T>("x", ser, tp1);
ValueState s2 = getRuntimeContext().getState(new ValueStateDescriptor<T>("y", ser, tp2));


Proposed Changes

General Architecture

The SQL Client can be executed in two modes: a gateway and embedded mode. In this FLIP we mostly focus on the embedded mode but also consider a later gateway conceptually.

  • In gateway mode, a dedicated SQL gateway process offers APIs to which other applications can connect to. A REST API and JDBC API allow to submit queries, cancel jobs, and retrieve results. Both REST and JDBC connect to a common executor that is responsible for communicating with Flink and external catalogs. The executor also keeps state about currently running sessions. The optional SQL CLI client connects to the REST API of the gateway and allows for managing queries via console.

  • In embedded mode, the SQL CLI is tightly coupled with the executor in a common process. This allows for playing around with Flink quickly and submit jobs without having to start an additional components.

The communication to Flink happens through the ClusterClient. By using the ClusterClient we can ensure that the SQL client will run Flink on YARN, Mesos, and other supported environments without additional effort.

For the embedded mode, a general communication flow would like:

  • The user adds catalog information to the configuration files.

  • The user starts CLI client with custom JAR files and configuration (--jar--context).

  • The user enters SQL query and submits it.

  • The executor submits Flink job with all involved JAR files.

  • If a query is a SELECT query, the executor materializes the result such that it can be pulled by the CLI client.

  • If a query is a INSERT INTO query, the executor submits the Flink job.

  • The user can stop the running query and submit a new one.

  • Exiting the CLI would also stop the running SELECT query but not INSERT INTO queries.

Gateway Mode

Image Removed

 

Embedded Mode

Image Removed

Configuration

 

Independent of the execution mode, the SQL client can be configured globally (sql-defaults.conf) and/or for every CLI session (sql-context.conf). The configuration specifies settings that would programmatically be applied to a ExecutionEnvironment/StreamExecutionEnvironment and TableEnvironment. It contains catalog information as well as job specific parameters.

The global configuration is located in a dedicated file and is read during startup. Global configuration applies to all CLI sessions. A context configuration file can be specified when starting the CLI client and is attached to any query executed in the current CLI session. Thus, context properties might overwrite global default properties. In future versions, the configuration might also be refined by using DDL statements such as:

DDL statements will have highest priority and overwrite the properties specified in configuration files.

Here are some properties that might need to be defined in the long run:

  • Pre-registered table sources

    • Name

    • Source

      • Type (e.g. Kafka, Custom)

      • Properties (e.g. topic, connection info, custom TableSource class)

    • Encoding

      • Type (e.g. Avro, JSON)

      • Schema (e.g. Avro class, JSON field names/types)

    • Rowtime descriptor/Proctime

      • Watermark strategy and Watermark properties

      • Time attribute info

    • Bucketization

    • Statistics

  • Pre-registered table sinks

    • Name

    • Sink

      • Type (e.g. Kafka, Custom)

      • Properties (e.g. destination path, output types)

  • External catalogs

    • Name

    • Properties (e.g. connection info, credentials, ExternalCatalog class)

  • User-defined functions

    • Name

    • Parameters (e.g. constructor parameters for a TableFunction)

    • Class

  • [Optional] User-defined types

    • Name

    • Field names and types

  • [Optional] User-defined local variables (@local_variable)

    • Name

    • Value

  • Job parameters

    • Batch or streaming execution

    • Parallelism

    • Maximum parallelism

    • State Backend + parameters

    • Auto watermark interval

    • Restart strategy

    • Query Config (e.g. min/max retention time)

  • [Separate configuration?] SQL client parameters

    • Gateway properties
      (e.g. database properties, server credentials)

    • CLI Client properties
      (e.g. timeouts, credentials for authenticating to gateway, result serving speed)

Result Retrieval

 

In the future, we can use different options for retrieving materialized results both for debugging purposes and long-running maintained views. The retrieval is managed by the executor.

There are three materialization modes:

 

...

Goal

We’ll deliver a solution of Flink state TTL of the following characteristics:

  • TTL policy in Flink State is exact TTL. User state will be cleaned exactly when its TTL expires
  • TTL will be supported for both event time and processing time
  • TTL starts to count down when the entry is created by default. Users can specifying TTL trigger policy (see example below) to decide if a state’s TTL will be refreshed upon read or/and update or/and read. More on this later.

TTL Policy

How to count the start time of TTL of a user state? Or, in another way to rephrase it, does Flink support extending/refreshing TTL for a user state?

There are mainly two situations here:

  1. TTL is always started when a user state is created. This is a fundamental behavior
  2. TTL for a user state can be extended/refreshed upon read or/and updated.

Design:

On TTL Policy Situation 1 (TtlUpdateEvent.onCreate, see above ‘TTL Policy’ section),

  • What we need is only one timer (a long) for each keyed state
  • Steps
    1. When a keyed user state is created, operator/state backend registers a timer for it in TimerService
    2. When the registered timer is invoked, it deletes the user state


On TTL Policy Situation 2 (set TtlUpdateEvent.onRead, onWrite, or onReadWrite, , see above ‘TTL Policy’ section)

  • What we need is only one timer (a long) and a timestamp (also a long) for each keyed state
  • Steps
    1. Use a new field lastModifiedTs to remember when a user state is last modified
    2. When a keyed user state is created, lastModifiedTs is set to creation time, and operator/state backend registers a timer for each keyed user state in TimerService
    3. Whenever actions of corresponding policies are taken, update lastModifiedTs of the user state
    4. When the registered timer is invoked, it checks lastModifiedTs of the user state. If lastModifiedTs <= current time (both event and processing time), delete the state; otherwise, register a new timer at (lastModifiedTimestamp+TTL) time, and go to (2)

Example for case 2:

For key K, we have ValueState A with TTL 16, with TTL policy onReadWrite.

  • At time 0, a keyed user state A is created with A.lastModifiedTs=0, a timer is registered for A at time 16
  • At time 2, A’s value is updated, and thus update A.lastModifiedTs=2
  • At time 15, A’s value is read, and thus updateA.lastModifiedTs=15
  • At time 16, the registered timer is invoked, it sees A.lastModifiedTs=15, so it does nothing but registers a new timer at (15+16) = 31

Pros and Cons:

Pros:

 

  1. TTL implementation is independent of state backends, it only relies on the abstraction of TimerService
  2. Because of (1), migration and compatibility can reuse existing solution
  3. This design will support both event time and processing time natively
  4. It has at most one timer for each keyed user state with TTL all the time

 

Cons:

  1. In TTL policy situation 2, we need to store a timer (basically a long) for each TTL keyed user state. There’ll be a little bit memory overhead.

Migration Plan and Compatibility of Checkpoints/Savepoints

According to our knowledge right now, we don’t need to extra work on migration plan and compatibility of checkpoints/savepoints because we are reusing existing wheels, like TimerService state backends, etc, and their behaviors will remain the same as before.

The situation might change as the FLIP evolves.

Implementation Plan

The implementation details are still not very clear at this moment, and we’d like to get a clearer picture as we move on.

However, the basic idea is very simple - TimerService and user states need to have access to each other.

  • TimerService needs to access user states so triggered timer can clean up expired user states.
  • User states need to access TimerService to register timers when certain event happened to user states

Thus the initial step should be moving TimerService to state backends to facilitate the above proposal.

We can add callback functions to all state implementations to ensure that we get notified on all state accesses.

We may need something in operators or state backends (e.g. operators/StreamingRuntimeContext/KeyedStateStore) to have access to TimerService

Moreover, we need a good strategy to ensure that the TTL timers do not interfere with user-configured timers, i.e., what happens if a user configures a timer for the same time as a Ttl timer, and which one should be invoked first? The main problem right now is that we have to distinguish between user and TTL timers. Currently, timer service does not support timer tags (or another method) to distinguish timers.

Test Plan

Normal unit, integration, and end-to-end tests

Rejected Alternatives

We have another proposal of using rocksdb’s built-in TTL. But it has a few major drawbacks:

  • Rocksdb’s built-in TTL only support TTL on record creation, not able to extend/refresh TTL upon read and/or write
  • It only supports processing time, and don’t support event time.
  • The TTL logic heavily rely on state backend implementation. If we add more state backends and rely on their internal feature for TTL, it’s hard to unify their behaviors and will introduce unnecessary system complexity
  • Using external feature to build Flink’s TTL makes migration and compatibility hard

...

Custom mode (for any result):
custom TableSink into arbitrary systems
Flink writes any result into a destination that is determined by the user using INSERT INTO. The results are not accessible by the executor and can thus not be served via REST or CLI console.

The supported materialization mode also depends on the query type:

 

Query Type

Internal Mode

External Mode*

Batch

collect() -> Heap/Database

File table sink

Append Stream

collect() -> Heap/Database

Kafka/file table sink

Retract/Upsert Stream

collect() -> Heap/Database

(Compacted Kafka)/Cassandra table sink

 

We might use usual heap space at the beginning. The internal database can be any JDBC database. External materialization modes (*) are not included in the first version. In the future, Kafka would be read by general Kafka utility functions. Files as well with support for different file systems.

Result Maintenance

While batch queries have bounded results, streaming queries are potentially never-ending and, therefore, require special treatment for keeping the results up to date and consistent. The streaming query can be considered as a view and the running streaming application as the view maintenance. Results might need to be supplied to systems that were not made for streaming queries, e.g., Java applications that read from a JDBC API. In those cases, every requested result set must be a snapshot (materialized view) of the running query at a point in time. The snapshot must be immutable until all result rows have been consumed by the application or a new result set is requested. We distinguish between two types of results that will require different materialization semantics: a materialized view and a materialized result stream.

Materialized View

A consistent materialized view of results for production use cases. Materialized views are not part of this FLIP but might be added in future versions. It requires another design document for the DDL statement and execution but here are some properties we aim for:

SQL: CREATE MATERIALIZED VIEW ...

  • intended for long running materialization queries that are updated periodically (e.g. every hour or on successful checkpoints)

  • retractions are not visible directly, only the materialized result

  • a result can be accessed by JDBC connections or the REST API (e.g. for powering dashboards)

  • materialization operators can cooperate with Flink's checkpointing (e.g. only checkpointed results are exposed through the APIs)

  • a user can specify different parameters for how and how often the view is maintained
    (see create_mv_refresh)

  • runs detached from the CLI client

Materialized Result Stream

A materialized stream of results for getting immediate insights into the running SQL query.

SQL: SELECT ...

  • indended for debugging during query creation and initial show cases

  • retractions are shown as streams of deletion and insertion

  • no guarantees about checkpointed results

  • the executor abstracts the underlying representation and supplies the interfaces for accessing the materialized stream in a FIFO fashion

  • only one running query per CLI session

  • cancelled if cancelled in CLI or CLI is closed

We focus on simple SELECT queries first that are materialized on the heap of the executor (internal materialization mode).

Compatibility, Deprecation, and Migration Plan

 No compatibility changes or other deprecation necessary.

Implementation Plan

1. Basic Embedded SQL Client 

Add the basic features to play around with Flink's streaming SQL.

  • Add CLI component that reads the configuration files
    • "Pre-registered table sources"
    • "Job parameters"
  • Add executor for retrieving pre-flight information and corresponding CLI SQL parser
    • SHOW TABLES
    • DESCRIBE TABLE
    • EXPLAIN
  • Add streaming append query submission to executor

    • Submit jars and run SELECT query using the ClusterClient

    • Collect results on heap and serve them on the CLI side (Internal Mode with SELECT)

    • EXECUTE (for executing a SQL statement stored in a local file)

2. Full Embedded SQL Client

Add important features to fully use the SQL client for a variety of use cases.

  • Add support for streaming retract/upsert queries
  • Add support for batch queries
  • Add user-defined function support

    • Support scalar, aggregate, and table functions
    • Submit corresponding jars to the cluster
    • DESCRIBE FUNCTION
  • Add CLI query history
  • Add CLI query code completion/highlighting
  • Add support for INSERT INTO
    • Read configuration about "Pre-registered table sinks"
    • Allow submission without collect()

3. Discuss/design further features

Discuss and prioritize other features that are not part of this FLIP. 

  • Introduce gateway mode with REST API

  • Add support for catalogs

  • Allow creating Materialized Views

  • Support other materialization backends

  • Create a JDBC API

  • Further SQL DDL statements:

    • CREATE TABLE

    • CREATE TYPE

Rejected Alternatives

...