THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.


What is lambda mode in Kylin?

Steps of Streaming Build Job

How to enable Lambda Mode in Kylin 3.1

Overall steps

  1. Create hive table(lambda table)
  2. Prepare sample event script
  3. Start kafka producer
  4. Deploy Kylin 
  5. Load kafka topic into Kylin
  6. Create model/cube into kylin
  7. Enable streaming cube
  8. Load data into lambda table
  9. Refresh streaming table

Create lambda table

We know that segments which built from kafka may be incorrect for some reason, such as very late message. User may want a chance to update them.

Here we create a hive table to store "correct" data, so user will have a chance to clean and transform data from Kafka, and refresh segment from the hive table.

Please make sure that lambda_flat_table contains all the columns that you want to be included in your streaming cube.

LambdaTable DDL
CREATE EXTERNAL TABLE IF NOT EXISTS lambda_flat_table
(
-- event timestamp and debug purpose column
EVENT_TIME timestamp,
str_minute_second string COMMENT "For debug purpose, maybe check timezone etc",

-- dimension column
act_type string COMMENT "What did user interact with our mobile app in this event",
user_devide_type string COMMENT "Which kind of device did user use in this event",
location_city string COMMENT "Which city did user locate in this event",
video_id bigint COMMENT "Which video did user watch in this event",
device_brand string,
page_id string,

-- measure column
play_times bigint,
play_duration decimal(23, 10),
pageview_id string COMMENT "Identier of a pageview",


-- for kylin used (dimension)
MINUTE_START timestamp,
HOUR_START timestamp,
MONTH_START date
)
COMMENT 'Fact table. Store raw user action log.'
PARTITIONED BY (DAY_START date)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs:///LACUS/lambda_data/lambda_flat_table';

Prepare sample event script

Says that we have a mobile application which 

MockMessage
# -*- coding: utf-8 -*-
import sys
import json
import datetime
from datetime import timedelta
import random
import uuid
import time
import argparse

########################################################
### User Config

## Begin of UTC
BEGIN_DT = datetime.datetime(1970, 1, 1)

## Sleep Per ten events
SLEEP_PER_TEN = 0.03

## Lost some dimsension value?
LOST_SOME_DIMSENSION = False

ENABLE_HOUR_POWER = True

MAX_UID = 20000
MAX_VID = 11000
pv_id_set_hour = set()
pv_id_stat = dict()
pv_id_set = set()
########################################################
# For verify the correctness of COUNT DISTINCT
# For verify the correctness of COUNT DISTINCT
########################################################


########################################################
### Program
random.seed(datetime.datetime.now().second)

act_type_power = [
    ['start', 32],
    ['exit', 43],
    ['play', 224],
    ['stop', 156],
    ['pause', 23],
    ['click', 367],
    ['comment', 14],
    ['share', 22],
    ['like', 55],
    ['dislike', 70]
]

tmp = 0
for act_type_item in act_type_power:
    p = act_type_item[1]
    act_type_item[1] = tmp
    tmp += p
TOTAL_POWER = tmp

hour_power = {
    0: 1.75,
    1: 1.25,
    2: 0.71,
    3: 0.43,
    4: 0.31,
    5: 0.55,
    6: 0.61,
    7: 0.72,
    8: 0.96,
    9: 1.01,
    10: 1.12,
    11: 1.11,
    12: 1.31,
    13: 1.21,
    14: 1.51,
    15: 1.23,
    16: 1.02,
    17: 1.31,
    18: 1.41,
    19: 1.22,
    20: 1.42,
    21: 1.55,
    22: 1.34,
    23: 1.67
}

gmt_8 = 8 * 3600


def fake_curr_date_long(now):
    return int((now - BEGIN_DT).total_seconds()) * 1000


def fake_curr_date_str(now):
    return now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')


def fake_act_type():
    ran = random.randint(0, TOTAL_POWER * 3) % TOTAL_POWER
    preact = act_type_power[0][0]
    for act in act_type_power:
        if ran < act[1]:
            return preact
        else:
            preact = act[0]
    return preact


def fake_active_minutes(row_no, now):
    now_hour = now.hour
    if row_no % 2 == 0:
        ran_num = random.randint(0, 3) + random.randint(0, 999) / 100.0
    else:
        ran_num = random.randint(4, 50) + random.randint(0, 999) / 100.0
    if ENABLE_HOUR_POWER:
        return round(ran_num * hour_power[now_hour], 4)
    else:
        return round(ran_num, 4)


