THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.



Part I   What is Hive Global Dictionary

Background

Count distinct(bitmap) measure is very important for many scenario, such as PageView statistics, and Kylin support count distinct since 1.5.3 .
Apache Kylin implements precisely count distinct measure based on bitmap, and use Global Dictionary to encode string value into integer.

Currently we have to build Global Dictionary in single process/JVM, which may take a lot of time and memory for UHC. By this feature, we use MR to build and use Hive to store Global Dictionary for Kylin.

Benefit

  1. Build Global Dictionary in distributed way, thus building job spent less time.
  2. Job Server will do less job, thus be more stable. 
  3. OneID, everyone can reuse this dictionary in the other scene across the company.

Configuration

Conf keyExplanationExample
kylin.dictionary.mr-hive.databaseWhich database will the Hive Global Dictionary in.default
kylin.dictionary.mr-hive.columnsA list, contain all columns which need a Hive Global Dictionary, in a {CUBE_NAME}_{COLUMN_NAME} format.KYLIN_SALES_SALES_ID,KYLIN_SALES_BUYER_ID
kylin.dictionary.mr-hive.table.suffixSuffix for Segment Dictionary Table and Global Dictionary Table_dict_table
kylin.dictionary.mr-hive.intermediate.table.suffixSuffix for Distinct Value Table_distinct_value
kylin.dictionary.mr-hive.columns.reduce.numA key/value structure, which the key is {CUBE_NAME}_{COLUMN_NAME}, and value is number for expected reducers in Build Segment Level Dictionary (MR job-1).KYLIN_SALES_SALES_ID:3,KYLIN_SALES_BUYER_ID:2
kylin.source.hive.databasedirThe location of Hive table in HDFS./user/hive/warehouse/lacus.db
kylin.dictionary.mr-hive.ref.columnsTo reuse another global dictionary(s), you can specific a list here, to refer to some existent global dictionary built by another cubeKYLIN_SALES_SALES_ID,KYLIN_SALES_BUYER_ID

Hive Table created for 

TableName PatternExplanation
Distinct Value Table${FLAT_TABLE}_${kylin.dictionary.mr-hive.intermediate.table.suffix}

This table is a temporary hive table for storing literal value which need to be encoded.

It contain one normal column, dict_key, that is all distinct literal value for each kylin.dictionary.mr-hive.columns(duplicated literal value are only remain once).

This table also contain a partition column, its name is dict_column, means one partition for one column.

Segment Dictionary Table${FLAT_TABLE}_${kylin.dictionary.mr-hive.table.suffix}
Global Dictionary Table${CUBE_NAME}_${kylin.dictionary.mr-hive.table.suffix}

New added steps

Serial NoStep NameInputOutput
1Create hive dictionary tableN/ACreate three hive table
2Extract distinct value into Distinct Value TableFlat tableDistinct Value Table
3Build Segment Level Dictionary (Parallel Part)Distinct Value TableIntermediate dict file(Literal value encoded in partition-level, so each reducer will encode literal from zero).
4Build Segment Level Dictionary (Parallel Total)Intermediate dict fileSegment Level Dictionary
5Merge Segment Level Dictionary into Global Dictionary TableSegment Level Dictionary and old Global Dictionary Table  New Global Dictionary Table
6Replace/encode Flat TableFlat tableNew flat table (but literal value will be replaced with encoded integer)
7Cleanup temp table & dataAll temporary hive tables

Nothing, they will be removed.




Part II  How to use

How to configure

Step1. Create cube which contains COUNT_DISTINCT(bitmap) measure.

Step 2. Add properties in configuration overwrite step. 

Step 3. Build new segment. 

HQL Analysis

Create hive table & Insert into distinct value table
set mapreduce.job.name=Build Hive Global Dict - extract distinct value;
USE NightlyBuild;
set hive.exec.compress.output=false;
set hive.mapred.mode=unstrict;

