THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.


Background

What is Realtime OLAP in Kylin

Kylin v3.0.0 will release the real-time OLAP feature, by the power of newly added streaming reciever cluster, Kylin can query streaming data with sub-second latency. You can check this tech blog for the overall design and core concept.

If you want to find a step by step tutorial, please check this this tech blog.
In this article, we will introduce how to update segment and set timezone for derived time column in realtime OLAP cube.

Sample Event

This sample event comes from my python script with some additional fields such as event_time. We have the field such as event_time, which stands for the timestamp of event.
And we assume that event come from countries of different timezone, “2019-12-09 08:44:50.000-0500” indicated that event applies America/New_York timezone. You may have some events which come from Asia/Shanghai as well.

Says we have Kafka message which looks like this:

SampleEvent
{
    "content_list":[
        "22",
        "22",
        "22"
    ],
    "act_type":"click",
    "event_ts_2":1600877255000,
    "event_ts":1600877255000,
    "user_detail":{
        "devide_type":7,
        "location":{
            "city":"shenzhen"
        },
        "network_type":3
    },
    "video_id":22,
    "event_date_2":"2020-09-23 16:07:35.000+08:00",
    "str_minute":"7",

    "video_type":"3c8416",
    "play_times":22,
    "pageview_id":"3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097",
    "active_minutes":50.0208,
    "device_brand":"vivo",
    "str_minute_second":"16_7_35",
    "play_duration":37.6584,
    "event_date":"2020-09-23 16:07:35.000+08:00",
    "page_id":"page_22",
    "str_second":"35",
    "uid":2
}

Question

When perform realtime OLAP analysis with Kylin, you may have some concerns included:

  1. Will events in different timezones cause incorrect query results?
  2. How could I make it correct when kafka messages contain the value which is not what you want, says some dimension value is misspelled?
  3. How could I retrieve long-late messages which has been dropped?
  4. My query only hit a small range of time, how should I write filter condition to make sure unused segments are purged/skipped from scan?

Quick Answer

For the first question, you can always get the correct result in the right timezone of location by set kylin.stream.event.timezone=GMT+N for all Kylin processes. By default, UTC is used for derived time column.

For the second and third question, in fact you cannot update/append segment to a normal streaming cube, but you can update/append a streaming cube which in lambda mode, all you need to prepare is creating a Hive table which is mapped to your kafka event.

For the fourth question, you can achieved this by adding derived time column in your filter condition like MINUTE_START/DAY_START etc.

Lambda Architecture

How to enable Lambda Mode in Kylin 3.1

Overall steps

  1. Create hive table(lambda table)
  2. Prepare sample event script
  3. Start kafka producer
  4. Deploy Kylin 
  5. Load kafka topic into Kylin
  6. Create model/cube into kylin
  7. Enable streaming cube
  8. Load data into lambda table
  9. Refresh streaming table

Create lambda table

We know that segments which built from kafka may be incorrect for some reason, such as very late message. User may want a chance to update them.

Here we create a hive table to store "correct" data, so user will have a chance to clean and transform data from Kafka, and refresh segment from the hive table.

Please make sure that lambda_flat_table contains all the columns that you want to be included in your streaming cube.

LambdaTable DDL
CREATE EXTERNAL TABLE IF NOT EXISTS lambda_flat_table
(
-- event timestamp and debug purpose column
EVENT_TIME timestamp,
str_minute_second string COMMENT "For debug purpose, maybe check timezone etc",

-- dimension column
act_type string COMMENT "What did user interact with our mobile app in this event",
user_devide_type string COMMENT "Which kind of device did user use in this event",
location_city string COMMENT "Which city did user locate in this event",
video_id bigint COMMENT "Which video did user watch in this event",
device_brand string,
page_id string,

-- measure column
play_times bigint,
play_duration decimal(23, 10),
pageview_id string COMMENT "Identier of a pageview",


-- for kylin used (dimension)
MINUTE_START timestamp,
HOUR_START timestamp,
MONTH_START date
)
COMMENT 'Fact table. Store raw user action log.'
PARTITIONED BY (DAY_START date)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs:///LACUS/lambda_data/lambda_flat_table';

