THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.

Compare to kylin architechture, the main changes include the following:

  • Query engine
    Can submit query task to yarn cluster with spark
  • Cube build engine

           Spark as the only build engine

  • Metadata
    Metadata still can be saved into HBase, JDBC. There's a little difference with kylin metadata, see more from MetadataConverter.scala
  • Storage
    • Cuboids are saved into HDFS as parquet format(or other file system, no longer need HBase)


Why Kylin on Parquet

     Storage

Currently, Kylin uses Apache HBase as the storage for OLAP cubes. HBase is very fast, while it also has some drawbacks:

  • HBase is not real columnar storage;
  • HBase has no secondary index; Rowkey is the only index;
  • HBase has no encoding, Kylin has to do the encoding by itself;
  • HBase does not fit for cloud deployment and auto-scaling;
  • HBase has different API versions and has compatible issues (e.g, 0.98, 1.0, 1.1, 2.0);
  • HBase has different vendor releases and has compatible issues (e.g, Cloudera's is not compatible with others);

This proposal is to use Apache Parquet + Spark to replace HBase:

  • Parquet is an open-source columnar file format;
  • Parquet is more cloud-friendly, can work with most FS including HDFS, S3, Azure Blob store, Ali OSS, etc;
  • Parquet can integrate very well with Hadoop, Hive, Spark, Impala, and others;
  • Support custom index;
  • It is mature and stable;

The new build engine is faster and cost less storage space in file system. And the query engine also has a very good performance. See more with the performance from the following.

Benchmark Report for Parquet Storage

Query

      Kylin on HBase

  • Query node calculate pressure, single bottleneck
  • Hard to debug the code generated by Calcite

Kylin on Parquet

  • Fully Distributed
  • Easy to debug and add breakpoint in each DataFrame
  • Parquet cloud-friendly


Spark building engine

     Interfaces

     Cube building


  • SparkCubingJob
    Extends CubingJob to create batch job steps for spark cubing, including the two steps
  • Resource detect and Cubing.

           It must extends class CubingJob, so that JobMonitor can collect job information and showing on front end.

  • NSparkExecutable

           To submit spark job to local or cluster.

  • SparkApplication

           The execatly executed instance on Spark

  • ResourceDetectStep
    • Dump kylin metadata to working fs
    • Specify the class name of the spark task execution
  • SparkCubingStep
    • Dump kylin metadata to working fs
    • Specify the class name of the spark task execution
    • Update metadata after the building job done
  • ResourceDetectBeforeCubingJob
    • Collect and dump source tables info
    • Adaptively adjust spark parameters
    • Create flat table and build Global dictionary(if needed)
  • CubeBuildJob
    • Build cuboids by layer
    • Save cuboids to FS as parquet format

       Cube merge

       

  • SparkMergingJob

            Extends CubingJob to create batch job steps for spark cubing, including the three step: Resource detect, Merging and Cleanup temp files.

Cubing steps

      Resources

      Collect and dump the following three source info
  • If contains COUNT_DISTINCT measure(Boolean)
  • Resource paths(Array) we can using ResourceDetectUtils to Get source table infor(like source size, etc).
  • Table RDD leaf task numbers(Map). It's used for the next step -- Adaptively adjust spark parameters
       Adaptively adjust spark parameters
  • Turned on by default
  • Cluster mode only
  • Affect spark configuration property
kylin.engine.spark-conf.spark.executor.instances
kylin.engine.spark-conf.spark.executor.cores
kylin.engine.spark-conf.spark.executor.memory
kylin.engine.spark-conf.spark.executor.memoryOverhead
kylin.engine.spark-conf.spark.sql.shuffle.partitions
kylin.engine.spark-conf.spark.driver.memory
kylin.engine.spark-conf.spark.driver.memoryOverhead
kylin.engine.spark-conf.spark.driver.cores


  • Driver memory base is 1024M, it will adujst by the number of cuboids. The adjust strategy is define in KylinConfigBase.java
public int[] getSparkEngineDriverMemoryStrategy() {
    String[] dft = { "2", "20", "100" };
    return getOptionalIntArray("kylin.engine.driver-memory-strategy", dft);
}


  

Flat table and Gloable Dictionary
      Improve
  • Distributed encoding
  • Using Roaring64NavigableMap, support canditary higher than Integer.MAX_VALUE
     Build process
  • Group by FlatTable RDD then distinct
  • Repartion RDD, Using DictionaryBuilderHelper.calculateBucketSize()
  • MapPartiton RDD, using DictHelper.genDict()
  • Save encoded dict file to FS, using NGlobalDictHDFSStore.writeBucketDict()
    Bucket concept
  • The bucket is used to store dictionaries. The number of bucket is just the RDD partitions(task parallelism). It has two import member variables -- relativeDictMap and absoluteDictMap.
  • At one segment building job, dictionaries are encoded parallelized and stored into RelativeDictionary and after segment building job done, dictionaries will be reencoded with buckets offsets. And this global dictionry will save to FS and tags as one version(If there's no global dictionary built before, version is 0).
  • When the next segment job starts, it will get the lastest vertion of dictionary and loaded to buckets and add new distinct values to buckts.

Cube build

  • Reduced build steps
    • From ten-twenty steps to only two steps
  • Build Engine
    • Simple and clear architecture
    • Spark as the only build engine
    • All builds are done via spark
    • Adaptively adjust spark parameters
    • Dictionary of dimensions no longer needed
    • Supported measures
      • Sum
      • Count
      • Min
      • Max
      • TopN
      • CountDictinct(Bitmap, HyperLogLog)

     Storage

The flowing is the tree of parquet storage dictory in FS. As we can see, cuboids are saved into path spliced by Cube Name, Segment Name and Cuboid Id, which is processed by PathManager.java .


Parquet file schema

If there is a dimension combination of columns[id, name, price] and measures[COUNT, SUM],  then a parquet file will be generated:

Columns[id, name, age] correspond to Dimension[2, 1, 0], measures[COUNT, SUM] correspond to [3, 4]


Query optimize

      FilePruner

      Effection

  • Prune segment with partition column(Date type)
  • Prune cuboid parquet files with shard by columns

      How to use

  • Prune with paritition column will auto analyse date range to prune segments
  • Prune shard columns
    1. Identify the columns that need shard by. It's usually the column that used after where. For example: "select count from kylin_sales left join kylin_order where seller_id = '100041'", the "shard by" column is seller_id.

    2. Edit cube. The shard by column should set as normal column not derived column.

    3. Set "Shard by" to true in "Cube edit" -> "Advanced Setting" -> "Rowkey"

    4. Set "kylin.storage.columnar.shard-rowcount" in kylin.properties, the default value is 2500000. The property is used to cut the cuboid file into multiple files and then filter out unwanted files when query.

  Limit:

  As for now, the shard by is set by cube leve, so there should only be one shard by column. In the future, we may support multi shard by columns with cuboid level. And community users can also give more suggestions.


  • No labels