流计算框架对比

流计算框架对比

几个月之前我们在这里讨论过目前对于这种日渐增加的分布式流处理的需求的原因。当然,目前也有很多的各式各样的框架被用于处理这一些问题。现在我们会在这篇文章中进行回顾,来讨论下各种框架之间的相似点以及区别在哪里,还有就是从我的角度分析的,推荐的适用的用户场景。

DAG 主要功能即是用图来表示链式的任务组合,而在流处理系统中,我们便常常用 DAG 来描述一个流工作的拓扑。笔者自己是从 Akka 的 Stream 中的术语得到了启发。如下图所示,数据流经过一系列的处理器从源点流动到了终点,也就是用来描述这流工作。谈到 Akka 的 Streams,我觉得要着重强调下分布式这个概念,因为即使也有一些单机的解决方案可以创建并且运行 DAG,但是我们仍然着眼于那些可以运行在多机上的解决方案。

DAG 流图

核心关注点

在不同的系统之间进行选择的时候,我们主要关注到以下几点。

  • Runtime and Programming model(运行与编程模型):一个平台提供的编程模型往往会决定很多它的特性,并且这个编程模型应该足够处理所有可能的用户案例。这是一个决定性的因素,我也会在下文中多次讨论。

  • Functional Primitives(函数式单元): 一个合格的处理平台应该能够提供丰富的能够在独立信息级别进行处理的函数,像 map、filter 这样易于实现与扩展的一些函数。同样也应提供像 aggregation 这样的跨信息处理函数以及像 join 这样的跨流进行操作的函数,虽然这样的操作会难以扩展。

  • State Management(状态管理): 大部分这些应用都有状态性的逻辑处理过程,因此,框架本身应该允许开发者去维护、访问以及更新这些状态信息。

  • Message Delivery Guarantees(消息投递的可达性保证): 一般来说,对于消息投递而言,我们有至多一次(at most once)、至少一次(at least once)以及恰好一次(exactly once)这三种方案。

    • At most once 投递保证每个消息会被投递 0 次或者 1 次,在这种机制下消息很有可能会丢失。
    • At least once 投递保证了每个消息会被默认投递多次,至少保证有一次被成功接收,信息可能有重复,但是不会丢失。- Exactly once 意味着每个消息对于接收者而言正好被接收一次,保证即不会丢失也不会重复。
  • Failures Handling 在一个流处理系统中,错误可能经常在不同的层级发生,譬如网络分割、磁盘错误或者某个节点莫名其妙挂掉了。平台要能够从这些故障中顺利恢复,并且能够从最后一个正常的状态继续处理而不会损害结果。

除此之外,我们也应该考虑到平台的生态系统、社区的完备程度,以及是否易于开发或者是否易于运维等等。

RunTime and Programming Model

运行环境与编程模型可能是某个系统的最重要的特性,因为它定义了整个系统的呈现特性、可能支持的操作以及未来的一些限制等等。因此,运行环境与编程模型就确定了系统的能力与适用的用户案例。目前,主要有两种不同的方法来构建流处理系统,其中一个叫 Native Streaming,意味着所有输入的记录或者事件都会根据它们进入的顺序一个接着一个的处理。

另一种方法叫做 Micro-Batching。大量短的 Batches 会从输入的记录中创建出然后经过整个系统的处理,这些 Batches 会根据预设好的时间常量进行创建,通常是每隔几秒创建一批。

两种方法都有一些内在的优势与不足,首先来谈谈 Native Streaming。好的一方面呢是 Native Streaming 的表现性会更好一点,因为它是直接处理输入的流本身的,并没有被一些不自然的抽象方法所限制住。同时,因为所有的记录都是在输入之后立马被处理,这样对于请求方而言响应的延迟就会优于那种 Micro Batching 系统。处理这些,有状态的操作符也会更容易被实现,我们在下文中也会描述这个特点。不过 Native Streaming 系统往往吞吐量会比较低,并且因为它需要去持久化或者重放几乎每一条请求,它的容错的代价也会更高一些。并且负载均衡也是一个不可忽视的问题,举例而言,我们根据键对数据进行了分割并且想做进一步地处理。如果某些键对应的分区因为某些原因需要更多地资源去处理,那么这个分区往往就会变成整个系统的瓶颈。

