流计算框架对比
流计算框架对比
几个月之前我们在这里讨论过目前对于这种日渐增加的分布式流处理的需求的原因。当然,目前也有很多的各式各样的框架被用于处理这一些问题。现在我们会在这篇文章中进行回顾,来讨论下各种框架之间的相似点以及区别在哪里,还有就是从我的角度分析的,推荐的适用的用户场景。

核心关注点
在不同的系统之间进行选择的时候,我们主要关注到以下几点。
-
Runtime and Programming model( 运行与编程模型): 一个平台提供的编程模型往往会决定很多它的特性,并且这个编程模型应该足够处理所有可能的用户案例。这是一个决定性的因素,我也会在下文中多次讨论。 -
Functional Primitives( 函数式单元): 一个合格的处理平台应该能够提供丰富的能够在独立信息级别进行处理的函数,像map 、filter 这样易于实现与扩展的一些函数。同样也应提供像aggregation 这样的跨信息处理函数以及像join 这样的跨流进行操作的函数,虽然这样的操作会难以扩展。 -
State Management( 状态管理): 大部分这些应用都有状态性的逻辑处理过程,因此,框架本身应该允许开发者去维护、访问以及更新这些状态信息。 -
Message Delivery Guarantees( 消息投递的可达性保证): 一般来说,对于消息投递而言,我们有至多一次(at most once) 、至少一次(at least once) 以及恰好一次(exactly once) 这三种方案。At most once 投递保证每个消息会被投递0 次或者1 次,在这种机制下消息很有可能会丢失。At least once 投递保证了每个消息会被默认投递多次,至少保证有一次被成功接收,信息可能有重复,但是不会丢失。- Exactly once 意味着每个消息对于接收者而言正好被接收一次,保证即不会丢失也不会重复。
-
Failures Handling 在一个流处理系统中,错误可能经常在不同的层级发生,譬如网络分割、磁盘错误或者某个节点莫名其妙挂掉了。平台要能够从这些故障中顺利恢复,并且能够从最后一个正常的状态继续处理而不会损害结果。
除此之外,我们也应该考虑到平台的生态系统、社区的完备程度,以及是否易于开发或者是否易于运维等等。
RunTime and Programming Model
运行环境与编程模型可能是某个系统的最重要的特性,因为它定义了整个系统的呈现特性、可能支持的操作以及未来的一些限制等等。因此,运行环境与编程模型就确定了系统的能力与适用的用户案例。目前,主要有两种不同的方法来构建流处理系统

另一种方法叫做

两种方法都有一些内在的优势与不足,首先来谈谈
而对于
而对于编程模型而言,又可以分为
Fault Tolerance
与批处理系统相比,流处理系统中的容错机制固然的会比批处理中的要难一点。在批处理系统中,如果碰到了什么错误,只要将计算中与该部分错误关联的重新启动就好了。不过在流处理的场景下,容错处理会更加困难,因为会不断地有数据进来,并且有些任务可能需要 7*24
地运行着。另一个我们碰到的挑战就是如何保证状态的一致性,在每天结束的时候我们会开始事件重放,当然不可能所有的状态操作都会保证幂等性。
反压
流处理系统需要能优雅地处理反压
-
Storm 是通过监控Bolt 中的接收队列负载情况,如果超过高水位值就会将反压信息写到Zookeeper ,Zookeeper 上的watch 会通知该拓扑的所有Worker 都进入反压状态,最后Spout 停止发送tuple 。具体实现可以看这个JIRA STORM-886 。 -
JStorm 认为直接停止Spout 的发送太过暴力,存在大量问题。当下游出现阻塞时,上游停止发送,下游消除阻塞后,上游又开闸放水,过了一会儿,下游又阻塞,上游又限流,如此反复,整个数据流会一直处在一个颠簸状态。所以JStorm 是通过逐级降速来进行反压的,效果会较Storm 更为稳定,但算法也更复杂。另外JStorm 没有引入Zookeeper 而是通过TopologyMaster 来协调拓扑进入反压状态,这降低了Zookeeper 的负载。 -
Flink 没有使用任何复杂的机制来解决反压问题,因为根本不需要那样的方案!它利用自身作为纯数据流引擎的优势来优雅地响应反压问题。
下面我们就看看其他的系统是怎么处理的:
Storm

Spark Streaming

Samza

Flink

Managing State(状态管理)
大部分重要的流处理应用都会保有状态,与无状态的操作符相比,这些应用中需要一个输入和一个状态变量,然后进行处理最终输出一个改变了的状态。我们需要去管理、存储这些状态,要保证在发生故障的时候能够重现这些状态。状态的重造可能会比较困难,毕竟上面提到的不少框架都不能保证
Storm

