【Kylin教程】(六)Kylin 流式构建
即使我们有增量构建,但仍然无法满足分钟级的实时数据更新需求,因为实时数据落地到Hive,再由Kylin触发构建任务,并从Hive中拉取数据。这个过程就需要花费大量的时间。
流式构建正是应对实时数据更新需求的解决方案。
Kylin 流式构建
Kylin假设在流式构建中,数据是以消息流的形式传递给流式构建引擎的。消息流中的每条消息需要包含如下信息。
·所有的维度信息。
·所有的度量信息。
·业务时间戳。
在消息流中,每条消息中的数据结构应该相同,并且可以用同一个分析器实例将每条消息中的维度、度量及时间戳信息提取出来。目前默认的分析器为org.apache.kylin.source.kafka.TimedJsonStreamParser,该分析器假设每条消息为一个单独的JSON,所有的信息都以键值对的形式保存在该JSON之中。
- 数据源中选择流式构建
在Kylin的Web GUI上,选择Model页面,单击“Data Source”,可以找到“Add Streaming Table”的按钮,这就是用来为流式构建创造虚拟表的入口
-
数据样本–>虚拟表
需要输入消息队列中的一段数据样本,数据样本可以是用户消息队列中的任意一条,但是需要保证想要被收录进虚拟表的键值对都应该出现在该数据样本之中。
{"amount":63.50375137330458,"category":"TOY","order_time":1477415932581,"device":"Other","qty":4,"user":{"id":"bf249f36-f593-4307-b156-240b3094a1c3","age":21,"gender":"Male"},"currency":"USD","country":"CHINA"}
单击中间的“>>”按钮,系统会自动地分析JSON中的键值对,将它们转化成虚拟表中的列。
虚拟表必须有至少一个Timestamp类型的列,而且该列必须是一个长整数类型。如果虚拟表中有多个Timestamp类型列,那也不会有问题,后续的向导会要求从中选择一个作为真正的业务时间戳。
注意:Kylin允许用户挑选一些从业务时间戳上衍生出来的时间维度(Derived Time Dimension),具体来说有如下几种。
·minute_start:业务时间戳所在的分钟起始时间,类型为Timestamp(yyyy-MM-dd HH:mm:ss)。
·hour_start:业务时间戳所在的小时起始时间,类型为Timestamp(yyyy-MM-dd HH:mm:ss)。
·day_start:业务时间戳所在的天起始时间,类型为Date(yyyy-MMdd)。
·week_start:业务时间戳所在的周起始时间,类型为Date(yyyy-MMdd)。
·month_start:业务时间戳所在的月起始时间,类型为Date(yyyy-MMdd)。
·quarter_start:业务时间戳所在的季度起始时间,类型为Date(yyyyMM-dd)。
·year_start:业务时间戳所在的年起始时间,类型为Date(yyyy-MMdd)。
这些衍生时间维度都是可选的,如果用户选择了这些衍生维度,那么在对应的时间粒度上进行聚合时就能够获得更好的查询性能,一般来说不推荐把原始的业务时间戳选择成一个单独的维度,因为该列的基数一般都是很大的。
- 配置Kafka
流式构建的用户需要使用Kafka的Producer将数据源源不断地加入某个Topic中,并且将Kafka的一些基本信息(例如Broker节点信息和Topic名称)告知流式构建任务。流式构建任务在启动的时候会启动Kafka客户端,然后根据配置向Kafka集群读取相应的Topic中的消息,并进行预处理计算。
配置:在对话框中输入Kafka的Topic名称,接着在Cluster选项卡中添加Kafka Broker的主机名和端口。
同一对话框下方的高级设置中,有如下三个参数可供配置:
·Timeout:可配置的,Kafka客户端读取超时时间。
·Buffer Size:可配置的,Kafka客户端读取缓冲区大小。
·Margin:可配置的,代表消息可能延迟的程度 -
配置完成
流式Cube的设计
创建Model
和增量构建的流程一样,我们也要为流式构建的Cube创建数据模型(Model)。这里只介绍与流式构建相关的配置项。
在创建Model对话框的第三步维度选择时,我们既可以选择普通的维度,又可以选择衍生的时间维度。注意,一般不推荐直接选择业务时间戳作为维度,因为业务时间戳的精度往往是精确到秒级甚至是毫秒级的,使用它作为一个维度失
去了聚合的意义,也会让整个Cube的体积变得非常庞大
在创建Model对话框的第五步设置中,一般选择最小粒度的衍生时间维度作为分割时间列,在这里我们选MINUTE_START,它的数据格式为yyyy-MM-dd HH:mm:ss。有了分割时间列,就可以对Cube进行分钟级的流式构建了
创建Cube
大体也和之前创建Cube的方式相同,主要介绍与流式构建相关的配置项。
创建Cube的第四步是设置Cube的自动合并时间。因为流式构建需要频繁地构建较小的Segment,为了不对存储器造成过大的压力,同时也为了获取较好的查询性能,因此需要通过自动合并将已有的多个小Segment合并成一个较大的Segment。所以,这里将设置一个层级的自动合并时间:0.5小时、4小时、1天、7天、28天。此外,设置保留时间为30天。
在第五步的Aggregation Groups设置中,可以把衍生时间维度设置为Hierarchy关系,设置的方法和普通维度一样。在RowKeys部分,也可以像调整普通维度的顺序一样合理地调整衍生时间维度。
流式构建原理
流式构建需要达到分钟级的数据更新频率,Kylin的做法是每隔一段时间(INTERVAL)就启动一次微构建,用于处理最新的一批数据。这种做法的理念有一些类似于Spark Streaming,它们也是将流数据视作一种特殊的微批次来处理的。由于分布式网络存在延迟等因素,消息可能存在延迟,因此不能为某一时刻刚刚过去的那几分钟立刻构建微批次。
举例来说,如果在每个微构建中要处理5分钟的增量数据,假设消息队列中的消息最多可能有10分钟的延迟(对应于“Margin”),那么就不能在1:00的时候立刻尝试去构建0:55到1:00这5分钟的数据,因为这部分数据的消息最迟可能在1:10分才会到齐,否则构建出来的Segment就存在很大的遗漏数据的风险。此时,需要像增量构建中提到的“数据持续更新”的情形一样,对过往的Segment进行刷新操作。但是目前流式构建并不支持Segment刷新操作,所以,最早只能在1:10开始构建0:55到1:00这部分的数据。这中间的延迟我们称之为DELAY,它等于每个微构建批次的时间(INTERVAL)加上消息最长可能延迟的时间(MARGIN)。
触发流式构建
数据源的准备
- KAFKA创建Topic
[root@cm-master bin]# ./kafka-topics.sh --create --zookeeper cm-master:2181,cm-slave1:2181,cm-slave2:2181 --replication-factor 3 --partitions 3 --topic kylin_demo
- 调用Kylin自带的数据生成器向KAFKA推送数据
这个工具每2秒钟将会生成一条数据,并发送到Kafka队列中。
[root@cm-master apache-kylin-2.3.1-bin]# ./bin/kylin.sh org.apache.kylin.source.kafka.util.KafkaSampleProducer --topic kylin_demo --broker cm-master:9092 —delay 0
可能会出现Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/kafka/clients/producer/Producer 将KAFKA添加到环境变量中即可
- KAFKA消费端接收数据
使用Kafka的控制台消费者(Console Consumer)查看队列中的数据,以确保数据按照需要生成
[root@cm-master bin]# ./kafka-console-consumer.sh --zookeeper cm-master:2181 --topic kylin_demo --from-beginning
构建cube
可以像之前的教程那样,直接通过Web ui来构建,或者使用API:
curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 9223372036854775807, "buildType": "BUILD"}' http://localhost:7070/kylin/api/cubes/{your_cube_name}/build2
1:如果出现找不到KAFKA依赖的报错,那么把KAFKA依赖包添加到Kyli:目录下。
2:如果步骤Build Cube In-Mem被admin kill了。那么应是资源的问题,vim kylin_job_conf_inmem.xml 根据自己机器内存来调节参数。
<property>
<name>mapreduce.map.memory.mb</name>
<value>1072</value>
<description></description>
</property>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx800m</value>
<description></description>
</property>
SQL查询
待Cube构建并启用成功之后,我们就可以进行SQL查询了。
select minute_start, count(*) from kylin_streaming_table group by minute_start order by minute_start
自动 build
一旦第一个 build 和查询成功了,您可以按照一定的频率调度增量 build。Kylin 将会记录每一个 build 的 offsets;当收到一个 build 请求,它将会从上一个结束的位置开始,然后从 Kafka 获取最新的 offsets。有了 REST API 您可以使用任何像 Linux cron 调度工具触发它:
crontab -e
*/5 * * * * curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0,"sourceOffsetEnd": 9223372036854775807,"buildType": "BUILD"}' http://192.168.109.134:7070/kylin/api/cubes/streaming_model_demo/build2
BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/2053