def fake_uid(row_no, now):
    if row_no % 3 == 0:
        return random.randint(1, 10)
    else:
        return random.randint(10, MAX_UID) + now.hour * 100


def fake_video_id(now):
    tmp1 = random.randint(0, MAX_VID)
    if now.minute % 2 == 1:
        return tmp % 100 + now.hour
    else:
        return tmp1 + now.hour * 100


def fake_play_times(row_no, now):
    now_hour = now.hour
    if row_no % 2 == 0:
        pt = random.randint(0, 10)
    else:
        pt = random.randint(10, 55)

    if ENABLE_HOUR_POWER:
        return int(pt * hour_power[now_hour])
    else:
        return pt


def fake_pageview_id(str2, now):
    return str2 + str(now.minute)


city_list = ["shanghai", "beijing", "hangzhou", "shenzhen",
             "taibei", "hongkong", "guangzhou",
             "nanjing", "chongqin", "berlin", "tokyo"]
video_type = ["Sports", "Computer", "Science", "Game",
              "News", "Taste", "Tour", "Music",
              "Finance", "Arts", u"军事", u"中文", u"音乐", u"古装剧"]

brand_list = ["huawei", "iPhone", "xiaomi", "vivo", "360", "meizu"]

def fake_video_type(str1, now):
    tt = now.second % 3
    if tt == 0:
        return video_type[random.randint(0, 3)]
    elif tt == 1:
        return video_type[random.randint(0, len(video_type) - 1)]
    else:
        return str1 + str(now.hour)


def fake_string_in_list(row_no, str_list, top_n=3):
    if row_no % 2 == 0:
        _id = row_no % top_n
    else:
        _id = row_no % len(str_list)
    return str_list[_id]


def fake_event(row_no, long_str, short_str, now):
    """
    Fake single event
    """
    row = dict()

    ########################################################
    # Dimsension
    row['event_date'] = fake_curr_date_str(now)
    row['event_date_2'] = fake_curr_date_str(now)
    row['event_ts'] = fake_curr_date_long(now)
    row['event_ts_2'] = fake_curr_date_long(now)
    if not LOST_SOME_DIMSENSION or row_no % 10 >= 8:
        row['act_type'] = fake_act_type()
    row['video_type'] = fake_video_type(short_str[0:4], now)

    # Dimsension or Measure (UserView)
    row['uid'] = fake_uid(row_no, now)
    row['page_id'] = "page_" + str(fake_video_id(now) % 50)
    row['video_id'] = fake_video_id(now)
    row['device_brand'] = fake_string_in_list(row_no, brand_list)

    # multi level key-value
    row['user_detail'] = {"location": {"city": fake_string_in_list(row_no, city_list)},
                          "devide_type": now.minute,
                          "network_type": now.microsecond % 4 + 2}
    row['content_list'] = [str(fake_video_id(now)), str(fake_video_id(now)), str(fake_video_id(now))]

    ########################################################
    # Measure
    row['play_duration'] = fake_active_minutes(row_no, now)
    row['active_minutes'] = fake_active_minutes(row_no, now)
    row['play_times'] = fake_play_times(row_no, now)
    pvid = fake_pageview_id(long_str, now)
    pv_id_set.add(pvid)  # add stat of pv_id
    pv_id_set_hour.add(pvid)
    row['pageview_id'] = pvid

    # Fix string for verify count distinct(bitmap)
    row['str_minute'] = '%s' % now.minute
    row['str_second'] = '%s' % now.second
    row['str_minute_second'] = '%s_%s_%s' % (now.hour, now.minute, now.second)
    return json.dumps(row)


def fake_all_rows():
    total_row = 0
    init = False
    start = datetime.datetime.now()
    data_file = open('out.data', 'w+')
    while True:
        row_no = 0
        while row_no <= 10:
            time.sleep(SLEEP_PER_TEN)
            now = datetime.datetime.now()

            if now.hour == 1:
                print >> sys.stderr, "Sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
                time.sleep(4 * 3600)
                print >> sys.stderr, "End sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
            elif now.hour == 7:
                print >> sys.stderr, "Sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
                time.sleep(4 * 3600)
                print >> sys.stderr, "End sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')

            unique_str = str(uuid.uuid3(uuid.NAMESPACE_URL, str(row_no + now.microsecond)))

            ####################################################
            # For verify the correctness of COUNT DISTINCT
            if not init:
                current_minute = -1
                current_hour = -1
                init = True
            if current_minute != now.minute:
                pv_id_stat['%s_%s' % (now.hour, current_minute)] = len(pv_id_set)
                print >> sys.stderr, "\nMETRICS,%s,%s,%s,%s" % (
                    current_hour + 8, current_minute, len(pv_id_set), str(total_row))
                current_minute = now.minute
                pv_id_set.clear()
            if current_hour != now.hour:
                print >> sys.stderr, "\nHOUR_METRICS,%s,%s" % (current_hour + 8, len(pv_id_set_hour))
                current_hour = now.hour
                pv_id_set_hour.clear()
            # For verify the correctness of COUNT DISTINCT
            ####################################################

            single_row = fake_event(row_no + 0, unique_str, unique_str + str(row_no), now)
            print single_row
            print >> data_file, single_row

            row_no += 1
            total_row += 1


