THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.

Before you start

What is lambda mode in Kylin

Steps of Streaming Build Job

Unable to render Jira issues macro, execution error.

http://kylin.apache.org/docs/tutorial/lambda_mode_and_timezone_realtime_olap.html

How to enable Lambda Mode in Kylin 3.1

Overall steps

  1. Create hive table(historical table)
  2. Prepare script to mock sample event
  3. Start kafka producer
  4. Deploy Kylin Coordinator and Streaming Receiver
  5. Load streaming table
  6. Create model/cube into kylin
  7. Enable streaming cube
  8. Load data into historical table
  9. Refresh streaming table

Create lambda table

We know that segments which built from kafka may be incorrect for some reason, such as very late message. User may want to have a chance to update them.

Here we create a hive table(historical table) to store "correct" data, to let user have a chance to clean and transform data from Kafka, and then overwrite segment by data of hive table(Correct data).

Please make sure that historical table contains all the columns that you want to be included in your streaming cube and data type is match.

Please choose "DAY_START" as partition column of historical table, it is recommended to refresh segment once a day.

Please remind that the segment range use the GMT+0/UTC timezone, if you see a segment named "20200923160000_20200924160000", and it means that this segment started from "2020-09-23 16:00:00 GMT+00:00", that is "2020-09-24 00:00:00 GMT+08:00" for the people lived in China.


LambdaTable DDL
CREATE EXTERNAL TABLE IF NOT EXISTS lambda_flat_table
(
-- event timestamp and debug purpose column
EVENT_TIME timestamp,
str_minute_second string COMMENT "For debug purpose, maybe check timezone etc",

-- dimension column
act_type string COMMENT "What did user interact with our mobile app in this event",
user_devide_type string COMMENT "Which kind of device did user use in this event",
location_city string COMMENT "Which city did user locate in this event",
video_id bigint COMMENT "Which video did user watch in this event",
device_brand string,
page_id string,

-- measure/metrics column
play_times bigint,
play_duration decimal(23, 10),
pageview_id string COMMENT "Identier of a pageview",


-- for kylin used (dimension)
MINUTE_START timestamp,
HOUR_START timestamp,
MONTH_START date
)
COMMENT 'Fact table. Store raw user action log.'
PARTITIONED BY (DAY_START date)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'hdfs:///LACUS/lambda_data/lambda_flat_table';

Prepare sample event script

Says that we want to monitor user's action against our mobile video application. Following script(Python2) will send event in JSON format to STDOUT. 

MockMessage
# -*- coding: utf-8 -*-
import sys
import json
import datetime
from datetime import timedelta
import random
import uuid
import time
import argparse

########################################################
### User Config

## Begin of UTC
BEGIN_DT = datetime.datetime(1970, 1, 1)

## Sleep Per ten events
SLEEP_PER_TEN = 0.03

## Lost some dimsension value?
LOST_SOME_DIMSENSION = False

ENABLE_HOUR_POWER = True

MAX_UID = 20000
MAX_VID = 11000
pv_id_set_hour = set()
pv_id_stat = dict()
pv_id_set = set()
########################################################
# For verify the correctness of COUNT DISTINCT
# For verify the correctness of COUNT DISTINCT
########################################################


########################################################
### Program
random.seed(datetime.datetime.now().second)

act_type_power = [
    ['start', 32],
    ['exit', 43],
    ['play', 224],
    ['stop', 156],
    ['pause', 23],
    ['click', 367],
    ['comment', 14],
    ['share', 22],
    ['like', 55],
    ['dislike', 70]
]

tmp = 0
for act_type_item in act_type_power:
    p = act_type_item[1]
    act_type_item[1] = tmp
    tmp += p
TOTAL_POWER = tmp