Prepare sample event script

Says that we have a mobile application which 

MockMessage
# -*- coding: utf-8 -*-
import sys
import json
import datetime
from datetime import timedelta
import random
import uuid
import time
import argparse

########################################################
### User Config

## Begin of UTC
BEGIN_DT = datetime.datetime(1970, 1, 1)

## Sleep Per ten events
SLEEP_PER_TEN = 0.03

## Lost some dimsension value?
LOST_SOME_DIMSENSION = False

ENABLE_HOUR_POWER = True

MAX_UID = 20000
MAX_VID = 11000
pv_id_set_hour = set()
pv_id_stat = dict()
pv_id_set = set()
########################################################
# For verify the correctness of COUNT DISTINCT
# For verify the correctness of COUNT DISTINCT
########################################################


########################################################
### Program
random.seed(datetime.datetime.now().second)

act_type_power = [
    ['start', 32],
    ['exit', 43],
    ['play', 224],
    ['stop', 156],
    ['pause', 23],
    ['click', 367],
    ['comment', 14],
    ['share', 22],
    ['like', 55],
    ['dislike', 70]
]

tmp = 0
for act_type_item in act_type_power:
    p = act_type_item[1]
    act_type_item[1] = tmp
    tmp += p
TOTAL_POWER = tmp

hour_power = {
    0: 1.75,
    1: 1.25,
    2: 0.71,
    3: 0.43,
    4: 0.31,
    5: 0.55,
    6: 0.61,
    7: 0.72,
    8: 0.96,
    9: 1.01,
    10: 1.12,
    11: 1.11,
    12: 1.31,
    13: 1.21,
    14: 1.51,
    15: 1.23,
    16: 1.02,
    17: 1.31,
    18: 1.41,
    19: 1.22,
    20: 1.42,
    21: 1.55,
    22: 1.34,
    23: 1.67
}

gmt_8 = 8 * 3600


def fake_curr_date_long(now):
    return int((now - BEGIN_DT).total_seconds()) * 1000


def fake_curr_date_str(now):
    return now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')


def fake_act_type():
    ran = random.randint(0, TOTAL_POWER * 3) % TOTAL_POWER
    preact = act_type_power[0][0]
    for act in act_type_power:
        if ran < act[1]:
            return preact
        else:
            preact = act[0]
    return preact


def fake_active_minutes(row_no, now):
    now_hour = now.hour
    if row_no % 2 == 0:
        ran_num = random.randint(0, 3) + random.randint(0, 999) / 100.0
    else:
        ran_num = random.randint(4, 50) + random.randint(0, 999) / 100.0
    if ENABLE_HOUR_POWER:
        return round(ran_num * hour_power[now_hour], 4)
    else:
        return round(ran_num, 4)


def fake_uid(row_no, now):
    if row_no % 3 == 0:
        return random.randint(1, 10)
    else:
        return random.randint(10, MAX_UID) + now.hour * 100


def fake_video_id(now):
    tmp1 = random.randint(0, MAX_VID)
    if now.minute % 2 == 1:
        return tmp % 100 + now.hour
    else:
        return tmp1 + now.hour * 100


def fake_play_times(row_no, now):
    now_hour = now.hour
    if row_no % 2 == 0:
        pt = random.randint(0, 10)
    else:
        pt = random.randint(10, 55)

    if ENABLE_HOUR_POWER:
        return int(pt * hour_power[now_hour])
    else:
        return pt


def fake_pageview_id(str2, now):
    return str2 + str(now.minute)


city_list = ["shanghai", "beijing", "hangzhou", "shenzhen",
             "taibei", "hongkong", "guangzhou",
             "nanjing", "chongqin", "berlin", "tokyo"]
video_type = ["Sports", "Computer", "Science", "Game",
              "News", "Taste", "Tour", "Music",
              "Finance", "Arts", u"军事", u"中文", u"音乐", u"古装剧"]

