前言

       由于工作的需要使用到了响应式编程,所以对响应式编程做了一番探索,今天分享出来大家一起探讨,共同进步,共同提高。

1. 响应式编程及其起源与发展

1.1 什么是响应式编程

1.1.1. 直观感受


       我们可以把A列从上到下想象成一个数据流,每一个数据到达时都会触发一个事件,该事件会被传播到右侧单元格,后者则会处理事件并改变自身的状态。这一系列流程就是响应式的核心思想。

1.1.2. 定义

       响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流(data stream)和变化传播(propagation of change)的声明式编程泛式。
       响应式编程的定义一共包括三部分:
       1). 面向数据流
       2). 变化传播
       3). 声明式编程泛式
       声明式编程区别于命令式编程,更关注我想要什么(What)而不是怎么去做(How)。
       SQl是典型的声明式编程,例如 select * from user where age > 10; 它描述的是我需要什么(What),而不管数据是怎么获取的。
       常用的编程语言都是命令式的,举例如下:

       它告诉计算机怎么做从1加到100这个事情(How)。

1.2 起源与发展

1.2.1 发展历史

       每一种技术都是为了解决特定领域的特定问题而产生的,响应式编程也不例外,我们从响应式编程的起源与发展来窥探一下响应式编程为了解决什么问题。

       如上图所示,响应式编程思想在1985年的时候被首次提出,2005年10月28日,微软首席架构师Ray Ozzie发送全员内部邮件,提醒大家复杂性设计带来的危害并开始探索新的解决方式,2007年夏天,Rx发明者在对Iterable/Iterator的处理上得到了一对儿表示异步事件流的接口,并与2009年发展成了Rx.net框架。之后响应式编程思想大爆发,发展出了大家熟知的Rxjava2,Rxjava3,Reactor,Vert.x等一大批优秀的框架。由此可见,响应式编程是为了解决复杂性而生。

反应式宣言:反应式宣言定义了反应式系统应该具备的一些架构设计原则。符合反应式设计原则的系统称为反应式系统。

Reactive Streams规范:反应式宣言仅阐述了设计原则,并没有给出具体的实现规范,导致每个反应式框架都各自实现了一套自己的API规范,且相互之间无法互通。为了解决这个问题,Reactive Streams规范应运而生。Reactive Streams的目标是定义一组最小化的异步流处理接口,使得在不同框架之间,甚至不同语言之间实现交互性。Reactive Streams的目的在于增强不同框架之间的交互性,提供的是一组最小功能集合。

1.2.2 响应式系统

       具备以下特质:即**时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)**的系统为反应式系统(Reactive System)。

即时响应性:只要有可能, 系统就会及时地做出响应。 即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。 即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。 这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。

回弹性:系统在出现失败时依然保持即时响应性。 这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。 回弹性是通过复制、 遏制、 隔离以及委托来实现的。 失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。 每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可
用性。 (因此)组件的客户端不再承担组件失败的处理。

弹性: 系统在不断变化的工作负载之下依然保持即时响应性。 反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。 这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。 通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。 这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。

消息驱动:反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 这一边界还提供了将失败作为消息委托出去的手段。 使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。 使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。 非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

1.2.3 框架技术定位

       Java领域比较流行的相应是编程的库主要有Reactor, RxJava, Vert.x, Akka, Quarkus五中,其中Reactor,RxJava框架实现思想类似,他们都是响应式编程的扩展。 Akka, Vert.x, Quarkus实现思路类似,他们都是基于Actor模型, Quarkus是基于 Vert.x, 我从官网上截取了这些框架的介绍如下:

1.2.4 总结和对比

  • 框架和库的对比
  • Reactor和RxJava的对比

  • Reactive Streams 与 Java8 Stream的区别

2. 响应式编程技术实现

2.1 基于Actor模型的实现

2.1.1 Actor模型

