THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.



Prepare data and schema

LambdaTable
CREATE EXTERNAL TABLE IF NOT EXISTS lambda_flat_table
(
-- event timestamp and debug purpose column
EVENT_TIME timestamp,
str_minute_second string COMMENT "For debug purpose, maybe check timezone etc"

-- dimension column
act_type string COMMENT "What did user interact with our mobile app in this event",
user_devide_type string COMMENT "Which kind of device did user use in this event",
location_city string COMMENT "Which city did user locate in this event",
video_id bigint "Which video did user watch in this event",
device_brand string,
page_id string,

-- measure column
play_times bigint,
play_duration decimal(23, 10),
pageview_id string COMMENT "Identier",


-- for kylin used (dimension)
,MINUTE_START timestamp,
,HOUR_START timestamp,
,MONTH_START date
)
PARTITIONED BY (DAY_START date)
STORED AS 
LOCATION 'hdfs:///LACUS/lambda_data/lambda_flat_table';
data
2020-09-23 16:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,223,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,play,4,Shanghai,100010002,vivo,page_002,224,34.2222,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,play,5,Shanghai,100010003,vivo,page_003,225,35.2222,4bb9c1a0-ded7-3f16-93fb-bd2cbac9a815,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,play,6,Shanghai,100010004,vivo,page_004,226,36.2222,78f8cf0b-0c6a-3fb8-9d24-1e4a8e882eaa,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,play,7,Shanghai,100010005,vivo,page_005,227,37.2222,c8f0193b-cdc1-3f89-93db-2fc9698580bd,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,play,8,Shanghai,100010006,vivo,page_006,228,38.2222,80c57eb0-2acb-36ac-89f9-95f0481c77a6,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,play,9,Beijing,100010007,vivo,page_007,229,39.2222,a04c4b4b-7d76-3479-84b0-646b305d32b4,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,play,10,Beijing,100010008,vivo,page_008,230,40.2222,f98be397-abfb-3f91-8448-29738a166724,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,play,11,Beijing,100010009,vivo,page_009,231,41.2222,ccfcea68-5ad0-3f02-9565-f4a496b398e6,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,start,12,Beijing,100010010,vivo,page_010,232,42.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,start,13,Beijing,100010011,apple,page_011,233,43.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,start,3,Beijing,100010012,apple,page_012,234,44.2222,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,start,4,Beijing,100010013,apple,page_013,235,45.2222,4bb9c1a0-ded7-3f16-93fb-bd2cbac9a815,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,start,5,Beijing,100010014,apple,page_014,236,46.2222,78f8cf0b-0c6a-3fb8-9d24-1e4a8e882eaa,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,start,6,Hangzhou,100010015,apple,page_015,237,47.2222,c8f0193b-cdc1-3f89-93db-2fc9698580bd,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,start,7,Hangzhou,100010016,apple,page_016,238,48.2222,80c57eb0-2acb-36ac-89f9-95f0481c77a6,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,start,8,Hangzhou,100010017,apple,page_017,239,49.2222,a04c4b4b-7d76-3479-84b0-646b305d32b4,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,stop,9,Hangzhou,100010018,apple,page_018,240,50.2222,f98be397-abfb-3f91-8448-29738a166724,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,stop,10,Hangzhou,100010019,apple,page_019,241,51.2222,ccfcea68-5ad0-3f02-9565-f4a496b398e6,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 16:07:35,EMPTY,stop,11,Hangzhou,100010020,apple,page_020,242,52.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 16:07:00,2020-09-23 16:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,223,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,4,Shanghai,100010002,vivo,page_002,224,34.2222,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,5,Shanghai,100010003,vivo,page_003,225,35.2222,4bb9c1a0-ded7-3f16-93fb-bd2cbac9a815,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,6,Shanghai,100010004,vivo,page_004,226,36.2222,78f8cf0b-0c6a-3fb8-9d24-1e4a8e882eaa,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,7,Shanghai,100010005,vivo,page_005,227,37.2222,c8f0193b-cdc1-3f89-93db-2fc9698580bd,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,8,Shanghai,100010006,vivo,page_006,228,38.2222,80c57eb0-2acb-36ac-89f9-95f0481c77a6,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,9,Beijing,100010007,vivo,page_007,229,39.2222,a04c4b4b-7d76-3479-84b0-646b305d32b4,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,10,Beijing,100010008,vivo,page_008,230,40.2222,f98be397-abfb-3f91-8448-29738a166724,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,play,11,Beijing,100010009,vivo,page_009,231,41.2222,ccfcea68-5ad0-3f02-9565-f4a496b398e6,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,start,12,Beijing,100010010,vivo,page_010,232,42.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,start,13,Beijing,100010011,apple,page_011,233,43.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,start,3,Beijing,100010012,apple,page_012,234,44.2222,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,start,4,Beijing,100010013,apple,page_013,235,45.2222,4bb9c1a0-ded7-3f16-93fb-bd2cbac9a815,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,start,5,Beijing,100010014,apple,page_014,236,46.2222,78f8cf0b-0c6a-3fb8-9d24-1e4a8e882eaa,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,start,6,Hangzhou,100010015,apple,page_015,237,47.2222,c8f0193b-cdc1-3f89-93db-2fc9698580bd,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,start,7,Hangzhou,100010016,apple,page_016,238,48.2222,80c57eb0-2acb-36ac-89f9-95f0481c77a6,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,start,8,Hangzhou,100010017,apple,page_017,239,49.2222,a04c4b4b-7d76-3479-84b0-646b305d32b4,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,stop,9,Hangzhou,100010018,apple,page_018,240,50.2222,f98be397-abfb-3f91-8448-29738a166724,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,stop,10,Hangzhou,100010019,apple,page_019,241,51.2222,ccfcea68-5ad0-3f02-9565-f4a496b398e6,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01
2020-09-23 17:07:35,EMPTY,stop,11,Hangzhou,100010020,apple,page_020,242,52.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 17:07:00,2020-09-23 17:00:00,2020-09-01