而对于 Micro Batching 而言,将流切分为小的 Batches 不可避免地会降低整个系统的变现性,也就是可读性。而一些类似于状态管理的或者 joins、splits 这些操作也会更加难以实现,因为系统必须去处理整个 Batch。另外,每个 Batch 本身也将架构属性与逻辑这两个本来不应该被糅合在一起的部分相连接了起来。而 Micro Batching 的优势在于它的容错与负载均衡会更加易于实现,它只要简单地在某个节点上处理失败之后转发给另一个节点即可。最后,值得一提的是,我们可以在 Native Streaming 的基础上快速地构建 Micro Batching 的系统。

而对于编程模型而言,又可以分为 Compositional(组合式)与 Declarative(声明式)。组合式会提供一系列的基础构件,类似于源读取与操作符等等,开发人员需要将这些基础构件组合在一起然后形成一个期望的拓扑结构。新的构件往往可以通过继承与实现某个接口来创建。另一方面,声明式 API 中的操作符往往会被定义为高阶函数。声明式编程模型允许我们利用抽象类型和所有其他的精选的材料来编写函数式的代码以及优化整个拓扑图。同时,声明式 API 也提供了一些开箱即用的高等级的类似于窗口管理、状态管理这样的操作符。下文中我们也会提供一些代码示例。

Fault Tolerance

与批处理系统相比,流处理系统中的容错机制固然的会比批处理中的要难一点。在批处理系统中,如果碰到了什么错误,只要将计算中与该部分错误关联的重新启动就好了。不过在流处理的场景下,容错处理会更加困难,因为会不断地有数据进来,并且有些任务可能需要 7*24 地运行着。另一个我们碰到的挑战就是如何保证状态的一致性,在每天结束的时候我们会开始事件重放,当然不可能所有的状态操作都会保证幂等性。

反压

流处理系统需要能优雅地处理反压(backpressure)问题。反压通常产生于这样的场景:短时负载高峰导致系统接收数据的速率远高于它处理数据的速率。许多日常问题都会导致反压,例如,垃圾回收停顿可能会导致流入的数据快速堆积,或者遇到大促或秒杀活动导致流量陡增。反压如果不能得到正确的处理,可能会导致资源耗尽甚至系统崩溃。目前主流的流处理系统 Storm/JStorm/Spark Streaming/Flink 都已经提供了反压机制,不过其实现各不相同。

  • Storm 是通过监控 Bolt 中的接收队列负载情况,如果超过高水位值就会将反压信息写到 Zookeeper,Zookeeper 上的 watch 会通知该拓扑的所有 Worker 都进入反压状态,最后 Spout 停止发送 tuple。具体实现可以看这个 JIRA STORM-886。

  • JStorm 认为直接停止 Spout 的发送太过暴力,存在大量问题。当下游出现阻塞时,上游停止发送,下游消除阻塞后,上游又开闸放水,过了一会儿,下游又阻塞,上游又限流,如此反复,整个数据流会一直处在一个颠簸状态。所以 JStorm 是通过逐级降速来进行反压的,效果会较 Storm 更为稳定,但算法也更复杂。另外 JStorm 没有引入 Zookeeper 而是通过 TopologyMaster 来协调拓扑进入反压状态,这降低了 Zookeeper 的负载。

  • Flink 没有使用任何复杂的机制来解决反压问题,因为根本不需要那样的方案!它利用自身作为纯数据流引擎的优势来优雅地响应反压问题。

下面我们就看看其他的系统是怎么处理的:

Storm