2.1.1.1 什么是Actor模型

       Actor模型是1973年提出的基于消息传递(区别于java的共享内存)的分布式并发编程模式。适用于对一致性需求不是很高且对性能需求较高的场景。

  • 处理并发问题本质上是如何保证共享数据的一致性和正确性。
  • 共享内存:强一致性,弱隔离性。
  • 消息传递:强隔离性,弱一致性。
2.1.1.2 Actor模型组成

       Actor是由状态(state)、行为(behavior)、邮箱(mailbox)三者组成的。

  • 状态(state): 状态是指actor对象的变量信息,状态由actor自身管理,避免并发环境下的锁和内存原子性等问题。
  • 行为(behavior):行为指定的是actor中计算逻辑,通过actor接收到的消息来改变actor的状态。
  • 邮箱(mailbox):邮箱是actor之间的通信桥梁,邮箱内部通过FIFO消息队列来存储发送发消息,而接收方则从邮箱中获取消息。
2.1.1.3 Actor模型调度方式

       Actor模型有两种任务调度方式:基于线程的调度、基于事件的调度。

  • 基于线程的调度
           为每个Actor分配一个线程,在接收一个消息时,如果当前Actor的邮箱为空则会阻塞当前线程。基于线程的调度实现较为简单,但线程数量受到操作的限制,现在的Actor模型一般不采用这种方式。
  • 基于事件的调度
           事件可以理解为任务或消息的到来,而此时才会为Actor的任务分配线程并执行。
2.1.2.4 Actor模型用途

Actor模型描述了一组并发编程的公理:

  • 1)所有的Actor状态是本地的,外部是无法访问的。
  • 2)Actor必须通过消息传递进行通信
  • 3)一个Actor可以响应消息、退出新Actor、改变内部状态、将消息发送到一个或多个Actor。
  • 4)Actor可能会堵塞自己但Actor不应该堵塞自己运行的线程
2.1.2.5 Actor与Java对比(消息传递与共享内存对比)
  • 共享内存方式计算1000000以内的素数

       通过锁/同步的方式实现并发,每次同步获取当前值并让一个线程去判断值是否为素数,若是的话则通过同步方式对计数器加一

  • Actor模型计算1000000以内的素数

           使用Actor模型方式会将此过程拆分成多个模块,即拆分成多个Actor。每个Actor负责不同部分,并通过消息传递让多个Actor协同工作。
2.1.2.6 Actor模型的限制

       Actor模型的限制是各Actor之间的行为相互独立,不适合用来做需要Actor之间配合的事情。

       当用户A Actor扣款期间,用户B Actor是不受限的,此时对用户B Actor进行操作是合法的,针对这种情况,单纯的Actor模型就显得比较乏力,需要加入其他机制来保证一致性。

2.1.2.7 Actor模型的应用
  • Akka, Vert.x, Quarkus 都是基于Actor模型实现的。
  • golang和Erlang也是基于Actor模型实现的。
  • Erlang是1986年由爱立信的工程师开发的语言,主要用在电信领域,Erlang具有高并发,消息传递,高容错,代码热加载,垃圾回收等特性,Erlang的Actor基于进程实现,Actor之间的通信实际上是进程之间的通信,基于Actor模型,Erlang引入了”随它崩溃“的哲学理念,Actor被supervisor监控着,监控者supervisor唯一的职责是知道代码崩溃后干什么,往往是使Actor从错误状态进入到初始化状态(进程重启)。

2.1.2 Vert.x

