WebFlux增删改查样例

WebFlux增删改查样例

跟着鸟哥学Spring Boot 2.0 WebFlux增删改查样例

Spring MVC Web架构是基于阻塞式Servlet API构建的。Servlet 3.1后提供了非阻塞API,Spring 5.0后基于这些API构建了一套全新的非阻塞Web框架 —— WebFlux。Spring Boot 2.0基于Spring 5.0构建,所以要在Spring Boot中使用WebFlux架构,版本必须大于2.0。

通过下面这张图了解下Spring MVC和Spring WebFlux的区别:

可以看到,Spring WebFlux是非阻塞式的,支持 Reactive Streams背压,并在Netty,Undertow和Servlet 3.1+容器等服务器上运行。其目前只支持非关系型数据库,如Mongo,Redis等。非阻塞式的编程模型可以提高程序的并发量,提升性能和吞吐量。

Mono和Flux在发布订阅模式中都属于发布者(不清楚的可以参考Java 9 Flow API学习),查看源码会发现它们都实现了Publisher接口。

Mono表示0 ~ 1个元素的数据发布者,Flux表示 0 ~ N个元素的数据发布者。我们可以通过一个例子来了解Mono和Flux,创建MonoFluxTest类:

Mono,Flux常用方法

源头操作

可以通过Flux类的静态方法来生成:

  1. just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
  2. fromArray()fromIterable()fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
  3. empty():创建一个不包含任何元素,只发布结束消息的序列。
  4. error(Throwable error):创建一个只包含错误消息的序列。
  5. never():创建一个不包含任何消息通知的序列。
  6. range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
  7. interval(Duration period)interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。

举些例子:

@Test
void flux() throws InterruptedException {
        Flux.just("Hello", "World").subscribe(System.out::println);
        Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
        Flux.empty().subscribe(System.out::println);
        Flux.range(1, 4).subscribe(System.out::println);
        Flux.interval(Duration.of(1, ChronoUnit.SECONDS)).subscribe(System.out::println);
        // 线程延迟关闭,不然最后一个例子木有输出
        Thread.currentThread().join(10000);
}

输出如下所示:

上面的这些静态方法适合于简单的Flux序列生成,当序列的生成需要复杂的逻辑时,则应该使用generate()create()方法。

generate()

generate()方法通过同步和逐一的方式来产生 Flux 序列。序列的产生是通过调用所提供的 SynchronousSink 对象的 next(),complete()和 error(Throwable)方法来完成的:

@Test
void flux1() throws InterruptedException {
    Flux.generate(sink -> {
        sink.next("Hello");
        sink.complete();
    }).subscribe(System.out::println);


    final Random random = new Random();
    Flux.generate(ArrayList::new, (list, sink) -> {
        int value = random.nextInt(100);
        list.add(value);
        sink.next(value);
        if (list.size() == 10) {
            sink.complete();
        }
        return list;
    }).subscribe(System.out::println);
}

输出如下所示:

如果不调用 complete()方法,所产生的是一个无限序列。

create()

create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素:

@Test
void flux2() {
    Flux.create(sink -> {
        for (int i = 0; i < 10; i++) {
            sink.next(i);
        }
        sink.complete();
    }).subscribe(System.out::println);
}

Mono

Mono 的创建方式与之前介绍的 Flux 比较相似。Mono 类中也包含了一些与 Flux 类中相同的静态方法。这些方法包括 just(),empty(),error()和 never()等。除了这些方法之外,Mono 还有一些独有的静态方法:

  1. fromCallable()fromCompletionStage()fromFuture()fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
  2. delay(Duration duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
  3. ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
  4. justOrEmpty(Optional<? extends T> data)justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
@Test
void mono() {
    Mono.just("are").subscribe(System.out::println);
    Mono.empty().subscribe(System.out::println);
    Mono.fromSupplier(() -> "you").subscribe(System.out::println);
    Mono.justOrEmpty(Optional.of("ok")).subscribe(System.out::println);
}

输出:

还可以通过 create()方法来使用 MonoSink 来创建 Mono:

Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
中间操作

filter

对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素:

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

输出前10偶数。

take

take 系列操作符用来从当前流中提取元素。提取的方式可以有很多种。

  1. take(long n):按照指定的数量来提取。
  2. takeLast(long n):提取流中的最后 N 个元素。
  3. takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。

4 takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。

举些例子:

Flux.range(1, 20).take(10).subscribe(System.out::println);
Flux.range(1, 20).takeLast(10).subscribe(System.out::println);
Flux.range(1, 20).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 20).takeUntil(i -> i == 10).subscribe(System.out::println);

reduce 和 reduceWith

reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

比如:

Flux.range(1, 10).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 10).reduceWith(() -> 10, (x, y) -> x + y).subscribe(System.out::println);

第一行语句对流中的元素进行相加操作,结果为 55;第二行语句同样也是进行相加操作,不过通过一个 Supplier 给出了初始值为 10,所以结果为 65。

merge