hour_power = {
    0: 1.75,
    1: 1.25,
    2: 0.71,
    3: 0.43,
    4: 0.31,
    5: 0.55,
    6: 0.61,
    7: 0.72,
    8: 0.96,
    9: 1.01,
    10: 1.12,
    11: 1.11,
    12: 1.31,
    13: 1.21,
    14: 1.51,
    15: 1.23,
    16: 1.02,
    17: 1.31,
    18: 1.41,
    19: 1.22,
    20: 1.42,
    21: 1.55,
    22: 1.34,
    23: 1.67
}

gmt_8 = 8 * 3600


def fake_curr_date_long(now):
    return int((now - BEGIN_DT).total_seconds()) * 1000


def fake_curr_date_str(now):
    return now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')


def fake_act_type():
    ran = random.randint(0, TOTAL_POWER * 3) % TOTAL_POWER
    preact = act_type_power[0][0]
    for act in act_type_power:
        if ran < act[1]:
            return preact
        else:
            preact = act[0]
    return preact


def fake_active_minutes(row_no, now):
    now_hour = now.hour
    if row_no % 2 == 0:
        ran_num = random.randint(0, 3) + random.randint(0, 999) / 100.0
    else:
        ran_num = random.randint(4, 50) + random.randint(0, 999) / 100.0
    if ENABLE_HOUR_POWER:
        return round(ran_num * hour_power[now_hour], 4)
    else:
        return round(ran_num, 4)


def fake_uid(row_no, now):
    if row_no % 3 == 0:
        return random.randint(1, 10)
    else:
        return random.randint(10, MAX_UID) + now.hour * 100


def fake_video_id(now):
    tmp1 = random.randint(0, MAX_VID)
    if now.minute % 2 == 1:
        return tmp % 100 + now.hour
    else:
        return tmp1 + now.hour * 100


def fake_play_times(row_no, now):
    now_hour = now.hour
    if row_no % 2 == 0:
        pt = random.randint(0, 10)
    else:
        pt = random.randint(10, 55)

    if ENABLE_HOUR_POWER:
        return int(pt * hour_power[now_hour])
    else:
        return pt


def fake_pageview_id(str2, now):
    return str2 + str(now.minute)


city_list = ["shanghai", "beijing", "hangzhou", "shenzhen",
             "taibei", "hongkong", "guangzhou",
             "nanjing", "chongqin", "berlin", "tokyo"]
video_type = ["Sports", "Computer", "Science", "Game",
              "News", "Taste", "Tour", "Music",
              "Finance", "Arts", u"军事", u"中文", u"音乐", u"古装剧"]

brand_list = ["huawei", "iPhone", "xiaomi", "vivo", "360", "meizu"]

def fake_video_type(str1, now):
    tt = now.second % 3
    if tt == 0:
        return video_type[random.randint(0, 3)]
    elif tt == 1:
        return video_type[random.randint(0, len(video_type) - 1)]
    else:
        return str1 + str(now.hour)


def fake_string_in_list(row_no, str_list, top_n=3):
    if row_no % 2 == 0:
        _id = row_no % top_n
    else:
        _id = row_no % len(str_list)
    return str_list[_id]


def fake_event(row_no, long_str, short_str, now):
    """
    Fake single event
    """
    row = dict()

    ########################################################
    # Dimsension
    row['event_date'] = fake_curr_date_str(now)
    row['event_date_2'] = fake_curr_date_str(now)
    row['event_ts'] = fake_curr_date_long(now)
    row['event_ts_2'] = fake_curr_date_long(now)
    if not LOST_SOME_DIMSENSION or row_no % 10 >= 8:
        row['act_type'] = fake_act_type()
    row['video_type'] = fake_video_type(short_str[0:4], now)

    # Dimsension or Measure (UserView)
    row['uid'] = fake_uid(row_no, now)
    row['page_id'] = "page_" + str(fake_video_id(now) % 50)
    row['video_id'] = fake_video_id(now)
    row['device_brand'] = fake_string_in_list(row_no, brand_list)

    # multi level key-value
    row['user_detail'] = {"location": {"city": fake_string_in_list(row_no, city_list)},
                          "devide_type": now.minute,
                          "network_type": now.microsecond % 4 + 2}
    row['content_list'] = [str(fake_video_id(now)), str(fake_video_id(now)), str(fake_video_id(now))]

    ########################################################
    # Measure
    row['play_duration'] = fake_active_minutes(row_no, now)
    row['active_minutes'] = fake_active_minutes(row_no, now)
    row['play_times'] = fake_play_times(row_no, now)
    pvid = fake_pageview_id(long_str, now)
    pv_id_set.add(pvid)  # add stat of pv_id
    pv_id_set_hour.add(pvid)
    row['pageview_id'] = pvid

    # Fix string for verify count distinct(bitmap)
    row['str_minute'] = '%s' % now.minute
    row['str_second'] = '%s' % now.second
    row['str_minute_second'] = '%s_%s_%s' % (now.hour, now.minute, now.second)
    return json.dumps(row)


