THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!

Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.

01 背景

1. 概述

数据存在很少的部分结果的更新,如果刷新 segment,耗时通常在数分钟左右,用户等不及;希望在数据更新后几秒内,Kylin就可以在查询结果中反映到数据变更(无需刷新 segment)。

2. 数据更新行为

  • Kylin 可以通过 csv 格式文件获取数据更新,delta 数据文件格式见“更新行为处理逻辑”;
  • 只处理 sum 度量和 count(*) 度量,不处理其它度量
  • 数据更新范围较少,是人手动修改的,数据记录数不会很多(万行级别)
  • 数据更新可能比较频繁,更新的人比较多

3. 更新行为逻辑

  • 变更度量列的值(变更 price 列的值,从 200 到 150)

order_id

buyer_id

part_dt

price

count

order_001

zhang

2021-01-01

-50

0

  • 插入新记录(插入order_002)

order_id

buyer_id

part_dt

price

count

order_002

zhao

2021-01-02

100

+1

  • 删除旧记录(删除order_001)

order_id

buyer_id

part_dt

price

count

order_001

zhang

2021-01-01

-200

-1

  • 变更维度列的值(part_dt 列,2021-01-01 变更为 2021-01-02),转化为删除 + 插入

order_id

buyer_id

part_dt

price

count

order_001

zhang

2021-01-01

-200

-1

order_001

zhang

2021-01-02

+200

+1

02 开发方案 Merge on Read

1. 思路

在每次查询的时候,在 TableScan 算子执行过程中,将 delta 数据文件按照击中的 Cuboid 进行 aggregation,然后和 Cuboid Dataframe 进行合并。
其中合并 Delta Cuboid 和 Original Cuboid 的处理,根据记录更新类型的不同,可能稍微有点复杂。比方删除记录和查询记录的行为得分开处理。

2. 流程图

03 代码实现及测试结果

1. 代码实现

代码分支:https://github.com/zhangayqian/kylin/tree/delta-csv-datasource

Commit link: https://github.com/apache/kylin/commit/7003d410eb2b9694198b2a03762aadb98a867dd1

2. 查询可用性及结果正确性验证

  • Delta cube metadata