def init_argument():
    parser = argparse.ArgumentParser()
    parser.add_argument('--max-uid', required=True, type=int)
    parser.add_argument('--max-vid', required=True, type=int)
    parser.add_argument('--enable-hour-power', required=True, type=bool)
    args = parser.parse_args()
    return args


if __name__ == "__main__":
    current_minute = -1
    fake_all_rows()

Start kafka producer

SendMsg
# bin/kafka-topics.sh --create --topic useraction --zookeeper cdh-master --partitions 10 --replication-factor 1
rm -rf out.data
python fake.py | kafka-console-producer --t
opic useraction_xxyu --broker-list cdh-master:9092,cdh-worker-1:9092,cdh-worker-2:9092


script
nohup sh SendMsg.sh > start.log &

Deploy Kylin 

kylin.stream.cube.duration=3600
kylin.stream.build.additional.cuboids=true
kylin.stream.metrics.option=console
kylin.stream.hive.database-for-lambda-cube=lambda_311
kylin.stream.event.timezone=GMT+8
kylin.stream.print-realtime-dict-enabled=true

Load kafka topic into Kylin


Create model/cube into kylin


Enable streaming cube


Load data into lambda table

data
2020-09-22 16:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,223,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01
2020-09-22 16:07:35,EMPTY,stop,4,London,100010002,apple,page_002,224,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01
2020-09-22 17:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,225,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01
2020-09-22 17:07:35,EMPTY,stop,4,London,100010002,apple,page_002,226,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01
2020-09-22 18:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,227,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01
2020-09-22 18:07:35,EMPTY,stop,4,London,100010002,apple,page_002,228,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01
2020-09-22 19:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,229,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01
2020-09-22 19:07:35,EMPTY,stop,4,London,100010002,apple,page_002,230,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01
2020-09-22 20:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,231,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01
2020-09-22 20:07:35,EMPTY,stop,4,London,100010002,apple,page_002,232,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01
2020-09-22 21:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,233,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 21:07:00,2020-09-22 21:00:00,2020-09-01
2020-09-22 22:07:35,EMPTY,stop,4,London,100010002,apple,page_002,234,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 22:07:00,2020-09-22 22:00:00,2020-09-01
2020-09-22 23:07:35,EMPTY,play,3,Shanghai,100010001,huawei,page_001,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 23:07:00,2020-09-22 23:00:00,2020-09-01
2020-09-23 00:07:35,EMPTY,pause,5,Tianjin,100010003,huawei,page_003,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 00:07:00,2020-09-23 00:00:00,2020-09-01
2020-09-23 02:07:35,EMPTY,pause,5,Tokyo,100010004,huawei,page_004,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 02:07:00,2020-09-23 02:00:00,2020-09-01
2020-09-23 05:07:35,EMPTY,pause,5,Beijing,100010005,huawei,page_005,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 05:07:00,2020-09-23 05:00:00,2020-09-01
2020-09-23 07:07:35,EMPTY,pause,5,Wuhan,100010006,huawei,page_006,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 07:07:00,2020-09-23 07:00:00,2020-09-01
2020-09-23 10:07:35,EMPTY,pause,5,Tianjin,100010007,huawei,page_007,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 10:07:00,2020-09-23 10:00:00,2020-09-01
2020-09-23 12:07:35,EMPTY,pause,5,Tokyo,100010008,huawei,page_008,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 12:07:00,2020-09-23 12:00:00,2020-09-01
2020-09-23 15:07:35,EMPTY,pause,5,Beijing,100010009,huawei,page_009,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 15:07:00,2020-09-23 15:00:00,2020-09-01



hadoop fs -put partition.csv hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23


Insert Partition
use lacus_0923;
ALTER TABLE lambda_flat_table ADD PARTITION(DAY_START='2020-09-23') location 'hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23';
select * from lambda_flat_table limit 5;

Refresh streaming table


  • No labels