def fake_all_rows():
    total_row = 0
    init = False
    start = datetime.datetime.now()
    data_file = open('out.data', 'w+')
    while True:
        row_no = 0
        while row_no <= 10:
            time.sleep(SLEEP_PER_TEN)
            now = datetime.datetime.now()

            if now.hour == 1:
                print >> sys.stderr, "Sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
                time.sleep(4 * 3600)
                print >> sys.stderr, "End sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
            elif now.hour == 7:
                print >> sys.stderr, "Sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')
                time.sleep(4 * 3600)
                print >> sys.stderr, "End sleep at " + now.strftime('%Y-%m-%d %H:%M:%S.000+08:00')

            unique_str = str(uuid.uuid3(uuid.NAMESPACE_URL, str(row_no + now.microsecond)))

            ####################################################
            # For verify the correctness of COUNT DISTINCT
            if not init:
                current_minute = -1
                current_hour = -1
                init = True
            if current_minute != now.minute:
                pv_id_stat['%s_%s' % (now.hour, current_minute)] = len(pv_id_set)
                print >> sys.stderr, "\nMETRICS,%s,%s,%s,%s" % (
                    current_hour + 8, current_minute, len(pv_id_set), str(total_row))
                current_minute = now.minute
                pv_id_set.clear()
            if current_hour != now.hour:
                print >> sys.stderr, "\nHOUR_METRICS,%s,%s" % (current_hour + 8, len(pv_id_set_hour))
                current_hour = now.hour
                pv_id_set_hour.clear()
            # For verify the correctness of COUNT DISTINCT
            ####################################################

            single_row = fake_event(row_no + 0, unique_str, unique_str + str(row_no), now)
            print single_row
            print >> data_file, single_row

            row_no += 1
            total_row += 1


def init_argument():
    parser = argparse.ArgumentParser()
    parser.add_argument('--max-uid', required=True, type=int)
    parser.add_argument('--max-vid', required=True, type=int)
    parser.add_argument('--enable-hour-power', required=True, type=bool)
    args = parser.parse_args()
    return args


if __name__ == "__main__":
    current_minute = -1
    fake_all_rows()

Use following script to send event to Kafka

SendMsg
# bin/kafka-topics.sh --create --topic useraction_xxyu --zookeeper cdh-master --partitions 10 --replication-factor 1
rm -rf out.data
python MockMessage.py | kafka-console-producer --topic useraction_xxyu --broker-list cdh-master:9092,cdh-worker-1:9092,cdh-worker-2:9092
script
nohup sh SendMsg.sh > start.log &

Deploy Kylin Cluster

  • Delopy a StreamingCorrdinator(kylin.sh start)
  • Delopy a StreamingReceiver(kylin.sh streaming start)
  • Create a ReplicaSet(In front-end page)

Please configure "kylin.stream.event.timezone" with your local timezone. Here is what I use(kylin.stream.event.timezone=GMT+8).