Delta cube metadata
{
  "uuid": "95e1b58c-0cb7-e508-a75a-358fe4a94e26",
  "last_modified": 1630479636000,
  "version": "3.9.9.1",
  "name": "kylin_sales_cube_test_zyq_delta",
  "is_draft": false,
  "model_name": "kylin_sales_model",
  "description": "",
  "null_string": null,
  "dimensions": [
    {
      "name": "TRANS_ID",
      "table": "KYLIN_SALES",
      "column": "TRANS_ID",
      "derived": null
    },
    {
      "name": "PART_DT",
      "table": "KYLIN_SALES",
      "column": "PART_DT",
      "derived": null
    },
    {
      "name": "SELLER_ID",
      "table": "KYLIN_SALES",
      "column": "SELLER_ID",
      "derived": null
    },
    {
      "name": "BUYER_ID",
      "table": "KYLIN_SALES",
      "column": "BUYER_ID",
      "derived": null
    }
  ],
  "measures": [
    {
      "name": "GMV_SUM",
      "function": {
        "expression": "SUM",
        "parameter": {
          "type": "column",
          "value": "KYLIN_SALES.PRICE"
        },
        "returntype": "decimal(19,4)"
      }
    },
    {
      "name": "TRANS_CNT",
      "function": {
        "expression": "COUNT",
        "parameter": {
          "type": "constant",
          "value": "1"
        },
        "returntype": "bigint"
      }
    },
    {
      "name": "SELLER_CNT_HLL",
      "function": {
        "expression": "COUNT_DISTINCT",
        "parameter": {
          "type": "column",
          "value": "KYLIN_SALES.SELLER_ID"
        },
        "returntype": "hllc(10)"
      }
    },
    {
      "name": "TOP_SELLER",
      "function": {
        "expression": "TOP_N",
        "parameter": {
          "type": "column",
          "value": "KYLIN_SALES.PRICE",
          "next_parameter": {
            "type": "column",
            "value": "KYLIN_SALES.SELLER_ID"
          }
        },
        "returntype": "topn(100,4)",
        "configuration": {
          "topn.encoding.KYLIN_SALES.SELLER_ID": "dict",
          "topn.encoding_version.KYLIN_SALES.SELLER_ID": "1"
        }
      }
    },
    {
      "name": "ITEM_BITMAP",
      "function": {
        "expression": "COUNT_DISTINCT",
        "parameter": {
          "type": "column",
          "value": "KYLIN_SALES.ITEM_ID"
        },
        "returntype": "bitmap"
      }
    },
    {
      "name": "PRICE_PERCENTILE",
      "function": {
        "expression": "PERCENTILE_APPROX",
        "parameter": {
          "type": "column",
          "value": "KYLIN_SALES.PRICE"
        },
        "returntype": "percentile(100)"
      }
    }
  ],
  "dictionaries": [
    {
      "column": "KYLIN_SALES.ITEM_ID",
      "builder": "org.apache.kylin.dict.GlobalDictionaryBuilder",
      "cube": null,
      "model": null
    }
  ],
  "rowkey": {
    "rowkey_columns": [
      {
        "column": "KYLIN_SALES.BUYER_ID",
        "encoding": "integer:4",
        "isShardBy": false
      },
      {
        "column": "KYLIN_SALES.SELLER_ID",
        "encoding": "integer:4",
        "isShardBy": true
      },
      {
        "column": "KYLIN_SALES.TRANS_ID",
        "encoding": "integer:4",
        "isShardBy": false
      },
      {
        "column": "KYLIN_SALES.PART_DT",
        "encoding": "date",
        "isShardBy": false
      }
    ]
  },
  "hbase_mapping": {
    "column_family": [
      {
        "name": "F1",
        "columns": [
          {
            "qualifier": "M",
            "measure_refs": [
              "GMV_SUM",
              "TRANS_CNT"
            ]
          }
        ]
      },
      {
        "name": "F2",
        "columns": [
          {
            "qualifier": "M",
            "measure_refs": [
              "SELLER_CNT_HLL",
              "TOP_SELLER",
              "ITEM_BITMAP",
              "PRICE_PERCENTILE"
            ]
          }
        ]
      }
    ]
  },
  "aggregation_groups": [
    {
      "includes": [
        "KYLIN_SALES.PART_DT",
        "KYLIN_SALES.BUYER_ID",
        "KYLIN_SALES.SELLER_ID",
        "KYLIN_SALES.TRANS_ID"
      ],
      "select_rule": {
        "hierarchy_dims": [],
        "mandatory_dims": [
          "KYLIN_SALES.PART_DT"
        ],
        "joint_dims": []
      }
    }
  ],
  "signature": "iQy/3K2X9Ur4nOI5udjIbg==",
  "notify_list": [],
  "status_need_notify": [],
  "partition_date_start": 1325376000000,
  "partition_date_end": 3153600000000,
  "auto_merge_time_ranges": [],
  "volatile_range": 0,
  "retention_range": 0,
  "engine_type": 6,
  "storage_type": 4,
  "override_kylin_properties": {
    "kylin.cube.aggrgroup.is-mandatory-only-valid": "true",
    "kylin.engine.spark.rdd-partition-cut-mb": "500",
    "kylin.storage.columnar.shard-countdistinct-rowcount": "1000",
    "kylin.cube.source.delta.csv.path": "file:/Users/yaqian.zhang/Desktop/test_csv.csv"
  },
  "cuboid_black_list": [],
  "parent_forward": 3,
  "mandatory_dimension_set_list": [],
  "snapshot_table_desc_list": []
}
  • 测试使用的 CSV 文件

test_csv.csv

  • 示例数据变更

          1. 变更度量PRICE的值

TRANS_ID

PART_DT

SELLER_ID

BUYER_ID

PRICE

COUNT

ITEM_ID

9999

2012-06-17

10000762

10000809

-30

0

ITEM-QB84

         2. 插入新记录

TRANS_ID

PART_DT

SELLER_ID

BUYER_ID

PRICE

COUNT

ITEM_ID

10001

2012-07-05

10000004

10006206

14.189

+1

ITEM-QB84

         3. 删除旧记录

TRANS_ID

PART_DT

SELLER_ID

BUYER_ID

PRICE

COUNT

ITEM_ID

0

2012-12-14

10000349

10002313

-36.2828

-1

ITEM-JS83

          4. 变更维度列的值,转化为删除 + 插入

TRANS_ID

PART_DT

SELLER_ID

BUYER_ID

PRICE

COUNT

ITEM_ID

9998

2012-07-05

10000004

10006206

-14.1753

-1

ITEM-YU39

9998

2012-07-05

10000005

10006207

12.534

+1

ITEM-YU39

  • 正确性测试

SQL1:基本功能验证

SQL1
# Q1
select TRANS_ID, PART_DT, count(*), sum(price)
from kylin_sales
where TRANS_ID in (9999, 10001, 0, 9998)
group by TRANS_ID, PART_DT

查询 cube 原始结果:

加入 CSV 文件后的结果:

SQL2:子查询

SQL2
# Q2
SELECT sum(sum_price) AS "COL" 
 FROM ( 
 select kylin_sales.part_dt, sum(kylin_sales.price) as sum_price, count(distinct ITEM_ID) as cnt_1
 from kylin_sales 
 group by kylin_sales.part_dt 
 ) "TableauSQL" 
 HAVING COUNT(1)>0

