经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » MySQL » 查看文章
深入理解r2dbc在mysql中的使用
来源:jb51  时间:2021/2/18 16:18:28  对本文有异议

简介

mysql应该是我们在日常工作中使用到的一个非常普遍的数据库,虽然mysql现在是oracle公司的,但是它是开源的,市场占有率还是非常高的。

今天我们将会介绍r2dbc在mysql中的使用。

r2dbc-mysql的maven依赖

要想使用r2dbc-mysql,我们需要添加如下的maven依赖:

  1. <dependency>
  2. <groupId>dev.miku</groupId>
  3. <artifactId>r2dbc-mysql</artifactId>
  4. <version>0.8.2.RELEASE</version>
  5. </dependency>

当然,如果你想使用snapshot版本的话,可以这样:

  1. <dependency>
  2. <groupId>dev.miku</groupId>
  3. <artifactId>r2dbc-mysql</artifactId>
  4. <version>${r2dbc-mysql.version}.BUILD-SNAPSHOT</version>
  5. </dependency>
  6.  
  7. <repository>
  8. <id>sonatype-snapshots</id>
  9. <name>SonaType Snapshots</name>
  10. <url>https://oss.sonatype.org/content/repositories/snapshots</url>
  11. <snapshots>
  12. <enabled>true</enabled>
  13. </snapshots>
  14. </repository>
  15.  

创建connectionFactory

创建connectionFactory的代码实际上使用的r2dbc的标准接口,所以和之前讲到的h2的创建代码基本上是一样的:

  1. // Notice: the query string must be URL encoded
  2. ConnectionFactory connectionFactory = ConnectionFactories.get(
  3. "r2dbcs:mysql://root:database-password-in-here@127.0.0.1:3306/r2dbc?" +
  4. "zeroDate=use_round&" +
  5. "sslMode=verify_identity&" +
  6. "useServerPrepareStatement=true&" +
  7. "tlsVersion=TLSv1.3%2CTLSv1.2%2CTLSv1.1&" +
  8. "sslCa=%2Fpath%2Fto%2Fmysql%2Fca.pem&" +
  9. "sslKey=%2Fpath%2Fto%2Fmysql%2Fclient-key.pem&" +
  10. "sslCert=%2Fpath%2Fto%2Fmysql%2Fclient-cert.pem&" +
  11. "sslKeyPassword=key-pem-password-in-here"
  12. )
  13.  
  14. // Creating a Mono using Project Reactor
  15. Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

不同的是ConnectionFactories传入的参数不同。

我们也支持unix domain socket的格式:

  1. // Minimum configuration for unix domain socket
  2. ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:mysql://root@unix?unixSocket=%2Fpath%2Fto%2Fmysql.sock")
  3.  
  4. Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

同样的,我们也支持从ConnectionFactoryOptions中创建ConnectionFactory:

  1. ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
  2. .option(DRIVER, "mysql")
  3. .option(HOST, "127.0.0.1")
  4. .option(USER, "root")
  5. .option(PORT, 3306) // optional, default 3306
  6. .option(PASSWORD, "database-password-in-here") // optional, default null, null means has no password
  7. .option(DATABASE, "r2dbc") // optional, default null, null means not specifying the database
  8. .option(CONNECT_TIMEOUT, Duration.ofSeconds(3)) // optional, default null, null means no timeout
  9. .option(SSL, true) // optional, default sslMode is "preferred", it will be ignore if sslMode is set
  10. .option(Option.valueOf("sslMode"), "verify_identity") // optional, default "preferred"
  11. .option(Option.valueOf("sslCa"), "/path/to/mysql/ca.pem") // required when sslMode is verify_ca or verify_identity, default null, null means has no server CA cert
  12. .option(Option.valueOf("sslCert"), "/path/to/mysql/client-cert.pem") // optional, default null, null means has no client cert
  13. .option(Option.valueOf("sslKey"), "/path/to/mysql/client-key.pem") // optional, default null, null means has no client key
  14. .option(Option.valueOf("sslKeyPassword"), "key-pem-password-in-here") // optional, default null, null means has no password for client key (i.e. "sslKey")
  15. .option(Option.valueOf("tlsVersion"), "TLSv1.3,TLSv1.2,TLSv1.1") // optional, default is auto-selected by the server
  16. .option(Option.valueOf("sslHostnameVerifier"), "com.example.demo.MyVerifier") // optional, default is null, null means use standard verifier
  17. .option(Option.valueOf("sslContextBuilderCustomizer"), "com.example.demo.MyCustomizer") // optional, default is no-op customizer
  18. .option(Option.valueOf("zeroDate"), "use_null") // optional, default "use_null"
  19. .option(Option.valueOf("useServerPrepareStatement"), true) // optional, default false
  20. .option(Option.valueOf("tcpKeepAlive"), true) // optional, default false
  21. .option(Option.valueOf("tcpNoDelay"), true) // optional, default false
  22. .option(Option.valueOf("autodetectExtensions"), false) // optional, default false
  23. .build();
  24. ConnectionFactory connectionFactory = ConnectionFactories.get(options);
  25.  
  26. // Creating a Mono using Project Reactor
  27. Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