2.1.2.1 Vert.x 特点
  • 支持多种编程语言
           目前支持Java、JavaScript、Ruby、Python、Groovy、Clojure、Ceylon等,并提供友好的API接口。以上技术栈的工程师可以非常容易的学习和使用Vert.x 架构。

  • 异步无锁编程
           经典的多线程编程模型能满足很多Web开发场景,但随着移动互联网并发连接数的猛增,多线程并发控制模型性能难以扩展,同时要想控制好并发锁需要较高的技巧,目前Reactor异步编程模型开始跑马圈地,而Vert.x就是这种异步无锁编程的一个首选。

  • 对各种IO的丰富支持
           目前Vert.x的异步模型已支持TCP、UDP、FileSystem、DNS、EventBus、Sockjs等,基本满足绝大多数系统架构需求。

  • 分布式消息传输
           Vert.x基于分布式Bus消息机制实现其Actor模型,我们的业务逻辑如果依赖其他Actor则通过Bus简单的将消息发送出去就可以了。EventBus事件总线,可以轻松编写分布式解耦的程序,具有很好的扩展性。
           EventBus也是Vert.x架构的灵魂所在。

  • 生态体系日趋成熟
           Vertx归入Eclipse基金会门下,异步驱动已经支持了Postgres、MySQL、MongoDB、Redis等常用组件,并且有若干Vertx在生产环境中的应用案例。

  • Vertx是轻量级的
           vertx的核心代码包只有650kB左右,依赖非常少,主要依赖Netty4和Jackson。

  • Vertx并不是一个Web容器
           Vertx并不是一个Web Server,它是一种异步编程框架,你可以将自己基于vert.x的应用程序放置到任何你想放置的Web容器中部署运行,可以非常方便的和Spring,Spring Boot,Spring Cloud,Nodejs等语言混编。

  • 模块化
           Vertx本身内置强大的模块管理机制,当你写完一个Vert.x业务逻辑的时候,你可以将其打包成module,然后部署到基于Maven的仓库里,与现有主流的开发过程无缝结合。

  • 支持WebSocket
           支持WebSocket协议兼容SockJS , 可以非常方便的实现web前端和服务后端长连接通信,是轻量级web聊天室应用首选解决方案。

  • 使用简单
           这里的简单意味着你编写的代码是完全基于异步事件的,类似Node.JS,与此同时.你不需要关注线程上的同步,与锁之类的概念,所有的程序都是异步执行并且通信是无阻塞的。

  • 良好的扩展性
           因为基于Actor模型,所以你的程序都是一个点一个点的单独在跑,一群点可以组成一个服务,某个点都是可以水平扩展,动态替换,这样你的程序,基本就可以达到无限制的水平扩展。

  • 高并发性
           vert.x是一个事件驱动非阻塞的异步编程框架,你可以在极少的核心线程里占用最小限度的硬件资源处理大量的高并发请求。
           此并发并非JDK库的并发,当你Coding的时候,不再关注其锁,同步块,死锁之类的概念,你就可以随性所欲的写自己的业务逻辑,Vert.x本身内置三种线程池帮你处理。

2.1.2.2 Vert.x基础概念
  • Event Loop:即事件循环,是由Vert.x启动的事件处理线程,也是Vert.x项目对外开放的入口,Vert.x由此接收请求事件。一个Vert.x有一个或多个事件循环线程组成,线程最大数量为主机有效的CPU核数。
  • Event Loop Vertical:事件的业务处理线程,存在于Event Loop中,用于处理非阻塞短任务。
  • Worker Vertical : 事件的业务处理线程,用于处理长任务阻塞任务。
  • Event Bus:即事件总线,是Vert.x事件模型中最核心的部分,所有的事件都经由事件总线进行分发,包括Vertical之间的通信事件。
  • Vert.x Module : Vert.x项目模块,一个应用通常由多个模块组成,每个模块一般包含多个Vertical。
  • Vert.x的实现基于Netty。
2.1.2.3 Vert.x模型及架构图
  • Vert.x系统的概念模型
  • Vert.x系统的架构模型
  • 客户端请求处理流程
  • 请求在vert.x框架中被插入到一个事件队列中,耗时的任务将被委托给长任务处理线程进行处理,保证线程不被耗时的IO操作进行阻塞等待。
  • 事件循环线程和长任务处理线程通过EventBus进行通信。

2.1.3 Quarkus