merge操作符用来把多个流合并成一个 Flux 序列:

Flux.merge(
        Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(2),
        Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(2)
).toStream().forEach(System.out::println);

输出 0 0 1 1。

buffer

buffer 操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素:

    @Test
    void buff() {
        //输出 5个包含 20 个元素的数组
        Flux.range(1, 100).buffer(20).subscribe(System.out::println);
        //bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;
        // 输出的是 5 个包含 2 个元素的数组
        // 每当遇到一个偶数就会结束当前的收集
        Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
        // 第四行语句输出的是 5 个包含 1 个元素的数组
        // 数组里面包含的只有偶数。
        Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
    }

输出如下所示:

终端处理

通过subscribe()方法处理正常和错误消息:

Flux.just(1, 2)
    .concatWith(Mono.error(new IllegalStateException()))
    .subscribe(System.out::println, System.err::println);

输出:

1
2
java.lang.IllegalStateException

出现错误时返回默认值:

Flux.just(1, 2)
    .concatWith(Mono.error(new IllegalStateException()))
    .onErrorReturn(0)
    .subscribe(System.out::println);

输出:

1
2
0

出现错误时使用另外的流:

Flux.just(1, 2)
    .concatWith(Mono.error(new IllegalArgumentException()))
    .onErrorResume(e -> {
        if (e instanceof IllegalStateException) {
            return Mono.just(0);
        } else if (e instanceof IllegalArgumentException) {
            return Mono.just(-1);
        }
        return Mono.empty();
    }).subscribe(System.out::println);

输出如下:

1
2
-1

学习连接

WebFlux增删改查样例

项目准备

新建一个Spring Boot项目,版本为2.1.3.RELEASE,并引入webfluxreactive mongodb依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

要开启Reactive Mongo DB的相关配置,需要在Spring Boot启动类上添加@EnableReactiveMongoRepositories注解:

@SpringBootApplication
@EnableReactiveMongoRepositories
public class SpringBootWebfluxCrudApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootWebfluxCrudApplication.class, args);
    }

}

接着在配置文件application.yml里配置Mongo DB连接:

spring:
  data:
    mongodb:
      host: localhost
      port: 27017
      database: user

创建User实体类:

@Document(collection = "user")
@Data
public class User {

    @Id
    private String id;
    private String name;
    private Integer age;
    private String description;

}
简单增删改查

创建UserDao接口,继承自ReactiveMongoRepository

@Repository
public interface UserDao extends ReactiveMongoRepository<User, String> {
}

和 Spring Boot整合Mongo DB 不同的是,我们继承的是ReactiveMongoRepository而非MongoRepository,它所提供的方法都是响应式的:

UserService里通过UserDao定义简单增删改查方法:

@Service
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class UserService {

    private final ReactiveMongoTemplate reactiveMongoTemplate;
    private final UserDao userDao;

    public Flux<User> getUsers() {
        return userDao.findAll();
    }

    public Mono<User> getUser(String id) {
        return this.userDao.findById(id);
    }

    public Mono<User> createUser(User user) {
        return userDao.save(user);
    }

    public Mono<Void> deleteUser(String id) {
        return this.userDao.findById(id)
                .flatMap(user -> this.userDao.delete(user));
    }

    public Mono<User> updateUser(String id, User user) {
        return this.userDao.findById(id)
            .flatMap(u -> {
                u.setName(user.getName());
                u.setAge(user.getAge());
                u.setDescription(user.getDescription());
                return this.userDao.save(u);
            });
    }

    public Mono<Page<User>> getUserByCondition(User user, Integer page, Integer size) {

        Criteria criteria = new Criteria();
        //根据名字查找
        if (!StringUtils.isEmpty(user.getName())) {
            criteria.and("name").is(user.getName());
        }
        //根据描述查找
        if (!StringUtils.isEmpty(user.getDescription())) {
            criteria.and("description").regex(user.getDescription());
        }
        //根据年龄降序
        Sort sort =Sort.by(Sort.Direction.DESC,"age");

        Pageable pageable = PageRequest.of(page, size,sort);
        Query query = new Query().with(pageable);
        query.addCriteria(criteria);
        Flux<User> chatUserFlux = reactiveMongoTemplate.find(query, User.class);
        Mono<Long> countMono = reactiveMongoTemplate.count(Query.of(query).limit(-1).skip(-1), User.class);
        return Mono.zip(chatUserFlux.collectList(),countMono).map(tuple2 -> PageableExecutionUtils.getPage(
                tuple2.getT1(),
                pageable,
                () -> tuple2.getT2()));

    }
}

编写RESTfulUserController

@RestController
@RequestMapping("user")
@RequiredArgsConstructor(onConstructor = @_(@Autowired))
public class UserController {

    private final UserService userService;

    /**
     * 以数组的形式一次性返回所有数据
     */
    @GetMapping
    public Flux<User> getUsers() {
        return userService.getUsers();
    }

    /**
     * 以 Server sent events形式多次返回数据
     */
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> getUsersStream() {
        return userService.getUsers();
    }

