ElasticSearch之——WebFlux

3.WebFlux

是不是感觉Elasticsearch的异步API感觉很别扭,而且似乎没什么用处?

那是因为我们之前学习的所有Web程序组件,都是同步阻塞的编程方式。包括:Tomcat、Servlet、SpringMVC、Mybatis等等。

因此在同步阻塞运行大环境下,使用异步API,就难以发挥异步调用的长处。

同步和异步执行,到底有怎样的差异呢?

3.1.同步和异步

那么同步和异步之间究竟有怎样的差异呢?

3.1.1.同步请求

一个典型的请求同步处理过程:

  • 1.开启线程,接收用户
  • 2.处理业务
  • 3.调用DB(比如elasticsearch)
  • 4.等待DB返回数据(无效的等待)
  • 5.返回数据给用户

image-20200226230629880

3.1.2.异步请求

一个典型的请求异步处理过程:

  • 1.开启线程,接收用户
  • 2.处理业务
  • 3.调用DB(比如elasticsearch)
  • 4.不等待,处理其他请求(同时DB准备数据)
  • 5.DB返回数据,传递给异步回调函数
  • 6.返回给用户

image-20200226231512179

虽然处理过程麻烦了,但是无疑我们可以同时服务更多请求了,系统的吞吐量得到了提升。

3.1.3.总结

异步执行相对于同步执行,可以大大提高CPU的利用率,减少CPU空置时间,提高服务的吞吐量,但是并不能减少单次请求的执行耗时

那么,有没有一套可以取代tomcat、SpringMVC,实现异步编程的Web应用方案呢?

3.1.4.异步Servlet(了解)

在Servlet3.0已经支持了异步编程,来看一个同步和异步Servlet的实例。

新建一个web项目,并在其中创建两个servlet:

1)同步运行的servlet

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package cn.itcast.demo.servlet;

import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@WebServlet(name = "SyncServlet", urlPatterns = "/sync")
public class SyncServlet extends HttpServlet {
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws javax.servlet.ServletException, IOException {
doGet(request, response);
}

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws javax.servlet.ServletException, IOException {
long b = System.currentTimeMillis();
// 执行业务
doSomething(response);
// 打印耗时时间
System.out.println("sync use: " + (System.currentTimeMillis() - b));
}

private void doSomething(HttpServletResponse response) {
try {
// 模拟延迟
Thread.sleep(3000);
// 模拟业务
response.getWriter().write("sync, hello !");
} catch (Exception e) {
e.printStackTrace();
}

}
}

在业务中,休眠3秒来模拟业务的执行。

2)异步运行的Servlet

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.itcast.demo.servlet;

import javax.servlet.AsyncContext;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@WebServlet(name = "AsyncServlet", asyncSupported = true, urlPatterns = "/async")
public class AsyncServlet extends HttpServlet {
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws javax.servlet.ServletException, IOException {
doGet(request, response);
}

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws javax.servlet.ServletException, IOException {
long b = System.currentTimeMillis();
// 开启异步任务
AsyncContext asyncContext = request.startAsync();
// 执行业务
asyncContext.start(() -> doSomething(asyncContext));
// 打印耗时时间
System.out.println("async use:" + (System.currentTimeMillis() - b));
}

private void doSomething(AsyncContext context) {
try {
// 模拟延迟
Thread.sleep(5000);
// 模拟业务
context.getResponse().getWriter().write("async, hello !");
} catch (Exception e) {
e.printStackTrace();
}
// 结束业务
context.complete();
}
}

注意关键点:

  • asyncSupported:在@WebServlet注解上,通过asyncSupported=true来开启异步支持
  • AsyncContext:通过request.startAsync()来初始化一个AsyncContext对象,并创建异步运行环境。
  • asyncContext.start():通过AsyncContext的start(Runnable task)方法来开启一个异步任务,
  • context.complete():通过context.complete()来标记业务结束

3)测试

启动项目,并分别访问:

1
2
localhost:8080/sync
localhost:8080/async

虽然浏览器返回结果都是耗时3秒:

image-20200227001639456

不过,在服务端AsyncServlet的执行却并没有阻塞:

image-20200227001721210

由此可见,异步执行的Servlet不需要阻塞等待任务执行结果,而是处理其它请求,应用的并发能力就得到了提升。

3.2.认识WebFlux

Spring框架中包含的原始Web框架Spring Web MVC是专门为Servlet API和Servlet容器而构建的,是一套同步阻塞的Web应用方案。

响应式Web框架Spring WebFlux在更高版本5.0中添加,它是完全非阻塞的,支持 Reactive Streams、背压,并在Netty,Undertow和Servlet 3.1+容器等服务器上运行。

Spring的Reactive技术栈与传统Servlet技术栈:

image-20200227195254458

3.2.1.为什么需要WebFlux

虽然Servlet3.0也支持异步编程,但是Spring的WebFlux却并不推荐继续使用Servlet容器,而是默认使用Netty,为什么呢?