2.1.3.1 什么是quarkus(官网)
  • Graalvm:Oracle在2018年开源的一个通用型虚拟机,可以用来构建多语言应用,也可以简单地用来编译和构建Java应用,甚至可以将Java应用构建成Native Image,以提高应用的启动速度以及降低应用的内存占用,目前还是一个实验性产品。
  • Quarkus的特点:
    • auto reload dev mode - 修改完的程序,保存后可以直接运行,无需手动编译后再运行,提高了开发效率。
    • jvm vs native - 除了jvm的运行模式外,还支持native模式。native模式类似于C/C++,需要预先编译生成可执行文件。由于不需要jvm,所以可以做到毫秒级别的启动,而且启动所需的内存也非常少。因此native模式非常适合serverless架构的实现。
    • imperatvice vs reactive - 同时支持imperative(代码的顺序就是执行的顺序)和reactive(比如javascript, Node.js中的callback)两种编程模式。
2.1.3.2 Quarkus 架构
  • Quarkus is based on Vert.x
  • Quarkus IO模型
  • Quarkus IO模型

2.2 观察者模式的扩展

2.2.1 Project Reactor(PR)整体设计

       Project Reactor是观察者模式的扩展,有三个最主要的接口:Publisher 即被观察者, Subscriber 即观察者, Subscription 订阅关系

2.2.2 Project Reactor 核心元素

2.2.2.1 Flux

       Flux, an Asynchronous Sequence of 0-N Items

2.2.2.2 Mono

       Mono, an Asynchronous 0-1 Result

2.2.3 常用操作

2.2.3.1 map

       map操作可以将数据元素进行转换/映射,得到一个新元素。

2.2.3.2 flatMap

       flatMap操作可以将每个数据元素转换/映射为一个流,然后将这些流合并为一个大的数据流。

       flatMap通常用于每个元素会引入新的流的情况。

2.2.3.3 filter

       filter操作可以对数据元素进行筛选。

2.2.3.4 zip

       将多个流合并成一个流

       zip可以非常方便实现pipeline调用
       pipeline: 将一个操作切割成可以独立运行的部分,每个部分由不同的单元负责,依次进入作业队列执行

2.2.4 流操作实现

举例

Flux.just("aaa","bbb","ccc").map(s->s.toUpperCase()).filter(item->item.length() >= 3).subscribe(System.out::println);

说明
       一个Reactor的调用链从create开始到subscribe结束,可以把整个调用链分为上游、中游、下游三个部分,上游是数据源,中游负责数据变换,下游负责数据接收与展示。

实现原理

2.2.5 调度器与线程模型

  • 调度器(Scheduler)用于多线程并发调度的处理。
  • Project Reactor默认调度器实现
    • 1.Schedulers.immediate() 当前线程
    • 2.Schedulers.single() 可重用的单线程,如果想使用独占线程则使用: Schedulers.newSingle(), 对标Executors.newSingleThreadExecutor()
    • 3.Schedulers.elastic() 弹性线程池,对标Executors.newCachedThreadPool()
    • 4.Schedulers.parallel() 固定大小线程池,所创建线程池的大小与CPU个数等同,对标Executors.newFixedThreadPool();
    • 5.Schedulers.fromExecutorService(ExecutorService) 自定义线程池

2.2.6 publishOn&subscribeOn

  • 如果不做线程切换,Project Reactor默认是在一个线程中执行的,publishOn&subscribeOn用于切换线程。
  • publishOn:会影响链中其后的操作符
  • subscribeOn:无论出现在什么位置,都只影响源头的执行环境

3. 命令式同步 VS 响应式异步

       我们用Reactor3对服务进行了异步化改造,压测效果如下所示:
       机器配置: 4C8G

  • 命令式同步
  • 响应式异步

4. 响应式编程缺点

响应式编程缺点

    1. 接受新思想的挑战。
    1. 调试相对困难。
    1. 代码结构、流程需要更细致的设计。

