Apache Kylin : Analytical Data Warehouse for Big Data
Welcome to Kylin Wiki.
Before you start
How to enable Lambda Mode in Kylin 3.1
Overall steps
- Create hive table(historical table)
- Prepare script to mock sample event
- Start kafka producer
- Deploy Kylin Coordinator and Streaming Receiver
- Load streaming table
- Create model/cube into kylin
- Enable streaming cube
- Load data into historical table
- Refresh streaming table
Create lambda table
We know that segments which built from kafka may be incorrect for some reason, such as very late message. User may want to have a chance to update them.
Here we create a hive table(historical table) to store "correct" data, to let user have a chance to clean and transform data from Kafka, and then overwrite segment by data of hive table(Correct data).
Please make sure that historical table contains all the columns that you want to be included in your streaming cube and data type is match.
Please choose "DAY_START/HOUR_START" as partition column of historical table, depend on in which frequency do you want to refresh segment.
CREATE EXTERNAL TABLE IF NOT EXISTS lambda_flat_table ( -- event timestamp and debug purpose column EVENT_TIME timestamp, str_minute_second string COMMENT "For debug purpose, maybe check timezone etc", -- dimension column act_type string COMMENT "What did user interact with our mobile app in this event", user_devide_type string COMMENT "Which kind of device did user use in this event", location_city string COMMENT "Which city did user locate in this event", video_id bigint COMMENT "Which video did user watch in this event", device_brand string, page_id string, -- measure column play_times bigint, play_duration decimal(23, 10), pageview_id string COMMENT "Identier of a pageview", -- for kylin used (dimension) MINUTE_START timestamp, HOUR_START timestamp, MONTH_START date ) COMMENT 'Fact table. Store raw user action log.' PARTITIONED BY (DAY_START date) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'hdfs:///LACUS/lambda_data/lambda_flat_table';
Prepare sample event script
Says that we wan to monitor user's action against our mobile video application. Following script(Python2) will send event in JSON format to STDOUT.
Use following script to send event to Kafka
# bin/kafka-topics.sh --create --topic useraction_xxyu --zookeeper cdh-master --partitions 10 --replication-factor 1 rm -rf out.data python MockMessage.py | kafka-console-producer --topic useraction_xxyu --broker-list cdh-master:9092,cdh-worker-1:9092,cdh-worker-2:9092
nohup sh SendMsg.sh > start.log &
Deploy Kylin Cluster
- Delopy a StreamingCorrdinator(kylin.sh start)
- Delopy a StreamingReceiver(kylin.sh streaming start)
- Create a ReplicaSet(In front-end page)
Please configure "kylin.stream.event.timezone" in correct way. Here is what I use.
kylin.metadata.url=REALTIME@jdbc,url=jdbc:mysql://localhost:3306/NightlyBuild,username=root,password=R00t@kylin,maxActive=10,maxIdle=10 kylin.source.hive.database-for-flat-table=APACHE kylin.env.zookeeper-base-path=/APACHE/REALTIME_OLAP kylin.storage.hbase.table-name-prefix=REALTIME_OLAP_ kylin.storage.hbase.namespace=APACHE kylin.env.hdfs-working-dir=/APACHE/REALTIME_OLAP kylin.stream.event.timezone=GMT+8 kylin.web.timezone=GMT+8 kylin.stream.hive.database-for-lambda-cube=APACHE
Load kafka topic into Kylin
Please choose the correct Timestamp Column, correct Timestamp Parser and correct Timestamp Pattern.
Following is the content of streaming_v2/APACHE.LAMBDA_FLAT_TABLE.json .
Create model/cube into kylin
Following is the content of Model and Cube.
- Enable streaming cube
Load data into lambda table
For each derived time column(event_time, minute_start, hour_start in our case), please make sure you remove your local timezone offset. For example, for a local timestamp "2020-09-23 00:07:35 GMT+08:00", please reduce timezone offset (8 hour) and remove timezone suffix, the result is "2020-09-22 16:07:35".
2020-09-22 16:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_004,223,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01 2020-09-22 16:07:35,EMPTY,stop,4,London,100010002,apple,page_003,224,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01 2020-09-22 17:07:35,EMPTY,play,5,Nanjing,100010003,vivo,page_002,225,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01 2020-09-22 17:07:35,EMPTY,stop,6,London,100010004,apple,page_001,226,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01 2020-09-22 18:07:35,EMPTY,play,7,Nanjing,100010001,vivo,page_001,227,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01 2020-09-22 18:07:35,EMPTY,stop,8,London,100010002,apple,page_002,228,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01 2020-09-22 19:07:35,EMPTY,play,9,Nanjing,100010003,vivo,page_003,229,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01 2020-09-22 19:07:35,EMPTY,stop,3,London,100010004,apple,page_004,230,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01 2020-09-22 20:07:35,EMPTY,play,4,Nanjing,100010001,vivo,page_001,231,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01 2020-09-22 20:07:35,EMPTY,stop,5,London,100010002,apple,page_002,232,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01 2020-09-22 21:07:35,EMPTY,play,6,Shanghai,100010003,vivo,page_003,233,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 21:07:00,2020-09-22 21:00:00,2020-09-01 2020-09-22 22:07:35,EMPTY,stop,7,London,100010004,apple,page_004,234,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 22:07:00,2020-09-22 22:00:00,2020-09-01 2020-09-22 23:07:35,EMPTY,play,8,Shanghai,100010001,huawei,page_001,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 23:07:00,2020-09-22 23:00:00,2020-09-01 2020-09-23 00:07:35,EMPTY,pause,9,Tianjin,100010002,huawei,page_003,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 00:07:00,2020-09-23 00:00:00,2020-09-01 2020-09-23 02:07:35,EMPTY,pause,3,Tokyo,100010003,huawei,page_004,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 02:07:00,2020-09-23 02:00:00,2020-09-01 2020-09-23 05:07:35,EMPTY,pause,4,Beijing,100010004,huawei,page_005,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 05:07:00,2020-09-23 05:00:00,2020-09-01 2020-09-23 07:07:35,EMPTY,pause,5,Wuhan,100010005,huawei,page_006,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 07:07:00,2020-09-23 07:00:00,2020-09-01 2020-09-23 10:07:35,EMPTY,pause,6,Tianjin,100010006,huawei,page_007,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 10:07:00,2020-09-23 10:00:00,2020-09-01 2020-09-23 12:07:35,EMPTY,pause,7,Tokyo,10001007,huawei,page_008,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 12:07:00,2020-09-23 12:00:00,2020-09-01 2020-09-23 15:07:35,EMPTY,pause,8,Beijing,100010008,huawei,page_009,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 15:07:00,2020-09-23 15:00:00,2020-09-01
Upload data into HDFS
hadoop fs -mkdir -p hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23 hadoop fs -put partition.csv hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23
Add partition to Hive table
use apache; ALTER TABLE lambda_flat_table ADD PARTITION(DAY_START='2020-09-23') location 'hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23';
Refresh streaming table
Send request to refresh segment, for startTime and endTime, make sure you are use (local)timestamp of "2020-09-23 00:00:00 GMT+08:00" .
Here is the URL of refresh API : http://cdh-master:7200/kylin/api/cubes/UserActionBasicAnslysisCube/rebuild
Here is the building Job.
Here is the HQL used in first step.
Verify query result
select hour_start, count(*) as event_num, count(distinct PAGEVIEW_ID) as pv_stat from LAMBDA_FLAT_TABLE where day_start >= '2020-09-23' and day_start < '2020-09-24' group by hour_start order by hour_start; select page_id, user_devide_type, count(*) as event_num, count(distinct PAGEVIEW_ID) as pv_stat from LAMBDA_FLAT_TABLE where day_start >= '2020-09-23' and day_start <= '2020-09-24' group by page_id, user_devide_type order by page_id, user_devide_type;