THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.


Release History

IDDateAuthorComment
12020-11Zhichao ZhangTuning guide for 4.0.0-alpha, 




Background


    Kylin 4 is a major architecture upgrade version, as the picture shown below, both cube building engine and query engine use spark as calculation engine, and cube data is stored in parquet files instead of HBase.

    So the build/query performance tuning is very different from Kylin 3 tuning(http://kylin.apache.org/docs/howto/howto_optimize_build.html). This article will introduce how to improve cube build and query performance in Kylin 4, including some tuning ways which have been done by Kylin 4 automatically. 


Cube building performance tuning


    In Kylin 4, there are two steps in the cube building job, the first step detects how many source files which will be built as cube data, and the second one is to build the snapshot tables (if need), generate the global dictionary (if need) and build cube data as parquet files. In the second step, all calculations are operations with a relatively heavy load, so except using Joint and Hierarchy on Dimensions to reduce the number of cuboids ( refers to the section 'Reduce combinations' in http://kylin.apache.org/docs/tutorial/cube_build_performance.html ), it’s also very important to use the proper spark resources and configurations to build cube data. There are 3 key points in this section to improve cube building performance.

Use the proper Spark resources and configurations to build cube data

    Now the spark application runs on yarn mode, the relevant configurations are shown below:

KeyDescription

spark.executor.instances

The number of executors for spark application.

spark.executor.cores

The number of cores to use on each executor. The value of 'spark.executor.instances' * 'spark.executor.cores' means the maximum parallelism when running cube building job.

spark.executor.memory

Amount of memory to use per executor process. Generally speaking, the ratio of core to memory is 1:4, for example, if you set 'spark.executor.cores' to 4, and then set 'spark.executor.memory' to 16G.

spark.executor.memoryOverhead

The amount of off-heap memory to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

spark.sql.shuffle.partitions

Configures the number of partitions to use when shuffling data for joins or aggregations, default value is 200. Larger value requires more CPU resources, while smaller value requires more memory resources.

spark.sql.files.maxPartitionBytes

The maximum number of bytes to pack into a single partition when reading files, default value is 128M. If there are many small files in source tables (Hive source), spark will automatically pack a number of small files into a single partition to avoid too many small tasks.

Automically setting spark configurations

    You can set these configurations with a 'kylin.engine.spark-conf.' prefix in 'kylin.properties' file, for example: 'kylin.engine.spark-conf.spark.executor.instances', and Kylin 4 will use them to allocate spark resources for cube building job. Similar to the tuning in spark + parquet, you can find out some problems through the Spark UI and change some configurations to improve performance, there are many articles describing how to improve the performance in spark + parquet, such as http://spark.apache.org/docs/2.4.6/sql-performance-tuning.html and http://spark.apache.org/docs/2.4.6/tuning.html . 

    If you don't know how to set these configurations properly, Kylin 4 will use below allocation rules to automatically set spark resources and configurations, all spark resources and configurations are set according to the maximum file size of source files and whether cube has accurate count distinct measure, this is the reason why we need to detect how many source files which will be built in the first step. You can see these allocation rules in the class 'SparkConfHelper':

  • ExecutorMemoryRule

If ${the maximum file size} >= 100G and ${exist accurate count distinct}, then set 'spark.executor.memory' to 20G;

        If ${the maximum file size} >= 100G or (${the maximum file size} >= 10G and ${exist accurate count distinct}), then set 'spark.executor.memory' to 16G;

        If ${the maximum file size} >= 10G or (${the maximum file size} >= 1G and ${exist accurate count distinct}), then set 'spark.executor.memory' to 10G;

        If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.memory' to 4G;

        Otherwise set 'spark.executor.memory' to 1G.

  • ExecutorCoreRule

        If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.cores' to 5;

        Otherwise set 'spark.executor.cores' to 1.

  • ExecutorOverheadRule

        If ${the maximum file size} >= 100G and ${exist accurate count distinct}, then set 'spark.executor.memoryOverhead' to 6G, so in this case, the memory of per executor is 20G + 6G = 26G;

        If ${the maximum file size} >= 100G or (${the maximum file size} >= 10G and ${exist accurate count distinct}), then set 'spark.executor.memoryOverhead' to 4G;

        If ${the maximum file size} >= 10G or (${the maximum file size} >= 1G and ${exist accurate count distinct}), then set 'spark.executor.memoryOverhead' to 2G;

        If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.memoryOverhead' to 1G;

        Otherwise set 'spark.executor.memoryOverhead' to 512M.

  • ExecutorInstancesRule

        The steps to set 'spark.executor.instances' are as following:

  1. Get the value of required cores, default value is 1;
  2. Get the value of configuration 'kylin.engine.base-executor-instance' as basic executor instances, default value is 5;
  3. According to the number of the cuboids, calculate the required number of executor instances: ${calculateExecutorInsByCuboidSize}. The configuration of the calculation strategy is 'kylin.engine.executor-instance-strategy', default value is '100,2,500,3,1000,4', which means if the number of the cuboids is greater and equal than 100, the factor is 2, and then the number of executor instances is ${basic executor instances} * ${factor} = 10, if greater and equal than 500, the factor is 3, and so on.
  4. Get the available memory and cores count of the default pool from yarn: ${availableMem} and ${availableCore};
  5. Get the sum memory value after applying 'ExecutorOverheadRule' and 'ExecutorMemoryRule' :  ${executorMem} = ${spark.executor.memory} + ${spark.executor.memoryOverhead};
  6. Get the cores count after applying 'ExecutorCoreRule': ${executorCore}
  7. According to ${availableMem}, ${availableCore}, ${executorCore} and ${executorMem}, calculate the maximum executor instances count which can request from yarn: ${queueAvailableInstance} = Math.min(${availableMem} / ${executorMem}, ${availableCore} / ${executorCore}); The purpose of this step is to avoid applying for more than the available resources on yarn.
  8. Get the final executor instances count: ${executorInstance} = Math.max(Math.min(${calculateExecutorInsByCuboidSize}, ${queueAvailableInstance}), ${kylin.engine.base-executor-instance});
  9. Set 'spark.executor.instances' to ${executorInstance};
  • ShufflePartitionsRule

        Set 'spark.sql.shuffle.partitions' to the value of 'max(2, ${the maximum file size in MB} / 32)';

    After applying all allocation rules mentioned above, you can find some log messages in 'kylin.log' file shown below:

Manually setting spark configurations ( if need )

    Based on the values of automatically adjusted configurations by Kylin, if there are still some cube building performance issues, you can appropriately change the values of these configurations to have a try, for example:

  • If you observe from the spark ui that there is serious GC phenomenon in some tasks, or find a large number of executor lost or fetch failure errors, you can change the value of these two configurations to increase the memory of per executor:
    • spark.executor.memory=
    • spark.executor.memoryOverhead=

The general adjustment strategy is to adjust the value up by 2 times. If the problem is solved, you can adjust it down appropriately to avoid wasting resources. After      increasing the memory of per executor, if there is still serious memory problem, you can consider adjusting 'spark.executor.cores' to 1, this adjustment can make a single task exclusive memory of per executor and the execution efficiency is relatively low, but it can be done in this way to avoid build failure.


  • If you observe from the spark ui that there are a large number of tasks that need to be scheduled for multiple rounds (each round eats all cores), you can change the value of these two configurations to increase the cores count of spark application:
    • spark.executor.cores=
    • spark.executor.instances=

         The general adjustment strategy is to adjust the value up by 2 times. If the problem is solved, you can adjust it down appropriately to avoid wasting resources.


  • If there are some executor lost or fetch failure errors, and just because the number of reducers during shuffling is too small, or the data is skewed, you can try to  increase the value of 'spark.sql.shuffle.partitions'.


  • If you observe from the spark ui that the duration time of the job is more than the sum duration time of stages, this means that the core resources is not enough and there are many jobs are waiting for core resources to be scheduled:

  The duration time of this job is 3.0 min, but the sum duration time of stages which are belong to this job is 17s + 2s = 19s, the stage 204 waited more than 2.0 min to   be scheduled.

In this case, you need to increase the cores count of spark application.

Global dictionary building performance tuning

    If cube has accurate count distinct measures, Kylin 4.0 will build the global dictionary for these measure columns in the second step based on Spark for distributed encoding processing, which reduces the pressure on a single machine node, and can break the limit of the maximum integer of the global dictionary, please refer to the detail design article: https://cwiki.apache.org/confluence/display/KYLIN/Global+Dictionary+on+Spark . There are one configuration about tuning on global dictionary building:

    kylin.dictionary.globalV2-threshold-bucket-size  (default value is 500000)

    Reducing the value of this configuration can reduce the amount of data in a single partition to build the global dictionary and speed up the dictionary building.

Snapshot tables building performance tuning

    If there are some snapshot tables to be built, Kylin 4.0 will build them parallelly in the second step, because the default value of the configuration 'kylin.snapshot.parallel-build-enabled' is true, which will speed up the snapshot tables building.

    On the other hand, you can reduce the value of configuration 'kylin.snapshot.shard-size-mb' (default value is 128MB) to increase the parallelism when building snapshot tables. According to the size of the source table, make sure the number of the building tasks is within 3 times the number of cores  of spark cube building application.


Query performance tuning


    In Kylin 4.0, query engine (called SparderContext) uses spark as calculation engine too, it's real distributed query engine, especially for complex query, the performance will be better than calcite. However there are still many key performance points that need to be optimized. In addition to setting proper calculation resources mentioned above, it also includes reducing small or unevenness files, setting proper partitions, and pruning parquet files as many as possible. Kylin 4.0 and Spark provide some optimization strategies to improve query performance.

Reduce small or unevenness parquet files

    Reading too many small files or a few too big files when querying will lead to low performance, in order to avoid this problem, Kylin 4.0 will repartition parquet files according to the following strategy to reduce small or uneven parquet files when building cube data as parquet files.

The strategy to check whether need to repartition

    If the following conditions are met:

  1. If this cuboid has shard by column;
  2. The average size of parquet files which have saved < the value of configuration 'kylin.storage.columnar.repartition-threshold-size-mb' && the number of parquet files is bigger than 1;  This condition is to avoid too many small files;
  3. The number of parquet files < (the total row count of parquet files / 'kylin.storage.columnar.shard-rowcount' * 0.75), if this cuboid has accurate count distinct measure, use 'kylin.storage.columnar.shard-countdistinct-rowcount' instead of 'kylin.storage.columnar.shard-rowcount'; This condition is to avoid uneven files;

    If meet the one of the conditions above, it will do repartition, the number of the partitions is calculated by this way:

        ${fileLengthRepartitionNum} = Math.ceil(${the parquet files size in MB} / ${kylin.storage.columnar.shard-size-mb})

        ${rowCountRepartitionNum} = Math.ceil(${the total row count of parquet files} / ${kylin.storage.columnar.shard-rowcount})

    if this cuboid has accurate count distinct measure, use 'kylin.storage.columnar.shard-countdistinct-rowcount' instead of 'kylin.storage.columnar.shard-rowcount'.

    The number of the partitions is :

        Math.ceil(( ${fileLengthRepartitionNum} + ${ rowCountRepartitionNum } ) / 2)

The relevant configurations

KeyDefault valueDescription

kylin.storage.columnar.shard-size-mb

128MB

The max size of each parquet file for shard by column, in MB.

kylin.storage.columnar.shard-rowcount

2500000

Each parquet files should contain at most 2.5 million rows.

kylin.storage.columnar.shard-countdistinct-rowcount

1000000

Since that fact that Bitmap has bigger size, so we can specific the max row count for cuboid with contain Bitmap. By default it contains at most 1.0 million row.

kylin.storage.columnar.repartition-threshold-size-mb

128MB

The max size of each parquet file, in MB.

How to set the above configurations properly

    You can use this command to find the repartition info messages in the kylin.log file after building cube data:

grep "Before repartition, cuboid" logs/kylin.log

    According to the log messages, you can find that the final number of partitions is too large, this will impact the building performance and query performance, after increasing the value of configuration 'kylin.storage.columnar.shard-rowcount' or 'kylin.storage.columnar.shard-countdistinct-rowcount' and rebuilding again, the log messages are shown below:

    The final number of partitions was reduced a lot: 809 to 3, and the time of cube building was reduced a lot too: 58 mins to 24 mins:

    And the query performance is improved too:

    The query time from cube which has too large number of partitions is 1.7s, and query engine scanned 58 files.

    But the query time from cube which has proper number of partitions is 0.4s, and query engine only scanned 4 files.

Use shard by column to prune parquet files

    In Kylin 4.0, the directory structure of parquet file storage is as following:

    When querying, query engine can filter out the segment-level directories through the date partition column, and filter out the cuboid-level directories through the hit cuboid, but at this time, if there are still many parquet files in the cuboid-level directories, you can use shard by column to further prune parquet files.

How to use shard by column

    From Cube Designer → Advanced Setting → Rowkeys in Kylin UI, you can specify a shard by column when creating cube:

    After specifying a shard by column, it will repartition parquet files by this shard by column when building cube data (If you do not specific a shard by column, repartition is done with all columns).

    When querying with this shard by column as filter condition, query engine will prune parquet files according to the value of shard by column, for example:

    There are two parquet files in each of the cuboid 131084 directories of these two cubes: kylin_sales_cube_non_shardby and kylin_sales_cube_shardby;

    Querying from cube 'kylin_sales_cube_non_shardby' which don't specify shard by column is scanning 2 files.

    Querying from cube 'kylin_sales_cube_shardby' which specifies shard by column is only scanning 1 file.

Recommendations

  • Currently it only supports the following filtering operations with shard by column in SQL query to prune parquet files:
    1. Equality
    2. In
    3. InSet
    4. IsNull
  • Because it only supports to specify one shard by column for one cube currently, it's better to use a column which have high cardinality and often is used as filter condition, such as mandatory dimension column. If specified shard by column is not mandatory dimension, there are some cases where cuboid cannot use this shard by column, for example: specified shard by column is A, but the columns of one cuboid are B, C, D.

Use sort by columns to filter data quickly when reading parquet files

    When you create a cube, you can specify the order of the dimension columns, and when saving cube data, the first of the dimension columns for each cuboid will be used to do the sort operation. The purpose is to filter out unwanted data as much as possible through the min-max index of parquet file when querying with the sort by column.

    From Cube Designer → Advanced Setting → Rowkeys in Kylin UI, you can drag the columns to adjust the order:

    For example: if cuboid includes these three columns: BUYER_ID, TRANS_ID, LEAF_CATEG_ID, and then it will sort data in one partition by BUYER_ID column when saving this cuboid data.

    Notes: Currently Apache Spark 2.4.6 which Kylin 4.0 used only supports filter out unwanted data through the min-max index of RowGroup in parquet files, it means that if there are some RowGroups in one parquet file, Spark will filter out unwanted data by the min-mas index of RowGroup, but if one parquet file only includes one RowGroup, filter doesn't make effect.

Pack a number of small files into a single partition

    When there are many small files in some segments which had been built, you can set the configuration 'spark.sql.files.maxPartitionBytes' (default value is 128MB) to a proper value, which will let spark engine pack some small files into a single partition and avoid to need too many small tasks, for example:

    This query scanned 2 parquet files but it used one task to handle these two files:


    On the other hand, if there are enough resources, you can reduce the value of configuration 'spark.sql.files.maxPartitionBytes' to increase the parallel tasks, but it also needs to reduce the value of configuration 'spark.hadoop.parquet.block.size' (default value is 128MB) when building cube data, because the smallest split unit of parquet files is RowGroup and configuration 'spark.hadoop.parquet.block.size' indicates the maximum size of one RowGroup for parquet.

Enable offheap

    Spark can directly operate the off-heap memory to reduce unnecessary memory overhead, as well as frequent GC, and improve processing performance.

    Relevant configurations:

KeyDescription
spark.memory.offHeap.enabledSet to 'true', use off-heap memory for spark shuffle e.g.
spark.memory.offHeap.sizeindicates the size of off-heap memory.

Set different configurations for each query (coming soon)

    Currently, all queries share one SparkSession, which means that all of them share the same configurations, but each query has different scenarios and could be optimized by different configurations. Therefore, we plan to clone a thread-level SparkSession for each query to set different configurations, and then execute the query, such as configuration 'spark.sql.shuffle.partitions', set this configuration to different values according to the amount of data obtained by each query to achieve the optimal query performance. This feature is planned to be released in the 4.0 Beta version.


Reference


  • No labels