查询 cube 原始结果:

加入 CSV 文件后的结果:

SQL3:查询包含 lookup table

SQL3
# Q3
SELECT TRANS_ID, PART_DT, SUM(PRICE) AS GMV,
FIRST_VALUE(SUM(PRICE)) OVER(PARTITION BY PART_DT ORDER BY PART_DT) AS "FIRST",
LAST_VALUE(SUM(PRICE)) OVER(PARTITION BY PART_DT ORDER BY PART_DT) AS "CURRENT",
LAG(SUM(PRICE), 1, 0.0) OVER(PARTITION BY PART_DT ORDER BY PART_DT) AS "PREV",
LEAD(SUM(PRICE), 1, 0.0) OVER(PARTITION BY PART_DT ORDER BY PART_DT) AS "NEXT",
NTILE(4) OVER (PARTITION BY PART_DT ORDER BY PART_DT) AS "QUARTER"
FROM KYLIN_SALES
INNER JOIN KYLIN_ACCOUNT as SELLER_ACCOUNT
ON KYLIN_SALES.SELLER_ID = SELLER_ACCOUNT.ACCOUNT_ID
INNER JOIN KYLIN_COUNTRY as SELLER_COUNTRY
ON SELLER_ACCOUNT.ACCOUNT_COUNTRY = SELLER_COUNTRY.COUNTRY
where TRANS_ID in (9999, 10001, 0, 9998)
GROUP BY TRANS_ID, PART_DT
ORDER BY PART_DT;

查询 cube 原始结果:

加入 CSV 文件后的结果:

SQL4: 查询包含多种度量(这里只支持 sum 和 count 度量的结果产生正确的变更;count_distinct 度量不更新;其他度量会根据 delta 数据源更新,但是可能产生不正确的结果。如果需要除 sum 和 count 度量之外其他度量均不更新,需要对每个不需要更新结果的度量进行单独处理,比如在构建 delta cuboid 时对每个度量列填充对应类型的 null 值,之后再与原 cuboid 进行聚合时就不会影响最终结果)

SQL4
# Q4
select TRANS_ID, PART_DT, count(*), sum(price), count(distinct ITEM_ID), percentile(price, 0.5)
from kylin_sales
where TRANS_ID in (9999, 10001, 0, 9998)
group by TRANS_ID, PART_DT

查询 cube 原始结果:

加入 CSV 文件后的结果:

3. 并发测试

使用 SSB10 数据集作为基础数据集,分别对修改记录为0、1000、1W、10W时做并发查询测试,并发设置为1、10、20,收集 ResponseTime。

  • 结论

随着并发量的增长,Delta 数据的大小并没有对查询 RT 的变化规律产生明显影响。并发为 20 时, Delta 数据量为 1000 行时,大部分查询 RT 相比无 Delta 数据增大 1.5 倍左右,不过对于 scan 行数较多的查询比如 Q4.3, 查询 RT 从 2648ms 到 3235ms,远远没有达到 1.5 倍的增长;Delta 数据量为 1w 行时,查询 RT 相比 1000 行不相上下,无明显变化;Delta 数据量为 10w 行时,查询 RT 相比无 Delta 数据时,Q4.3 查询 RT 增大 1 倍左右,其他查询相比无 Delta 文件时的查询 RT 增大 4 倍左右,这应该是由于当 Delta 数据量达到 10W 时,查询时需要扫描的 Delta 数据量已经远远超过需要扫描的 Cuboid 本身的数据量,主要影响查询时间的因素从 Cuboid 数据变成 Delta 数据,导致最后的查询 RT 趋于平均,不再与原查询 RT 保持相似的比例。

  • 测试结果明细


Delta 数据条数


Query

kylin.query.spark-conf.spark.executor.cores=3
kylin.query.spark-conf.spark.executor.instances=2

kylin.query.spark-conf.spark.executor.cores=1
kylin.query.spark-conf.spark.executor.instances=1

1并发

10并发

20并发

1并发

10并发

20并发/err

0


Q1.1

178.69

475.86

834.46

631.82

2826.32

12850.18/3.95%

Q1.2

142.40

299.24

592.78

790.59

2229.30

4343.96/2.05%

Q1.3

147.73

305.32

593.30

437.71

2115.67

2794.34/0.70%

Q2.1

176.78

376.71

727.96

441.27

2220.17

2304.61/1.41%

Q2.2

172.78

364.76

716.69

441.03

1893.13

2119.11

Q2.3

171.09

358.08

719.93

476.31

1827.40

2105.86

Q3.1

170.63

351.84

720.70

465.03

1980.37

2363.31

Q3.2