Spring官网给出的原因有两点:

  • Servlet虽然支持异步API,但是其中的FilterListener等组件依然是同步的。而且如getParameter()这样的方法是阻塞的。而WebFlux中使用Netty作为容器,其中的API都是异步或非阻塞式的。
  • JDK8中引入了stream的API、函数式编程的新特性。而这些恰好为非阻塞应用和链式编程提供了便捷。而Spring的WebFlux完全支持这些新的特性。

3.2.2.什么是响应式

我们知道什么是非阻塞、函数式编程。那么什么是响应式编程呢?

传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是的方式。每次由调用者通过 next()方法来获取序列中的下一个值。

响应式流采用的则是的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被主动推送到订阅者来进行处理。在响应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。

比如,我们调用elasticsearch的异步API,获取一个Future结果,这个结果就是数据的发布者,将来的某一刻会把从elasticsearch中拿到的数据发布出去。

而我们的web应用就可以作为消息的订阅者,订阅elasticsearch的数据,当有数据到来的时候写入response中,响应给用户。

在elasticsearch查询并处理数据的时候,我们的web应用可以去做其它事情,无需阻塞等待,提高CPU利用率

反应式编程最早由 .NET 平台上的 Reactive Extensions (Rx) 库来实现。后来迁移到 Java 平台之后就产生了著名的 RxJava 库,并产生了很多其他编程语言上的对应实现。在这些实现的基础上产生了后来的反应式流(Reactive Streams)规范。该规范定义了反应式流的相关接口,并将集成到 Java 9 中。

3.2.3.Reactor Project

Project Reactor是Java中非常流行的响应式编程库,官网:https://projectreactor.io/

前面提到的 RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。不过 RxJava 库也有其不足的地方。

Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念,建议大家以后又机会多多学习该框架。

3.2.4.Flux 和 Mono

Flux 和 Mono 是 Reactor 中的两个基本概念。

Flux 表示的是包含 0 到 N 个元素的异步序列,是一个数据的发布者(publisher)。可以通过subscribe()函数来订阅该发布者的数据。当数据产生时,订阅者的处理方法会执行。

Mono 表示的是包含 0 或者 1 个元素的异步序列。与Flux类似,也是一个数据的发布者(publisher)。

WebFlux的Controller中,业务处理的返回值必须是Mono或者Flux。而我们可以把service或dao中数据查询的过程封装到Mono或Flux中,数据查询成功后由Mono或Flux将数据发布出去。

然后Spring的WebFlux会订阅Mono或Flux数据,接收数据后写入Response,响应给用户

3.3.WebFlux入门

接下来,我们来看看如何构建一个基于WebFlux的web应用。

3.3.1.搭建工程

使用spring的initialize来搭建:

image-20200227135858380

项目名称:

image-20200227135910241

选择依赖:

image-20200227135919635

注意,这里必须选择Spring Reactive Web而不是Spring Web依赖。

然后,点击Next完成项目创建。

image-20200227140032169

3.3.2.Mono的Demo

为了降低学习难度,WebFlux开发时可以使用SpringMVC中的注解,因此Controller定义与以前差别不大,来看一个示例.

首先准备一个实体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package cn.itcast.demo.pojo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class User {
private String name;
private int age;
}

1)一个传统的Handler

我们先写一个同步阻塞的Handler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.itcast.demo.web;

import cn.itcast.demo.pojo.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
@RequestMapping("user")
public class UserController {

@GetMapping("/sync")
public User getUserSync() {
log.info("sync 开始执行");
User user = getUser();
log.info("sync 执行完毕");
return user;
}
// 生成user的方法
private User getUser(){
try {
// 模拟业务耗时1秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return User.of("Rose", 18);
}
}

2)异步的Handler

我们再来写一个异步的Handler:

1
2
3
4
5
6
7
@GetMapping("/mono")
public Mono<User> getUserMono() {
log.info("mono 开始执行");
Mono<User> mono = Mono.fromSupplier(this::getUser);
log.info("mono 执行完毕");
return mono;
}

代码要点:

  • 返回值:这里的返回值我们写的是Mono<User>,是一个只包含一个User元素的Publisher,它会将结果推送给客户端。
  • Mono.fromSupplier(this::getUser):构建Mono,参数是一个Supplier(无参有返回值的函数式接口),本例中是把getUser()方法作为数据的提供者。

3)测试结果

启动项目,分别访问下面地址:

http://localhost:8080/hello/sync

http://localhost:8080/hello/mono

可以看到控制台的信息:

image-20200227141049622

可以发现,传统的Handler执行getOne()方法,耗时2秒,而Mono方式执行耗时不足1毫秒

3.3.3.Flux的Demo

Mono代表返回值是一个元素,而Flux则代表返回值是一串元素,来看一个示例。

1)普通Flux模式

首先看下基本的Flux模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@GetMapping("/flux")
public Flux<User> getUserFlux() {
log.info("flux 开始执行");
Flux<User> flux = getUserWithFlux();
log.info("flux 执行完毕");
return flux;
}

private Flux<User> getUserWithFlux() {
return Flux.interval(Duration.ofSeconds(1))// 每隔1秒发射一个元素
.take(3) // 取前3个元素
// 将元素转为一个User对象
.map(i -> User.of("user_" + i, 20 + i.intValue()));
}