brand_list = ["huawei", "iPhone", "xiaomi", "vivo", "360", "meizu"]

def fake_video_type(str1, now):
    tt = now.second % 3
    if tt == 0:
        return video_type[random.randint(0, 3)]
    elif tt == 1:
        return video_type[random.randint(0, len(video_type) - 1)]
    else:
        return str1 + str(now.hour)


def fake_string_in_list(row_no, str_list, top_n=3):
    if row_no % 2 == 0:
        _id = row_no % top_n
    else:
        _id = row_no % len(str_list)
    return str_list[_id]


def fake_event(row_no, long_str, short_str, now):
    """
    Fake single event
    """
    row = dict()

    ########################################################
    # Dimsension
    row['event_date'] = fake_curr_date_str(now)
    row['event_date_2'] = fake_curr_date_str(now)
    row['event_ts'] = fake_curr_date_long(now)
    row['event_ts_2'] = fake_curr_date_long(now)
    if not LOST_SOME_DIMSENSION or row_no % 10 >= 8:
        row['act_type'] = fake_act_type()
    row['video_type'] = fake_video_type(short_str[0:4], now)

    # Dimsension or Measure (UserView)
    row['uid'] = fake_uid(row_no, now)
    row['page_id'] = "page_" + str(fake_video_id(now) % 50)
    row['video_id'] = fake_video_id(now)
    row['device_brand'] = fake_string_in_list(row_no, brand_list)

    # multi level key-value
    row['user_detail'] = {"location": {"city": fake_string_in_list(row_no, city_list)},
                          "devide_type": now.minute,
                          "network_type": now.microsecond % 4 + 2}
    row['content_list'] = [str(fake_video_id(now)), str(fake_video_id(now)), str(fake_video_id(now))]

    ########################################################
    # Measure
    row['play_duration'] = fake_active_minutes(row_no, now)
    row['active_minutes'] = fake_active_minutes(row_no, now)
    row['play_times'] = fake_play_times(row_no, now)
    pvid = fake_pageview_id(long_str, now)
    pv_id_set.add(pvid)  # add stat of pv_id
    pv_id_set_hour.add(pvid)
    row['pageview_id'] = pvid

    # Fix string for verify count distinct(bitmap)
    row['str_minute'] = '%s' % now.minute
    row['str_second'] = '%s' % now.second
    row['str_minute_second'] = '%s_%s_%s' % (now.hour, now.minute, now.second)
    return json.dumps(row)


def fake_all_rows():
    total_row = 0
    init = False
    start = datetime.datetime.now()
    data_file = open('out.data', 'w+')
    while True:
        row_no = 0
        while row_no <= 10:
            time.sleep(SLEEP_PER_TEN)
            now = datetime.datetime.now()

            if now.hour == 1:
                print >> sys.stderr, "Sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
                time.sleep(4 * 3600)
                print >> sys.stderr, "End sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
            elif now.hour == 7:
                print >> sys.stderr, "Sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
                time.sleep(4 * 3600)
                print >> sys.stderr, "End sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')

            unique_str = str(uuid.uuid3(uuid.NAMESPACE_URL, str(row_no + now.microsecond)))

            ####################################################
            # For verify the correctness of COUNT DISTINCT
            if not init:
                current_minute = -1
                current_hour = -1
                init = True
            if current_minute != now.minute:
                pv_id_stat['%s_%s' % (now.hour, current_minute)] = len(pv_id_set)
                print >> sys.stderr, "\nMETRICS,%s,%s,%s,%s" % (
                    current_hour + 8, current_minute, len(pv_id_set), str(total_row))
                current_minute = now.minute
                pv_id_set.clear()
            if current_hour != now.hour:
                print >> sys.stderr, "\nHOUR_METRICS,%s,%s" % (current_hour + 8, len(pv_id_set_hour))
                current_hour = now.hour
                pv_id_set_hour.clear()
            # For verify the correctness of COUNT DISTINCT
            ####################################################

            single_row = fake_event(row_no + 0, unique_str, unique_str + str(row_no), now)
            print single_row
            print >> data_file, single_row

            row_no += 1
            total_row += 1