或者下面的unix domain socket格式:

  1. // Minimum configuration for unix domain socket
  2. ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
  3. .option(DRIVER, "mysql")
  4. .option(Option.valueOf("unixSocket"), "/path/to/mysql.sock")
  5. .option(USER, "root")
  6. .build();
  7. ConnectionFactory connectionFactory = ConnectionFactories.get(options);
  8.  
  9. Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

使用MySqlConnectionFactory创建connection

上面的例子中,我们使用的是通用的r2dbc api来创建connection,同样的,我们也可以使用特有的MySqlConnectionFactory来创建connection:

  1. MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
  2. .host("127.0.0.1")
  3. .user("root")
  4. .port(3306) // optional, default 3306
  5. .password("database-password-in-here") // optional, default null, null means has no password
  6. .database("r2dbc") // optional, default null, null means not specifying the database
  7. .serverZoneId(ZoneId.of("Continent/City")) // optional, default null, null means query server time zone when connection init
  8. .connectTimeout(Duration.ofSeconds(3)) // optional, default null, null means no timeout
  9. .sslMode(SslMode.VERIFY_IDENTITY) // optional, default SslMode.PREFERRED
  10. .sslCa("/path/to/mysql/ca.pem") // required when sslMode is VERIFY_CA or VERIFY_IDENTITY, default null, null means has no server CA cert
  11. .sslCert("/path/to/mysql/client-cert.pem") // optional, default has no client SSL certificate
  12. .sslKey("/path/to/mysql/client-key.pem") // optional, default has no client SSL key
  13. .sslKeyPassword("key-pem-password-in-here") // optional, default has no client SSL key password
  14. .tlsVersion(TlsVersions.TLS1_3, TlsVersions.TLS1_2, TlsVersions.TLS1_1) // optional, default is auto-selected by the server
  15. .sslHostnameVerifier(MyVerifier.INSTANCE) // optional, default is null, null means use standard verifier
  16. .sslContextBuilderCustomizer(MyCustomizer.INSTANCE) // optional, default is no-op customizer
  17. .zeroDateOption(ZeroDateOption.USE_NULL) // optional, default ZeroDateOption.USE_NULL
  18. .useServerPrepareStatement() // Use server-preparing statements, default use client-preparing statements
  19. .tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false
  20. .tcpNoDelay(true) // optional, controls TCP No Delay, default is false
  21. .autodetectExtensions(false) // optional, controls extension auto-detect, default is true
  22. .extendWith(MyExtension.INSTANCE) // optional, manual extend an extension into extensions, default using auto-detect
  23. .build();
  24. ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);
  25.  
  26. // Creating a Mono using Project Reactor
  27. Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
  28.  

或者下面的unix domain socket方式:

  1. // Minimum configuration for unix domain socket
  2. MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
  3. .unixSocket("/path/to/mysql.sock")
  4. .user("root")
  5. .build();
  6. ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);
  7.  
  8. Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
  9.  

执行statement

首先看一个简单的不带参数的statement:

  1. connection.createStatement("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')")
  2. .execute(); // return a Publisher include one Result