Message
{
    "content_list":[
        "22",
        "22",
        "22"
    ],
    "act_type":"click",
    "event_ts_2":1600877255000,
    "event_ts":1600877255000,
    "user_detail":{
        "devide_type":7,
        "location":{
            "city":"shenzhen"
        },
        "network_type":3
    },
    "video_id":22,
    "event_date_2":"2020-09-23 16:07:35.000+08:00",
    "str_minute":"7",

    "video_type":"3c8416",
    "play_times":22,
    "pageview_id":"3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097",
    "active_minutes":50.0208,
    "device_brand":"vivo",
    "str_minute_second":"16_7_35",
    "play_duration":37.6584,
    "event_date":"2020-09-23 16:07:35.000+08:00",
    "page_id":"page_22",
    "str_second":"35",
    "uid":2
}


MockMessage
# -*- coding: utf-8 -*-
import sys
import json
import datetime
from datetime import timedelta
import random
import uuid
import time
import argparse

########################################################
### User Config

## Begin of UTC
BEGIN_DT = datetime.datetime(1970, 1, 1)

## Sleep Per ten events
SLEEP_PER_TEN = 0.03

## Lost some dimsension value?
LOST_SOME_DIMSENSION = False

ENABLE_HOUR_POWER = True

MAX_UID = 20000
MAX_VID = 11000
pv_id_set_hour = set()
pv_id_stat = dict()
pv_id_set = set()
########################################################
# For verify the correctness of COUNT DISTINCT
# For verify the correctness of COUNT DISTINCT
########################################################


########################################################
### Program
random.seed(datetime.datetime.now().second)

act_type_power = [
    ['start', 32],
    ['exit', 43],
    ['play', 224],
    ['stop', 156],
    ['pause', 23],
    ['click', 367],
    ['comment', 14],
    ['share', 22],
    ['like', 55],
    ['dislike', 70]
]

