Apache Kylin : Analytical Data Warehouse for Big Data
Welcome to Kylin Wiki.
Background
What is Realtime OLAP in Kylin
Kylin v3.0.0 will release the real-time OLAP feature, by the power of newly added streaming reciever cluster, Kylin can query streaming data with sub-second latency. You can check this tech blog for the overall design and core concept.
If you want to find a step by step tutorial, please check this this tech blog.
In this article, we will introduce how to update segment and set timezone for derived time column in realtime OLAP cube.
Sample Event
This sample event comes from my python script with some additional fields such as event_time
. We have the field such as event_time
, which stands for the timestamp of event.
And we assume that event come from countries of different timezone, “2019-12-09 08:44:50.000-0500” indicated that event applies America/New_York
timezone. You may have some events which come from Asia/Shanghai
as well.
Says we have Kafka message which looks like this:
{ "content_list":[ "22", "22", "22" ], "act_type":"click", "event_ts_2":1600877255000, "event_ts":1600877255000, "user_detail":{ "devide_type":7, "location":{ "city":"shenzhen" }, "network_type":3 }, "video_id":22, "event_date_2":"2020-09-23 16:07:35.000+08:00", "str_minute":"7", "video_type":"3c8416", "play_times":22, "pageview_id":"3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097", "active_minutes":50.0208, "device_brand":"vivo", "str_minute_second":"16_7_35", "play_duration":37.6584, "event_date":"2020-09-23 16:07:35.000+08:00", "page_id":"page_22", "str_second":"35", "uid":2 }
Question
When perform realtime OLAP analysis with Kylin, you may have some concerns included:
- Will events in different timezones cause incorrect query results?
- How could I make it correct when kafka messages contain the value which is not what you want, says some dimension value is misspelled?
- How could I retrieve long-late messages which has been dropped?
- My query only hit a small range of time, how should I write filter condition to make sure unused segments are purged/skipped from scan?
Quick Answer
For the first question, you can always get the correct result in the right timezone of location by set kylin.stream.event.timezone=GMT+N
for all Kylin processes. By default, UTC is used for derived time column.
For the second and third question, in fact you cannot update/append segment to a normal streaming cube, but you can update/append a streaming cube which in lambda mode, all you need to prepare is creating a Hive table which is mapped to your kafka event.
For the fourth question, you can achieved this by adding derived time column in your filter condition like MINUTE_START
/DAY_START
etc.
Lambda Architecture
How to enable Lambda Mode in Kylin 3.1
Overall steps
- Create hive table(lambda table)
- Prepare sample event script
- Start kafka producer
- Deploy Kylin
- Load kafka topic into Kylin
- Create model/cube into kylin
- Enable streaming cube
- Load data into lambda table
- Refresh streaming table
Create lambda table
We know that segments which built from kafka may be incorrect for some reason, such as very late message. User may want a chance to update them.
Here we create a hive table to store "correct" data, so user will have a chance to clean and transform data from Kafka, and refresh segment from the hive table.
Please make sure that lambda_flat_table contains all the columns that you want to be included in your streaming cube.
CREATE EXTERNAL TABLE IF NOT EXISTS lambda_flat_table ( -- event timestamp and debug purpose column EVENT_TIME timestamp, str_minute_second string COMMENT "For debug purpose, maybe check timezone etc", -- dimension column act_type string COMMENT "What did user interact with our mobile app in this event", user_devide_type string COMMENT "Which kind of device did user use in this event", location_city string COMMENT "Which city did user locate in this event", video_id bigint COMMENT "Which video did user watch in this event", device_brand string, page_id string, -- measure column play_times bigint, play_duration decimal(23, 10), pageview_id string COMMENT "Identier of a pageview", -- for kylin used (dimension) MINUTE_START timestamp, HOUR_START timestamp, MONTH_START date ) COMMENT 'Fact table. Store raw user action log.' PARTITIONED BY (DAY_START date) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'hdfs:///LACUS/lambda_data/lambda_flat_table';
Prepare sample event script
Says that we have a mobile application which
Start kafka producer
# bin/kafka-topics.sh --create --topic useraction --zookeeper cdh-master --partitions 10 --replication-factor 1 rm -rf out.data python fake.py | kafka-console-producer --t opic useraction_xxyu --broker-list cdh-master:9092,cdh-worker-1:9092,cdh-worker-2:9092
nohup sh SendMsg.sh > start.log &
Deploy Kylin
kylin.stream.cube.duration=3600 kylin.stream.build.additional.cuboids=true kylin.stream.metrics.option=console kylin.stream.hive.database-for-lambda-cube=lambda_311 kylin.stream.event.timezone=GMT+8 kylin.stream.print-realtime-dict-enabled=true
Load kafka topic into Kylin
Create model/cube into kylin
Enable streaming cube
Load data into lambda table
2020-09-22 16:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,223,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01 2020-09-22 16:07:35,EMPTY,stop,4,London,100010002,apple,page_002,224,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01 2020-09-22 17:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,225,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01 2020-09-22 17:07:35,EMPTY,stop,4,London,100010002,apple,page_002,226,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01 2020-09-22 18:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,227,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01 2020-09-22 18:07:35,EMPTY,stop,4,London,100010002,apple,page_002,228,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01 2020-09-22 19:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,229,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01 2020-09-22 19:07:35,EMPTY,stop,4,London,100010002,apple,page_002,230,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01 2020-09-22 20:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,231,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01 2020-09-22 20:07:35,EMPTY,stop,4,London,100010002,apple,page_002,232,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01 2020-09-22 21:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,233,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 21:07:00,2020-09-22 21:00:00,2020-09-01 2020-09-22 22:07:35,EMPTY,stop,4,London,100010002,apple,page_002,234,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 22:07:00,2020-09-22 22:00:00,2020-09-01 2020-09-22 23:07:35,EMPTY,play,3,Shanghai,100010001,huawei,page_001,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 23:07:00,2020-09-22 23:00:00,2020-09-01 2020-09-23 00:07:35,EMPTY,pause,5,Tianjin,100010003,huawei,page_003,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 00:07:00,2020-09-23 00:00:00,2020-09-01 2020-09-23 02:07:35,EMPTY,pause,5,Tokyo,100010004,huawei,page_004,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 02:07:00,2020-09-23 02:00:00,2020-09-01 2020-09-23 05:07:35,EMPTY,pause,5,Beijing,100010005,huawei,page_005,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 05:07:00,2020-09-23 05:00:00,2020-09-01 2020-09-23 07:07:35,EMPTY,pause,5,Wuhan,100010006,huawei,page_006,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 07:07:00,2020-09-23 07:00:00,2020-09-01 2020-09-23 10:07:35,EMPTY,pause,5,Tianjin,100010007,huawei,page_007,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 10:07:00,2020-09-23 10:00:00,2020-09-01 2020-09-23 12:07:35,EMPTY,pause,5,Tokyo,100010008,huawei,page_008,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 12:07:00,2020-09-23 12:00:00,2020-09-01 2020-09-23 15:07:35,EMPTY,pause,5,Beijing,100010009,huawei,page_009,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 15:07:00,2020-09-23 15:00:00,2020-09-01
hadoop fs -put partition.csv hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23
use lacus_0923; ALTER TABLE lambda_flat_table ADD PARTITION(DAY_START='2020-09-23') location 'hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23'; select * from lambda_flat_table limit 5;