10.1理解响应式编程

10.1理解响应式编程

响应式编程是对命令式编程进行替代的一个范例。这种替代的存在是因为响应式编程解决了命令式编程的限制。通过了解这些限制,可以更好地把握响应式模式的好处。

注意:响应式编程不是银弹。不应该从这章或是其他任何对于响应式编程的讨论中推断出命令式编程是魔鬼而响应式编程是天使。与作为一个开发人员学习的任何东西一样,响应式编程在某些地方很合适,在某些地方完全没有,应该对症下药。

如果和许多开发者一样,都是从命令式编程起步的。很自然地,现在你所写的大多数代码都是命令式的。命令式编程是非常直观的,现在的学生在他们学校的STEM课程中很轻松地学习它,它很强大,以至于它构成了大部分的代码,驱动着最大的企业。

这个想法很简单:你写的代码就是一行接一行的指令,按照它们的顺序一次一条地出现。一个任务被执行,程序就需要等到它执行完了,才能执行下一个任务。每一步,数据都需要完全获取到了才能被处理,因此它需要作为一个整体来处理。

这样做还行吧…直到它不得行了。当正在执行的任务被阻塞了,特别是它是一个IO任务,例如将数据写入到数据库或从远程服务器获取数据,那么调用该任务的线程将无法做任何事情,直到任务完成。说穿了,阻塞的线程就是一种浪费。

大多数编程语言,包括Java,支持并发编程。在Java中启动另一个线程并将其发送到执行某项工作的分支上是相当容易的,而调用线程则继续执行其他工作。尽管很容易创建线程,但这些线程可能最终会阻塞自己。在管理在多线程里面的并发是很具有挑战性的。更多的线程意味着更高的复杂度。

相反,响应式编程是函数式和声明式的。响应式编程涉及描述通过该数据流的pipelinestream,而不是描述的一组按顺序执行的步骤。响应式流处理数据时只要数据是可用的就进行处理,而不是需要将数据作为一个整体进行提供。事实上,输入数据可以是无穷的(例如,一个地点的实时温度数据的恒定流

应用于一个真实世界的类比就是,将命令式编程看做一个装水的气球,响应式编程看做花园里面的水管。两者都是在炎热的夏天让你的朋友惊喜并沉浸其中的方式。但是它们的执行风格是不同的:

  • 一个水气球一次能携带的它的有效载荷,在撞击的那一刻浸湿了它预定的目标。然而,水球的容量是有限的,如果你想用水泡更多的人(或把同一个人淋得更湿,你唯一的选择就是增加水球的数量。
  • 一根花园水龙带将其有效载荷作为一股水流从水龙头流向喷嘴。花园水龙头接的水带的容量可能是有限的,但在打水仗的过程中水是源源不断的。只要水从水龙头进入软管,就会一直通过软管然后从喷嘴喷出。同一个花园软管是很容易扩展的,你和朋友们可以玩得更尽兴。

命令式编程就类似打水仗中的水球,本质上没有什么问题,但是拿着类似响应式编程的水管的人在可扩展性和性能方面是有优势的。

10.1.1定义响应式流

Reactive Streams2013年底由NetflixLightbendPivotalSpring背后的公司)的工程师发起的一项计划。响应式流旨在为无阻塞异步流处理提供一个标准。

我们已经谈到了响应式编程的异步特性;它使我们能够并行执行任务以获得更大的可伸缩性。Backpressure(译者注:如何形象的描述反应式编程中的背压(Backpressure)机制? )是一种手段,通过对用户愿意处理的数据量设定限制,数据消费者可以避免被生产速度过快的数据淹没。

Java StreamsReactive Streams对比

Java流和响应式流之间有很大的相似性。首先,它们的名字中都含有Streams。它们也都为处理数据提供函数式接口。事实上,稍后当学到容器的时候,你会看到,其实它们有很多共同操作。

然而,Java流通常是同步的,同时只能处理有限数据集。它们本质上是使用函数式进行集合迭代的一种手段。

响应式流支持任何大小的数据集,包括无限数据集的异步处理。它们使实时处理数据成为了可能。

响应式流的规范可以通过四个接口定义来概括:Publisher,Subscriber,SubscriptionProcessorPublisher为每一个SubscriptionSubscriber生产数据。Publisher接口声明了一个subscribe()方法,通过这个方法Subscriber可以订阅Publisher

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

Subscriber一旦进行了订阅,就可以从Publisher中接收消息,这些消息都是通过Subscriber接口中的方法进行发送:

public interface Subscriber<T> {
    void onSubscribe(Subscription sub);
    void onNext(T item);
    void onError(Throwable ex);
    void onComplete();
}

Subscriber通过调用onSubscribe()函数将会收到第一个消息。当Publisher调用onSubscribe(),它通过一个Subscription对象将消息传输给Subscriber。消息是通过Subscription进行传递的,Subscriber可以管理他自己的订阅内容:

public interface Subscription {
    void request(long n);
    void cancel();
}

Subscriber可以调用request()去请求被被发送了的数据,或者调用cancel()来表明他对接收的数据不感兴趣,并取消订阅。当调用request()时,Subscriber通过传递一个long值的参数来表示它将会接收多少数据。这时就会引进backpressure,用以阻止Publisher发送的数据超过Subscriber能够处理的数据。在Publisher发送了足够的被请求的数据后,Subscriber可以再次调用request()来请求更多的数据。

一旦Subcriber已经接收到数据,数据就通过流开始流动了。每一个Publisher发布的项目都会通过调用onNext()方法将数据传输到Subscriber。如果出现错误,onError()方法将被调用。如果Publisher没有更多的数据需要发送了,同时也不会再生产任何数据了,将会调用onComplete()方法来告诉Subscriber,它已经结束了。

对于Processor接口而言,它连接了SubscriberPublisher

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

作为SubscriberProcessor将会接收数据然后以一定的方式处理这些数据。然后它会像变戏法一样地变为一个Publisher将处理的结果发布给Subscriber

正如你所看到的,响应式流规范相当地简单。关于如何从Publisher开始建立起一个数据处理的通道,这也是一件很容易的事情了,通过将数据不输入或是输入到多个Processor中,然后将最终结果传递到Subscriber中就行了。

Reactor工程实现了响应式流的规范,它提供由响应式流组成的函数式API。正如你将在后面的章节中看到的,ReactorSpring 5响应式编程模型的基础。在本章的剩余部分,我们将探索Reactor工程。

下一页