Storm 使用了所谓的逆流备份与记录确认的机制来保证消息会在某个错误之后被重新处理。记录确认这一个操作工作如下:一个操作器会在处理完成一个记录之后向它的上游发送一个确认消息。而一个拓扑的源会保存有所有其创建好的记录的备份。一旦受到了从 Sinks 发来的包含有所有记录的确认消息,就会把这些确认消息安全地删除掉。当发生错误时,如果还没有接收到全部的确认消息,就会从拓扑的源开始重放这些记录。这就确保了没有数据丢失,不过会导致重复的 Records 处理过程,这就属于 At-Least 投送原则。

Storm 用一套非常巧妙的机制来保证了只用很少的字节就能保存并且追踪确认消息,但是并没有太多关注于这套机制的性能,从而使得 Storm 有较低地吞吐量,并且在流控制上存在一些问题,譬如这种确认机制往往在存在背压的时候错误地认为发生了故障。

Spark Streaming

Spark Streaming 以及它的 Micro Batching 机制则使用了另一套方案,道理很简单,Spark 将 Micro-Batches 分配到多个节点运行,每个 Micro-Batch 可以成功运行或者发生故障,当发生故障时,那个对应的 Micro-Batch 只要简单地重新计算即可,因为它是持久化并且无状态的,所以要保证 Exactly-Once 这种投递方式也是很简单的。

Samza

Samza 的实现手段又不一样了,它利用了一套可靠地、基于 Offset 的消息系统,在很多情况下指的就是 Kafka。Samza 会监控每个任务的偏移量,然后在接收到消息的时候修正这些偏移量。Offset 可以是存储在持久化介质中的一个检查点,然后在发生故障时可以进行恢复。不过问题在于你并不知道恢复到上一个 CheckPoint 之后到底哪个消息是处理过的,有时候会导致某些消息多次处理,这也是 At-Least 的投递原则。

Flink 主要是基于分布式快照,每个快照会保存流任务的状态。链路中运送着大量的 CheckPoint Barrier(检查点障碍,就是分隔符、标识器之类的),当这些 Barrier 到达某个 Operator 的时候,Operator 将自身的检查点与流相关联。与 Storm 相比,这种方式会更加高效,毕竟不用对每个 Record 进行确认操作。不过要注意的是,Flink 还是 Native Streaming,概念上和 Spark 还是相去甚远的。Flink 也是达成了 Exactly-Once 投递原则。

Managing State(状态管理)

大部分重要的流处理应用都会保有状态,与无状态的操作符相比,这些应用中需要一个输入和一个状态变量,然后进行处理最终输出一个改变了的状态。我们需要去管理、存储这些状态,要保证在发生故障的时候能够重现这些状态。状态的重造可能会比较困难,毕竟上面提到的不少框架都不能保证 Exactly-Once,有些 Record 可能被重放多次。

Storm

Storm 是实践了 At-Least 投递原则,而怎么利用 Trident 来保证 Exactly-Once 呢?概念上还是很简单的,只需要使用事务进行提交 Records,不过很明显这种方式及其低效。所以呢,还是可以构建一些小的 Batches,并且进行一些优化。Trident 是提供了一些抽象的接口来保证实现 Exactly-Once,如下图所示,还有很多东西等着你去挖掘。

Spark Streaming

当想要在流处理系统中实现有状态的操作时,我们往往想到的是一个长时间运行的 Operator,然后输入一个状态以及一系列的 Records。不过 Spark Streaming 是以另外一种方式进行处理的,Spark Streaming 将状态作为一个单独地 Micro Batching 流进行处理,所以在对每个小的 Micro-Spark 任务进行处理时会输入一个当前的状态和一个代表当前操作的函数,最后输出一个经过处理的 Micro-Batch 以及一个更新好的状态。

Samza

Samza 的处理方式更加简单明了,就是把它们放到 Kafka 中,然后问题就解决了。Samza 提供了真正意义上的有状态的 Operators,这样每个任务都能保有状态,然后所有状态的变化都会被提交到 Kafka 中。在有需要的情况下某个状态可以很方便地从 Kafka 的 Topic 中完成重造。为了提高效率,Samza 允许使用插件化的键值本地存储来避免所有的消息全部提交到 Kafka。这种思路如下图所示,不过 Samza 只是提高了 At-Least 这种机制,未来可能会提供 Exactly-Once。

