经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » MongoDB » 查看文章
Reactive-MongoDB 异步 Java Driver 解读
来源:cnblogs  作者:wefeng  时间:2019/10/22 13:42:54  对本文有异议

一、关于 异步驱动

从3.0 版本开始,MongoDB 开始提供异步方式的驱动(Java Async Driver),这为应用提供了一种更高性能的选择。
但实质上,使用同步驱动(Java Sync Driver)的项目也不在少数,或许是因为先入为主的原因(同步Driver的文档说明更加的完善),又或者是为了兼容旧的 MongoDB 版本。
无论如何,由于 Reactive 的发展,未来使用异步驱动应该是一个趋势。

在使用 Async Driver 之前,需要对 Reactive 的概念有一些熟悉。

二、理解 Reactive (响应式)

响应式(Reactive)是一种异步的、面向数据流的开发方式,最早是来自于.NET 平台上的 Reactive Extensions 库,随后被扩展为各种编程语言的实现。
在著名的 Reactive Manifesto(响应式宣言) 中,对 Reactive 定义了四个特征:

reactive-specs

  • 及时响应(Responsive):系统能及时的响应请求。
  • 有韧性(Resilient):系统在出现异常时仍然可以响应,即支持容错。
  • 有弹性(Elastic):在不同的负载下,系统可弹性伸缩来保证运行。
  • 消息驱动(Message Driven):不同组件之间使用异步消息传递来进行交互,并确保松耦合及相互隔离。

在响应式宣言的所定义的这些系统特征中,无一不与响应式的流有若干的关系,于是乎就有了 2013年发起的 响应式流规范(Reactive Stream Specification)。

https://www.reactive-streams.org/

其中,对于响应式流的处理环节又做了如下定义:

  • 具有处理无限数量的元素的能力,即允许流永不结束
  • 按序处理
  • 异步地传递元素
  • 实现非阻塞的负压(back-pressure)

Java 平台则是在 JDK 9 版本上发布了对 Reactive Streams 的支持。

下面介绍响应式流的几个关键接口:

  • Publisher
    Publisher 是数据的发布者。Publisher 接口只有一个方法 subscribe,用于添加数据的订阅者,也就是 Subscriber。
  • Subscriber
    Subscriber 是数据的订阅者。Subscriber 接口有4个方法,都是作为不同事件的处理器。在订阅者成功订阅到发布者之后,其 onSubscribe(Subscription s) 方法会被调用。
    Subscription 表示的是当前的订阅关系。

当订阅成功后,可以使用 Subscription 的 request(long n) 方法来请求发布者发布 n 条数据。发布者可能产生3种不同的消息通知,分别对应 Subscriber 的另外3个回调方法。

数据通知:对应 onNext 方法,表示发布者产生的数据。
错误通知:对应 onError 方法,表示发布者产生了错误。
结束通知:对应 onComplete 方法,表示发布者已经完成了所有数据的发布。
在上述3种通知中,错误通知和结束通知都是终结通知,也就是在终结通知之后,不会再有其他通知产生。

  • Subscription
    Subscription 表示的是一个订阅关系。除了之前提到的 request 方法之外,还有 cancel 方法用来取消订阅。需要注意的是,在 cancel 方法调用之后,发布者仍然有可能继续发布通知。但订阅最终会被取消。

这几个接口的关系如下图所示:

reactive interfaces

图片出处:http://wiki.jikexueyuan.com/index.php/project/reactor-2.0/05.html

MongoDB 的异步驱动为 mongo-java-driver-reactivestreams 组件,其实现了 Reactive Stream 的上述接口。

> 除了 reactivestream 之外,MongoDB 的异步驱动还包含 RxJava 等风格的版本,有兴趣的读者可以进一步了解

http://mongodb.github.io/mongo-java-driver-reactivestreams/1.11/getting-started/quick-tour-primer/

三、使用示例

接下来,通过一个简单的例子来演示一下 Reactive 方式的代码风格:

A. 引入依赖

  1. org.mongodb
  2. mongodb-driver-reactivestreams
  3. 1.11.0

> 引入mongodb-driver-reactivestreams 将会自动添加 reactive-streams, bson, mongodb-driver-async组件

B. 连接数据库

  1. //服务器实例表
  2. List servers = new ArrayList();
  3. servers.add(new ServerAddress("localhost", 27018));
  4. //配置构建器
  5. MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder();
  6. //传入服务器实例
  7. settingsBuilder.applyToClusterSettings(
  8. builder -> builder.hosts(servers));
  9. //构建 Client 实例
  10. MongoClient mongoClient = MongoClients.create(settingsBuilder.build());

