Apache Kylin : Analytical Data Warehouse for Big Data
Welcome to Kylin Wiki.
Part I What is Hive Global Dictionary
Background
Count distinct(bitmap) measure is very important for many scenarios, such as PageView statistics, and Kylin support count distinct since 1.5.3 .
Apache Kylin implements precisely count distinct measure based on bitmap, and use Global Dictionary to encode string value into integer.
Currently we have to build Global Dictionary in single process/JVM, which may take a lot of time and memory for UHC. By this feature, we use MR to build and use Hive to store Global Dictionary for Kylin.
Benefit
- Build Global Dictionary in distributed way, thus building job spent less time.
- Job Server will do less job, thus be more stable.
- OneID, since the fact that Hive Global Dictionary is readable outside of Kylin, everyone can reuse this dictionary(Hive table) in the other scene across the company.
Release
Release Date | Release version | JIRA issue | Comment |
---|---|---|---|
2019-10 | v3.0.0 |
| Introduce Hive global dictionary(first version). |
N/A | N/A |
| Use Mapreduce other than HQL to improve performance. |
Configuration
Conf key | Explanation | Example |
---|---|---|
kylin.dictionary.mr-hive.database | Which database will the Hive Global Dictionary in. | default |
kylin.dictionary.mr-hive.columns | A list, contain all columns which need a Hive Global Dictionary, in a {TABLE_NAME}_{COLUMN_NAME} pattern. | KYLIN_SALES_SALES_ID,KYLIN_SALES_BUYER_ID |
kylin.dictionary.mr-hive.table.suffix | Suffix for Segment Dictionary Table and Global Dictionary Table | _global_dict |
kylin.dictionary.mr-hive.intermediate.table.suffix | Suffix for Distinct Value Table | _group_by |
kylin.dictionary.mr-hive.columns.reduce.num | A key/value structure(or a map), which key is {CUBE_NAME}_{COLUMN_NAME}, and value is number for expected reducers in Build Segment Level Dictionary (MR job Parallel Part Build). | KYLIN_SALES_SALES_ID:3,KYLIN_SALES_BUYER_ID:2 |
kylin.source.hive.databasedir | The location of Hive table in HDFS. | /user/hive/warehouse/lacus.db |
kylin.dictionary.mr-hive.ref.columns | To reuse other global dictionary(s), you can specific a list here, to refer to some existent global dictionary(s) built by another cube. | KYLIN_SALES_SALES_ID,KYLIN_SALES_BUYER_ID |
Hive Table
Table | Name Pattern | Explanation |
---|---|---|
Distinct Value Table | ${FLAT_TABLE}_${kylin.dictionary.mr-hive.intermediate.table.suffix} | This table is a temporary hive table for storing literal value which be extracted from flat table. It contain one normal column, dict_key, that is all distinct literal value for each kylin.dictionary.mr-hive.columns(duplicated literal value are only remain once). This table also contain a partition column, its name is dict_column, means one partition for one column. Please note, literal value which has been encoded will be removed. |
Segment Dictionary Table | ${FLAT_TABLE}_${kylin.dictionary.mr-hive.table.suffix} | This table is a temporary hive table for storing literal value and its encoded integer which be extracted from flat table. It contain two normal column: dict_key, that is all distinct literal value for each kylin.dictionary.mr-hive.columns(duplicated literal value are only remain once); the second column, dict_value, contains the encoded integer for corresponding literal value. This table also contain a partition column, its name is dict_column, means one partition for one column. |
Global Dictionary Table | ${CUBE_NAME}_${kylin.dictionary.mr-hive.table.suffix} | This table is the Global Dictionary. It has the same schema as Segment Dictionary Table . |
New added steps
Serial No | Step Name | Input | Output |
---|---|---|---|
1 | Create hive dictionary table | N/A | Three hive table |
2 | Extract distinct value into Distinct Value Table | Flat table | Distinct Value Table |
3 | Build Segment Level Dictionary (Parallel Part Build) | Distinct Value Table(File path is determined by kylin.source.hive.databasedir) | Intermediate dict file(Literal value encoded in partition-level, so each reducer will encode literal from zero). |
4 | Build Segment Level Dictionary (Parallel Total Build) | Intermediate dict file | Segment Level Dictionary |
5 | Merge Segment Level Dictionary into Global Dictionary Table | Segment Level Dictionary and old Global Dictionary Table | New Global Dictionary Table |
6 | Replace/encode Flat Table | Flat table | New flat table (but literal value will be replaced with encoded integer) |
7 | Cleanup temp table & data | All temporary hive tables | Nothing, they will be removed. |
Screen
Mapreduce Job Diagram
HQL Analysis
set mapreduce.job.name=Build Hive Global Dict - extract distinct value; USE NightlyBuild; set hive.exec.compress.output=false; set hive.mapred.mode=unstrict; -- create hive global dictionary table CREATE TABLE IF NOT EXISTS NightlyBuild.ValidationCube_global_dict ( dict_key STRING COMMENT '', dict_val INT COMMENT '' ) COMMENT 'Hive Global Dictionary' PARTITIONED BY (dict_column string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; -- create distinct value table DROP TABLE IF EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by; CREATE TABLE IF NOT EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by ( dict_key STRING COMMENT '' ) COMMENT '' PARTITIONED BY (dict_column string) STORED AS TEXTFILE ; -- create segment level dictionary table DROP TABLE IF EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict; CREATE TABLE IF NOT EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ( dict_key STRING COMMENT '' , dict_val STRING COMMENT '' ) COMMENT '' PARTITIONED BY (dict_column string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE ; -- insert data into distinct value table INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PV_ID') SELECT a.DICT_KEY FROM ( SELECT USERACTIONLOGSAMPLE_PV_ID as DICT_KEY FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f GROUP BY USERACTIONLOGSAMPLE_PV_ID) a LEFT JOIN (SELECT DICT_KEY FROM NightlyBuild.ValidationCube_global_dict WHERE DICT_COLUMN = 'USERACTIONLOGSAMPLE_PV_ID' ) b ON a.DICT_KEY = b.DICT_KEY WHERE b.DICT_KEY IS NULL ; INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') SELECT a.DICT_KEY FROM ( SELECT USERACTIONLOGSAMPLE_PLAY_ID as DICT_KEY FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f GROUP BY USERACTIONLOGSAMPLE_PLAY_ID) a LEFT JOIN (SELECT DICT_KEY FROM NightlyBuild.ValidationCube_global_dict WHERE DICT_COLUMN = 'USERACTIONLOGSAMPLE_PLAY_ID' ) b ON a.DICT_KEY = b.DICT_KEY WHERE b.DICT_KEY IS NULL ; -- calculate max dict id INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by PARTITION (DICT_COLUMN = 'KYLIN_MAX_DISTINCT_COUNT') SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) FROM ( SELECT dict_column, count(1) total_distinct_val FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by WHERE DICT_COLUMN != 'KYLIN_MAX_DISTINCT_COUNT' GROUP BY dict_column) tc LEFT JOIN ( SELECT dict_column, if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val FROM NightlyBuild.ValidationCube_global_dict GROUP BY dict_column) tm ON tc.dict_column = tm.dict_column;
set mapreduce.job.name=Build Hive Global Dict - merge to dict table; USE NightlyBuild; set hive.mapred.mode=unstrict; -- data is prepared in previous two MR jobs, create partition for segment level dictionary base on prepared files ALTER TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ADD IF NOT EXISTS PARTITION (dict_column='USERACTIONLOGSAMPLE_PV_ID'); ALTER TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ADD IF NOT EXISTS PARTITION (dict_column='USERACTIONLOGSAMPLE_PLAY_ID'); -- merge segment level dictionary into global dictionary INSERT OVERWRITE TABLE NightlyBuild.ValidationCube_global_dict PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PV_ID') SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID' UNION ALL SELECT dict_key, dict_val FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID' ; INSERT OVERWRITE TABLE NightlyBuild.ValidationCube_global_dict PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID' UNION ALL SELECT dict_key, dict_val FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID' ;
set mapreduce.job.name=Build Hive Global Dict - replace intermediate table; USE NightlyBuild; set hive.exec.compress.output=false; set hive.mapred.mode=unstrict; -- encode/replace flat table for specific column INSERT OVERWRITE TABLE NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f SELECT a.USERACTIONLOGSAMPLE_UID ,a.USERACTIONLOGSAMPLE_ACT_TYPE ,a.USERACTIONLOGSAMPLE_PAGE_ID ,a.USERACTIONLOGSAMPLE_DEVICE_BRAND ,a.USERACTIONLOGSAMPLE_ITEM_TYPE_ID ,a.USERACTIONLOGSAMPLE_CITY ,a.USERACTIONLOGSAMPLE_PART_DT ,b.dict_val ,a.USERACTIONLOGSAMPLE_PLAY_ID FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f a LEFT OUTER JOIN (SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID') b ON a.USERACTIONLOGSAMPLE_PV_ID = b.dict_key; INSERT OVERWRITE TABLE NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f SELECT a.USERACTIONLOGSAMPLE_UID ,a.USERACTIONLOGSAMPLE_ACT_TYPE ,a.USERACTIONLOGSAMPLE_PAGE_ID ,a.USERACTIONLOGSAMPLE_DEVICE_BRAND ,a.USERACTIONLOGSAMPLE_ITEM_TYPE_ID ,a.USERACTIONLOGSAMPLE_CITY ,a.USERACTIONLOGSAMPLE_PART_DT ,a.USERACTIONLOGSAMPLE_PV_ID ,b.dict_val FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f a LEFT OUTER JOIN (SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') b ON a.USERACTIONLOGSAMPLE_PLAY_ID = b.dict_key;
Part II How to use
How to configure
Step1. Create cube which contains COUNT_DISTINCT(bitmap) measure.
Step 2. Add properties in configuration overwrite step.
Step 3. Build new segment.
Part III Performance
Hadoop Env
Hadoop : CDH 5.7
Yarn memory : 102GB
Yarn cores :18
Comparison
Segment ID | Data Size | Duration - Hive Global Dictionary v1 | Duration - Hive Global Dictionary v2 |
---|---|---|---|
1 | 0.1 billion | ||
2 | 0.1 billion | ||
3 | 0.1 billion |
Comparison
Step Name | Duration Job-1 | Data size | Duration Job-2 | Data size | Duration Job-3 | Data size |
---|---|---|---|---|---|---|
Create Intermediate Flat Hive Table | ||||||
Build Hive Global Dict - extract distinct value | ||||||
Redistribute Flat Hive Table | ||||||
Build Hive Global Dict - parallel part build | ||||||
Build Hive Global Dict - parallel total build | ||||||
Build Hive Global Dict - merge to dict table | ||||||
Build Hive Global Dict - replace intermediate table | ||||||
Extract Fact Table Distinct Columns | ||||||
Build Dimension Dictionary | ||||||
Extract Dictionary from Global Dictionary | ||||||
Build Base Cuboid | ||||||
- | ||||||
Total | ||||||
Comment |
Part IV Reference
https://issues.apache.org/jira/browse/KYLIN-4342