THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.

1. Background

    Kylin 4 is a major architecture upgrade version, as the picture shown below, both cube building engine and query engine use spark as calculation engine, and cube data is stored in parquet files instead of Hbase.

So the build/query performance tuning is very different from Kylin 3 tuning. This article will introduce how to improve cube build and query performance in Kylin 4, including some tuning ways which have been done by Kylin 4 automatically.

2. Cube building performance tuning

    In Kylin 4, there are two steps in the cube building job, the first step detects how many source files which will be built as cube data, and the second one is to build the snapshot tables (if need), generate the global dictionary (if need) and build cube data as parquet files. In the second step, all calculations are operations with a relatively heavy load, so except using Joint and Hierarchy on Dimensions to reduce the number of cuboids ( refers to the section 'Reduce combinations' in http://kylin.apache.org/docs/tutorial/cube_build_performance.html ), it’s also very important to use the proper spark resources and configurations to build cube data. There are 3 key points in this section to improve cube build performance.


2.1 Use the proper Spark resources and configurations to build cube data

    Now the spark application runs on yarn mode, the relevant configurations are shown below:

KeyDescription

spark.executor.instances

The number of executors for spark application.

spark.executor.cores

The number of cores to use on each executor. The value of 'spark.executor.instances' * 'spark.executor.cores' means the maximum parallelism when running cube building job.

spark.executor.memory

Amount of memory to use per executor process. Generally speaking, the ratio of core to memory is 1:4, for example, if you set 'spark.executor.cores' to 4, and then set 'spark.executor.memory' to 16G.

spark.executor.memoryOverhead

The amount of off-heap memory to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

spark.sql.shuffle.partitions

Configures the number of partitions to use when shuffling data for joins or aggregations, default value is 200. Larger value requires more CPU resources, while smaller value requires more memory resources.

spark.sql.files.maxPartitionBytes

The maximum number of bytes to pack into a single partition when reading files, default value is 128M. If there are many small files in source tables (Hive source), spark will automatically pack a number of small files into a single partition to avoid too many small tasks.

    You can set these configurations with a 'kylin.engine.spark-conf.' prefix in 'kylin.properties' file, and Kylin 4 will use them to allocate spark resources for cube building job. Similar to the tuning in spark + parquet, you can find out some problems through the Spark UI and change some configurations to improve performance, there are many articles describing how to improve the performance in spark + parquet, such as http://spark.apache.org/docs/2.4.6/sql-performance-tuning.html and http://spark.apache.org/docs/2.4.6/tuning.html . 

    If you don't know how to set these configurations properly, Kylin 4 will use below allocation rules to automatically set spark resources and configurations, all spark resources and configurations are set according to the maximum file size of source files and whether cube has accurate count distinct measure, this is the reason why we need to detect how many source files which will be built in the first step. You can see these allocation rules in the class 'SparkConfHelper':

  • ExecutorMemoryRule

        If ${the maximum file size} >= 100G and ${exist accurate count distinct}, then set 'spark.executor.memory' to 20G;

        If ${the maximum file size} >= 100G or (${the maximum file size} >= 10G and ${exist accurate count distinct}), then set 'spark.executor.memory' to 16G;

        If ${the maximum file size} >= 10G or (${the maximum file size} >= 1G and ${exist accurate count distinct}), then set 'spark.executor.memory' to 10G;

        If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.memory' to 4G;

        Otherwise set 'spark.executor.memory' to 1G.

  • ExecutorCoreRule

        If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.cores' to 5;

        Otherwise set 'spark.executor.cores' to 1.

  • ExecutorOverheadRule

        If ${the maximum file size} >= 100G and ${exist accurate count distinct}, then set 'spark.executor.memoryOverhead' to 6G, so in this case, the memory of per executor is 20G + 6G = 26G;

        If ${the maximum file size} >= 100G or (${the maximum file size} >= 10G and ${exist accurate count distinct}), then set 'spark.executor.memoryOverhead' to 4G;

        If ${the maximum file size} >= 10G or (${the maximum file size} >= 1G and ${exist accurate count distinct}), then set 'spark.executor.memoryOverhead' to 2G;

        If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.memoryOverhead' to 1G;

        Otherwise set 'spark.executor.memoryOverhead' to 512M.

  • ExecutorInstancesRule

        The steps to set 'spark.executor.instances' are as following:

  1. Get the value of required cores, default value is 1;
  2. Get the value of configuration 'kylin.engine.base-executor-instance' as basic executor instances, default value is 5;
  3. According to the number of the cuboids, calculate the required number of executor instances: ${calculateExecutorInsByCuboidSize}. The configuration of the calculation strategy is 'kylin.engine.executor-instance-strategy', default value is '100,2,500,3,1000,4', which means if the number of the cuboids is greater and equal than 100, the factor is 2, and then the number of executor instances is ${basic executor instances} * ${factor} = 10, if greater and equal than 500, the factor is 3, and so on.
  4. Get the available memory and cores count of the default pool from yarn: ${availableMem} and ${availableCore};
  5. Get the sum memory value after applying 'ExecutorOverheadRule' and 'ExecutorMemoryRule' :  ${executorMem} = ${spark.executor.memory} + ${spark.executor.memoryOverhead};
  6. Get the cores count after applying 'ExecutorCoreRule': ${executorCore}
  7. According to ${availableMem}, ${availableCore}, ${executorCore} and ${executorMem}, calculate the maximum executor instances count which can require from yarn: ${queueAvailableInstance} = Math.min(${availableMem} / ${executorMem}, ${availableCore} / ${executorCore}); The purpose of this step is to avoid applying for more than the available resources on yarn.
  8. Get the final executor instances count: ${executorInstance} = Math.max(Math.min(${calculateExecutorInsByCuboidSize}, ${queueAvailableInstance}), ${kylin.engine.base-executor-instance});
  9. Set 'spark.executor.instances' to ${executorInstance};
  • ShufflePartitionsRule

        Set 'spark.sql.shuffle.partitions' to the value of 'max(2, ${the maximum file size in MB} / 32)';


    After applying all allocation rules mentioned above, you can find some log messages in 'kylin.log file' shown below:

    Based on the value of automatically adjusted configurations by Kylin, if there are still some cube building performance issues, you can appropriately change the value of these configurations to have a try, for example:











  • No labels