反应式

接下来会围绕微服务,整理相关的技术,及开源一个实践项目。
微服务是很多新技术,思想,方案,实践的产物,那么,我们就从Spring5的新特性反应式说起

反应式

说反应式先从基础开始

基础

  • 阻塞:等待(网络,磁盘IO, 或锁)
  • 非阻塞:不等待立刻获得结果,当I/O操作无法完成时,不要将线程等待,而是返回一个错误码(EWOULDBLOCK),这样请求就不会阻塞。
    乐观锁CAS 也是非阻塞的方式。
  • 同步:当前线程等待调用结果
  • 异步:当前线程不等待调用结果,一般需要使用新的线程来做IO这样的事情。
  • 阻塞传输

    write阻塞, 当kernel的该socket的发送缓冲区已满时。
    read阻塞,通常是发送端的数据没有到达。
    
    线程池
    目的:更多的线程来解决并发线程因为阻塞不够用的问题,本质上没有解决阻塞
    缺点:增加CPU线程上下文切换,CPU,内存的浪费
    
  • 非阻塞传输
    目的:达到非阻塞,就不需要更多的线程
    好处:CPU,内存利用率,资源的伸缩,以及吞吐量
IO 模式 效果 IO是否阻塞 IO同步or异步 说明
BIO 等待读,等待写 阻塞 同步 阶段1:网络 I/O 阻塞;磁盘I/O 阻塞。 阶段2:数据拷贝:内核态到用户态内存
NIO 可以读,可以写 非阻塞 同步 I/O操作无法完成时,返回一个错误码,这样请求就不会阻塞 ;每次发起I/O请求后立即返回,为了等到数据,需要轮询
AIO 读好了 非阻塞 异步 read/write方法都是异步的,完成后操作系统会主动调用回调函数

以上是一些总结,相关代码可以搜索 :

  • BIO (等待IO)同步阻塞IO
  • NIO (可以IO)同步非阻塞IO
    I/O多路复用(异步阻塞 I/O)
    Reactor单线程模型
    Reactor多线程模型
    主从Reactor多线程模型 (读写IO由subReactor线程池处理)
  • AIO 异步I/O (IO好了)异步非阻塞IO

  • 其它:
    Netty网络通信的基础:异步NIO实现;
    线程模型,包括3个线程池:
    eventLoopGroupBoss Aceptor线程池
    eventLoopGroupSelector IO 线程池
    defaultEventExecutorGroup 业务处理线程池

反应式落地

概念:反应式 就是 异步非阻塞
  • 响应式宣言(Reactive Manifesto) 描述了响应式系统(reactive systems)应该具备的四个关键属性:
    Responsive(灵敏的)、
    Resilient(可故障恢复的)、
    Elastic(可伸缩的)、
    Message Driven(消息驱动的)。

  • Reactive programming 是一种编程范式,类似面向对象编程。

  • Reactive Streams 则是落地的一套规范,Reactor 实现了 该规范。

  • Spring Web Flux 使用的是 Reactor。
    类似的反应式框架还有:Akka ,Ratpack ,Vert.x ,RxJava ,Reactive 等。

  • Reactive Stream 接口及说明:

角色 接口 功能实现 说明
发布者 Publisher Mono,Flux 数据的载体:Mono 0到1个元素,Flux 0到无限个元素
订阅者 Subscriber 一系列回调方法 异步管道的衔接者
订阅产生的令牌 Subcription 与发布者进行交互 订阅请求成功时,发布者将其传递给订阅者。可以控制请求多少的元素或取消订阅。
处理者 Processor<T,R> 处理阶段进行元素的转换等 在发布者—订阅者的管道中作为转换器的作用
  • 接口规范
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    Publisher:是元素(消息)序列的提供者,根据它的订阅者的需求,来发布这些元素(消息)。
    public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
    }
    Subscriber:当通过 Publisher.subscribe(Subscriber) 注册后,它将通过 Subscriber.onSubscribe(Subscription) 来接收消息。
    public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
    }
    Subscription:代表了消息从 Publisher 到 Subscriber 的一个一对一的生命周期。
    public interface Subscription {
    public void request(long n);
    public void cancel();
    }
    Processor:继承了 Publisher 和 Subscriber,用于转换发布者到订阅者之间管道中的元素。Processor<T,R> 订阅类型为 T 的数据元素,接收并转换为类型为 R 的数据,然后发布变换后的数据。
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }

回压式流Flux

概念:数据传输的异步非阻塞管道,具有流控能力的数据流

接收端进程从socket读数据的速度跟不上发送端进程向socket写数据的速度,最终导致发送端write调用阻塞

“避免压垮数据收件者,这样就不会阻塞任何线程”,不需要阻塞,就不需要更多的线程

回压式流,消费者决定流速,当消费速度跟不上生产的速度时,生产者按策略进行流控。
起到了保护接收者,生产者也不需要阻塞的效果。那么,从本质上更好的解决了,服务调用之间的可靠性。

图片

流量控制

  • 当订阅者处理慢于发布者时,发布者的按照策略进行流量控制
    订阅者订阅时可以指定流控策略,并覆盖发布者设置的流控策略

  • 发布者与订阅者不在同一个线程中,使用Subscription 的 request方法可以控制速率

速度 是否流控 模式 说明
P快S慢 需要流控 pull Subscriber 使用Subscription 控制 request(n)
P慢S快 不需要流控 push onSubscribe 默认 Long.MAX_VALUE

流控策略

  • ERROR: 当下游跟不上节奏的时候发出一个错误信号。
  • DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
  • LATEST:让下游只得到上游最新的元素。
  • BUFFER:缓存下游没有来得及处理的元素(如果缓存不限大小的可能导致OutOfMemoryError)。

反应式编程接口及操作

Publisher创建:

  • 常用
    just
    never
    empty
    error
    range
    interval
    concact
    zip
    fromArray,fromIterable fromFuture,fromCallable 等

  • generate,同步和逐一的方式来产生 Flux 序列。

  • create,使用 FluxSink 支持同步,异步,批量的消息产生。 可以将一系列事件转换成异步的事件流。

  • WebClient Spring5 WebFlux 提供的响应式调用工具

转化

  • 中间阶段
    filter
    map
    flatMap
    then
    zip
    reduce 等
  • 结束阶段 - 订阅
    subscribe

异步调度和并发

  • publishOn , subscribeOn 可以使用 Schedulers线程调度器

  • runOn(Scheduler) 开启并行执行

更多的操作

无副作用

doOnNext、doOnError、doOncomplete、doOnSubscribe、doOnCancel等

转换

when、and/or、merge、concat、collect、count、repeat

过滤/拣选

take、first、last、sample、skip、limitRequest

timeout、onErrorReturn、onErrorResume、doFinally、retryWhen
分批

window、buffer、group

其它

  • Restfull Controller 开启流式数据传递的管道,常用MediaType有:
    TEXT_EVENT_STREAM
    APPLICATION_STREAM_JSON_VALUE

  • 如json流式产生者
    @GetMapping(value = “”, produces = MediaType.APPLICATION_STREAM_JSON_VALUE)

  • 如json流式消费者
    @PostMapping(path=”/persons”,consumes=MediaType.APPLICATION_STREAM_JSON_VALUE)

  • 反应式数据源支持
    目前,各个数据库都开始陆续推出异步驱动,Spring Data已经支持的响应式数据库有MongoDB、Redis、Apache Cassandra和CouchDB。

微服务的更多整理,可以参考下一篇博客。

反应式编程参考文献

https://htmlpreview.github.io/?https://github.com/get-set/reactor-core/blob/master-zh/src/docs/index.html