Status
Current state: Under Discussion
Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discussion-flink-elasticsearch-connector-supports-td42082.html#a42106
...
Page properties |
---|
Discussion thread |
|
---|
Vote thread |
|
---|
JIRA | Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-16713 |
---|
|
|
---|
|
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
- The flink es connector of only supports sink but doesn't support source. Some company truly has the scenario of reading elasticsearch.
- It wll make the es connector support more thoroughly.
Scope
- We’ll design and develop a es source connector with the following features:
- support es datastream and sql
- es as bounded source and supports lookup join
- support multiple es version(5/6/7)
- support FilterPushDown and ProjectPushDown and LimitPushDown to optimize query performance
Overall Design
- We split logical data into xxInputSplits in hadoop ecosystem. and use xxInputFormt to read data
- Implement DynamicTableSource to support sql
- We split es type(it is index in high version, because every index just has one default type, and default type is docs) into different ElasticSearchInputSplit. One split corresponds to one shard.
- The split doesn't contain slice info, which you can see below
- You can read here https://www.jianshu.com/p/d32e17dab90c, which is
chinese.But you can konw that slice api has poor performance in es-hadoop
project .
- And i found that es-hadoop(https://github.com/elastic/elasticsearch-hadoop) has removed this and disable sliced scrolls by
default. you can see below, which i found in the lastest es-hadoop release
version
- Configuration Changes
`es.input.use.sliced.partitions` is deprecated in 6.5.0, and will be removed
in 7.0.0. The default value for `es.input.max.docs.per.partition` (100000)
will also be removed in 7.0.0, thus disabling sliced scrolls by default, and
switching them to be an explicitly opt-in feature.
ElasticSearchInputFormatBase
- ElasticSearchInputFormatBase implementing reading data
Adapt to different es versions
- In order to adapt to different es versions(5 for TransportClient , 6,7 for RestHighLevelClient), we should add these methods in ElasticsearchApiCallBridge
Code Block |
---|
language | java |
---|
linenumbers | true |
---|
collapse | true |
---|
|
ElasticsearchInputSplit[] createInputSplitsInternal(String index, String type, C client, int minNumSplits);
SearchResponse search(C client, SearchRequest searchRequest) throws IOException;
SearchResponse scroll(C client, SearchScrollRequest searchScrollRequest) throws IOException;
void close(C client) throws IOException; |
- take es search as example
Code Block |
---|
|
//for Elasticsearch5ApiCallBridge
@Override
public SearchResponse search(TransportClient client, SearchRequest searchRequest) throws IOException {
/how to construct SearchRequest/convert elasticsearchBuilder to SearchRequestBuiler
return client.search(searchRequest).actionGet();
}
//for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge
@Override
public SearchResponse search(RestHighLevelClient client, SearchRequest searchRequest) throws IOException {
//convert elasticsearchBuilder to SearchRequest
return client.search(searchRequest);
} |
SQL/DataStream and configuration for Boundes Source
- for es DynamicTableSource
- for FLIP-122 New Connector Property Keys for New Factory
- for es DynamicTableSource
Code Block |
---|
|
CREATE TABLE es_table (
...
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9092',
'index' = 'MyIndex',
'document-type' = 'MyType'
'scan.scroll.max-size'= '1000',
'scan.scroll.timeout'= '1000'
);
public static final ConfigOption<Integer> SCROLL_MAX_SIZE_OPTION =
ConfigOptions.key("scan.scroll.max-size")
.intType()
.noDefaultValue()
.withDescription("Maximum number of hits to be returned with each Elasticsearch scroll request");
public static final ConfigOption<Duration> SCROLL_TIMEOUT_OPTION =
ConfigOptions.key("scan.scroll.timeout")
.durationType()
.noDefaultValue()
.withDescription("Amount of time Elasticsearch will keep the search context alive for scroll requests"); |
Code Block |
---|
|
Map<String, String> userConfig = new HashMap<>();
userConfig.put("cluster.name", CLUSTER_NAME);
DataType dataType = ROW(FIELD("data", STRING()));
RowType schema = (RowType) dataType.getLogicalType();
// pass on missing field
DeserializationSchema<RowData> deserializationSchema = new JsonRowDataDeserializationSchema(
schema, new RowDataTypeInfo(schema), false, false);
ElasticSearchInputFormatBase inputFormat = createElasticsearchInputFormat(
userConfig,
(DeserializationSchema<T>) deserializationSchema,
null, //the fields user want
"elasticsearch-sink-test-index",//index
"flink-es-test-type", //type
1000, //scrollTimeout
10 //scrollMaxSize
);
DataStream<RowTypeInfo> dataStream = env.createInput(inputFormat);
dataStream.print();
env.execute("Elasticsearch Source Test");
|
Lookup Source
- how to join with es lookup source
- use TermQueryBuilder and fetchSource, because the lookup join is equal join.
Code Block |
---|
|
// which is in eval method
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.fetchSource(fieldNames, null);
for (int i = 0; i < fieldNames.length; i++) {
looupCondition.must(new TermQueryBuilder(keyNames[i], keys[i]));
}
searchSourceBuilder.query(looupCondition);
searchRequest.source(searchSourceBuilder); |
- Dimension table cache
- use guava cache and add this arguments
Code Block |
---|
|
// cache settings
final long cacheMaxSize;
final long cacheExpireMs; |
xxPushDown optimizer
- how to set filedNames(for ProjectPushDown), size(for LimitPushDown), condition (boolQueryBuilder for FilterPushDown)
Code Block |
---|
|
SearchRequest searchRequest = new SearchRequest(index);
if (type == null) {
searchRequest.types(Strings.EMPTY_ARRAY);
} else {
searchRequest.types(type);
}
this.scroll = new Scroll(TimeValue.timeValueMinutes(scrollTimeout));
searchRequest.scroll(scroll);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
int size;
if (limit > 0) {
size = (int) Math.min(limit, scrollSize);
} else {
size = scrollSize;
}
//es scroll size default value is 10
searchSourceBuilder.size(size);
searchSourceBuilder.fetchSource(fieldNames, null);
if (predicate != null) {
searchSourceBuilder.query(predicate);
} else {
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
}
searchRequest.source(searchSourceBuilder);
searchRequest.preference("_shards:" + split.getShard()); |
- FilterPushDown
- use es TermQueryBuilder to meet "> < = >= <=" compare expression
- use es should/must/mustNot to meet "or/and/not" logical expression
- use es ExistsQueryBuilder to meet "isNull/ isNotNull" expression
- ProjectPushDownProjectPusgDown
- use es SearchRequest fetchSource
- LimitPushDown
- use es SearchRequest size
...