1 概述
1.1 离线计算、流式计算、实时计算 概述
什么是离线计算?
- 离线计算:批量获取数据、批量传输数据、周期性批量计算数据、展示数据
- 代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、Azkaban任务调度
什么是流式计算?
- 流式计算:数据实时产生、实时传输、实时计算、实时展示
- 代表数据:Flume实时获取数据、Kafka实时数据存储、Storm/JStrom实时数据计算、Redis实时结果缓存、持久化存储(Mysql)
- 总结:将源源不断产生的数据实时收集并实时计算
流式计算一般架构图
/
离线计算与实时计算最大的区别?
- 收集
- 计算
- 展示
1.2 Storm概述
- Storm用来实时处理数据
- 使用场景
- 日志分析
- 管道系统
- 消息转化器
- 特点
- 低延迟
- 高可用
- 分布式
- 可扩展
- 数据不丢失
- 简单易用的接口
Storm与Hadoop对比
- Storm用于实时计算,Hadoop用于离线计算。
- Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批。
- Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中。
- Storm与Hadoop的编程模型相似
2 Storm核心组件
2.1 组件简介
- Nimbus:负责资源分配和任务调度
- Supervisor:负责接受Nimbus分配的任务,启动和停止属于自己管理的worker进程。
- Worker:运行具体处理逻辑的进程。只有两种类型,Spout和Bolt。
- Task:Worker中每一个Spout/Bolt的线程成为一个Task。不同的S/B的Task可能会共享一个物理线程,该线程成为Executor。
2.2 编程模型
- Topology:Storm中运行的一个实时应用程序的名称。
- Spout:在一个Topology中获取源数据流的组件。
- Bolt:接受数据然后执行处理逻辑的组件。
- Tuple:一次消息传递的基本单元,理解为一组消息就是是一个Tuple。
- Stream:表示数据流向。
2.3 Storm并发机制
2.3.1 概念
- Workers (JVMs): 在一个物理节点上可以运行一个或多个独立的JVM进程。一个Topology可以包含一个或多个worker(并行的跑在不同的物理机上), 所以worker process就是执行一个topology的子集, 并且worker只能对应于一个topology
- Executors (threads): 在一个worker JVM进程中运行着多个Java线程。一个executor线程可以执行一个或多个tasks。但一般默认每个executor只执行一个task。一个worker可以包含一个或多个executor, 每个component (spout或bolt)至少对应于一个executor, 所以可以说executor执行一个compenent的子集, 同时一个executor只能对应于一个component。
- Tasks(bolt/spout instances):Task就是具体的处理逻辑对象,每一个Spout和Bolt会被当作很多task在整个集群里面执行。每一个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder.setSpout和TopBuilder.setBolt来设置并行度 — 也就是有多少个task。
2.3.2 配置并行度
- 对于并发度的配置, 在storm里面可以在多个地方进行配置, 优先级为:
- defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration
- worker processes的数目, 可以通过配置文件和代码中配置, worker就是执行进程, 所以考虑并发的效果, 数目至少应该大亍machines的数目
- executor的数目, component的并发线程数,只能在代码中配置(通过setBolt和setSpout的参数), 例如, setBolt(“green-bolt”, new GreenBolt(), 2)
- tasks的数目, 可以不配置, 默认和executor1:1, 也可以通过setNumTasks()配置
- Topology的worker数通过config设置,即执行该topology的worker(java)进程数。它可以通过 storm rebalance 命令任意调整。
代码配置
|
|
图解
代码配置对应的策略
- 3个组件的并发度加起来是10,就是说拓扑一共有10个executor
- 一共有2个worker,每个worker产生10 / 2 = 5条线程
- 绿色的bolt配置成2个executor和4个task。为此每个executor为这个bolt运行2个task。
动态的改变并行度
- Storm支持在不 restart topology 的情况下, 动态的改变(增减) worker processes 的数目和 executors 的数目, 称为rebalancing. 通过Storm web UI,或者通过storm rebalance命令实现:
- storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
3 部署和测试
3.1 安装、部署
3.1.1 单机伪分布式部署
- 下载storm-1.1.1压缩包
- 解压、配置环境变量
- 修改配置文件
storm.yaml
|
|
3.1.2 启动集群
zhServer.sh start
先启动ZooKeepernohup storm nimbus &
在nimbus.host机器上启动nimbus服务nohup storm ui &
在nimbus.host机器上启动UI服务nohup strom supervisor
在其他节点上启动supervisor服务(单机就直接在host上启动就ok)
3.1.3 查看集群
查看Storm日志
- nimbus: $STORM_HOME/logs/nimbus.log
- ui: $STORM_HOME/logs/ui.log
- supervisor: $STORM_HOME/logs/supervisor.log
- worker(在某台supervisor上): $STORM_HOME/logs/workers-artifacts/wordcount-1-1511791820(Task名称)/6700(Worker所在端口号)/worker.log
3.2 测试集群
3.2.1 Storm集群常用命令
有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑(Topology)。
- 提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
- bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount
- 杀死任务命令格式:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
- storm kill topology-name -w 10
- 停用任务命令格式:storm deactivte 【拓扑名称】
- storm deactivte topology-name
- 我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。
- 启用任务命令格式:storm activate【拓扑名称】
- storm activate topology-name
- 重新部署任务命令格式:storm rebalance 【拓扑名称】
- storm rebalance topology-name
- 再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑。
3.2.2 运行WordCount-Topology
$ storm jar storm-starter.jar org.apache.storm.starter.WordCountTopology theirWordCount
运行后集群状态
4 第一个Storm程序
4.1 WordCount
|
|
注意:我们必须使用FieldsGrouping才能正确的统计出每个单词的总数,将hash(word)值相同的单词分配到同一个CountBolt。
4.2 StreamGrouping详解
Storm里面有7种类型的stream grouping
- Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
- Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
- All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
- Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
- Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
- Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
- Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。