kylin.metadata.url=REALTIME@jdbc,url=jdbc:mysql://localhost:3306/NightlyBuild,username=root,password=R00t@kylin,maxActive=10,maxIdle=10
kylin.source.hive.database-for-flat-table=APACHE
kylin.env.zookeeper-base-path=/APACHE/REALTIME_OLAP
kylin.storage.hbase.table-name-prefix=REALTIME_OLAP_
kylin.storage.hbase.namespace=APACHE
kylin.env.hdfs-working-dir=/APACHE/REALTIME_OLAP
kylin.stream.event.timezone=GMT+8
kylin.web.timezone=GMT+8
kylin.stream.hive.database-for-lambda-cube=APACHE

Load kafka topic into Kylin

Please choose the correct Timestamp Column, correct Timestamp Parser and correct Timestamp Pattern.

Following is the content of streaming_v2/APACHE.LAMBDA_FLAT_TABLE.json .

TableMeta
{
  "uuid" : "e7c87706-f28d-4001-9688-e15702924f0d",
  "last_modified" : 1600869884656,
  "version" : "3.0.0.20500",
  "name" : "APACHE.LAMBDA_FLAT_TABLE",
  "parser_info" : {
    "ts_col_name" : "event_date",
    "ts_parser" : "org.apache.kylin.stream.source.kafka.DateTimeParser",
    "ts_pattern" : "yyyy-MM-dd HH:mm:ss.SSSZZ",
    "format_ts" : false,
    "field_mapping" : {
      "user_devide_type" : "user_detail.devide_type",
      "location_city" : "user_detail.location.city",
      "play_times" : "play_times",
      "pageview_id" : "pageview_id",
      "device_brand" : "device_brand",
      "str_minute_second" : "str_minute_second",
      "event_time" : "event_date",
      "page_id" : "page_id"
    }
  },
  "properties" : {
    "topic" : "useraction_xxyu",
    "bootstrap.servers" : "cdh-master:9092,cdh-worker-1:9092,cdh-worker-2:9092"
  }
}

Create model/cube into kylin

Following is the content of Model and Cube.

ModeDesc
{
  "uuid": "0352eb6f-1249-952d-aabc-78399493f1e4",
  "last_modified": 1600869976000,
  "version": "3.0.0.20500",
  "name": "LambdaVerifyModel",
  "owner": "ADMIN",
  "is_draft": false,
  "description": "Let us verify if lambda mode works.",
  "fact_table": "APACHE.LAMBDA_FLAT_TABLE",
  "fact_table_alias": "LAMBDA_FLAT_TABLE",
  "lookups": [],
  "dimensions": [
    {
      "table": "LAMBDA_FLAT_TABLE",
      "columns": [
        "USER_DEVIDE_TYPE",
        "LOCATION_CITY",
        "DEVICE_BRAND",
        "STR_MINUTE_SECOND",
        "EVENT_TIME",
        "MONTH_START",
        "DAY_START",
        "HOUR_START",
        "MINUTE_START",
        "PAGE_ID"
      ]
    }
  ],
  "metrics": [
    "LAMBDA_FLAT_TABLE.PLAY_TIMES",
    "LAMBDA_FLAT_TABLE.PAGEVIEW_ID"
  ],
  "filter_condition": "",
  "partition_desc": {
    "partition_date_column": "LAMBDA_FLAT_TABLE.DAY_START",
    "partition_time_column": null,
    "partition_date_start": 0,
    "partition_date_format": "yyyy-MM-dd",
    "partition_time_format": "HH:mm:ss",
    "partition_type": "APPEND",
    "partition_condition_builder": "org.apache.kylin.metadata.model.PartitionDesc$DefaultPartitionConditionBuilder"
  },
  "capacity": "MEDIUM",
  "projectName": "LambdaPrj"
}