    @PostMapping
    public Mono<User> createUser(User user) {
        return userService.createUser(user);
    }

    /**
     * 存在返回 200,不存在返回 404
     */
    @DeleteMapping("/{id}")
    public Mono<ResponseEntity<Void>> deleteUser(@PathVariable String id) {
        return userService.deleteUser(id)
                .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 存在返回修改后的 User
     * 不存在返回 404
     */
    @PutMapping("/{id}")
    public Mono<ResponseEntity<User>> updateUser(@PathVariable String id, User user) {
        return userService.updateUser(id, user)
                .map(u -> new ResponseEntity<>(u, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 根据用户 id查找
     * 存在返回,不存在返回 404
     */
    @GetMapping("/{id}")
    public Mono<ResponseEntity<User>> getUser(@PathVariable String id) {
        return userService.getUser(id)
                .map(user -> new ResponseEntity<>(user, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    /**
     * 分页
     * @param size
     * @param page
     * @param user
     * @return
     */
    @GetMapping("/condition")
    public Mono<Page<User>>  getUserByCondition(int size, int page, User user) {
        return userService.getUserByCondition(user, page, size);
    }
}

对于返回值为Flux<T>类型的方法,推荐定义两个一样的方法,一个以普通形式返回,一个以Server Sent Event的形式返回。对于修改和删除,如果需要修改和删除的用户不存在,我们返回404。

postman测试
{
	"info": {
		"_postman_id": "57b5f9bd-0dc5-4c53-9478-e5e8e1158657",
		"name": "spring",
		"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
	},
	"item": [
		{
			"name": "webflux-mongodb",
			"item": [
				{
					"name": "getUser",
					"request": {
						"method": "GET",
						"header": [],
						"url": {
							"raw": "127.0.0.1:8080/user",
							"host": [
								"127",
								"0",
								"0",
								"1"
							],
							"port": "8080",
							"path": [
								"user"
							]
						}
					},
					"response": []
				},
				{
					"name": "getUserStream",
					"request": {
						"method": "GET",
						"header": [],
						"url": {
							"raw": "127.0.0.1:8080/user/stream",
							"host": [
								"127",
								"0",
								"0",
								"1"
							],
							"port": "8080",
							"path": [
								"user",
								"stream"
							]
						}
					},
					"response": []
				},
				{
					"name": "updateUser",
					"request": {
						"method": "GET",
						"header": [],
						"url": {
							"raw": "127.0.0.1:8080/user?name=yz&description=mongodb&age=20",
							"host": [
								"127",
								"0",
								"0",
								"1"
							],
							"port": "8080",
							"path": [
								"user"
							],
							"query": [
								{
									"key": "name",
									"value": "yz"
								},
								{
									"key": "description",
									"value": "mongodb"
								},
								{
									"key": "age",
									"value": "20"
								}
							]
						}
					},
					"response": []
				},
				{
					"name": "getUserByID",
					"request": {
						"method": "GET",
						"header": [],
						"url": {
							"raw": "127.0.0.1:8080/user/612122fb75da162aac4dee19",
							"host": [
								"127",
								"0",
								"0",
								"1"
							],
							"port": "8080",
							"path": [
								"user",
								"612122fb75da162aac4dee19"
							]
						}
					},
					"response": []
				},
				{
					"name": "deleteUserById",
					"request": {
						"method": "DELETE",
						"header": [],
						"url": {
							"raw": "127.0.0.1:8080/user/612122fb75da162aac4dee19",
							"host": [
								"127",
								"0",
								"0",
								"1"
							],
							"port": "8080",
							"path": [
								"user",
								"612122fb75da162aac4dee19"
							]
						}
					},
					"response": []
				},
				{
					"name": "updateUserById",
					"request": {
						"method": "GET",
						"header": [],
						"url": {
							"raw": "127.0.0.1:8080/user/612122fb75da162aac4dee19?name=yz&description=mongodb&age=20",
							"host": [
								"127",
								"0",
								"0",
								"1"
							],
							"port": "8080",
							"path": [
								"user",
								"612122fb75da162aac4dee19"
							],
							"query": [
								{
									"key": "name",
									"value": "yz"
								},
								{
									"key": "description",
									"value": "mongodb"
								},
								{
									"key": "age",
									"value": "20"
								}
							]
						}
					},
					"response": []
				},
				{
					"name": "分页",
					"request": {
						"method": "GET",
						"header": [],
						"url": {
							"raw": "127.0.0.1:8080/user/condition?size=10&page=0&name=yz",
							"host": [
								"127",
								"0",
								"0",
								"1"
							],
							"port": "8080",
							"path": [
								"user",
								"condition"
							],
							"query": [
								{
									"key": "size",
									"value": "10"
								},
								{
									"key": "page",
									"value": "0"
								},
								{
									"key": "name",
									"value": "yz"
								}
							]
						}
					},
					"response": []
				}
			]
		}
	]
}