Spark Streaming
当想要在流处理系统中实现有状态的操作时,我们往往想到的是一个长时间运行的

Samza

Flink

Counting Words with State
Trident
public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
...
}
在第
Spark Streaming
// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)])
val lines = ...
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
val trackStateFunc = (batchTime: Time, word: String, one: Option[Int],
state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
Some(output)
}
val stateDstream = wordDstream.trackStateByKey(
StateSpec.function(trackStateFunc).initialState(initialRDD))
在第
Samza
class WordCountTask extends StreamTask with InitableTask {
private var store: CountStore = _
def init(config: Config, context: TaskContext) {
this.store = context.getStore("wordcount-store")
.asInstanceOf[KeyValueStore[String, Integer]]
}
override def process(envelope: IncomingMessageEnvelope,
collector: MessageCollector, coordinator: TaskCoordinator) {
val words = envelope.getMessage.asInstanceOf[String].split(" ")
words.foreach { key =>
val count: Integer = Option(store.get(key)).getOrElse(0)
store.put(key, count + 1)
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"),
(key, count)))
}
}
在上述代码中第
Flink
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(...)
val words = text.flatMap ( _.split(" ") )
words.keyBy(x => x).mapWithState {
(word, count: Option[Int]) =>
{
val newCount = count.getOrElse(0) + 1
val output = (word, newCount)
(output, Some(newCount))
}
}
在第mapWithState
函数,第一个参数是即将需要处理的单次,第二个参数是一个全局的状态。
Performance
合理的性能比较也是本文的一个重要主题之一。不同的系统的解决方案差异很大,因此也是很难设置一个无偏的测试。通常而言,在一个流处理系统中,我们常说的性能就是指延迟与吞吐量。这取决于很多的变量,但是总体而言标准为如果单节点每秒能处理
另一方面,消息的可达性保证、容错以及状态管理都是需要考虑进去的。譬如如果你开启了容错机制,那么会增加
还有就是一定要记住,在分布式环境下进行大数据传输也是一件非常昂贵的消耗,因此我们要利用好数据本地化以及整个应用的序列化的调优。
Project Maturity( 项目成熟度)
在为你的应用选择一个合适的框架的时候,框架本身的成熟度与社区的完备度也是一个不可忽略的部分。
Summary
在我们进最后的框架推荐之前,我们再看一下上面那张图:

Framework Recommendations
这个问题的回答呢,也很俗套,具体情况具体分析。总的来说,你首先呢要仔细评估下你应用的需求并且完全理解各个框架之间的优劣比较。同时我建议是使用一个提供了上层接口的框架,这样会更加的开发友好,并且能够更快地投入生产环境。不过别忘了,绝大部分流应用都是有状态的,因此状态管理也是不可忽略地一个部分。同时,我也是推荐那些遵循
-
对于小型与需要快速响应地项目,
Storm 依旧是一个非常好的选择,特别是在你非常关注延迟度的情况下。不过还是要谨记容错机制和Trident 的状态管理会严重影响性能。Twitter 目前正在设计新的流处理系统Heron 用来替代Storm ,它可以在单个项目中有很好地表现。不过Twitter 可不一定会开源它。 -
对于
Spark Streaming 而言,如果你的系统的基础架构中已经使用了Spark ,那还是很推荐你试试的。另一方面,如果你想使用Lambda 架构,那Spark 也是个不错的选择。不过你一定要记住,Micro-Batching 本身的限制和延迟对于你而言不是一个关键因素。 -
如果你想用
Samza 的话,那最好Kafka 已经是你的基础设施的一员了。虽然在Samza 中Kafka 只是个可插拔的组件,不过基本上所有人都会使用Kafka 。正如上文所说,Samza 提供了强大的本地存储功能,能够轻松管理数十G 的状态数据。不过它的At-Least-Once 的投递限制也是很大一个瓶颈。 -
Flink 目前在概念上是一个非常优秀的流处理系统,它能够满足大部分的用户场景并且提供了很多先进的功能,譬如窗口管理或者时间控制。所以当你发现你需要的功能在Spark 当中无法很好地实现的时候,你可以考虑下Flink 。另外,Flink 也提供了很好地通用的批处理的接口,只不过你需要很大的勇气来将你的项目结合到Flink 中,并且别忘了多关注关注它的路线图。
Dataflow 与开源
我最后一个要提到的就是
