You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state Under Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discussion-flink-elasticsearch-connector-supports-td42082.html#a42106

JIRA: Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

  • The flink es connector of  only supports sink but doesn't support source. Some company truly has the scenario of reading elasticsearch.
  • It wll make the es connector support more thoroughly.

Scope

  • We’ll design and develop a es source connector with the following features:
    • support es datastream and sql
    • es as bounded source and supports lookup join
    • support multiple es version(5/6/7)
    • support FilterPushDown and ProjectPushDown and LimitPushDown to optimize query performance

Overall Design

  • We split logical data into xxInputSplits in hadoop ecosystem. and use xxInputFormt to read data
  • Implement DynamicTableSource to support sql

ElasticsearchInputSplit

  • We split es type(it is index in high version, because every index just has one default type, and default type is docs) into different ElasticSearchInputSplit. One split corresponds to one shard.
  • The split doesn't contain slice info, which you can see below 
    • You can read here https://www.jianshu.com/p/d32e17dab90c, which is
      chinese.But you can konw that slice api has poor performance in es-hadoop
      project .
    • And i found that es-hadoop(https://github.com/elastic/elasticsearch-hadoop) has removed this and disable sliced scrolls by
      default. you can see below, which i found in the lastest es-hadoop release
      version
    • Configuration Changes
      `es.input.use.sliced.partitions` is deprecated in 6.5.0, and will be removed
      in 7.0.0. The default value for `es.input.max.docs.per.partition` (100000)
      will also be removed in 7.0.0, thus disabling sliced scrolls by default, and
      switching them to be an explicitly opt-in feature.

      

ElasticSearchInputFormatBase

  • ElasticSearchInputFormatBase implementing reading data

        


Adapt to different es versions

  • In order to adapt to different es versions(5 for TransportClient , 6,7 for RestHighLevelClient), we should add these methods in ElasticsearchApiCallBridge
	ElasticsearchInputSplit[] createInputSplitsInternal(String index, String type, C client, int minNumSplits);

	SearchResponse search(C client, SearchRequest searchRequest) throws IOException;

	SearchResponse scroll(C client, SearchScrollRequest searchScrollRequest) throws IOException;


	void close(C client) throws IOException;
  • take es search as example
//for Elasticsearch5ApiCallBridge
@Override
public SearchResponse search(TransportClient client, SearchRequest searchRequest) throws IOException {
  /how to construct SearchRequest/convert elasticsearchBuilder to SearchRequestBuiler
  return client.search(searchRequest).actionGet();
}

//for Elasticsearch6ApiCallBridge and Elasticsearch7ApiCallBridge
@Override
public SearchResponse search(RestHighLevelClient client, SearchRequest searchRequest) throws IOException {
    //convert elasticsearchBuilder to SearchRequest
  return client.search(searchRequest);
}

SQL/DataStream and configuration for Boundes Source

  • for FLIP-122 New Connector Property Keys for New Factory
    • for es DynamicTableSource
CREATE TABLE es_table (
   ...
  ) WITH (
   'connector' = 'elasticsearch-7',
   'hosts' = 'http://localhost:9092',
   'index' = 'MyIndex',
   'document-type' = 'MyType'
   'scan.scroll.max-size'= '1000',
   'scan.scroll.timeout'= '1000'
  );


	public static final ConfigOption<Integer> SCROLL_MAX_SIZE_OPTION =
		ConfigOptions.key("scan.scroll.max-size")
			.intType()
			.noDefaultValue()
			.withDescription("Maximum number of hits to be returned with each Elasticsearch scroll request");

	public static final ConfigOption<Duration> SCROLL_TIMEOUT_OPTION =
		ConfigOptions.key("scan.scroll.timeout")
			.durationType()
			.noDefaultValue()
			.withDescription("Amount of time Elasticsearch will keep the search context alive for scroll requests");
  • for DataStream api
		Map<String, String> userConfig = new HashMap<>();
		userConfig.put("cluster.name", CLUSTER_NAME);
		DataType dataType = ROW(FIELD("data", STRING()));
		RowType schema = (RowType) dataType.getLogicalType();

		// pass on missing field
		DeserializationSchema<RowData> deserializationSchema = new JsonRowDataDeserializationSchema(
			schema, new RowDataTypeInfo(schema), false, false);

		ElasticSearchInputFormatBase inputFormat = createElasticsearchInputFormat(
			userConfig,
			(DeserializationSchema<T>) deserializationSchema,
			null,	//the fields user want
			"elasticsearch-sink-test-index",//index
			"flink-es-test-type", //type
			1000,  //scrollTimeout
			10  //scrollMaxSize
		);
		DataStream<RowTypeInfo> dataStream = env.createInput(inputFormat);
		dataStream.print();
		env.execute("Elasticsearch Source Test");

Lookup Source

  • how to join with es lookup source
    • use TermQueryBuilder and fetchSource, because the lookup join is equal join.
		// which is in eval method
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
		searchSourceBuilder.fetchSource(fieldNames, null);

		for (int i = 0; i < fieldNames.length; i++) {
			looupCondition.must(new TermQueryBuilder(keyNames[i], keys[i]));
		}
		searchSourceBuilder.query(looupCondition);
		searchRequest.source(searchSourceBuilder);
  • Dimension table cache 
    • use guava cache and add this arguments
 // cache settings
  final long cacheMaxSize;
  final long cacheExpireMs;


xxPushDown optimizer

how to set filedNames(for ProjectPushDown), size(for LimitPushDown), condition (boolQueryBuilder for FilterPushDown)

		SearchRequest searchRequest = new SearchRequest(index);
		if (type == null) {
			searchRequest.types(Strings.EMPTY_ARRAY);
		} else {
			searchRequest.types(type);
		}
		this.scroll = new Scroll(TimeValue.timeValueMinutes(scrollTimeout));
		searchRequest.scroll(scroll);
		SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
		int size;
		if (limit > 0) {
			size = (int) Math.min(limit, scrollSize);
		} else {
			size = scrollSize;
		}
		//es scroll size default value is 10
		searchSourceBuilder.size(size);
		searchSourceBuilder.fetchSource(fieldNames, null);
		if (predicate != null) {
			searchSourceBuilder.query(predicate);
		} else {
			searchSourceBuilder.query(QueryBuilders.matchAllQuery());
		}
		searchRequest.source(searchSourceBuilder);
		searchRequest.preference("_shards:" + split.getShard());
  • FilterPushDown
    • use es TermQueryBuilder to meet "> < = >= <=" compare expression
    • use es should/must/mustNot to meet "or/and/not" logical expression
    • use es ExistsQueryBuilder to meet "isNull/ isNotNull" expression
  • ProjectPusgDown
    • use es SearchRequest fetchSource 
  • LimitPushDown
    • use es SearchRequest size

Test Plan


  • after discussion, we will implement it and add unit and integration test and end to end test


Rejected Alternatives


  • No labels