Status

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

  • The flink es connector of  only supports sink but doesn't support source. Some company truly has the scenario of reading elasticsearch.
  • It wll make the es connector support more thoroughly.

Scope

  • We’ll design and develop a es source connector with the following features:
    • support es datastream and sql
    • es as bounded source and supports lookup join
    • support multiple es version(5/6/7)
    • support FilterPushDown and ProjectPushDown and LimitPushDown to optimize query performance

Overall Design

  • We split logical data into xxInputSplits in hadoop ecosystem. and use xxInputFormt to read data
  • Implement DynamicTableSource to support sql

ElasticsearchInputSplit

  • We split es type(it is index in high version, because every index just has one default type, and default type is docs) into different ElasticSearchInputSplit. One split corresponds to one shard.
  • The split doesn't contain slice info, which you can see below 
    • You can read here https://www.jianshu.com/p/d32e17dab90c, which is
      chinese.But you can konw that slice api has poor performance in es-hadoop
      project .
    • And i found that es-hadoop(https://github.com/elastic/elasticsearch-hadoop) has removed this and disable sliced scrolls by
      default. you can see below, which i found in the lastest es-hadoop release
      version
    • Configuration Changes
      `es.input.use.sliced.partitions` is deprecated in 6.5.0, and will be removed
      in 7.0.0. The default value for `es.input.max.docs.per.partition` (100000)
      will also be removed in 7.0.0, thus disabling sliced scrolls by default, and
      switching them to be an explicitly opt-in feature.

      

ElasticSearchInputFormatBase

  • ElasticSearchInputFormatBase implementing reading data

        


Adapt to different es versions

  • In order to adapt to different es versions(5 for TransportClient , 6,7 for RestHighLevelClient), we should add these methods in ElasticsearchApiCallBridge
ElasticsearchInputSplit[] createInputSplitsInternal(String index, String type, C client, int minNumSplits);

SearchResponse search(C client, SearchRequest searchRequest) throws IOException;

SearchResponse scroll(C client, SearchScrollRequest searchScrollRequest) throws IOException;

void close(C client) throws IOException;


  • take es search as example
//for Elasticsearch5ApiCallBridge
@Override
public SearchResponse search(TransportClient client, SearchRequest searchRequest) throws IOException {
  /how to construct SearchRequest/convert elasticsearchBuilder to SearchRequestBuiler
  return client.search(searchRequest).actionGet();
}

//for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge
@Override
public SearchResponse search(RestHighLevelClient client, SearchRequest searchRequest) throws IOException {
    //convert elasticsearchBuilder to SearchRequest
  return client.search(searchRequest);
}

SQL/DataStream and configuration for Boundes Source

  • for es DynamicTableSource
    • for FLIP-122 New Connector Property Keys for New Factory
CREATE TABLE es_table (
   ...
  ) WITH (
   'connector' = 'elasticsearch-7',
   'hosts' = 'http://localhost:9092',
   'index' = 'MyIndex',
   'document-type' = 'MyType'
   'scan.scroll.max-size'= '1000',
   'scan.scroll.timeout'= '1000'
  );


	public static final ConfigOption<Integer> SCROLL_MAX_SIZE_OPTION =
		ConfigOptions.key("scan.scroll.max-size")
			.intType()
			.noDefaultValue()
			.withDescription("Maximum number of hits to be returned with each Elasticsearch scroll request");

	public static final ConfigOption<Duration> SCROLL_TIMEOUT_OPTION =
		ConfigOptions.key("scan.scroll.timeout")
			.durationType()
			.noDefaultValue()
			.withDescription("Amount of time Elasticsearch will keep the search context alive for scroll requests");


  • for DataStream api
		Map<String, String> userConfig = new HashMap<>();
		userConfig.put("cluster.name", CLUSTER_NAME);
		DataType dataType = ROW(FIELD("data", STRING()));
		RowType schema = (RowType) dataType.getLogicalType();

		// pass on missing field
		DeserializationSchema<RowData> deserializationSchema = new JsonRowDataDeserializationSchema(
			schema, new RowDataTypeInfo(schema), false, false);

		ElasticSearchInputFormatBase inputFormat = createElasticsearchInputFormat(
			userConfig,
			(DeserializationSchema<T>) deserializationSchema,
			null,	//the fields user want
			"elasticsearch-sink-test-index",//index
			"flink-es-test-type", //type
			1000,  //scrollTimeout
			10  //scrollMaxSize
		);
		DataStream<RowTypeInfo> dataStream = env.createInput(inputFormat);
		dataStream.print();
		env.execute("Elasticsearch Source Test");

Lookup Source

  • how to join with es lookup source
    • use TermQueryBuilder and fetchSource, because the lookup join is equal join.
		// which is in eval method
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
		searchSourceBuilder.fetchSource(fieldNames, null);

		for (int i = 0; i < fieldNames.length; i++) {
			looupCondition.must(new TermQueryBuilder(keyNames[i], keys[i]));
		}
		searchSourceBuilder.query(looupCondition);
		searchRequest.source(searchSourceBuilder);
  • Dimension table cache 
    • use guava cache and add this arguments
 // cache settings
  final long cacheMaxSize;
  final long cacheExpireMs;


xxPushDown optimizer

  • how to set filedNames(for ProjectPushDown), size(for LimitPushDown), condition (boolQueryBuilder for FilterPushDown)
		SearchRequest searchRequest = new SearchRequest(index);
		if (type == null) {
			searchRequest.types(Strings.EMPTY_ARRAY);
		} else {
			searchRequest.types(type);
		}
		this.scroll = new Scroll(TimeValue.timeValueMinutes(scrollTimeout));
		searchRequest.scroll(scroll);
		SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
		int size;
		if (limit > 0) {
			size = (int) Math.min(limit, scrollSize);
		} else {
			size = scrollSize;
		}
		//es scroll size default value is 10
		searchSourceBuilder.size(size);
		searchSourceBuilder.fetchSource(fieldNames, null);
		if (predicate != null) {
			searchSourceBuilder.query(predicate);
		} else {
			searchSourceBuilder.query(QueryBuilders.matchAllQuery());
		}
		searchRequest.source(searchSourceBuilder);
		searchRequest.preference("_shards:" + split.getShard());



  • FilterPushDown
    • use es TermQueryBuilder to meet "> < = >= <=" compare expression
    • use es should/must/mustNot to meet "or/and/not" logical expression
    • use es ExistsQueryBuilder to meet "isNull/ isNotNull" expression
  • ProjectPushDown
    • use es SearchRequest fetchSource 
  • LimitPushDown
    • use es SearchRequest size

Test Plan


  • after discussion, we will implement it and add unit and integration test and end to end test


Rejected Alternatives