-- create hive global dictionary table
CREATE TABLE IF NOT EXISTS NightlyBuild.ValidationCube_global_dict
 ( dict_key STRING COMMENT '', 
   dict_val INT COMMENT '' 
) 
COMMENT 'Hive Global Dictionary' 
PARTITIONED BY (dict_column string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE; 

-- create distinct value table
DROP TABLE IF EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by; 
CREATE TABLE IF NOT EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by 
( 
   dict_key STRING COMMENT '' 
) 
COMMENT '' 
PARTITIONED BY (dict_column string) 
STORED AS TEXTFILE 
;

-- create segment level dictionary
DROP TABLE IF EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict; 
CREATE TABLE IF NOT EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict 
( 
   dict_key STRING COMMENT '' , 
  dict_val STRING COMMENT '' 
) 
COMMENT '' 
PARTITIONED BY (dict_column string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE 
;

-- insert data into distinct value table
INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by 
PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PV_ID') 
SELECT a.DICT_KEY FROM (
  SELECT 
USERACTIONLOGSAMPLE_PV_ID as DICT_KEY 
  FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f
  GROUP BY USERACTIONLOGSAMPLE_PV_ID) a 
    LEFT JOIN 
  (SELECT DICT_KEY FROM NightlyBuild.ValidationCube_global_dict    WHERE DICT_COLUMN = 'USERACTIONLOGSAMPLE_PV_ID' ) b 
ON a.DICT_KEY = b.DICT_KEY 
WHERE b.DICT_KEY IS NULL 
;
INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by 
PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') 
SELECT a.DICT_KEY FROM (
  SELECT 
USERACTIONLOGSAMPLE_PLAY_ID as DICT_KEY 
  FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f
  GROUP BY USERACTIONLOGSAMPLE_PLAY_ID) a 
    LEFT JOIN 
  (SELECT DICT_KEY FROM NightlyBuild.ValidationCube_global_dict    WHERE DICT_COLUMN = 'USERACTIONLOGSAMPLE_PLAY_ID' ) b 
ON a.DICT_KEY = b.DICT_KEY 
WHERE b.DICT_KEY IS NULL 
;

-- calculate max dict id 
INSERT OVERWRITE TABLE  kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by PARTITION (DICT_COLUMN = 'KYLIN_MAX_DISTINCT_COUNT') 
SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) 
FROM (
    SELECT dict_column, count(1) total_distinct_val
    FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by
    WHERE DICT_COLUMN != 'KYLIN_MAX_DISTINCT_COUNT'
    GROUP BY dict_column) tc 
LEFT JOIN (
    SELECT dict_column, if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val 
    FROM NightlyBuild.ValidationCube_global_dict
    GROUP BY dict_column) tm 
ON tc.dict_column = tm.dict_column;


Merge into global dictionary
set mapreduce.job.name=Build Hive Global Dict - merge to dict table;
USE NightlyBuild;
set hive.mapred.mode=unstrict;

-- data is prepared in previous two MR jobs, create partition base on prepared files
ALTER TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ADD IF NOT EXISTS PARTITION (dict_column='USERACTIONLOGSAMPLE_PV_ID'); 
ALTER TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ADD IF NOT EXISTS PARTITION (dict_column='USERACTIONLOGSAMPLE_PLAY_ID'); 


-- merge segment level dictionary into global dictionary
INSERT OVERWRITE TABLE NightlyBuild.ValidationCube_global_dict 
PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PV_ID') 
SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict 
WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID' 
UNION ALL 
SELECT dict_key, dict_val FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict 
 WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID' ;

INSERT OVERWRITE TABLE NightlyBuild.ValidationCube_global_dict 
PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') 
SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict 
WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID' 
UNION ALL 
SELECT dict_key, dict_val FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict 
 WHERE dict_column = 'USERACTIONLOGSAM
PLE_PLAY_ID' ;


Replace flat table
set mapreduce.job.name=Build Hive Global Dict - replace intermediate table;
USE NightlyBuild;
set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;

INSERT OVERWRITE TABLE NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f 
SELECT 
a.USERACTIONLOGSAMPLE_UID 
,a.USERACTIONLOGSAMPLE_ACT_TYPE 
,a.USERACTIONLOGSAMPLE_PAGE_ID 
,a.USERACTIONLOGSAMPLE_DEVICE_BRAND 
,a.USERACTIONLOGSAMPLE_ITEM_TYPE_ID 
,a.USERACTIONLOGSAMPLE_CITY 
,a.USERACTIONLOGSAMPLE_PART_DT 
,b.dict_val 
,a.USERACTIONLOGSAMPLE_PLAY_ID 
FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f a 
LEFT OUTER JOIN 
 (SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID') b 
ON a.USERACTIONLOGSAMPLE_PV_ID = b.dict_key;


INSERT OVERWRITE TABLE NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f 
SELECT 
a.USERACTIONLOGSAMPLE_UID 
,a.USERACTIONLOGSAMPLE_ACT_TYPE 
,a.USERACTIONLOGSAMPLE_PAGE_ID 
,a.USERACTIONLOGSAMPLE_DEVICE_BRAND 
,a.USERACTIONLOGSAMPLE_ITEM_TYPE_ID 
,a.USERACTIONLOGSAMPLE_CITY 
,a.USERACTIONLOGSAMPLE_PART_DT 
,a.USERACTIONLOGSAMPLE_PV_ID 
,b.dict_val 
FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f a 
LEFT OUTER JOIN 
 (SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') b 
ON a.USERACTIONLOGSAMPLE_PLAY_ID = b.dict_key;




Part III  Performance Comparison


Hadoop Env

Hadoop : CDH 5.7

Yarn memory : 102GB

Yarn cores :18

Step Cost 


Data Size

Step Name | Duration | Data Size




Part IV Reference 

https://issues.apache.org/jira/browse/KYLIN-4342


  • No labels