Apache Kylin : Analytical Data Warehouse for Big Data
Kylin 的 Metrics 系统的验证, 解释和增强 (KYLIN-4371的开发日志)
一. 背景
在 Kylin 2.3 发布后, Kylin team 发布了 System Cube 和 Dashboard, 使得 Kylin 用户可以更加容易地收集和观测 Kylin 本身应用级别的 Metrics 信息. 在目前版本的 Kylin, 通过配置 System Cube 为 HiveSink 或者 KafkaSink, 可以将 Metrics 信息发布到对应的 Sink, 并且通过触发构建任务可以构建 System Cube 以满足系统管理员通过 SQL 查询 Kylin的系统信息的需求. 此外, 借助 Dashboard 用户可以轻松地做到 Kylin 系统监控信息的可视化分析.
在 Kylin 3.0, Kylin team 发布了另外一个特性, 也就是 Real-time OLAP, 可以使得用户可以实时地获取和查询流式数据源.
这里, 我尝试探索如何将 System Cube 和 Real-time OLAP 相集成, 来降低 System Cube 的数据准备延迟. 经过调研, System Cube 本身已经包含了 KafkaSink, 所以需要的工作是修改 SCCreator, 使之能准备 Real-time OLAP 所需的 Cube 元数据, 和一些其它适配工作.
二. 设计
Metrics System
在 eBay 引入的 System Cube 特性中, 首先需要收集 Metrics 信息, 然后基于 Cubing Engine 来构建 Cube. 通过下图所示的 Metrics System 可以获取和汇报 Job 和 Query 主题的消息到不同的 Sink. Metrics System 代码主要包含在 core-metrics, metrics-reporter-hive, metrics-reporter-kafka 三个 Maven module中, 另外还有 QueryMetricsFacade, JobMetricsFacade.
Metrics System 架构示意图如下:
在 Metrics Event 的输入部分, Kylin team 基于门面模式的设计, Job 和 Query 相关的消息被对应的门面 (QueryMetricsFacade, JobMetricsFacade) 封装成统一的 RecordEvent. Query 和 Job 相关的记录会分别由对应的门面来负责进行加工, 然后通过 MetricsManager 的 update 方法统一发送给 Metrics System.
发送给 Metrics System 的 RecordEvent, 会交由 ActiveReservoir 来暂存, 根据 RecordEvent 的缓存策略和对数据延迟的需求, 可以分别配置使用 BlockingReservoir 和 InstantReservoir. 其中, BlockingReservoir 会以(微)批次的方式向 Sink 发送数据, 而 InstantReservoir 会不做等待地直接发送 RecordEvent; 所以一般 BlockingReservoir 比较适合和 HiveSink 搭配写入DFS/Hive, 而 InstantReservoir 比较适合和 KafkaSink 搭配.
在输出部分, 每个 Reservoir 可以拥有数个 ActiveReservoirReporter, 如 HiveReservoirReporter 和 KafkaReservoirReporter 负责将消息写入不同的 Sink. 目前文档的默认配置是 HiveSink, 我在这里会尝试适配 KafkaSink.
在 Metrics System 之外, 我们还需要工具来创建 Cube 来获取来分析落到不同 Sink 的数据, 这块的工作由 SCCreator 负责. 由创建出来的 Cube 称之为 System Cube, 一共有5个. SCCreator 需要一个 json指定System Cube 所需的一些配置项. 最终通过 SCCreator 我们会获取完成一系列所需要的准备工作, 包括用于创建 Hive Table/Kafka Topic 的脚本和包含一系列模型和 Cube 的新的 Project 元数据.
当相关的准备工作完成后, 我们可以使用 corntab 来定时调度构建作业, 这样使得数据源源不断地被构建进入 Kylin, 以便 Dashboard 和 Cube Planner 两者来对其进行查询(消费).
Real-time OLAP
在 Kylin 3.0 中, 我们通过一个新的架构实现流式数据处理的解决方案.
我们看下面的架构图: 在数据源端, 我们可以通过新增的 Recevier 节点实时地摄入数据, 并且不断地将满足时间条件的数据上传和整合到 HBase; 在查询端, 通过合并来自 HBase 的历史数据和来自 Receiver 的实时数据, 我们可以获取近乎实时的查询结果. 更加详细的分析可以参考技术博客
http://kylin.apache.org/blog/2019/04/12/rt-streaming-design/ .
集成 System Cube 和 Real-time OLAP
在集成两者的过程中, 我们主要修改 SCCreator, 在这里我们主要适配了 Kafka Topic 和 Real-time OLAP 所需的一些额外元数据的创建工作, 这两个任务分别来由 KafkaTopicCreator 和 StreamingMetadataCreator 来负责, 最后由 SCCreator 来统一调度. 对于 Metrics System 则不需要改动.
三. 如何使用
1. 开启 Metrics 信息收集
kylin.metrics.monitor-enabled=true kylin.metrics.reporter-job-enabled=true kylin.metrics.reporter-query-enabled=true kylin.web.dashboard-enabled=true
2. 准备 System Cube 所需的 Cube 参数文件
我们需要提供一个 Json Array, Array包含的对象为 MetricsSinkDesc.
[ { "sink": "hive", "storage_type": 2, "cube_desc_override_properties": { "kylin.cube.algorithm": "INMEM", "kylin.cube.max-building-segments": "1" } }, { "sink": "kafka", "storage_type": 3, "cube_desc_override_properties": { "kylin.cube.algorithm": "INMEM", "kylin.stream.cube.window": 28800, "kylin.stream.cube.duration": 3600, "kylin.stream.segment.retention.policy": "fullBuild", "kylin.cube.max-building-segments": "20" }, "table_properties": { "bootstrap.servers": "{YOUR_SERVERS_LIST}" } } ]
3. 生成 System Cube (Real-time OLAP) 元数据
./bin/kylin.sh org.apache.kylin.tool.metrics.systemcube.SCCreator -inputConfig sink.json -output system-cube
4. 创建 Kafka Topic
在 create_kafka_topic_for_system_cubes.sh 中, 根据需求来修改参数 zookeeper 和 partitions.
sh create_kafka_topic_for_system_cubes.sh
5. 导入 Cube 元数据
sh bin/metastore.sh restore system-cube/
6. 更新 kylinMetrics.xml
将默认的 BlockingReservoir 改为 InstantReservoir, 并且同时修改 ReservoirReporter 和 Sink 的类型.
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <description>Kylin Metrics Related Configuration (System Cube)</description> <!-- A Reservoir which don't staged metrics message at all, emit it in no time. Maybe good for debug purpose.--> <bean id="instantReservoir" class="org.apache.kylin.metrics.lib.impl.InstantReservoir"/> <bean id="kafkaSink" class="org.apache.kylin.metrics.lib.impl.kafka.KafkaSink"/> <bean id="initMetricsManager" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> <property name="targetClass" value="org.apache.kylin.metrics.MetricsManager"/> <property name="targetMethod" value="initMetricsManager"/> <property name="arguments"> <list> <!-- Sink of System Cube. --> <ref bean="kafkaSink"/> <!-- Bind properties for each ActiveReservoirReporter. --> <map key-type="org.apache.kylin.metrics.lib.ActiveReservoir" value-type="java.util.List"> <!-- Each ActiveReservoir can have multi ReservoirReporter --> <entry key-ref="instantReservoir"> <list> <bean class="org.apache.kylin.common.util.Pair"> <!-- Implementation of ReservoirReporter--> <property name="first" value="org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter"/> <!-- Properties for specific ReservoirReporter--> <property name="second"> <props> <prop key="bootstrap.servers">cdh-master:9092,cdh-worker-1:9092,cdh-worker-2:9092</prop> </props> </property> </bean> </list> </entry> </map> </list> </property> </bean> </beans>
7. 启用 System Cube
四. Metrics信息
五. 参考链接
http://kylin.apache.org/docs/tutorial/setup_systemcube.html
http://kylin.apache.org/docs/tutorial/realtime_olap.html
http://kylin.apache.org/docs/tutorial/use_dashboard.html
http://kylin.apache.org/blog/2016/08/27/query-metrics-in-kylin/
System Cube 的 JIRA issue :
Dashboard 的 JIRA issue :
System Cube 和 Real-time OLAP 的集成的 JIRA issue :