然后看一个带参数的statement:

  1. connection.createStatement("INSERT INTO `person` (`birth`, `nickname`, `show_name`) VALUES (?, ?name, ?name)")
  2. .bind(0, LocalDateTime.of(2019, 6, 25, 12, 12, 12))
  3. .bind("name", "Some one") // Not one-to-one binding, call twice of native index-bindings, or call once of name-bindings.
  4. .add()
  5. .bind(0, LocalDateTime.of(2009, 6, 25, 12, 12, 12))
  6. .bind(1, "My Nickname")
  7. .bind(2, "Naming show")
  8. .returnGeneratedValues("generated_id")
  9. .execute(); // return a Publisher include two Results.

注意,如果参数是null的话,可以使用bindNull来进行null值的绑定。

接下来我们看一个批量执行的操作:

  1. connection.createBatch()
  2. .add("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')")
  3. .add("UPDATE `earth` SET `count` = `count` + 1 WHERE `id` = 'human'")
  4. .execute(); // return a Publisher include two Results.

执行事务

我们看一个执行事务的例子:

  1. connection.beginTransaction()
  2. .then(Mono.from(connection.createStatement("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who', 'how')").execute()))
  3. .flatMap(Result::getRowsUpdated)
  4. .thenMany(connection.createStatement("INSERT INTO `person` (`birth`, `nickname`, `show_name`) VALUES (?, ?name, ?name)")
  5. .bind(0, LocalDateTime.of(2019, 6, 25, 12, 12, 12))
  6. .bind("name", "Some one")
  7. .add()
  8. .bind(0, LocalDateTime.of(2009, 6, 25, 12, 12, 12))
  9. .bind(1, "My Nickname")
  10. .bind(2, "Naming show")
  11. .returnGeneratedValues("generated_id")
  12. .execute())
  13. .flatMap(Result::getRowsUpdated)
  14. .then(connection.commitTransaction());

使用线程池

为了提升数据库的执行效率,减少建立连接的开销,一般数据库连接都会有连接池的概念,同样的r2dbc也有一个叫做r2dbc-pool的连接池。

r2dbc-pool的依赖:

  1. <dependency>
  2. <groupId>io.r2dbc</groupId>
  3. <artifactId>r2dbc-pool</artifactId>
  4. <version>${version}</version>
  5. </dependency>

如果你想使用snapshot版本,也可以这样指定:

  1. <dependency>
  2. <groupId>io.r2dbc</groupId>
  3. <artifactId>r2dbc-pool</artifactId>
  4. <version>${version}.BUILD-SNAPSHOT</version>
  5. </dependency>
  6.  
  7. <repository>
  8. <id>spring-libs-snapshot</id>
  9. <name>Spring Snapshot Repository</name>
  10. <url>https://repo.spring.io/libs-snapshot</url>
  11. </repository>

看一下怎么指定数据库连接池:

  1. ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:pool:<my-driver>://<host>:<port>/<database>[?maxIdleTime=PT60S[&…]");
  2.  
  3. Publisher<? extends Connection> connectionPublisher = connectionFactory.create();
  4.  

可以看到,我们只需要在连接URL上面添加pool这个driver即可。

同样的,我们也可以通过ConnectionFactoryOptions来创建:

  1. ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
  2. .option(DRIVER, "pool")
  3. .option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
  4. .option(HOST, "…")
  5. .option(PORT, "…")
  6. .option(USER, "…")
  7. .option(PASSWORD, "…")
  8. .option(DATABASE, "…")
  9. .build());
  10.  
  11. Publisher<? extends Connection> connectionPublisher = connectionFactory.create();
  12.  
  13. // Alternative: Creating a Mono using Project Reactor
  14. Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

最后, 你也可以直接通过创建ConnectionPoolConfiguration来使用线程池:

  1. ConnectionFactory connectionFactory = …;
  2.  
  3. ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
  4. .maxIdleTime(Duration.ofMillis(1000))
  5. .maxSize(20)
  6. .build();
  7.  
  8. ConnectionPool pool = new ConnectionPool(configuration);
  9.  
  10. Mono<Connection> connectionMono = pool.create();
  11.  
  12. // later
  13.  
  14. Connection connection = …;
  15. Mono<Void> release = connection.close(); // released the connection back to the pool
  16.  
  17. // application shutdown
  18. pool.dispose();

到此这篇关于深入理解r2dbc在mysql中的使用的文章就介绍到这了,更多相关mysql r2dbc 内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号