Page properties | |||||
---|---|---|---|---|---|
|
Status
Current state: "Under Discussion"
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Proposing-FLIP-25-Support-User-State-TTL-Natively-in-Flink-td20912.html#a22097
...
|
Draft: https://docs.google.com/document/d/1SI_WoXAfOd4_NKpGyk4yh3mf59g12pSGNXRtNFi-tgMReleased: -
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Reasons to introduce Flink native state expiration:
- The size of user
...
- state in Flink can grow infinitely
...
- for certain scenarios and some use cases need to guarantee automatic cleanup of too old state
- Developers have to make currently ad hoc implementations of TTL themself, e.g. using timer service which might be not space efficient
- Some legal regulations require data accessibility for limited amount of time. Especially recent changes in EU data privacy law force digital companies to treat personal data very carefully. It drives the priority of the first iteration steps focused on making expired state inaccessible.
This effort is to work out a unified approach for TTL semantics and make it reusable.
Proposed Change
API/Semantics
User can associate or update time-to-live (TTL) for a value or entry of keyed state during certain operations depending on configuration. The data automatically becomes unavailable after expiration of TTL and garbage collected sooner or later.General idea is to provide
- relaxed guarantee for state cleanup: state is persisted at least for TTL and then cleaned up based on the best effort: on access and in background
- exact guarantee for state visibility: expired state is hidden/blocked by API methods even if it is still persisted
- or relaxed guarantee for state visibility: expired state is returned by API methods if it is still available
Setup TTL
- Create TTL configuration
- Supply it in state descriptor
Configuration of TTL
TTL state cleanup:
- relaxed (cleanup on access and in the background as next step)
- (exact with timers as next step)
TTL state visibility for relaxed cleanup:
- exact (expired is never returned)
- relaxed (returned if still available)
Update type:
- only on creation and write
- on read and creation/write
- (possibly only on creation, in current design it might degrade write performance because to preserve original timestamp this option requires firstly to read it out and write it back with the updated user value but as one db value)
Time characteristic:
- processing time
- event time
TTL behaviour
- Read:
- check TTL upon read
- discard if expired and issue cleanup delete/rewrite
- depending on configuration, update if not expired
- Write (and creation):
- set/update TTL upon (re)writing new value
- append: set TTL per added element
Entries in map and list state have separate independent TTLs and get filtered out on expiration while being read out.
First Iteration (main TTL API) Jira FLINK-9510
Configuration
TTL can be configured and enabled in the abstract StateDescriptor and become available in all subclasses:
enum UpdateType { Disabled, OnCreateAndWrite, OnReadAndWrite } enum StateVisibility { ReturnExpiredIfNotCleanedUp, NeverReturnExpired } enum TtlTimeCharacteristic { ProcessingTime, EventTime } abstract class StateDescriptor { void enableTimeToLive(StateTtlConfig ttlConfig) { … } } .newBuilder(Time.seconds(1)) .setUpdateType(..) … .build(); XxxStateDescriptor stateDesc = new XxxStateDescriptor(...); stateDesc.enableTimeToLive(ttlConfig); |
State value with timestamp
The main idea is to wrap user state value with a class holding the value and the last access timestamp (maybe meta data in future) and use the new object as a value in the existing implementations:
class TtlValue<V> { V value; long lastAccessTimestamp; } |
Wrapping state factory
The original state factory provided in backends is wrapped with TtlStateFactory if TTL is enabled:
state = stateDesc.getTtlConfig().isEnabled() ? new TtlStateFactory(originalStateFactory,..).createState(...) : originalStateFactory.createState(...); |
TtlStateFactory decorates the states produced by the original factory with TTL logic wrappers and adds TtlValue serialisation logic:
TtlStateFactory { TtlTimeProvider timeProvider; // e.g. System.currentTimeMillis() <V> TtlValueState<V> createValueState(valueDesc) { serializer = new TtlValueSerializer(valueDesc.getSerializer); returnnew TtlValueState(originalStateWithTtl, timeProvider); } |
TTL serializer should add expiration timestamp.
Wrapping state objects
TTL state decorators use original state with packed TTL and add TTL logic using time provider:
TtlValueState<V> implements ValueState<V> { // List, Map, .... TtlTimeProvider timeProvider;
void update() { ... underlyingState.update(valueWithTtl); ... } |
Cleanup: issue delete/rewrite upon realising expiration during access/modification.
Save-/checkpoint save/restore
Values wrapped with timestamp are serialised and snapshotted the same way as without it just using the enhanced TTL serializer.
TTL config is just a way of interpreting state value and does not associate any stateful meta info. TtlValueSerialiser saved as state value serializer already enforces compatibility check, e.g. if TTL'ed state is restored with disabled TTL config.
Additional Cleanup Strategies
Cleanup of full state snapshot upon checkpointing Jira FLINK-9938
Filter out expired entries in case of full state scan to take a full checkpoint. This approach does not reduce the size of local state used in running job but reduces the size of taken snapshot. In case of restore from such a checkpoint, the local state will not contain expired entries as well.The implementation is based on extending backends to support custom state transformers. The backends call the transformer for each state entry during the full snapshot scan and the transformer decide whether to keep, modify or drop the state entry. TTL has its own relevant implementation of state transformers to check timestamp and filter out expired entries.
public interface StateSnapshotTransformer<T> { @Nullable |
To avoid concurrent usage of transformer objects (can be relevant for performance and reuse of serialization buffers), each snapshotting thread uses a factory to produce a thread-confined transformer.
Incremental cleanup with global iterator (heap backend) Jira FLINK-10473
This approach enables lazy background cleanup of state with TTL in JVM heap backend. The idea is to keep a global state lazy iterator with loose consistency. Every time a state value for some key is accessed or a record is processed, the iterator is advanced, TTL of iterated state entries is checked and the expired entries are cleaned up. When the iterator reaches the end of state storage it just starts over. This way the state with TTL is regularly cleaned up to prevent ever growing memory consumption.
The caveat of this cleanup strategy is that if state is not accessed or no records are processed then accumulated expired state still occupies the storage which should be rather impractical case.
Cleanup using RocksDB compaction filter Jira FLINK-10471
In case of rocksdb backend, we can piggy back compaction using custom compaction filter which checks our last access timestamp and discards expired values. It requires contributing a C++ Flink TTL specific filter to Rocksdb, like for cassandra. At the moment RocksDB does not support compaction filter plugins (see PR discussion), it is under development. Meanwhile, we can apply to strategies to enable this feature in Flink:
- Release and maintain a temporary fork of RocksDB for Flink: FRocksDB and merge TTL filter into this fork (used in Flink 1.8)
- Build C++ TTL filter separately, pack this C++ lib into its JNI java client jar and load it in Flink additionally to vanila RocksDB (Flink RocksDB extensions, under development)
The second strategy is more flexible in the long run.
Event time support Jira FLINK-12005
The event time is opted for in StateTtlConfig by setting TtlTimeCharacteristic.EventTime.To define how to expire elements, we need to define which timestamp to save when the state entry is accessed/updated and which timestamp is used to check expiration. In case of processing time, the time semantics is straightforward: we always use the current processing time. The definition of event time semantics is a bit trickier. The proposal, based on ML discussion thread, is the following at the moment:
Last access timestamp: Event timestamp of currently being processed record
Current timestamp to check expiration has two options:
- Last emitted watermark
- Current processing time
Therefore, TtlTimeProvider will need two methods: getAccessTimestamp and getCurrentTimestamp.
Moreover, to enable event time support, event timestamp of the record and the updated watermark needs to be passed to the state backend, shared with TTL state wrappers and additional cleanup strategies (snapshot transformers and compaction filter).
Event time provider
Different implementations of TtlTimeProvider, which e.g. holds current watermark, needs to be passed to the state backend at the moment of its creation in StreamTaskStateInitializerImpl. There are several ways to update watermark in this implementation of TtlTimeProvider:
- in InternalTimeServiceManager.advanceWatermark explicitly
- InternalTimeServiceManager/InternalTimerServiceImpl could be refactored to use shared EventTimeService which holds current updatable watermark and wrapped by TtlTimeProvider
The TTL state wrapping factory should create TTL state wrappers and snapshot transformers with TtlTimeProvider selected by TtlTimeCharacteristic.
RocksDB TTL compaction filter
The RocksDB TTL compaction filter factory needs to get selected TtlTimeProvider when it gets configured. There are two ways:
- make it volatile and settable in RocksDbTtlCompactFiltersManager.TimeProviderWrapper, track it in RocksDbTtlCompactFiltersManager along with FlinkCompactionFilterFactory to configure later before configuring FlinkCompactionFilterFactory.
- Move FlinkCompactionFilter.TimeProvider from FlinkCompactionFilterFactory to ConfigHolder and set selected TtlTimeProvider with the Config.
The second option does not use volatile variable and should be more performant but needs changing RocksDB java client and either releasing new version FRocksDB or Flink RocksDB extensions
Migration Plan and Compatibility
This feature introduces a new type of state which did not exist before. All current state types stay the same so it does not need specific migration. Adding TTL to or removing it from the existing state requires an explicit custom migration, basically transforming the stored state by adding or removing bytes of last access timestamp.
Future work
- register processing/event timer per state entry for exact cleanup upon expiration callback, inject it into TTL state decorators (the conflicts and precedence with user timers should be addressed)
- support queryable state with TTL
- set TTL in state get/update methods and/or set current TTL in state object
- state TTL migration: upon restoration add or drop TTL for the existing state which has or not had it before
- support optional prolonging of state TTL in case of e.g. disaster recovery to prevent real time state expiration during downtime (Jira FLINK-9661)
- probably out of scope: potentially introduce generalised meta info (including timestamp) associated with each state value
Rejected Alternatives
Previous version of Flip-25
TtlDb
Embedded TTL per state name/column family
- Only processing time
- Get API can return expired entries w/o explicitly informing about it
Timer service or dedicated column family
The first iteration can be extended with this approach as well, see Future work. It can be used where deterministic cleanup with exact guarantees is required.
The tradeoff here is that even after becoming part of state backends, timer service still requires to store keys twice in RocksDB and inside the checkpoint: associated with state and its expiration timestamp which results in a bigger space consumption and extra overhead. The reason is that timers require another data layout sorted by timestamp.
Some lighter cleanup strategies can also be given a try based on the suggested first iteration, see Future work
To solve all the above problems, we want to support user state TTL natively in Flink with low memory overhead.
*Note that this doc is only for the MVP (or V1) of this feature. More valid features and use cases can be added after MVP.
Public Interfaces
...
Example:
Code Block | ||
---|---|---|
| ||
enum TtlUpdateEvent {
// required, default option
OnCreate,
// optional
OnRead,
OnWrite,
OnReadWrite
}
enum TtlTimeCharacteristic {
EVENT_TIME,
PROCESSING_TIME
}
public class TtlPolicy {
TtlUpdateEvent ttlEvent;
int ttl;
TtlTimeCharacteristics ttlTimeCharacteristics;
// ...
}
// ttl in sec
int ttl = 5;
// default TTL, using TtlPolicy.OnCreate
TtlPolicy tp1 = new TtlPolicy(ttl, TimeCharacteristic.EVENT_TIME)
// extended TTL, using TtlPolicy.OnCreate and TtlPolicy.onReadWrite
TtlPolicy tp2 = new TtlPolicy(ttl, TimeCharacteristic.PROCESSING_TIME, TtlEvent.OnReadWrite)
ValueState s1 = getRuntimeContext().getState(new ValueStateDescriptor<T>("x", ser, tp1);
ValueState s2 = getRuntimeContext().getState(new ValueStateDescriptor<T>("y", ser, tp2)); |
Proposed Changes
General Architecture
The SQL Client can be executed in two modes: a gateway and embedded mode. In this FLIP we mostly focus on the embedded mode but also consider a later gateway conceptually.
In gateway mode, a dedicated SQL gateway process offers APIs to which other applications can connect to. A REST API and JDBC API allow to submit queries, cancel jobs, and retrieve results. Both REST and JDBC connect to a common executor that is responsible for communicating with Flink and external catalogs. The executor also keeps state about currently running sessions. The optional SQL CLI client connects to the REST API of the gateway and allows for managing queries via console.
In embedded mode, the SQL CLI is tightly coupled with the executor in a common process. This allows for playing around with Flink quickly and submit jobs without having to start an additional components.
The communication to Flink happens through the ClusterClient. By using the ClusterClient we can ensure that the SQL client will run Flink on YARN, Mesos, and other supported environments without additional effort.
For the embedded mode, a general communication flow would like:
The user adds catalog information to the configuration files.
The user starts CLI client with custom JAR files and configuration (
--jar
,--context
).The user enters SQL query and submits it.
The executor submits Flink job with all involved JAR files.
If a query is a
SELECT
query, the executor materializes the result such that it can be pulled by the CLI client.If a query is a
INSERT INTO
query, the executor submits the Flink job.The user can stop the running query and submit a new one.
Exiting the CLI would also stop the running
SELECT
query but notINSERT INTO
queries.
Gateway Mode
Embedded Mode
Configuration
Independent of the execution mode, the SQL client can be configured globally (sql-defaults.conf
) and/or for every CLI session (sql-context.conf
). The configuration specifies settings that would programmatically be applied to a ExecutionEnvironment
/StreamExecutionEnvironment
and TableEnvironment
. It contains catalog information as well as job specific parameters.
The global configuration is located in a dedicated file and is read during startup. Global configuration applies to all CLI sessions. A context configuration file can be specified when starting the CLI client and is attached to any query executed in the current CLI session. Thus, context properties might overwrite global default properties. In future versions, the configuration might also be refined by using DDL statements such as:
CREATE TYPE...
CREATE TABLE…
DECLARE @CustomerID = 'XYZ'
DDL statements will have highest priority and overwrite the properties specified in configuration files.
Here are some properties that might need to be defined in the long run:
Pre-registered table sources
Name
Source
Type (e.g. Kafka, Custom)
Properties (e.g. topic, connection info, custom TableSource class)
Encoding
Type (e.g. Avro, JSON)
Schema (e.g. Avro class, JSON field names/types)
Rowtime descriptor/Proctime
Watermark strategy and Watermark properties
Time attribute info
Bucketization
Statistics
Pre-registered table sinks
Name
Sink
Type (e.g. Kafka, Custom)
Properties (e.g. destination path, output types)
External catalogs
Name
Properties (e.g. connection info, credentials, ExternalCatalog class)
User-defined functions
Name
Parameters (e.g. constructor parameters for a TableFunction)
Class
[Optional] User-defined types
Name
Field names and types
[Optional] User-defined local variables (@local_variable)
Name
Value
Job parameters
Batch or streaming execution
Parallelism
Maximum parallelism
State Backend + parameters
Auto watermark interval
Restart strategy
Query Config (e.g. min/max retention time)
[Separate configuration?] SQL client parameters
Gateway properties
(e.g. database properties, server credentials)CLI Client properties
(e.g. timeouts, credentials for authenticating to gateway, result serving speed)
Result Retrieval
In the future, we can use different options for retrieving materialized results both for debugging purposes and long-running maintained views. The retrieval is managed by the executor.
There are three materialization modes:
...
Custom mode (for any result):
custom TableSink into arbitrary systems
Flink writes any result into a destination that is determined by the user using INSERT INTO. The results are not accessible by the executor and can thus not be served via REST or CLI console.
The supported materialization mode also depends on the query type:
Query Type | Internal Mode | External Mode* |
Batch |
| File table sink |
Append Stream |
| Kafka/file table sink |
Retract/Upsert Stream |
| (Compacted Kafka)/Cassandra table sink |
We might use usual heap space at the beginning. The internal database can be any JDBC database. External materialization modes (*) are not included in the first version. In the future, Kafka would be read by general Kafka utility functions. Files as well with support for different file systems.
Result Maintenance
While batch queries have bounded results, streaming queries are potentially never-ending and, therefore, require special treatment for keeping the results up to date and consistent. The streaming query can be considered as a view and the running streaming application as the view maintenance. Results might need to be supplied to systems that were not made for streaming queries, e.g., Java applications that read from a JDBC API. In those cases, every requested result set must be a snapshot (materialized view) of the running query at a point in time. The snapshot must be immutable until all result rows have been consumed by the application or a new result set is requested. We distinguish between two types of results that will require different materialization semantics: a materialized view and a materialized result stream.
Materialized View
A consistent materialized view of results for production use cases. Materialized views are not part of this FLIP but might be added in future versions. It requires another design document for the DDL statement and execution but here are some properties we aim for:
SQL: CREATE MATERIALIZED VIEW ...
intended for long running materialization queries that are updated periodically (e.g. every hour or on successful checkpoints)
retractions are not visible directly, only the materialized result
a result can be accessed by JDBC connections or the REST API (e.g. for powering dashboards)
materialization operators can cooperate with Flink's checkpointing (e.g. only checkpointed results are exposed through the APIs)
a user can specify different parameters for how and how often the view is maintained
(see create_mv_refresh)runs detached from the CLI client
Materialized Result Stream
A materialized stream of results for getting immediate insights into the running SQL query.
SQL: SELECT ...
indended for debugging during query creation and initial show cases
retractions are shown as streams of deletion and insertion
no guarantees about checkpointed results
the executor abstracts the underlying representation and supplies the interfaces for accessing the materialized stream in a FIFO fashion
only one running query per CLI session
cancelled if cancelled in CLI or CLI is closed
We focus on simple SELECT queries first that are materialized on the heap of the executor (internal materialization mode).
Compatibility, Deprecation, and Migration Plan
No compatibility changes or other deprecation necessary.
Implementation Plan
1. Basic Embedded SQL Client
Add the basic features to play around with Flink's streaming SQL.
- Add CLI component that reads the configuration files
- "Pre-registered table sources"
- "Job parameters"
- Add executor for retrieving pre-flight information and corresponding CLI SQL parser
SHOW TABLES
DESCRIBE TABLE
EXPLAIN
Add streaming append query submission to executor
Submit jars and run
SELECT
query using the ClusterClientCollect results on heap and serve them on the CLI side (Internal Mode with SELECT)
EXECUTE
(for executing a SQL statement stored in a local file)
2. Full Embedded SQL Client
Add important features to fully use the SQL client for a variety of use cases.
- Add support for streaming retract/upsert queries
- Add support for batch queries
Add user-defined function support
- Support scalar, aggregate, and table functions
- Submit corresponding jars to the cluster
DESCRIBE FUNCTION
- Add CLI query history
- Add CLI query code completion/highlighting
- Add support for INSERT INTO
- Read configuration about "Pre-registered table sinks"
- Allow submission without
collect()
3. Discuss/design further features
Discuss and prioritize other features that are not part of this FLIP.
Introduce gateway mode with REST API
Add support for catalogs
Allow creating Materialized Views
Support other materialization backends
Create a JDBC API
Further SQL DDL statements:
CREATE TABLE
CREATE TYPE
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.