THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Background

Why need

...

Global Dictionary

...

      Improve
  • Distributed encoding
  • Using Roaring64NavigableMap, support canditary higher than Integer.MAX_VALUE
     Build process
  • Group by FlatTable RDD then distinct
  • Repartion RDD, Using DictionaryBuilderHelper.calculateBucketSize()
  • MapPartiton RDD, using DictHelper.genDict()
  • Save encoded dict file to FS, using NGlobalDictHDFSStore.writeBucketDict()
    Bucket concept
  • The bucket is used to store dictionaries. The number of bucket is just the RDD partitions(task parallelism). It has two import member variables -- relativeDictMap and absoluteDictMap.
  • At one segment building job, dictionaries are encoded parallelized and stored into RelativeDictionary and after segment building job done, dictionaries will be reencoded with buckets offsets. And this global dictionry will save to FS and tags as one version(If there's no global dictionary built before, version is 0).
  • When the next segment job starts, it will get the lastest vertion of dictionary and loaded to buckets and add new distinct values to buckts.

Image Removed

?

In the field of OLAP data analysis, count distinct is a very common requirement, and it is divided into approximate deduplication and precise deduplication according to the requirements of deduplication results.

In large-scale data sets, it is still very challenging to achieve accurate deduplication and ensure quick query response. We know that the most frequently used processing method for precise deduplication is the bit map method. For integer data, we can save the these integers into Bit map directly, but in addition to integers, there are other types, such as String. If we want to achieve accurate deduplication, we first need to build a dictionary for these data Perform unified mapping, and then use the Bit map method for statistics.

We all know that Kylin uses pre-computation technology to accelerate big data analysis. In the incremental construction of the Cube, in order to avoid errors in the final deduplication results caused by the separate construction of dictionaries for different segments, all segments in a Cube will use the same dictionary, which is the global dictionary.


Changes

Kylin has supported the global dictionary function since version 1.5.3, but the construction method at this time also has obvious defects:

  • The global dictionary is constructed at a single point on the Job Server, and the construction time becomes uncontrollable as the data increases
    With the accumulation of data, the construction of the global dictionary will require more and more memory for Kylin's construction node
  • Limited by the maximum number of integers

In fact, the Hive-based distributed global dictionary construction has been added to Kylin 3.1. It has solved the above problems. For details, please refer to Kylin Distributed Global Dictionary(Chinese). However, in order to adapt to the new construction query engine, Kylin 4.0 implements another distributed method of building a global dictionary based on spark. Today we will describe in detail how Kylin 4.0's global dictionary is implemented.

Global Dictrionary based on Spark

Kylin 4.0 builds a global dictionary based on Spark for distributed encoding processing, which reduces the pressure on a single machine node, and the number of built dictionaries can break the limit of the maximum number of integers.


Design

Structure

  • Every build task will generate a new global dictionary
  • The dictionary of each new build task is saved according to the version number, and the old global dictionary will be gradually deleted
  • A dictionary includes a meta data file and multiple dictionary files, each dictionary file is called a bucket (Bucket)
  • Each bucket is divided into two mappings (Map<Object, Long>), and the two are combined into a complete mapping relationship

Image Added

draw.io Diagram
bordertrue
diagramNameGlobalDictionary-In-Kylin4
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth671
revision3

BucketDictionary

Kylin introduced the concept of buckets, which can be understood as dividing the data into several buckets (ie, multiple partitions) for parallel processing when processing data. When the dictionary is built for the first time, the value in each bucket will be encoded starting from 1, and after the encoding of all buckets is completed, the overall dictionary value will be allocated according to the offset value of each bucket. In the code, the two encodings are stored through two HashMaps, one of which stores the relative dictionary value in the bucket, and the other stores the absolute dictionary value between all buckets.

The following figure shows the transfer of the dictionary in the bucket in multiple construction tasks for the bucket numbered 1, and each build creates a new version for the bucket (ie v1, v2, v3, etc.), add the reason behind version control There will be an explanation. Curr (current) and Prev (Previous) are two HashMaps in a bucket, which store the relative (Relative) code value of the dictionary in the current bucket and the absolute (Absolute) code value of all the dictionary values that have been constructed before.

Image Added

Steps

  • Create a flat table through Spark and obtain the distinct values that need to be accurately deduplicated
  • Confirm the number of shards according to the number of literal values after deduplication, and determine whether to expand according to demand
  • Repartition data to multiple partitions, encode them separately, and store them in their own dictionary files
  • Assign a version number to the current build task
  • Save dictionary files and metadata data (number of buckets and offset value of buckets)
  • According to the conditions, the old version needs to be deleted

First Build

  • Calculate the size of the bucket
  • Take the number of dictionaries that need to be built to handle the maximum value of the single bucket threshold and the default value of the number of buckets.
  • Create buckets and allocate data for encoding
  • Generate the offsets of the meta file record bucket

The following are related configuration items and their default values.

Code Block
languagebash
kylin.dictionary.globalV2-min-hash-partitions=10
kylin.dictionary.globalV2-threshold-bucket-size=500000

Image Added

Non-First Build

  • Determine whether the bucket needs to be expanded according to the number of dictionaries
  • The encoded dictionary value redistributes the expanded bucket
  • Read the latest version of the dictionary data before and distribute it to each bucket
  • Assign the new value to the bucket
  • The value of the dictionary built last time will not change

Image Added

Versions Management

The global dictionary is isolated by assigning a timestamp-based version number to a single build. The reason for adding version control is that the build task may be executed concurrently, and the current coding in the process of building the global dictionary does not support concurrency. Through version control, the global dictionary built before can be completely read for every code, which ensures that the latest version of the dictionary has the most complete global dictionary code, and the global dictionary of a Cube will be read every time it is read Select the latest version of the dictionary. The dictionary is finally stored by version on the file storage system (HDFS here) as shown in the figure below.

Image Added

FAQ

  1. Why do I need to use two Maps in a BucketDIctionary?
    1. At the beginning of the construction process, it is necessary to make a relative code for the dictionary assigned to each bucket starting from 1. The relative code value of this part of the dictionary will be stored in a HashMap. After the relative dictionary value coding is completed, each bucket will be obtained. The offset value, that is, the number of dictionaries in the bucket, and then calculate the absolute (Absolute) code of the dictionary value in each bucket (bucket is ordered) relative to the offset value of all buckets, and the absolute code of the dictionary is also Will use another HashMap for storage.
  2. Will there be data skew issues?
    1. Now that the test is done, the probability that the hotspot cannot be constructed is very small. Generally, it will not pass if it is tilted by one billion level. A lot of columns may indeed cause this problem, but the number of coded buckets can be infinitely enlarged. Unless a single key hotspot is required, adjusting the parameters is also The build is easy to complete.
  3. Why can the number of global dictionaries exceed the limit of the maximum integer base (2.1 billion)?
    1. Because of the introduction of the new BitMap data structure Roaring64BitMap, after the global dictionary encoding is completed, the encoding will be compressed into binary and stored in the Roaring64BitMap object. BitMap is actually stored through Long instead of Integer.

...