230.57

443.34

787.17

552.73

2239.47

2997.98

Q3.3

226.74

428.90

770.51

506.64

2360.32

4290.75

Q3.4

742.20

1168.66

1849.99

971.21

3059.67

6440.36

Q4.1

203.03

386.46

769.75

457.79

2724.21

6273.74/1.43%

Q4.2

196.96

416.01

774.25

488.17

3035.03

6595.58/1.45%

Q4.3

1049.04

1636.10

2648.57

1747.15

4212.71

12277.43/2.94%

Total

292.97

539.07

961.17

631.82

2513.60

5235.15/1.09%

1000

Q1.1

476.85

936.78

2173.84

589.04

3576.84

10766.92/2.17%

Q1.2

354.13

703.09

1399.27

404.63

2431.06

4189.42

Q1.3

386.30

741.74

1430.06

425.39

1971.07

2970.18

Q2.1

396.13

789.61

1576.43

415.97

1747.05

2487.82

Q2.2

398.13

779.49

1548.78

426.37

1489.50

2283.08

Q2.3

396.22

773.03

1542.58

404.89

1337.23

2261.86

Q3.1

401.29

777.99

1579.48

412.47

1467.30

2387.84

Q3.2

466.83

856.32

1651.12

484.24

1779.71

3242.41

Q3.3

444.43

849.28

1636.10

463.30

2240.67

4146.23

Q3.4

937.92

1546.29

2612.82

932.41

3326.44

6891.25

Q4.1

409.63

824.52

1608.71

425.12

3036.08

6497.19/2.22%

Q4.2

417.70

844.03

1663.59

451.22

3413.08

5774.76/4.55%

Q4.3

1259.57

1994.06

3235.35

1689.60

5140.59

11645.83/2.38%

Total

518.86

954.59

1818.23

578.31

2528.45

5016.44/0.86%

10000

Q1.1

502.79

976.64

1876.83

772.26

4232.76

14508.03

Q1.2

385.83

728.87

1408.21

474.30

2437.27

4044.32

Q1.3

411.04

766.88

1420.83

496.38

2232.82

2756.73

Q2.1

426.85

808.79

1552.06

482.23

2435.83

2812.12

Q2.2

428.44

808.44

1547.05

491.30

2148.69

2778.46

Q2.3

424.88

790.97

1518.39

481.49

2035.67

2824.06

Q3.1

430.00

812.11

1534.64

479.70

2358.08

2827.38

Q3.2

491.93

879.05

1632.51

574.54

2547.74

3261.59/2.50%

Q3.3

483.87

876.05

1646.57

523.46

2501.18

5753.11/11.11%

Q3.4

966.57

1579.42

2639.72

1024.07

3624.23

7868.68/3.85%

Q4.1

433.51

843.04

1611.06

488.41

3110.84

6889.55

Q4.2

447.69

882.36

1685.38

517.94

3099.13

4955.08

Q4.3

1304.88

2131.19

3300.50

1777.68

5013.85

13861.27

Total

549.08

990.14

1796.84

659.67

2904.05

5644.54/1.35%

100000

Q1.1

675.06

2042.42

4180.84

1082.81

8344.58

22257.57

Q1.2

523.85

1625.18

3348.57

815.97

5708.44

9131.57

Q1.3

576.77

1689.07

3388.77

858.46

4994.33

8851.27

Q2.1

593.84

1740.86

3410.08

853.22

5117.77

9297.82

Q2.2

595.50

1772.65

3436.87

852.19

4861.39

9166.90

Q2.3

586.94

1765.92

3466.89

807.70

4731.81

9088.02

Q3.1

596.31

1769.98

3499.83

843.34

4979.61

9521.87

Q3.2

684.93

1914.43

3652.75

958.34

5439.97

10442.05

Q3.3

654.18

1871.97

3654.16

927.20

5939.36

11308.07

Q3.4

1155.46

2517.51

4699.65

1413.27

7388.26

15554.08/1.67%

Q4.1

604.28

1743.92

3574.57

889.16

7002.80

13778.51/18.64%

Q4.2

625.01

1886.34

3752.79

909.96

7051.97

10190.56/16.67%

Q4.3

1561.09

3115.93

5360.42

2151.64

9693.87

22114.15

Total

725.39

1957.08

3799.19

1026.98

6228.20

12133.60/2.68%


04 结论概述

  • 增加一个 In-mem Segment 对查询并发性能有一个(不大的)固定的 overhead,e.g. 20~%
  • 当修改行数特别大的时候,比如 #mod-rows > 10w,查询性能逐渐开始出现明显下降,原因是计算的重心逐渐移动到 In-mem 数据上,而不是预计算后的历史数据上。
  • 功能正常,假设 all measures are sum



















  • No labels