Flink 提供了类似于 Samza 的有状态的 Operator 的概念,在 Flink 中,我们可以使用两种不同的状态。第一种是本地的或者叫做任务状态,它是某个特定的 Operator 实例的当前状态,并且这种状态不会与其他进行交互。另一种呢就是维护了整个分区的状态。

Counting Words with State

Trident

public static StormTopology buildTopology(LocalDRPC drpc) {
   FixedBatchSpout spout = ...

   TridentTopology topology = new TridentTopology();

   TridentState wordCounts = topology.newStream("spout1", spout)
     .each(new Fields("sentence"),new Split(), new Fields("word"))
     .groupBy(new Fields("word"))
     .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
 ...
 }

在第 9 行中,我们可以通过调用一个持久化的聚合函数来创建一个状态。

Spark Streaming

// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)])

val lines = ...
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

val trackStateFunc = (batchTime: Time, word: String, one: Option[Int],
  state: State[Int]) => {
    val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
    val output = (word, sum)
    state.update(sum)
    Some(output)
  }

val stateDstream = wordDstream.trackStateByKey(
  StateSpec.function(trackStateFunc).initialState(initialRDD))

在第 2 行中,我们创建了一个 RDD 用来保存初始状态。然后在 5,6 行中进行一些转换,接下来可以看出,在 8-14 行中,我们定义了具体的转换方程,即输入时一个单词、它的统计数量和它的当前状态。函数用来计算、更新状态以及返回结果,最后我们将所有的 Bits 一起聚合。

Samza

class WordCountTask extends StreamTask with InitableTask {

  private var store: CountStore = _

  def init(config: Config, context: TaskContext) {
    this.store = context.getStore("wordcount-store")
      .asInstanceOf[KeyValueStore[String, Integer]]
  }

 override def process(envelope: IncomingMessageEnvelope,
   collector: MessageCollector, coordinator: TaskCoordinator) {

   val words = envelope.getMessage.asInstanceOf[String].split(" ")

   words.foreach { key =>
     val count: Integer = Option(store.get(key)).getOrElse(0)
     store.put(key, count + 1)
     collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"),
       (key, count)))
   }
 }

在上述代码中第 3 行定义了全局的状态,这里是使用了键值存储方式,并且在 5~6 行中定义了如何初始化。然后,在整个计算过程中我们都使用了该状态。

val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements(...)
val words = text.flatMap ( _.split(" ") )

words.keyBy(x => x).mapWithState {
  (word, count: Option[Int]) =>
    {
      val newCount = count.getOrElse(0) + 1
      val output = (word, newCount)
      (output, Some(newCount))
    }
}

在第 6 行中使用了 mapWithState 函数,第一个参数是即将需要处理的单次,第二个参数是一个全局的状态。

Performance

合理的性能比较也是本文的一个重要主题之一。不同的系统的解决方案差异很大,因此也是很难设置一个无偏的测试。通常而言,在一个流处理系统中,我们常说的性能就是指延迟与吞吐量。这取决于很多的变量,但是总体而言标准为如果单节点每秒能处理 500K 的 Records 就是个合格的,如果能达到 100 万次以上就已经不错了。每个节点一般就是指 24 核附带上 24 或者 48GB 的内存。对于延迟而言,如果是 Micro-Batch 的话往往希望能在秒级别处理。如果是 Native Streaming 的话,希望能有百倍的减少,调优之后的 Storm 可以很轻易达到几十毫秒。

另一方面,消息的可达性保证、容错以及状态管理都是需要考虑进去的。譬如如果你开启了容错机制,那么会增加 10%到 15%的额外消耗。除此之外,以文章中两个 WordCount 为例,第一个是无状态的 WordCount,第二个是有状态的 WordCount,后者在 Flink 中可能会有 25%额外的消耗,而在 Spark 中可能有 50% 的额外消耗。当然,我们肯定可以通过调优来减少这种损耗,并且不同的系统都提供了很多的可调优的选项。

