Status
Current state: Under Discussion
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discussion-flink-elasticsearch-connector-supports-td42082.html#a42106
JIRA:
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
- The flink es connector of only supports sink but doesn't support source. Some company truly has the scenario of reading elasticsearch.
- It wll make the es connector support more thoroughly.
Scope
- We’ll design and develop a es source connector with the following features:
- support es datastream and sql
- es as bounded source and supports lookup join
- support multiple es version(5/6/7)
- support FilterPushDown and ProjectPushDown and LimitPushDown to optimize query performance
Overall Design
- We split logical data into xxInputSplits in hadoop ecosystem. and use xxInputFormt to read data
- Implement DynamicTableSource to support sql
ElasticsearchInputSplit
- We split es type(it is index in high version, because every index just has one default type, and default type is docs) into different ElasticSearchInputSplit. One split corresponds to one shard.
- The split doesn't contain slice info, which you can see below
- You can read here https://www.jianshu.com/p/d32e17dab90c, which is
chinese.But you can konw that slice api has poor performance in es-hadoop
project . - And i found that es-hadoop(https://github.com/elastic/elasticsearch-hadoop) has removed this and disable sliced scrolls by
default. you can see below, which i found in the lastest es-hadoop release
version - Configuration Changes
`es.input.use.sliced.partitions` is deprecated in 6.5.0, and will be removed
in 7.0.0. The default value for `es.input.max.docs.per.partition` (100000)
will also be removed in 7.0.0, thus disabling sliced scrolls by default, and
switching them to be an explicitly opt-in feature.
- You can read here https://www.jianshu.com/p/d32e17dab90c, which is
ElasticSearchInputFormatBase
- ElasticSearchInputFormatBase implementing reading data
Adapt to different es versions
- In order to adapt to different es versions(5 for TransportClient , 6,7 for RestHighLevelClient), we should add these methods in ElasticsearchApiCallBridge
ElasticsearchInputSplit[] createInputSplitsInternal(String index, String type, C client, int minNumSplits); SearchResponse search(C client, SearchRequest searchRequest) throws IOException; SearchResponse scroll(C client, SearchScrollRequest searchScrollRequest) throws IOException; void close(C client) throws IOException;
- take es search as example
//for Elasticsearch5ApiCallBridge @Override public SearchResponse search(TransportClient client, SearchRequest searchRequest) throws IOException { /how to construct SearchRequest/convert elasticsearchBuilder to SearchRequestBuiler return client.search(searchRequest).actionGet(); } //for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge @Override public SearchResponse search(RestHighLevelClient client, SearchRequest searchRequest) throws IOException { //convert elasticsearchBuilder to SearchRequest return client.search(searchRequest); }
SQL/DataStream and configuration for Boundes Source
- for es DynamicTableSource
- for FLIP-122 New Connector Property Keys for New Factory
CREATE TABLE es_table ( ... ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9092', 'index' = 'MyIndex', 'document-type' = 'MyType' 'scan.scroll.max-size'= '1000', 'scan.scroll.timeout'= '1000' ); public static final ConfigOption<Integer> SCROLL_MAX_SIZE_OPTION = ConfigOptions.key("scan.scroll.max-size") .intType() .noDefaultValue() .withDescription("Maximum number of hits to be returned with each Elasticsearch scroll request"); public static final ConfigOption<Duration> SCROLL_TIMEOUT_OPTION = ConfigOptions.key("scan.scroll.timeout") .durationType() .noDefaultValue() .withDescription("Amount of time Elasticsearch will keep the search context alive for scroll requests");
- for DataStream api
Map<String, String> userConfig = new HashMap<>(); userConfig.put("cluster.name", CLUSTER_NAME); DataType dataType = ROW(FIELD("data", STRING())); RowType schema = (RowType) dataType.getLogicalType(); // pass on missing field DeserializationSchema<RowData> deserializationSchema = new JsonRowDataDeserializationSchema( schema, new RowDataTypeInfo(schema), false, false); ElasticSearchInputFormatBase inputFormat = createElasticsearchInputFormat( userConfig, (DeserializationSchema<T>) deserializationSchema, null, //the fields user want "elasticsearch-sink-test-index",//index "flink-es-test-type", //type 1000, //scrollTimeout 10 //scrollMaxSize ); DataStream<RowTypeInfo> dataStream = env.createInput(inputFormat); dataStream.print(); env.execute("Elasticsearch Source Test");
Lookup Source
- how to join with es lookup source
- use TermQueryBuilder and fetchSource, because the lookup join is equal join.
// which is in eval method SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.fetchSource(fieldNames, null); for (int i = 0; i < fieldNames.length; i++) { looupCondition.must(new TermQueryBuilder(keyNames[i], keys[i])); } searchSourceBuilder.query(looupCondition); searchRequest.source(searchSourceBuilder);
- Dimension table cache
- use guava cache and add this arguments
// cache settings final long cacheMaxSize; final long cacheExpireMs;
xxPushDown optimizer
- how to set filedNames(for ProjectPushDown), size(for LimitPushDown), condition (boolQueryBuilder for FilterPushDown)
SearchRequest searchRequest = new SearchRequest(index); if (type == null) { searchRequest.types(Strings.EMPTY_ARRAY); } else { searchRequest.types(type); } this.scroll = new Scroll(TimeValue.timeValueMinutes(scrollTimeout)); searchRequest.scroll(scroll); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); int size; if (limit > 0) { size = (int) Math.min(limit, scrollSize); } else { size = scrollSize; } //es scroll size default value is 10 searchSourceBuilder.size(size); searchSourceBuilder.fetchSource(fieldNames, null); if (predicate != null) { searchSourceBuilder.query(predicate); } else { searchSourceBuilder.query(QueryBuilders.matchAllQuery()); } searchRequest.source(searchSourceBuilder); searchRequest.preference("_shards:" + split.getShard());
- FilterPushDown
- use es TermQueryBuilder to meet "> < = >= <=" compare expression
- use es should/must/mustNot to meet "or/and/not" logical expression
- use es ExistsQueryBuilder to meet "isNull/ isNotNull" expression
- ProjectPushDown
- use es SearchRequest fetchSource
- LimitPushDown
- use es SearchRequest size
Test Plan
- after discussion, we will implement it and add unit and integration test and end to end test