tmp = 0
for act_type_item in act_type_power:
    p = act_type_item[1]
    act_type_item[1] = tmp
    tmp += p
TOTAL_POWER = tmp

hour_power = {
    0: 1.75,
    1: 1.25,
    2: 0.71,
    3: 0.43,
    4: 0.31,
    5: 0.55,
    6: 0.61,
    7: 0.72,
    8: 0.96,
    9: 1.01,
    10: 1.12,
    11: 1.11,
    12: 1.31,
    13: 1.21,
    14: 1.51,
    15: 1.23,
    16: 1.02,
    17: 1.31,
    18: 1.41,
    19: 1.22,
    20: 1.42,
    21: 1.55,
    22: 1.34,
    23: 1.67
}

gmt_8 = 8 * 3600


def fake_curr_date_long(now):
    return int((now - BEGIN_DT).total_seconds()) * 1000


def fake_curr_date_str(now):
    return now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')


def fake_act_type():
    ran = random.randint(0, TOTAL_POWER * 3) % TOTAL_POWER
    preact = act_type_power[0][0]
    for act in act_type_power:
        if ran < act[1]:
            return preact
        else:
            preact = act[0]
    return preact


def fake_active_minutes(row_no, now):
    now_hour = now.hour
    if row_no % 2 == 0:
        ran_num = random.randint(0, 3) + random.randint(0, 999) / 100.0
    else:
        ran_num = random.randint(4, 50) + random.randint(0, 999) / 100.0
    if ENABLE_HOUR_POWER:
        return round(ran_num * hour_power[now_hour], 4)
    else:
        return round(ran_num, 4)


def fake_uid(row_no, now):
    if row_no % 3 == 0:
        return random.randint(1, 10)
    else:
        return random.randint(10, MAX_UID) + now.hour * 100


def fake_video_id(now):
    tmp1 = random.randint(0, MAX_VID)
    if now.minute % 2 == 1:
        return tmp % 100 + now.hour
    else:
        return tmp1 + now.hour * 100


def fake_play_times(row_no, now):
    now_hour = now.hour
    if row_no % 2 == 0:
        pt = random.randint(0, 10)
    else:
        pt = random.randint(10, 55)

    if ENABLE_HOUR_POWER:
        return int(pt * hour_power[now_hour])
    else:
        return pt


def fake_pageview_id(str2, now):
    return str2 + str(now.minute)


city_list = ["shanghai", "beijing", "hangzhou", "shenzhen",
             "taibei", "hongkong", "guangzhou",
             "nanjing", "chongqin", "berlin", "tokyo"]
video_type = ["Sports", "Computer", "Science", "Game",
              "News", "Taste", "Tour", "Music",
              "Finance", "Arts", u"军事", u"中文", u"音乐", u"古装剧"]

brand_list = ["huawei", "iPhone", "xiaomi", "vivo", "360", "meizu"]

def fake_video_type(str1, now):
    tt = now.second % 3
    if tt == 0:
        return video_type[random.randint(0, 3)]
    elif tt == 1:
        return video_type[random.randint(0, len(video_type) - 1)]
    else:
        return str1 + str(now.hour)


def fake_string_in_list(row_no, str_list, top_n=3):
    if row_no % 2 == 0:
        _id = row_no % top_n
    else:
        _id = row_no % len(str_list)
    return str_list[_id]