CubeDesc
{
  "uuid": "2e9eaf25-31e0-16e1-42fc-32d83c05255d",
  "last_modified": 1600870179000,
  "version": "3.0.0.20500",
  "name": "UserActionBasicAnslysisCube",
  "is_draft": false,
  "model_name": "LambdaVerifyModel",
  "description": "",
  "null_string": null,
  "dimensions": [
    {
      "name": "USER_DEVIDE_TYPE",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "USER_DEVIDE_TYPE",
      "derived": null
    },
    {
      "name": "LOCATION_CITY",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "LOCATION_CITY",
      "derived": null
    },
    {
      "name": "DEVICE_BRAND",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "DEVICE_BRAND",
      "derived": null
    },
    {
      "name": "STR_MINUTE_SECOND",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "STR_MINUTE_SECOND",
      "derived": null
    },
    {
      "name": "PAGE_ID",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "PAGE_ID",
      "derived": null
    },
    {
      "name": "MONTH_START",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "MONTH_START",
      "derived": null
    },
    {
      "name": "DAY_START",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "DAY_START",
      "derived": null
    },
    {
      "name": "HOUR_START",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "HOUR_START",
      "derived": null
    },
    {
      "name": "MINUTE_START",
      "table": "LAMBDA_FLAT_TABLE",
      "column": "MINUTE_START",
      "derived": null
    }
  ],
  "measures": [
    {
      "name": "_COUNT_",
      "function": {
        "expression": "COUNT",
        "parameter": {
          "type": "constant",
          "value": "1"
        },
        "returntype": "bigint"
      }
    },
    {
      "name": "PV_STAT",
      "function": {
        "expression": "COUNT_DISTINCT",
        "parameter": {
          "type": "column",
          "value": "LAMBDA_FLAT_TABLE.PAGEVIEW_ID"
        },
        "returntype": "hllc(16)"
      }
    },
    {
      "name": "PLAY_STAT",
      "function": {
        "expression": "SUM",
        "parameter": {
          "type": "column",
          "value": "LAMBDA_FLAT_TABLE.PLAY_TIMES"
        },
        "returntype": "bigint"
      }
    }
  ],
  "dictionaries": [],
  "rowkey": {
    "rowkey_columns": [
      {
        "column": "LAMBDA_FLAT_TABLE.USER_DEVIDE_TYPE",
        "encoding": "dict",
        "encoding_version": 1,
        "isShardBy": false
      },
      {
        "column": "LAMBDA_FLAT_TABLE.LOCATION_CITY",
        "encoding": "dict",
        "encoding_version": 1,
        "isShardBy": false
      },
      {
        "column": "LAMBDA_FLAT_TABLE.DEVICE_BRAND",
        "encoding": "dict",
        "encoding_version": 1,
        "isShardBy": false
      },
      {
        "column": "LAMBDA_FLAT_TABLE.STR_MINUTE_SECOND",
        "encoding": "dict",
        "encoding_version": 1,
        "isShardBy": false
      },
      {
        "column": "LAMBDA_FLAT_TABLE.PAGE_ID",
        "encoding": "dict",
        "encoding_version": 1,
        "isShardBy": false
      },
      {
        "column": "LAMBDA_FLAT_TABLE.MONTH_START",
        "encoding": "date",
        "encoding_version": 1,
        "isShardBy": false
      },
      {
        "column": "LAMBDA_FLAT_TABLE.DAY_START",
        "encoding": "date",
        "encoding_version": 1,
        "isShardBy": false
      },
      {
        "column": "LAMBDA_FLAT_TABLE.HOUR_START",
        "encoding": "time",
        "encoding_version": 1,
        "isShardBy": false
      },
      {
        "column": "LAMBDA_FLAT_TABLE.MINUTE_START",
        "encoding": "time",
        "encoding_version": 1,
        "isShardBy": false
      }
    ]
  },
  "hbase_mapping": {
    "column_family": [
      {
        "name": "F1",
        "columns": [
          {
            "qualifier": "M",
            "measure_refs": [
              "_COUNT_",
              "PLAY_STAT"
            ]
          }
        ]
      },
      {
        "name": "F2",
        "columns": [
          {
            "qualifier": "M",
            "measure_refs": [
              "PV_STAT"
            ]
          }
        ]
      }
    ]
  },
  "aggregation_groups": [
    {
      "includes": [
        "LAMBDA_FLAT_TABLE.USER_DEVIDE_TYPE",
        "LAMBDA_FLAT_TABLE.LOCATION_CITY",
        "LAMBDA_FLAT_TABLE.DEVICE_BRAND",
        "LAMBDA_FLAT_TABLE.STR_MINUTE_SECOND",
        "LAMBDA_FLAT_TABLE.PAGE_ID",
        "LAMBDA_FLAT_TABLE.MONTH_START",
        "LAMBDA_FLAT_TABLE.DAY_START",
        "LAMBDA_FLAT_TABLE.HOUR_START",
        "LAMBDA_FLAT_TABLE.MINUTE_START"
      ],
      "select_rule": {
        "hierarchy_dims": [],
        "mandatory_dims": [
          "LAMBDA_FLAT_TABLE.USER_DEVIDE_TYPE",
          "LAMBDA_FLAT_TABLE.STR_MINUTE_SECOND",
          "LAMBDA_FLAT_TABLE.DEVICE_BRAND"
        ],
        "joint_dims": []
      }
    }
  ],
  "signature": "z2i5rv8LtjgoSByK5a/Y9w==",
  "notify_list": [],
  "status_need_notify": [
    "ERROR",
    "DISCARDED",
    "SUCCEED"
  ],
  "partition_date_start": 0,
  "partition_date_end": 3153600000000,
  "auto_merge_time_ranges": [
    604800000,
    2419200000
  ],
  "volatile_range": 0,
  "retention_range": 0,
  "engine_type": 2,
  "storage_type": 3,
  "override_kylin_properties": {
    "kylin.stream.cube.window": "3600",
    "kylin.stream.cube.duration": "3600",
    "kylin.stream.index.checkpoint.intervals": "300",
    "kylin.cube.algorithm": "INMEM",
    "kylin.stream.segment.retention.policy": "fullBuild",
    "kylin.stream.build.additional.cuboids": "true",
    "kylin.stream.event.timezone": "GMT+8"
  },
  "cuboid_black_list": [],
  "parent_forward": 3,
  "mandatory_dimension_set_list": [],
  "snapshot_table_desc_list": []
}