C. 实现文档查询

  1. //获得数据库对象
  2. MongoDatabase database = client.getDatabase(databaseName);
  3. //获得集合
  4. MongoCollection collection = database.getCollection(collectionName);
  5. //异步返回Publisher
  6. FindPublisher publisher = collection.find();
  7. //订阅实现
  8. publisher.subscribe(new Subscriber() {
  9. @Override
  10. public void onSubscribe(Subscription s) {
  11. System.out.println("start...");
  12. //执行请求
  13. s.request(Integer.MAX_VALUE);
  14. }
  15. @Override
  16. public void onNext(Document document) {
  17. //获得文档
  18. System.out.println("Document:" + document.toJson());
  19. }
  20. @Override
  21. public void onError(Throwable t) {
  22. System.out.println("error occurs.");
  23. }
  24. @Override
  25. public void onComplete() {
  26. System.out.println("finished.");
  27. }
  28. });

注意到,与使用同步驱动不同的是,collection.find()方法返回的不是 Cursor,而是一个 FindPublisher对象,这是Publisher接口的一层扩展。
而且,在返回 Publisher 对象时,此时并没有产生真正的数据库IO请求。 真正发起请求需要通过调用 Subscription.request()方法。
在上面的代码中,为了读取由 Publisher 产生的结果,通过自定义一个Subscriber,在onSubscribe 事件触发时就执行 数据库的请求,之后分别对 onNext、onError、onComplete进行处理。

尽管这种实现方式是纯异步的,但在使用上比较繁琐。试想如果对于每个数据库操作都要完成一个Subscriber 逻辑,那么开发的工作量是巨大的。

为了尽可能复用重复的逻辑,可以对Subscriber的逻辑做一层封装,包含如下功能:

  • 使用 List 容器对请求结果进行缓存
  • 实现阻塞等待结果的方法,可指定超时时间
  • 捕获异常,在等待结果时抛出

代码如下:

  1. public class ObservableSubscriber implements Subscriber {
  2. //响应数据
  3. private final List received;
  4. //错误信息
  5. private final List errors;
  6. //等待对象
  7. private final CountDownLatch latch;
  8. //订阅器
  9. private volatile Subscription subscription;
  10. //是否完成
  11. private volatile boolean completed;
  12. public ObservableSubscriber() {
  13. this.received = new ArrayList();
  14. this.errors = new ArrayList();
  15. this.latch = new CountDownLatch(1);
  16. }
  17. @Override
  18. public void onSubscribe(final Subscription s) {
  19. subscription = s;
  20. }
  21. @Override
  22. public void onNext(final T t) {
  23. received.add(t);
  24. }
  25. @Override
  26. public void onError(final Throwable t) {
  27. errors.add(t);
  28. onComplete();
  29. }
  30. @Override
  31. public void onComplete() {
  32. completed = true;
  33. latch.countDown();
  34. }
  35. public Subscription getSubscription() {
  36. return subscription;
  37. }
  38. public List getReceived() {
  39. return received;
  40. }
  41. public Throwable getError() {
  42. if (errors.size() > 0) {
  43. return errors.get(0);
  44. }
  45. return null;
  46. }
  47. public boolean isCompleted() {
  48. return completed;
  49. }
  50. /**
  51. * 阻塞一定时间等待结果
  52. *
  53. * @param timeout
  54. * @param unit
  55. * @return
  56. * @throws Throwable
  57. */
  58. public List get(final long timeout, final TimeUnit unit) throws Throwable {
  59. return await(timeout, unit).getReceived();
  60. }
  61. /**
  62. * 一直阻塞等待请求完成
  63. *
  64. * @return
  65. * @throws Throwable
  66. */
  67. public ObservableSubscriber await() throws Throwable {
  68. return await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
  69. }
  70. /**
  71. * 阻塞一定时间等待完成
  72. *
  73. * @param timeout
  74. * @param unit
  75. * @return
  76. * @throws Throwable
  77. */
  78. public ObservableSubscriber await(final long timeout, final TimeUnit unit) throws Throwable {
  79. subscription.request(Integer.MAX_VALUE);
  80. if (!latch.await(timeout, unit)) {
  81. throw new MongoTimeoutException("Publisher onComplete timed out");
  82. }
  83. if (!errors.isEmpty()) {
  84. throw errors.get(0);
  85. }
  86. return this;
  87. }
  88. }

借助这个基础的工具类,我们对于文档的异步操作就变得简单多了。
比如对于文档查询的操作可以改造如下:

  1. ObservableSubscriber subscriber = new ObservableSubscriber();
  2. collection.find().subscribe(subscriber);
  3. //结果处理
  4. subscriber.get(15, TimeUnit.SECONDS).forEach( d -> {
  5. System.out.println("Document:" + d.toJson());
  6. });

当然,这个例子还有可以继续完善,比如使用 List 作为缓存,则要考虑数据量的问题,避免将全部(或超量) 的文档一次性转入内存。

原文地址:https://www.mongochina.com/article/655.html

原文链接:http://www.cnblogs.com/wefeng/p/11718970.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号