def init_argument():
    parser = argparse.ArgumentParser()
    parser.add_argument('--max-uid', required=True, type=int)
    parser.add_argument('--max-vid', required=True, type=int)
    parser.add_argument('--enable-hour-power', required=True, type=bool)
    args = parser.parse_args()
    return args


if __name__ == "__main__":
    current_minute = -1
    fake_all_rows()

Start kafka producer

SendMsg
# bin/kafka-topics.sh --create --topic useraction --zookeeper cdh-master --partitions 10 --replication-factor 1
rm -rf out.data
python fake.py | kafka-console-producer --t
opic useraction_xxyu --broker-list cdh-master:9092,cdh-worker-1:9092,cdh-worker-2:9092


script
nohup sh SendMsg.sh > start.log &

Deploy Kylin 

kylin.stream.cube.duration=3600
kylin.stream.build.additional.cuboids=true
kylin.stream.metrics.option=console
kylin.stream.hive.database-for-lambda-cube=lambda_311
kylin.stream.event.timezone=GMT+8
kylin.stream.print-realtime-dict-enabled=true

Load kafka topic into Kylin


Create model/cube into kylin


Enable streaming cube


Load data into lambda table

data
2020-09-22 16:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,223,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01
2020-09-22 16:07:35,EMPTY,stop,4,London,100010002,apple,page_002,224,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01
2020-09-22 17:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,225,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01
2020-09-22 17:07:35,EMPTY,stop,4,London,100010002,apple,page_002,226,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01
2020-09-22 18:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,227,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01
2020-09-22 18:07:35,EMPTY,stop,4,London,100010002,apple,page_002,228,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01
2020-09-22 19:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,229,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01
2020-09-22 19:07:35,EMPTY,stop,4,London,100010002,apple,page_002,230,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01
2020-09-22 20:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,231,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01
2020-09-22 20:07:35,EMPTY,stop,4,London,100010002,apple,page_002,232,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01
2020-09-22 21:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_001,233,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 21:07:00,2020-09-22 21:00:00,2020-09-01
2020-09-22 22:07:35,EMPTY,stop,4,London,100010002,apple,page_002,234,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 22:07:00,2020-09-22 22:00:00,2020-09-01
2020-09-22 23:07:35,EMPTY,play,3,Shanghai,100010001,huawei,page_001,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 23:07:00,2020-09-22 23:00:00,2020-09-01
2020-09-23 00:07:35,EMPTY,pause,5,Tianjin,100010003,huawei,page_003,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 00:07:00,2020-09-23 00:00:00,2020-09-01
2020-09-23 02:07:35,EMPTY,pause,5,Tokyo,100010004,huawei,page_004,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 02:07:00,2020-09-23 02:00:00,2020-09-01
2020-09-23 05:07:35,EMPTY,pause,5,Beijing,100010005,huawei,page_005,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 05:07:00,2020-09-23 05:00:00,2020-09-01
2020-09-23 07:07:35,EMPTY,pause,5,Wuhan,100010006,huawei,page_006,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 07:07:00,2020-09-23 07:00:00,2020-09-01
2020-09-23 10:07:35,EMPTY,pause,5,Tianjin,100010007,huawei,page_007,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 10:07:00,2020-09-23 10:00:00,2020-09-01
2020-09-23 12:07:35,EMPTY,pause,5,Tokyo,100010008,huawei,page_008,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 12:07:00,2020-09-23 12:00:00,2020-09-01
2020-09-23 15:07:35,EMPTY,pause,5,Beijing,100010009,huawei,page_009,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 15:07:00,2020-09-23 15:00:00,2020-09-01



hadoop fs -put partition.csv hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23


Insert Partition
use lacus_0923;
ALTER TABLE lambda_flat_table ADD PARTITION(DAY_START='2020-09-23') location 'hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23';
select * from lambda_flat_table limit 5;

Refresh streaming table


  • No labels