Enable streaming cube

Load data into lambda table

For each derived time column(event_time, minute_start, hour_start in our case), please make sure you REDUCE your local timezone offset. For example, for a local timestamp "2020-09-23 00:07:35 GMT+08:00", please reduce timezone offset (8 hour) and remove timezone suffix, the result is "2020-09-22 16:07:35".

data
2020-09-22 16:07:35,EMPTY,play,3,Shanghai,100010001,vivo,page_004,223,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01
2020-09-22 16:07:35,EMPTY,stop,4,London,100010002,apple,page_003,224,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 16:07:00,2020-09-22 16:00:00,2020-09-01
2020-09-22 17:07:35,EMPTY,play,5,Nanjing,100010003,vivo,page_002,225,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01
2020-09-22 17:07:35,EMPTY,stop,6,London,100010004,apple,page_001,226,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 17:07:00,2020-09-22 17:00:00,2020-09-01
2020-09-22 18:07:35,EMPTY,play,7,Nanjing,100010001,vivo,page_001,227,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01
2020-09-22 18:07:35,EMPTY,stop,8,London,100010002,apple,page_002,228,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 18:07:00,2020-09-22 18:00:00,2020-09-01
2020-09-22 19:07:35,EMPTY,play,9,Nanjing,100010003,vivo,page_003,229,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01
2020-09-22 19:07:35,EMPTY,stop,3,London,100010004,apple,page_004,230,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 19:07:00,2020-09-22 19:00:00,2020-09-01
2020-09-22 20:07:35,EMPTY,play,4,Nanjing,100010001,vivo,page_001,231,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01
2020-09-22 20:07:35,EMPTY,stop,5,London,100010002,apple,page_002,232,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 20:07:00,2020-09-22 20:00:00,2020-09-01
2020-09-22 21:07:35,EMPTY,play,6,Shanghai,100010003,vivo,page_003,233,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 21:07:00,2020-09-22 21:00:00,2020-09-01
2020-09-22 22:07:35,EMPTY,stop,7,London,100010004,apple,page_004,234,24.2342,9311744c-3746-3502-84c9-d06e8b5ea2d6,2020-09-22 22:07:00,2020-09-22 22:00:00,2020-09-01
2020-09-22 23:07:35,EMPTY,play,8,Shanghai,100010001,huawei,page_001,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-22 23:07:00,2020-09-22 23:00:00,2020-09-01
2020-09-23 00:07:35,EMPTY,pause,9,Tianjin,100010002,huawei,page_003,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 00:07:00,2020-09-23 00:00:00,2020-09-01
2020-09-23 02:07:35,EMPTY,pause,3,Tokyo,100010003,huawei,page_004,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 02:07:00,2020-09-23 02:00:00,2020-09-01
2020-09-23 05:07:35,EMPTY,pause,4,Beijing,100010004,huawei,page_005,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 05:07:00,2020-09-23 05:00:00,2020-09-01
2020-09-23 07:07:35,EMPTY,pause,5,Wuhan,100010005,huawei,page_006,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 07:07:00,2020-09-23 07:00:00,2020-09-01
2020-09-23 10:07:35,EMPTY,pause,6,Tianjin,100010006,huawei,page_007,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 10:07:00,2020-09-23 10:00:00,2020-09-01
2020-09-23 12:07:35,EMPTY,pause,7,Tokyo,10001007,huawei,page_008,235,33.2222,3c84cf9d-b8fb-3dec-8b8c-f510c4b6fd097,2020-09-23 12:07:00,2020-09-23 12:00:00,2020-09-01
2020-09-23 15:07:35,EMPTY,pause,8,Beijing,100010008,huawei,page_009,235,33.2222,e80160bc-f25a-3566-bf9b-a16e91ef6ee4,2020-09-23 15:07:00,2020-09-23 15:00:00,2020-09-01