5. 参考

  • 响应式

    • 2 http://www.reactive-streams.org/ (Reactive Streams规范)
    • 3 [https://www.jianshu.com/p/a6a8de73f4bf](Reactive Streams 与 Java 8 Stream的区别)
  • Actotr模型

    • 1.https://www.jianshu.com/p/d803e2a7de8eS
  • Spring

    • 1.https://www.cnblogs.com/doit8791/p/10507820.html
  • Node.js

    • 1.https://baike.baidu.com/item/node.js/7567977?fr=aladdin node.js简介
  • Reactor相关

      1. https://projectreactor.io/ project reactor
      1. https://projectreactor.io/docs/core/release/reference/ 英文版使用文档
      1. https://www.jianshu.com/p/7ee89f70dfe5?from=singlemessage Spring Reactor 入门与实践
      1. https://www.cnblogs.com/felordcn/p/12142581.html Reactor介绍
      1. https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro 学习文档
      1. https://zhuanlan.zhihu.com/p/96603046 反应式架构(1):基本概念介绍
      1. http://blog.yannxia.top/2018/06/26/java/spring/projectreactor/ 由表及里学 ProjectReactor
      1. https://www.jianshu.com/p/df395eb28f69 Project Reactor 核心原理解析
      1. https://blog.csdn.net/cunfu5234/article/details/112546222 reactor模式
      1. https://www.javacodegeeks.com/2018/08/frameworks-toolkits-make-java-reactive-rxjava-spring-reactor-akka-vert-x-overview.html
      1. http://blog.yannxia.top/2018/06/26/java/spring/projectreactor/ 由表及里学 ProjectReactor
      1. https://blog.csdn.net/get_set/article/details/79466657 响应式Spring的道法术器
      1. https://blog.csdn.net/weixin_43113679/article/details/99616683
      1. http://www.blogjava.net/DLevin/archive/2015/09/02/427045.html
  • RxJava相关

      1. https://reactivex.io/ 官网
      1. https://www.jianshu.com/p/7b4e5e496120 RxJava 起源
      1. https://github.com/ReactiveX/RxJava RxJava github 地址
      1. https://segmentfault.com/a/1190000021578752 RxJava介绍
      1. https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-3.0#interoperation
  • Vert.x相关

      1. https://vertx.io/get-started/ 官网
      1. https://github.com/vert-x3 github地址
      1. https://www.jianshu.com/p/056f333d97f5 深入浅出Vert.x架构
      1. https://www.cnblogs.com/jpfss/p/9395063.html vert.x简介
      1. https://www.jdon.com/concurrent/vertx.html Vert.x入门教程
      1. https://maimai.cn/article/detail?fid=1396337560&efid=BbeVJslTNoA7SotvUW9HzQ vert.x原理分析
      1. http://www.360doc.com/content/18/0203/14/39530679_727432611.shtml vert.x介绍
      1. https://blog.csdn.net/m0_37055174/article/details/99957489 全面异步化:淘宝反应式架构升级探索
      1. http://www.360doc.com/content/18/0203/14/39530679_727432611.shtml
      1. https://www.cnblogs.com/jpfss/p/9395063.html
      1. https://www.bilibili.com/video/av42433614/
  • Quarkus

      1. https://quarkus.io/guides/quarkus-reactive-architecture
      1. https://quarkus.io/blog/mutiny-vertx/
      1. https://quarkus.io/guides/quarkus-reactive-architecture
      1. https://www.jianshu.com/p/e490043cc3eb
  • AKKA

      1. https://www.lightbend.com/
      1. https://www.cnblogs.com/crazymakercircle/p/13910553.html AkkA
      1. https://www.zhihu.com/question/279512440 为什么Akka(Actor模型)在中国不温不火?
  • Erlang相关

      1. https://www.researchgate.net/publication/222607035_Plex_Languages PLEX
      1. https://blog.csdn.net/weixin_52395571/article/details/112211713 Erlang