这里通过getUserWithFlux()方法生成了包含3个元素的Flux,生成每个元素的间隔是1秒,模拟任务耗时。

来看下页面效果:

发现在请求发出3秒后,浏览器拿到完整数据,并组织成了一个json的数组

2)通过EventStream格式化的Flux

上面的请求中,响应结果并没有像流一样,逐个返回到页面,而是在3秒后将结果组织为一个JSON数组返回。

如果我们希望数据像流一样,不断生成并返回到页面,我们设置响应模式为:text/event-stream类型。

代码如下:

1
2
3
4
5
6
7
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> getUserStream() {
log.info("stream 开始执行");
Flux<User> flux = getUserWithFlux();
log.info("stream 执行完毕");
return flux;
}

说明:MediaType.TEXT_EVENT_STREAM_VALUE枚举的值就是text/event-stream

效果图:

可以看到,请求发出后,数据会逐条返回到客户端,并渲染在页面,真的像数据流一样。最终实现数据从服务端向客户端的持续推送,类似看视频直播一样,这个就是HTML5中的SSE技术:Server Sent Events

SSE: 服务端到客户端的持续推流,基于http协议

WebSocket:服务端与客户端的双向持续交互,额外的一种协议

3.4.Flux和Mono的API

在WebFlux应用中,我们会的返回值通常都会用Flux或者Mono来封装,那么构建Flux和Mono的方式有哪些呢?

3.4.1.创建Flux的API

1)简单方法

首先是一些简单静态方法,这些方法添加一些包含固定元素的Flux或者简单数字的Flux。包括:

  • just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
  • fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
  • empty():创建一个不包含任何元素,只发布结束消息的序列。
  • error(Throwable error):创建一个只包含错误消息的序列。
  • never():创建一个不包含任何消息通知的序列。
  • range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
  • interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来生成

代码实例:

1
2
3
4
5
6
7
8
9
10
// 生成包含两个字符串的Flux:"Hello", "World"
Flux.just("Hello", "World").subscribe(System.out::println);
// 从一个Integer数组中创建Flux
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
// 空的Flux
Flux.empty().subscribe(System.out::println);
// 创建一个包含1到10的整数Integer类型的Flux
Flux.range(1, 10).subscribe(System.out::println);
// 每隔1秒生成一个元素
Flux.interval(Duration.ofSeconds(1)).subscribe(System.out::println)

如果生成元素的逻辑比较复杂,建议使用generate方法或者create方法

2)generate方法

generate()方法通过同步和逐一的方式来产生 Flux 序列。序列的产生是通过调用所提供的 SynchronousSink 对象的 next()complete()error(Throwable)方法来完成的。

代码:

1
2
3
4
5
6
Flux<String> flux = Flux.generate(sink -> {
// 添加一个元素
sink.next("Hello");
// 结束
sink.complete();
});

这个方法的含义是生成一个Flux,但是只包含一个元素:"hello";

generate()方法的另外一种形式 generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator),其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 定义一个随机生成器
final Random random = new Random();
// 把集合的构造函数作为初始化方法,形成的初始化变量,会传递给第二个函数作为参数
Flux<Integer> flux = Flux.generate(ArrayList::new, (list, sink) -> {
// 生成随机数
int value = random.nextInt(100);
// 加入集合
list.add(value);
// 加入Flux
sink.next(value);
if (list.size() == 10) {
// 当集合元素数量等于10,则停止
sink.complete();
}
return list;
});

这个方法的最终结果,是生成一个包含10个随机数的Flux

Generate方法比较抽象,有点类似于reduce方法。如果不好理解,建议使用create方法。

3)create方法

create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。在代码清单 3 中,在一次调用中就产生了全部的 10 个元素。

代码:

1
2
3
4
5
6
7
8
9
10
Flux<Integer> flux = Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
// 将循环遍历得到的数字,添加到flux
sink.next(i);
}
// 循环结束,任务完成
sink.complete();
});
// 会生成一个包含从0到9的int元素的Flux
flux.subscribe(System.out::println);

3.4.2.创建Mono的API

Mono中只包含一个元素,因此创建起来更加简单,其中包含一些与Flux类似的API:

  • just():指定Mono中的固定元素
  • empty():创建一个不包含任何元素,但会发送消息的Mono。
  • error(Throwable error):创建一个只包含错误消息的Mono。
  • never():创建一个不包含任何元素的Mono。
  • create():通过MonoSink来构建包含一个元素的Mono,可以使用MonoSink的success()、error()等方法。

代码:

1
2
Mono<String> mono = Mono.just("hello");
Mono<Object> mono = Mono.create(monoSink -> monoSink.success("hello"));

但Mono也有一些独特的方法:

  • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
  • delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
  • ignoreElements(Publisher source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
  • justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。

代码:

1
2
Mono<String> mono = Mono.fromSupplier(() -> "hello");
Mono<String> mono = Mono.fromFuture(CompletableFuture.supplyAsync(() -> "hello"));