还有就是一定要记住,在分布式环境下进行大数据传输也是一件非常昂贵的消耗,因此我们要利用好数据本地化以及整个应用的序列化的调优。

Project Maturity(项目成熟度)

在为你的应用选择一个合适的框架的时候,框架本身的成熟度与社区的完备度也是一个不可忽略的部分。Storm 是第一个正式提出的流处理框架,它已经成为了业界的标准并且被应用到了像 Twitter、Yahoo、Spotify 等等很多公司的生产环境下。Spark 则是目前最流行的 Scala 的库之一,并且 Spark 正逐步被更多的人采纳,它已经成功应用在了像 Netflix、Cisco、DataStax、Indel、IBM 等等很多公司内。而 Samza 最早由 LinkedIn 提出,并且正在运行在几十个公司内。Flink 则是一个正在开发中的项目,不过我相信它发展的会非常迅速。

Summary

在我们进最后的框架推荐之前,我们再看一下上面那张图:

Framework Recommendations

这个问题的回答呢,也很俗套,具体情况具体分析。总的来说,你首先呢要仔细评估下你应用的需求并且完全理解各个框架之间的优劣比较。同时我建议是使用一个提供了上层接口的框架,这样会更加的开发友好,并且能够更快地投入生产环境。不过别忘了,绝大部分流应用都是有状态的,因此状态管理也是不可忽略地一个部分。同时,我也是推荐那些遵循 Exactly-Once 原则的框架,这样也会让开发和维护更加简单。不过不能教条主义,毕竟还是有很多应用会需要 At-Least-Once 与 At-Most-Once 这些投递模式的。最后,一定要保证你的系统可以在故障情况下很快恢复,可以使用 Chaos Monkey 或者其他类似的工具进行测试。在我们之前的讨论中也发现这个快速恢复的能力至关重要。

  • 对于小型与需要快速响应地项目,Storm 依旧是一个非常好的选择,特别是在你非常关注延迟度的情况下。不过还是要谨记容错机制和 Trident 的状态管理会严重影响性能。Twitter 目前正在设计新的流处理系统 Heron 用来替代 Storm,它可以在单个项目中有很好地表现。不过 Twitter 可不一定会开源它。

  • 对于 Spark Streaming 而言,如果你的系统的基础架构中已经使用了 Spark,那还是很推荐你试试的。另一方面,如果你想使用 Lambda 架构,那 Spark 也是个不错的选择。不过你一定要记住,Micro-Batching 本身的限制和延迟对于你而言不是一个关键因素。

  • 如果你想用 Samza 的话,那最好 Kafka 已经是你的基础设施的一员了。虽然在 Samza 中 Kafka 只是个可插拔的组件,不过基本上所有人都会使用 Kafka。正如上文所说,Samza 提供了强大的本地存储功能,能够轻松管理数十 G 的状态数据。不过它的 At-Least-Once 的投递限制也是很大一个瓶颈。

  • Flink 目前在概念上是一个非常优秀的流处理系统,它能够满足大部分的用户场景并且提供了很多先进的功能,譬如窗口管理或者时间控制。所以当你发现你需要的功能在 Spark 当中无法很好地实现的时候,你可以考虑下 Flink。另外,Flink 也提供了很好地通用的批处理的接口,只不过你需要很大的勇气来将你的项目结合到 Flink 中,并且别忘了多关注关注它的路线图。

Dataflow 与开源

我最后一个要提到的就是 Dataflow 和它的开源计划。Dataflow 是 Google 云平台的一个组成部分,是目前在 Google 内部提供了统一的用于批处理与流处理的服务接口。譬如用于批处理的 MapReduce,用于编程模型定义的 FlumeJava 以及用于流处理的 MillWheel。Google 最近打算开源这货的 SDK 了,Spark 与 Flink 都可以成为它的一个运行驱动。

Google Dataflow