def fake_event(row_no, long_str, short_str, now):
    """
    Fake single event
    """
    row = dict()

    ########################################################
    # Dimsension
    row['event_date'] = fake_curr_date_str(now)
    row['event_date_2'] = fake_curr_date_str(now)
    row['event_ts'] = fake_curr_date_long(now)
    row['event_ts_2'] = fake_curr_date_long(now)
    if not LOST_SOME_DIMSENSION or row_no % 10 >= 8:
        row['act_type'] = fake_act_type()
    row['video_type'] = fake_video_type(short_str[0:4], now)

    # Dimsension or Measure (UserView)
    row['uid'] = fake_uid(row_no, now)
    row['page_id'] = "page_" + str(fake_video_id(now) % 50)
    row['video_id'] = fake_video_id(now)
    row['device_brand'] = fake_string_in_list(row_no, brand_list)

    # multi level key-value
    row['user_detail'] = {"location": {"city": fake_string_in_list(row_no, city_list)},
                          "devide_type": now.minute,
                          "network_type": now.microsecond % 4 + 2}
    row['content_list'] = [str(fake_video_id(now)), str(fake_video_id(now)), str(fake_video_id(now))]

    ########################################################
    # Measure
    row['play_duration'] = fake_active_minutes(row_no, now)
    row['active_minutes'] = fake_active_minutes(row_no, now)
    row['play_times'] = fake_play_times(row_no, now)
    pvid = fake_pageview_id(long_str, now)
    pv_id_set.add(pvid)  # add stat of pv_id
    pv_id_set_hour.add(pvid)
    row['pageview_id'] = pvid

    # Fix string for verify count distinct(bitmap)
    row['str_minute'] = '%s' % now.minute
    row['str_second'] = '%s' % now.second
    row['str_minute_second'] = '%s_%s_%s' % (now.hour, now.minute, now.second)
    return json.dumps(row)


def fake_all_rows():
    total_row = 0
    init = False
    start = datetime.datetime.now()
    data_file = open('out.data', 'w+')
    while True:
        row_no = 0
        while row_no <= 10:
            time.sleep(SLEEP_PER_TEN)
            now = datetime.datetime.now()

            if now.hour == 1:
                print >> sys.stderr, "Sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
                time.sleep(4 * 3600)
                print >> sys.stderr, "End sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
            elif now.hour == 7:
                print >> sys.stderr, "Sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
                time.sleep(4 * 3600)
                print >> sys.stderr, "End sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')

            unique_str = str(uuid.uuid3(uuid.NAMESPACE_URL, str(row_no + now.microsecond)))

            ####################################################
            # For verify the correctness of COUNT DISTINCT
            if not init:
                current_minute = -1
                current_hour = -1
                init = True
            if current_minute != now.minute:
                pv_id_stat['%s_%s' % (now.hour, current_minute)] = len(pv_id_set)
                print >> sys.stderr, "\nMETRICS,%s,%s,%s,%s" % (
                    current_hour + 8, current_minute, len(pv_id_set), str(total_row))
                current_minute = now.minute
                pv_id_set.clear()
            if current_hour != now.hour:
                print >> sys.stderr, "\nHOUR_METRICS,%s,%s" % (current_hour + 8, len(pv_id_set_hour))
                current_hour = now.hour
                pv_id_set_hour.clear()
            # For verify the correctness of COUNT DISTINCT
            ####################################################

            single_row = fake_event(row_no + 0, unique_str, unique_str + str(row_no), now)
            print single_row
            print >> data_file, single_row

            row_no += 1
            total_row += 1


def init_argument():
    parser = argparse.ArgumentParser()
    parser.add_argument('--max-uid', required=True, type=int)
    parser.add_argument('--max-vid', required=True, type=int)
    parser.add_argument('--enable-hour-power', required=True, type=bool)
    args = parser.parse_args()
    return args


if __name__ == "__main__":
    current_minute = -1
    fake_all_rows()


SendMsg
# bin/kafka-topics.sh --create --topic useraction --zookeeper cdh-master --partitions 10 --replication-factor 1
rm -rf out.data
python fake.py | kafka-console-producer --t
opic useraction_xxyu --broker-list cdh-master:9092,cdh-worker-1:9092,cdh-worker-2:9092


script
nohup sh SendMsg.sh > start.log &

Prepare Kylin 

kylin.stream.cube.duration=3600
kylin.stream.build.additional.cuboids=true
kylin.stream.metrics.option=console
kylin.stream.hive.database-for-lambda-cube=lambda_311
kylin.stream.event.timezone=GMT+8
kylin.stream.print-realtime-dict-enabled=true




  • No labels