Upload data into HDFS

hadoop fs -mkdir -p hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23
hadoop fs -put partition.csv hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23

Add partition to Hive table

Insert Partition
use apache;
ALTER TABLE lambda_flat_table ADD PARTITION(DAY_START='2020-09-23') location 'hdfs:///LACUS/lambda_data/lambda_flat_table/day_start=2020-09-23';

Refresh streaming table

Send request to refresh segment, for startTime and endTime, make sure you are use (local)timestamp of "2020-09-23 00:00:00 GMT+08:00" .

Here is the URL of refresh API :  http://cdh-master:7200/kylin/api/cubes/UserActionBasicAnslysisCube/rebuild 

Build API
{  
   "startTime":1600790400000, 
   "endTime":  1600876800000, 
   "buildType":"BUILD"
}

Here is the building Job.

Here is the HQL used in first step.

Create flat table
USE APACHE;

DROP TABLE IF EXISTS kylin_intermediate_useractionbasicanslysiscube_542a8a51_5a5c_d24a_19a2_a37a59fac050;
CREATE EXTERNAL TABLE IF NOT EXISTS kylin_intermediate_useractionbasicanslysiscube_542a8a51_5a5c_d24a_19a2_a37a59fac050
(
\`LAMBDA_FLAT_TABLE_USER_DEVIDE_TYPE\` string
,\`LAMBDA_FLAT_TABLE_LOCATION_CITY\` string
,\`LAMBDA_FLAT_TABLE_DEVICE_BRAND\` string
,\`LAMBDA_FLAT_TABLE_STR_MINUTE_SECOND\` string
,\`LAMBDA_FLAT_TABLE_PAGE_ID\` string
,\`LAMBDA_FLAT_TABLE_MONTH_START\` date
,\`LAMBDA_FLAT_TABLE_DAY_START\` date
,\`LAMBDA_FLAT_TABLE_HOUR_START\` timestamp
,\`LAMBDA_FLAT_TABLE_MINUTE_START\` timestamp
,\`LAMBDA_FLAT_TABLE_PAGEVIEW_ID\` string
,\`LAMBDA_FLAT_TABLE_PLAY_TIMES\` int
)
STORED AS SEQUENCEFILE
LOCATION 'hdfs://cdh-master:8020/APACHE/REALTIME_OLAP/REALTIME/kylin-f1b9f3ab-45f6-8ec8-f701-2306d24bc825/kylin_intermediate_useractionbasicanslysiscube_542a8a51_5a5c_d24a_19a2_a37a59fac050';
ALTER TABLE kylin_intermediate_useractionbasicanslysiscube_542a8a51_5a5c_d24a_19a2_a37a59fac050 SET TBLPROPERTIES('auto.purge'='true');
INSERT OVERWRITE TABLE \`kylin_intermediate_useractionbasicanslysiscube_542a8a51_5a5c_d24a_19a2_a37a59fac050\` SELECT
\`LAMBDA_FLAT_TABLE\`.\`USER_DEVIDE_TYPE\` as \`LAMBDA_FLAT_TABLE_USER_DEVIDE_TYPE\`
,\`LAMBDA_FLAT_TABLE\`.\`LOCATION_CITY\` as \`LAMBDA_FLAT_TABLE_LOCATION_CITY\`
,\`LAMBDA_FLAT_TABLE\`.\`DEVICE_BRAND\` as \`LAMBDA_FLAT_TABLE_DEVICE_BRAND\`
,\`LAMBDA_FLAT_TABLE\`.\`STR_MINUTE_SECOND\` as \`LAMBDA_FLAT_TABLE_STR_MINUTE_SECOND\`
,\`LAMBDA_FLAT_TABLE\`.\`PAGE_ID\` as \`LAMBDA_FLAT_TABLE_PAGE_ID\`
,\`LAMBDA_FLAT_TABLE\`.\`MONTH_START\` as \`LAMBDA_FLAT_TABLE_MONTH_START\`
,\`LAMBDA_FLAT_TABLE\`.\`DAY_START\` as \`LAMBDA_FLAT_TABLE_DAY_START\`
,\`LAMBDA_FLAT_TABLE\`.\`HOUR_START\` as \`LAMBDA_FLAT_TABLE_HOUR_START\`
,\`LAMBDA_FLAT_TABLE\`.\`MINUTE_START\` as \`LAMBDA_FLAT_TABLE_MINUTE_START\`
,\`LAMBDA_FLAT_TABLE\`.\`PAGEVIEW_ID\` as \`LAMBDA_FLAT_TABLE_PAGEVIEW_ID\`
,\`LAMBDA_FLAT_TABLE\`.\`PLAY_TIMES\` as \`LAMBDA_FLAT_TABLE_PLAY_TIMES\`
 FROM \`APACHE\`.\`LAMBDA_FLAT_TABLE\` as \`LAMBDA_FLAT_TABLE\`
WHERE 1=1 AND (\`LAMBDA_FLAT_TABLE\`.\`DAY_START\` >= '2020-09-23' AND \`LAMBDA_FLAT_TABLE\`.\`DAY_START\` < '2020-09-24')
;

Verify query result

Query
select hour_start, count(*) as event_num, count(distinct PAGEVIEW_ID) as pv_stat
from LAMBDA_FLAT_TABLE 
where day_start >= '2020-09-23' and day_start < '2020-09-24'
group by hour_start
order by hour_start;


select page_id, user_devide_type, count(*) as event_num, count(distinct PAGEVIEW_ID) as pv_stat
from LAMBDA_FLAT_TABLE
where day_start >= '2020-09-23' and day_start <= '2020-09-24'
group by page_id, user_devide_type
order by page_id, user_devide_type;
  • No labels