经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
PICE(2):JDBCStreaming - gRPC-JDBC Service
来源:cnblogs  作者:雪川大虫  时间:2018/9/25 20:28:33  对本文有异议

   在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化特性。所以,JDBC数据库服务器必须通过服务方式来向外提供数据操。在这种场景里服务端是JDBC服务,其它节点,包括其它的JDBC数据库节点都是这个JDBC服务的调用客户端。因为我们已经明确选择了在akka-cluster集群环境里实施gRPC服务模式,通过akka-stream的流控制方式实现数据库操作的程序控制,所以在本次讨论里我们将示范说明gRPC-JDBC-Streaming的具体实现和使用方式。

在上次的讨论里我们已经示范了最简单的JDBC-Streaming Unary request/response模式:从客户端向JDBC-Service发送一个JDBCQuery、JDBC服务端运行JDBCQuery后向客户端返回一个数据流DataRows。jdbc.proto文件里用IDL定义的数据和服务类型如下:

  1. message JDBCDataRow {
  2. string year = 1;
  3. string state = 2;
  4. string county = 3;
  5. string value = 4;
  6. }
  7. message JDBCQuery {
  8. string dbName = 1;
  9. string statement = 2;
  10. bytes parameters = 3;
  11. google.protobuf.Int32Value fetchSize= 4;
  12. google.protobuf.BoolValue autoCommit = 5;
  13. google.protobuf.Int32Value queryTimeout = 6;
  14. }
  15. service JDBCServices {
  16. rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  17. }

以上数据类型JDBCDataRow和JDBCQuery分别对应JDBC-Streaming工具的流元素结构和JDBCQueryContext,如下:

  1. val toRow = (rs: WrappedResultSet) => JDBCDataRow(
  2. year = rs.string("REPORTYEAR"),
  3. state = rs.string("STATENAME"),
  4. county = rs.string("COUNTYNAME"),
  5. value = rs.string("VALUE")
  6. )
  7. val ctx = JDBCQueryContext[JDBCDataRow](
  8. dbName = Symbol(q.dbName),
  9. statement = q.statement,
  10. parameters = params
  11. fetchSize = q.fetchSize.getOrElse(100),
  12. autoCommit = q.autoCommit.getOrElse(false),
  13. queryTimeout = q.queryTimeout
  14. )
  15. jdbcAkkaStream(ctx, toRow)

用scalaPB编译后自动产生服务端和客户端框架代码(boilerplate-code)。我们需要实现具体的JDBC服务:

  1. class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {
  2. val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)
  3. val toRow = (rs: WrappedResultSet) => JDBCDataRow(
  4. year = rs.string("REPORTYEAR"),
  5. state = rs.string("STATENAME"),
  6. county = rs.string("COUNTYNAME"),
  7. value = rs.string("VALUE")
  8. )
  9. override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {
  10. logger.info("**** runQuery called on service side ***")
  11. Flow[JDBCQuery]
  12. .flatMapConcat { q =>
  13. //unpack JDBCQuery and construct the context
  14. val params: Seq[Any] = unmarshal[Seq[Any]](q.parameters)
  15. logger.info(s"**** query parameters: ${params} ****")
  16. val ctx = JDBCQueryContext[JDBCDataRow](
  17. dbName = Symbol(q.dbName),
  18. statement = q.statement,
  19. parameters = params,
  20. fetchSize = q.fetchSize.getOrElse(100),
  21. autoCommit = q.autoCommit.getOrElse(false),
  22. queryTimeout = q.queryTimeout
  23. )
  24. jdbcAkkaStream(ctx, toRow)
  25. }
  26. }
  27. }

下面是客户端调用服务示范:

  1. val query = JDBCQuery (
  2. dbName = "h2",
  3. statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
  4. parameters = marshal(Seq("Arizona", 5))
  5. )
  6. def queryRows: Source[JDBCDataRow,NotUsed] = {
  7. logger.info(s"running queryRows ...")
  8. Source
  9. .single(query)
  10. .via(stub.runQuery)
  11. }

这个程序的运行方式如下:

  1. object QueryRows extends App {
  2. implicit val system = ActorSystem("QueryRows")
  3. implicit val mat = ActorMaterializer.create(system)
  4. val client = new JDBCStreamClient("localhost", 50051)
  5. client.queryRows.runForeach(println)
  6. scala.io.StdIn.readLine()
  7. mat.shutdown()
  8. system.terminate()
  9. }

那么如果从客户端发出一串的JDBCQuery又如何呢?这也是所谓的BiDi-Streaming模式,在jdbc.proto的服务描述如下:

  1. service JDBCServices {
  2. rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  3. rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  4. }

我们看到batQuery的入参是一个stream。自动产生的服务函数batQuery款式是这样的:

  1. override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = { ... }
  2. override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery

runQuery和batQuery的函数款式是一样的。这就说明服务端提供的服务模式是一样的。在我们这个例子里它们都是对每个收到的JDBCQuery发还相关的数据流。实际上这两项服务的区别在客户方。下面是scalaPB产生的源代码:

  1. override def runQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] =
  2. Flow.fromGraph(
  3. new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow]({ outputObserver =>
  4. new StreamObserver[grpc.jdbc.services.JDBCQuery] {
  5. override def onError(t: Throwable): Unit = ()
  6. override def onCompleted(): Unit = ()
  7. override def onNext(request: grpc.jdbc.services.JDBCQuery): Unit =
  8. ClientCalls.asyncServerStreamingCall(
  9. channel.newCall(METHOD_RUN_QUERY, options),
  10. request,
  11. outputObserver
  12. )
  13. }
  14. })
  15. )
  16. ...
  17. override def batQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] =
  18. Flow.fromGraph(new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow](outputObserver =>
  19. ClientCalls.asyncBidiStreamingCall(
  20. channel.newCall(METHOD_BAT_QUERY, options),
  21. outputObserver
  22. )
  23. ))

所以在客户端我们调用batQuery:

  1. def batQueryRows: Source[JDBCDataRow,NotUsed] = {
  2. logger.info(s"running batQueryRows ...")
  3. Source
  4. .fromIterator(() => List(query,query2,query3).toIterator)
  5. .via(stub.batQuery)
  6. }

JDBC操作除Query之外还应该具备数据更新部分,包括Schema DDL和database-updates。JDBC-update是通过JDBCContext来传递更新要求的:

  1. case class JDBCContext(
  2. dbName: Symbol,
  3. statements: Seq[String] = Nil,
  4. parameters: Seq[Seq[Any]] = Nil,
  5. fetchSize: Int = 100,
  6. queryTimeout: Option[Int] = None,
  7. queryTags: Seq[String] = Nil,
  8. sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
  9. batch: Boolean = false,
  10. returnGeneratedKey: Seq[Option[Any]] = Nil,
  11. // no return: None, return by index: Some(1), by name: Some("id")
  12. preAction: Option[PreparedStatement => Unit] = None,
  13. postAction: Option[PreparedStatement => Unit] = None) {... }

这个class对应的protobuf message定义如下:

  1. message JDBCResult {
  2. bytes result = 1;
  3. }
  4. message JDBCUpdate {
  5. string dbName = 1;
  6. repeated string statements = 2;
  7. bytes parameters = 3;
  8. google.protobuf.Int32Value fetchSize= 4;
  9. google.protobuf.Int32Value queryTimeout = 5;
  10. int32 sqlType = 6;
  11. google.protobuf.Int32Value batch = 7;
  12. bytes returnGeneratedKey = 8;
  13. }
  14. service JDBCServices {
  15. rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  16. rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  17. rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
  18. }

服务函数runDDL返回消息类型JDBCResult: 包嵌一个Seq[Any]类型的返回值。下面是JDBCContext的protobuf message打包、还原使用方法示范,在服务端把JDBCUpdate拆解构建JDBCContext后调用jdbcExecuteDDL:

  1. override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = {
  2. logger.info("**** runDDL called on service side ***")
  3. Flow[JDBCUpdate]
  4. .flatMapConcat { context =>
  5. //unpack JDBCUpdate and construct the context
  6. val ctx = JDBCContext(
  7. dbName = Symbol(context.dbName),
  8. statements = context.statements,
  9. sqlType = JDBCContext.SQL_EXEDDL,
  10. queryTimeout = context.queryTimeout
  11. )
  12. logger.info(s"**** JDBCContext => ${ctx} ***")
  13. Source
  14. .fromFuture(jdbcExecuteDDL(ctx))
  15. .map { r => JDBCResult(marshal(r)) }
  16. }
  17. }

jdbcExecuteDDL返回Future[String],如下:

  1. def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
  2. if (ctx.sqlType != SQL_EXEDDL) {
  3. Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
  4. }
  5. else {
  6. Future {
  7. NamedDB(ctx.dbName) localTx { implicit session =>
  8. ctx.statements.foreach { stm =>
  9. val ddl = new SQLExecution(statement = stm, parameters = Nil)(
  10. before = WrappedResultSet => {})(
  11. after = WrappedResultSet => {})
  12. ddl.apply()
  13. }
  14. "SQL_EXEDDL executed succesfully."
  15. }
  16. }
  17. }
  18. }

我们可以用Source.fromFuture(jdbcExecuteDDL(cox))来构建一个akka-stream Source。 在客户端构建一个JDBCUpdate结构传给服务端进行运算:

  1. val dropSQL: String ="""
  2. drop table members
  3. """
  4. val createSQL: String ="""
  5. create table members (
  6. id serial not null primary key,
  7. name varchar(30) not null,
  8. description varchar(1000),
  9. birthday date,
  10. created_at timestamp not null,
  11. picture blob
  12. )"""
  13. val ctx = JDBCUpdate (
  14. dbName = "h2",
  15. sqlType = JDBCContext.SQL_EXEDDL,
  16. statements = Seq(dropSQL,createSQL)
  17. )
  18. def createTbl: Source[JDBCResult,NotUsed] = {
  19. logger.info(s"running createTbl ...")
  20. Source
  21. .single(ctx)
  22. .via(stub.runDDL)
  23. }

注意:statements = Seq(dropSQL,createSQL)包含了两个独立的SQL运算。

下面我们示范一下从客户端传送一个数据流(stream MemberRow),由服务端插入数据库操作。DDL数据类型和服务函数定义如下:

  1. message JDBCDate {
  2. int32 yyyy = 1;
  3. int32 mm = 2;
  4. int32 dd = 3;
  5. }
  6. message JDBCTime {
  7. int32 hh = 1;
  8. int32 mm = 2;
  9. int32 ss = 3;
  10. int32 nnn = 4;
  11. }
  12. message JDBCDateTime {
  13. JDBCDate date = 1;
  14. JDBCTime time = 2;
  15. }
  16. message MemberRow {
  17. string name = 1;
  18. JDBCDate birthday = 2;
  19. string description = 3;
  20. JDBCDateTime created_at = 4;
  21. bytes picture = 5;
  22. }
  23. service JDBCServices {
  24. rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  25. rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  26. rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
  27. rpc insertRows(stream MemberRow) returns(JDBCResult) {}
  28. }

insertRows服务函数的实现如下:

  1. override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = {
  2. logger.info("**** insertRows called on service side ***")
  3. val insertSQL = """
  4. insert into members(
  5. name,
  6. birthday,
  7. description,
  8. created_at
  9. ) values (?, ?, ?, ?)
  10. """
  11. Flow[MemberRow]
  12. .flatMapConcat { row =>
  13. val ctx = JDBCContext('h2)
  14. .setUpdateCommand(true,insertSQL,
  15. row.name,
  16. jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd),
  17. row.description,
  18. jdbcSetNow
  19. )
  20. logger.info(s"**** JDBCContext => ${ctx} ***")
  21. Source
  22. .fromFuture(jdbcTxUpdates[Vector](ctx))
  23. .map { r => JDBCResult(marshal(r)) }
  24. }
  25. }

同样,这个jdbcTxUpdates返回结果是Future类型。具体实现在附件的JDBCEngine.scala中。

客户端构建一个MemberRow流,然后经过stub.insertRows发送给服务端:

  1. val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)
  2. val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)
  3. val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)
  4. val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)
  5. def insertRows: Source[JDBCResult,NotUsed] = {
  6. logger.info(s"running insertRows ...")
  7. Source
  8. .fromIterator(() => List(p1,p2,p3,p4).toIterator)
  9. .via(stub.insertRows)
  10. }

最后,我们再示范jdbcBatchUpdate函数的使用。我们从服务端读取MemberRow再传回服务端进行更新操作。DDL如下:

  1. message MemberRow {
  2. string name = 1;
  3. JDBCDate birthday = 2;
  4. string description = 3;
  5. JDBCDateTime created_at = 4;
  6. bytes picture = 5;
  7. }
  8. service JDBCServices {
  9. rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  10. rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  11. rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
  12. rpc insertRows(stream MemberRow) returns(JDBCResult) {}
  13. rpc updateRows(stream MemberRow) returns(JDBCResult) {}
  14. rpc getMembers(JDBCQuery) returns (stream MemberRow) {}
  15. }

服务端函数定义如下:

  1. val toMemberRow = (rs: WrappedResultSet) => MemberRow(
  2. name = rs.string("name"),
  3. description = rs.string("description"),
  4. birthday = None,
  5. createdAt = None,
  6. picture = _root_.com.google.protobuf.ByteString.EMPTY
  7. )
  8. override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = {
  9. logger.info("**** getMembers called on service side ***")
  10. Flow[JDBCQuery]
  11. .flatMapConcat { q =>
  12. //unpack JDBCQuery and construct the context
  13. var params: Seq[Any] = Nil
  14. if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
  15. params = unmarshal[Seq[Any]](q.parameters)
  16. logger.info(s"**** query parameters: ${params} ****")
  17. val ctx = JDBCQueryContext[MemberRow](
  18. dbName = Symbol(q.dbName),
  19. statement = q.statement,
  20. parameters = params,
  21. fetchSize = q.fetchSize.getOrElse(100),
  22. autoCommit = q.autoCommit.getOrElse(false),
  23. queryTimeout = q.queryTimeout
  24. )
  25. jdbcAkkaStream(ctx, toMemberRow)
  26. }
  27. }
  28. override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = {
  29. logger.info("**** updateRows called on service side ***")
  30. val updateSQL = "update members set description = ?, created_at = ? where name = ?"
  31. Flow[MemberRow]
  32. .flatMapConcat { row =>
  33. val ctx = JDBCContext('h2)
  34. .setBatchCommand(updateSQL)
  35. .appendBatchParameters(
  36. row.name + " updated.",
  37. jdbcSetNow,
  38. row.name
  39. ).setBatchReturnGeneratedKeyOption(true)
  40. logger.info(s"**** JDBCContext => ${ctx} ***")
  41. Source
  42. .fromFuture(jdbcBatchUpdate[Vector](ctx))
  43. .map { r => JDBCResult(marshal(r)) }
  44. }
  45. }

jdbcBatchUpdate函数的源代码在附件JDBCEngine.scala中。客户端代码如下:

  1. val queryMember = JDBCQuery (
  2. dbName = "h2",
  3. statement = "select * from members"
  4. )
  5. def updateRows: Source[JDBCResult,NotUsed] = {
  6. logger.info(s"running updateRows ...")
  7. Source
  8. .single(queryMember)
  9. .via(stub.getMembers)
  10. .via(stub.updateRows)
  11. }

下面的例子示范了如何利用JDBCActionStream来批量处理数据。服务端的源代码如下:

  1. val params: JDBCDataRow => Seq[Any] = row => {
  2. Seq((row.value.toInt * 2), row.state, row.county, row.year) }
  3. val sql = "update AQMRPT set total = ? where statename = ? and countyname = ? and reportyear = ?"
  4. val jdbcActionStream = JDBCActionStream('h2,sql ,params)
  5. .setParallelism(4).setProcessOrder(false)
  6. val jdbcActionFlow = jdbcActionStream.performOnRow
  7. override def updateBat: Flow[JDBCDataRow, JDBCDataRow, NotUsed] = {
  8. logger.info("**** updateBat called on service side ***")
  9. Flow[JDBCDataRow]
  10. .via(jdbcActionFlow)
  11. }

jdbcActionFlow是一个Flow[R,R,_],所以我们直接用via把它连接到上一个Flow。下面是JDBCActionStream的定义代码:

  1. case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
  2. statement: String, prepareParams: R => Seq[Any]) {
  3. jas =>
  4. def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
  5. def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
  6. def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
  7. private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
  8. import scala.concurrent._
  9. val params = prepareParams(r)
  10. Future {
  11. NamedDB(dbName) autoCommit { session =>
  12. session.execute(statement, params: _*)
  13. }
  14. r
  15. }
  16. }
  17. def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
  18. if (processInOrder)
  19. Flow[R].mapAsync(parallelism)(perform)
  20. else
  21. Flow[R].mapAsyncUnordered(parallelism)(perform)
  22. }
  23. object JDBCActionStream {
  24. def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
  25. new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
  26. }

函数performOnRow是个passthrough处理过程,使用了mapAsync来支持多线程运算。客户端调用方式如下:

  1. def updateBatches: Source[JDBCDataRow,NotUsed] = {
  2. logger.info(s"running updateBatches ...")
  3. Source
  4. .fromIterator(() => List(query,query2,query3).toIterator)
  5. .via(stub.batQuery)
  6. .via(stub.updateBat)
  7. }

下面是本次示范的完整源代码:

jdbc.proto

  1. syntax = "proto3";
  2. import "google/protobuf/wrappers.proto";
  3. import "google/protobuf/any.proto";
  4. import "scalapb/scalapb.proto";
  5. package grpc.jdbc.services;
  6. option (scalapb.options) = {
  7. // use a custom Scala package name
  8. // package_name: "io.ontherocks.introgrpc.demo"
  9. // don't append file name to package
  10. flat_package: true
  11. // generate one Scala file for all messages (services still get their own file)
  12. single_file: true
  13. // add imports to generated file
  14. // useful when extending traits or using custom types
  15. // import: "io.ontherocks.hellogrpc.RockingMessage"
  16. // code to put at the top of generated file
  17. // works only with `single_file: true`
  18. //preamble: "sealed trait SomeSealedTrait"
  19. };
  20. /*
  21. * Demoes various customization options provided by ScalaPBs.
  22. */
  23. message JDBCDataRow {
  24. string year = 1;
  25. string state = 2;
  26. string county = 3;
  27. string value = 4;
  28. }
  29. message JDBCQuery {
  30. string dbName = 1;
  31. string statement = 2;
  32. bytes parameters = 3;
  33. google.protobuf.Int32Value fetchSize= 4;
  34. google.protobuf.BoolValue autoCommit = 5;
  35. google.protobuf.Int32Value queryTimeout = 6;
  36. }
  37. message JDBCResult {
  38. bytes result = 1;
  39. }
  40. message JDBCUpdate {
  41. string dbName = 1;
  42. repeated string statements = 2;
  43. bytes parameters = 3;
  44. google.protobuf.Int32Value fetchSize= 4;
  45. google.protobuf.Int32Value queryTimeout = 5;
  46. int32 sqlType = 6;
  47. google.protobuf.Int32Value batch = 7;
  48. bytes returnGeneratedKey = 8;
  49. }
  50. message JDBCDate {
  51. int32 yyyy = 1;
  52. int32 mm = 2;
  53. int32 dd = 3;
  54. }
  55. message JDBCTime {
  56. int32 hh = 1;
  57. int32 mm = 2;
  58. int32 ss = 3;
  59. int32 nnn = 4;
  60. }
  61. message JDBCDateTime {
  62. JDBCDate date = 1;
  63. JDBCTime time = 2;
  64. }
  65. message MemberRow {
  66. string name = 1;
  67. JDBCDate birthday = 2;
  68. string description = 3;
  69. JDBCDateTime created_at = 4;
  70. bytes picture = 5;
  71. }
  72. service JDBCServices {
  73. rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
  74. rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
  75. rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
  76. rpc insertRows(stream MemberRow) returns(JDBCResult) {}
  77. rpc updateRows(stream MemberRow) returns(JDBCResult) {}
  78. rpc getMembers(JDBCQuery) returns (stream MemberRow) {}
  79. }

JDBCEngine.scala

  1. package sdp.jdbc.engine
  2. import java.sql.PreparedStatement
  3. import scala.collection.generic.CanBuildFrom
  4. import akka.stream.scaladsl._
  5. import scalikejdbc._
  6. import scalikejdbc.streams._
  7. import akka.NotUsed
  8. import akka.stream._
  9. import java.time._
  10. import scala.concurrent.duration._
  11. import scala.concurrent._
  12. import sdp.jdbc.FileStreaming._
  13. import scalikejdbc.TxBoundary.Try._
  14. import scala.concurrent.ExecutionContextExecutor
  15. import java.io.InputStream
  16. object JDBCContext {
  17. type SQLTYPE = Int
  18. val SQL_EXEDDL= 1
  19. val SQL_UPDATE = 2
  20. val RETURN_GENERATED_KEYVALUE = true
  21. val RETURN_UPDATED_COUNT = false
  22. }
  23. case class JDBCQueryContext[M](
  24. dbName: Symbol,
  25. statement: String,
  26. parameters: Seq[Any] = Nil,
  27. fetchSize: Int = 100,
  28. autoCommit: Boolean = false,
  29. queryTimeout: Option[Int] = None)
  30. case class JDBCContext(
  31. dbName: Symbol,
  32. statements: Seq[String] = Nil,
  33. parameters: Seq[Seq[Any]] = Nil,
  34. fetchSize: Int = 100,
  35. queryTimeout: Option[Int] = None,
  36. queryTags: Seq[String] = Nil,
  37. sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
  38. batch: Boolean = false,
  39. returnGeneratedKey: Seq[Option[Any]] = Nil,
  40. // no return: None, return by index: Some(1), by name: Some("id")
  41. preAction: Option[PreparedStatement => Unit] = None,
  42. postAction: Option[PreparedStatement => Unit] = None) {
  43. ctx =>
  44. //helper functions
  45. def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)
  46. def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)
  47. def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)
  48. def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)
  49. def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
  50. if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
  51. !ctx.batch && ctx.statements.size == 1)
  52. ctx.copy(preAction = action)
  53. else
  54. throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
  55. }
  56. def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
  57. if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
  58. !ctx.batch && ctx.statements.size == 1)
  59. ctx.copy(postAction = action)
  60. else
  61. throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
  62. }
  63. def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
  64. if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
  65. ctx.copy(
  66. statements = ctx.statements ++ Seq(_statement),
  67. parameters = ctx.parameters ++ Seq(Seq(_parameters))
  68. )
  69. } else
  70. throw new IllegalStateException("JDBCContex setting error: option not supported!")
  71. }
  72. def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
  73. if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
  74. ctx.copy(
  75. statements = ctx.statements ++ Seq(_statement),
  76. parameters = ctx.parameters ++ Seq(_parameters),
  77. returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
  78. )
  79. } else
  80. throw new IllegalStateException("JDBCContex setting error: option not supported!")
  81. }
  82. def appendBatchParameters(_parameters: Any*): JDBCContext = {
  83. if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
  84. throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
  85. var matchParams = true
  86. if (ctx.parameters != Nil)
  87. if (ctx.parameters.head.size != _parameters.size)
  88. matchParams = false
  89. if (matchParams) {
  90. ctx.copy(
  91. parameters = ctx.parameters ++ Seq(_parameters)
  92. )
  93. } else
  94. throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
  95. }
  96. def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
  97. if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
  98. throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
  99. ctx.copy(
  100. returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
  101. )
  102. }
  103. def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
  104. ctx.copy(
  105. statements = Seq(_statement),
  106. parameters = Seq(_parameters),
  107. sqlType = JDBCContext.SQL_EXEDDL,
  108. batch = false
  109. )
  110. }
  111. def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
  112. ctx.copy(
  113. statements = Seq(_statement),
  114. parameters = Seq(_parameters),
  115. returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
  116. sqlType = JDBCContext.SQL_UPDATE,
  117. batch = false
  118. )
  119. }
  120. def setBatchCommand(_statement: String): JDBCContext = {
  121. ctx.copy (
  122. statements = Seq(_statement),
  123. sqlType = JDBCContext.SQL_UPDATE,
  124. batch = true
  125. )
  126. }
  127. }
  128. object JDBCEngine {
  129. import JDBCContext._
  130. type JDBCDate = LocalDate
  131. type JDBCDateTime = LocalDateTime
  132. type JDBCTime = LocalTime
  133. def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
  134. def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn)
  135. def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) = LocalDateTime.of(date,time)
  136. def jdbcSetNow = LocalDateTime.now()
  137. type JDBCBlob = InputStream
  138. def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  139. implicit mat: Materializer) = FileToInputStream(fileName,timeOut)
  140. def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
  141. implicit mat: Materializer) = InputStreamToFile(blob,fileName)
  142. private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
  143. throw new IllegalStateException(message)
  144. }
  145. def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)
  146. (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
  147. val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream {
  148. val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
  149. ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
  150. val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
  151. sql.iterator
  152. .withDBSessionForceAdjuster(session => {
  153. session.connection.setAutoCommit(ctx.autoCommit)
  154. session.fetchSize(ctx.fetchSize)
  155. })
  156. }
  157. Source.fromPublisher[A](publisher)
  158. }
  159. def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],
  160. extractor: WrappedResultSet => A)(
  161. implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
  162. val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
  163. ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
  164. rawSql.fetchSize(ctx.fetchSize)
  165. implicit val session = NamedAutoSession(ctx.dbName)
  166. val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
  167. sql.collection.apply[C]()
  168. }
  169. def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
  170. if (ctx.sqlType != SQL_EXEDDL) {
  171. Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
  172. }
  173. else {
  174. Future {
  175. NamedDB(ctx.dbName) localTx { implicit session =>
  176. ctx.statements.foreach { stm =>
  177. val ddl = new SQLExecution(statement = stm, parameters = Nil)(
  178. before = WrappedResultSet => {})(
  179. after = WrappedResultSet => {})
  180. ddl.apply()
  181. }
  182. "SQL_EXEDDL executed succesfully."
  183. }
  184. }
  185. }
  186. }
  187. def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  188. implicit ec: ExecutionContextExecutor,
  189. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  190. if (ctx.statements == Nil)
  191. Future.failed ( new IllegalStateException("JDBCContex setting error: statements empty!"))
  192. if (ctx.sqlType != SQL_UPDATE) {
  193. Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
  194. }
  195. else {
  196. if (ctx.batch) {
  197. if (noReturnKey(ctx)) {
  198. val usql = SQL(ctx.statements.head)
  199. .tags(ctx.queryTags: _*)
  200. .batch(ctx.parameters: _*)
  201. Future {
  202. NamedDB(ctx.dbName) localTx { implicit session =>
  203. ctx.queryTimeout.foreach(session.queryTimeout(_))
  204. usql.apply[Seq]()
  205. Seq.empty[Long].to[C]
  206. }
  207. }
  208. } else {
  209. val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
  210. Future {
  211. NamedDB(ctx.dbName) localTx { implicit session =>
  212. ctx.queryTimeout.foreach(session.queryTimeout(_))
  213. usql.apply[C]()
  214. }
  215. }
  216. }
  217. } else {
  218. Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
  219. }
  220. }
  221. }
  222. private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  223. implicit ec: ExecutionContextExecutor,
  224. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  225. val Some(key) :: xs = ctx.returnGeneratedKey
  226. val params: Seq[Any] = ctx.parameters match {
  227. case Nil => Nil
  228. case p@_ => p.head
  229. }
  230. val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
  231. Future {
  232. NamedDB(ctx.dbName) localTx { implicit session =>
  233. session.fetchSize(ctx.fetchSize)
  234. ctx.queryTimeout.foreach(session.queryTimeout(_))
  235. val result = usql.apply()
  236. Seq(result).to[C]
  237. }
  238. }
  239. }
  240. private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  241. implicit ec: ExecutionContextExecutor,
  242. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  243. val params: Seq[Any] = ctx.parameters match {
  244. case Nil => Nil
  245. case p@_ => p.head
  246. }
  247. val before = ctx.preAction match {
  248. case None => pstm: PreparedStatement => {}
  249. case Some(f) => f
  250. }
  251. val after = ctx.postAction match {
  252. case None => pstm: PreparedStatement => {}
  253. case Some(f) => f
  254. }
  255. val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
  256. Future {
  257. NamedDB(ctx.dbName) localTx {implicit session =>
  258. session.fetchSize(ctx.fetchSize)
  259. ctx.queryTimeout.foreach(session.queryTimeout(_))
  260. val result = usql.apply()
  261. Seq(result.toLong).to[C]
  262. }
  263. }
  264. }
  265. private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  266. implicit ec: ExecutionContextExecutor,
  267. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  268. if (noReturnKey(ctx))
  269. singleTxUpdateNoReturnKey(ctx)
  270. else
  271. singleTxUpdateWithReturnKey(ctx)
  272. }
  273. private def noReturnKey(ctx: JDBCContext): Boolean = {
  274. if (ctx.returnGeneratedKey != Nil) {
  275. val k :: xs = ctx.returnGeneratedKey
  276. k match {
  277. case None => true
  278. case Some(k) => false
  279. }
  280. } else true
  281. }
  282. def noActon: PreparedStatement=>Unit = pstm => {}
  283. def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  284. implicit ec: ExecutionContextExecutor,
  285. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  286. Future {
  287. NamedDB(ctx.dbName) localTx { implicit session =>
  288. session.fetchSize(ctx.fetchSize)
  289. ctx.queryTimeout.foreach(session.queryTimeout(_))
  290. val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
  291. case Nil => Seq.fill(ctx.statements.size)(None)
  292. case k@_ => k
  293. }
  294. val sqlcmd = ctx.statements zip ctx.parameters zip keys
  295. val results = sqlcmd.map { case ((stm, param), key) =>
  296. key match {
  297. case None =>
  298. new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
  299. case Some(k) =>
  300. new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
  301. }
  302. }
  303. results.to[C]
  304. }
  305. }
  306. }
  307. def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  308. implicit ec: ExecutionContextExecutor,
  309. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  310. if (ctx.statements == Nil)
  311. Future.failed( new IllegalStateException("JDBCContex setting error: statements empty!"))
  312. if (ctx.sqlType != SQL_UPDATE) {
  313. Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
  314. }
  315. else {
  316. if (!ctx.batch) {
  317. if (ctx.statements.size == 1)
  318. singleTxUpdate(ctx)
  319. else
  320. multiTxUpdates(ctx)
  321. } else
  322. Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !"))
  323. }
  324. }
  325. case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
  326. statement: String, prepareParams: R => Seq[Any]) {
  327. jas =>
  328. def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
  329. def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
  330. def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
  331. private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
  332. import scala.concurrent._
  333. val params = prepareParams(r)
  334. Future {
  335. NamedDB(dbName) autoCommit { session =>
  336. session.execute(statement, params: _*)
  337. }
  338. r
  339. }
  340. // Future.successful(r)
  341. }
  342. def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
  343. if (processInOrder)
  344. Flow[R].mapAsync(parallelism)(perform)
  345. else
  346. Flow[R].mapAsyncUnordered(parallelism)(perform)
  347. }
  348. object JDBCActionStream {
  349. def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
  350. new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
  351. }
  352. }

JDBCService.scala

  1. package demo.grpc.jdbc.services
  2. import akka.NotUsed
  3. import akka.stream.scaladsl.{Source,Flow}
  4. import grpc.jdbc.services._
  5. import java.util.logging.Logger
  6. import protobuf.bytes.Converter._
  7. import sdp.jdbc.engine._
  8. import JDBCEngine._
  9. import scalikejdbc.WrappedResultSet
  10. import scala.concurrent.ExecutionContextExecutor
  11. class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {
  12. val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)
  13. val toRow = (rs: WrappedResultSet) => JDBCDataRow(
  14. year = rs.string("REPORTYEAR"),
  15. state = rs.string("STATENAME"),
  16. county = rs.string("COUNTYNAME"),
  17. value = rs.string("VALUE")
  18. )
  19. override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {
  20. logger.info("**** runQuery called on service side ***")
  21. Flow[JDBCQuery]
  22. .flatMapConcat { q =>
  23. //unpack JDBCQuery and construct the context
  24. var params: Seq[Any] = Nil
  25. if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
  26. params = unmarshal[Seq[Any]](q.parameters)
  27. logger.info(s"**** query parameters: ${params} ****")
  28. val ctx = JDBCQueryContext[JDBCDataRow](
  29. dbName = Symbol(q.dbName),
  30. statement = q.statement,
  31. parameters = params,
  32. fetchSize = q.fetchSize.getOrElse(100),
  33. autoCommit = q.autoCommit.getOrElse(false),
  34. queryTimeout = q.queryTimeout
  35. )
  36. jdbcAkkaStream(ctx, toRow)
  37. }
  38. }
  39. override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery
  40. override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = {
  41. logger.info("**** runDDL called on service side ***")
  42. Flow[JDBCUpdate]
  43. .flatMapConcat { context =>
  44. //unpack JDBCUpdate and construct the context
  45. val ctx = JDBCContext(
  46. dbName = Symbol(context.dbName),
  47. statements = context.statements,
  48. sqlType = JDBCContext.SQL_EXEDDL,
  49. queryTimeout = context.queryTimeout
  50. )
  51. logger.info(s"**** JDBCContext => ${ctx} ***")
  52. Source
  53. .fromFuture(jdbcExecuteDDL(ctx))
  54. .map { r => JDBCResult(marshal(r)) }
  55. }
  56. }
  57. override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = {
  58. logger.info("**** insertRows called on service side ***")
  59. val insertSQL = """
  60. insert into members(
  61. name,
  62. birthday,
  63. description,
  64. created_at
  65. ) values (?, ?, ?, ?)
  66. """
  67. Flow[MemberRow]
  68. .flatMapConcat { row =>
  69. val ctx = JDBCContext('h2)
  70. .setUpdateCommand(true,insertSQL,
  71. row.name,
  72. jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd),
  73. row.description,
  74. jdbcSetNow
  75. )
  76. logger.info(s"**** JDBCContext => ${ctx} ***")
  77. Source
  78. .fromFuture(jdbcTxUpdates[Vector](ctx))
  79. .map { r => JDBCResult(marshal(r)) }
  80. }
  81. }
  82. override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = {
  83. logger.info("**** updateRows called on service side ***")
  84. val updateSQL = "update members set description = ?, created_at = ? where name = ?"
  85. Flow[MemberRow]
  86. .flatMapConcat { row =>
  87. val ctx = JDBCContext('h2)
  88. .setBatchCommand(updateSQL)
  89. .appendBatchParameters(
  90. row.name + " updated.",
  91. jdbcSetNow,
  92. row.name
  93. ).setBatchReturnGeneratedKeyOption(true)
  94. logger.info(s"**** JDBCContext => ${ctx} ***")
  95. Source
  96. .fromFuture(jdbcBatchUpdate[Vector](ctx))
  97. .map { r => JDBCResult(marshal(r)) }
  98. }
  99. }
  100. val toMemberRow = (rs: WrappedResultSet) => MemberRow(
  101. name = rs.string("name"),
  102. description = rs.string("description"),
  103. birthday = None,
  104. createdAt = None,
  105. picture = _root_.com.google.protobuf.ByteString.EMPTY
  106. )
  107. override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = {
  108. logger.info("**** getMembers called on service side ***")
  109. Flow[JDBCQuery]
  110. .flatMapConcat { q =>
  111. //unpack JDBCQuery and construct the context
  112. var params: Seq[Any] = Nil
  113. if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
  114. params = unmarshal[Seq[Any]](q.parameters)
  115. logger.info(s"**** query parameters: ${params} ****")
  116. val ctx = JDBCQueryContext[MemberRow](
  117. dbName = Symbol(q.dbName),
  118. statement = q.statement,
  119. parameters = params,
  120. fetchSize = q.fetchSize.getOrElse(100),
  121. autoCommit = q.autoCommit.getOrElse(false),
  122. queryTimeout = q.queryTimeout
  123. )
  124. jdbcAkkaStream(ctx, toMemberRow)
  125. }
  126. }
  127. }

JDBCServer.scala

  1. package demo.grpc.jdbc.server
  2. import java.util.logging.Logger
  3. import akka.actor.ActorSystem
  4. import akka.stream.ActorMaterializer
  5. import io.grpc.Server
  6. import demo.grpc.jdbc.services._
  7. import io.grpc.ServerBuilder
  8. import grpc.jdbc.services._
  9. class gRPCServer(server: Server) {
  10. val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)
  11. def start(): Unit = {
  12. server.start()
  13. logger.info(s"Server started, listening on ${server.getPort}")
  14. sys.addShutdownHook {
  15. // Use stderr here since the logger may has been reset by its JVM shutdown hook.
  16. System.err.println("*** shutting down gRPC server since JVM is shutting down")
  17. stop()
  18. System.err.println("*** server shut down")
  19. }
  20. ()
  21. }
  22. def stop(): Unit = {
  23. server.shutdown()
  24. }
  25. /**
  26. * Await termination on the main thread since the grpc library uses daemon threads.
  27. */
  28. def blockUntilShutdown(): Unit = {
  29. server.awaitTermination()
  30. }
  31. }
  32. object JDBCServer extends App {
  33. import sdp.jdbc.config._
  34. implicit val system = ActorSystem("JDBCServer")
  35. implicit val mat = ActorMaterializer.create(system)
  36. implicit val ec = system.dispatcher
  37. ConfigDBsWithEnv("dev").setup('h2)
  38. ConfigDBsWithEnv("dev").loadGlobalSettings()
  39. val server = new gRPCServer(
  40. ServerBuilder
  41. .forPort(50051)
  42. .addService(
  43. JdbcGrpcAkkaStream.bindService(
  44. new JDBCStreamingServices
  45. )
  46. ).build()
  47. )
  48. server.start()
  49. // server.blockUntilShutdown()
  50. scala.io.StdIn.readLine()
  51. ConfigDBsWithEnv("dev").close('h2)
  52. mat.shutdown()
  53. system.terminate()
  54. }

JDBCClient.scala

  1. package demo.grpc.jdbc.client
  2. import grpc.jdbc.services._
  3. import java.util.logging.Logger
  4. import protobuf.bytes.Converter._
  5. import akka.stream.scaladsl._
  6. import akka.NotUsed
  7. import akka.actor.ActorSystem
  8. import akka.stream.{ActorMaterializer, ThrottleMode}
  9. import io.grpc._
  10. import sdp.jdbc.engine._
  11. class JDBCStreamClient(host: String, port: Int) {
  12. val logger: Logger = Logger.getLogger(classOf[JDBCStreamClient].getName)
  13. val channel = ManagedChannelBuilder
  14. .forAddress(host,port)
  15. .usePlaintext(true)
  16. .build()
  17. val stub = JdbcGrpcAkkaStream.stub(channel)
  18. val query = JDBCQuery (
  19. dbName = "h2",
  20. statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
  21. parameters = marshal(Seq("Arizona", 2))
  22. )
  23. val query2 = JDBCQuery (
  24. dbName = "h2",
  25. statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
  26. parameters = marshal(Seq("Colorado", 3))
  27. )
  28. val query3= JDBCQuery (
  29. dbName = "h2",
  30. statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
  31. parameters = marshal(Seq("Arkansas", 8))
  32. )
  33. def queryRows: Source[JDBCDataRow,NotUsed] = {
  34. logger.info(s"running queryRows ...")
  35. Source
  36. .single(query)
  37. .via(stub.runQuery)
  38. }
  39. def batQueryRows: Source[JDBCDataRow,NotUsed] = {
  40. logger.info(s"running batQueryRows ...")
  41. Source
  42. .fromIterator(() => List(query,query2,query3).toIterator)
  43. .via(stub.batQuery)
  44. }
  45. val dropSQL: String ="""
  46. drop table members
  47. """
  48. val createSQL: String ="""
  49. create table members (
  50. id serial not null primary key,
  51. name varchar(30) not null,
  52. description varchar(1000),
  53. birthday date,
  54. created_at timestamp not null,
  55. picture blob
  56. )"""
  57. val ctx = JDBCUpdate (
  58. dbName = "h2",
  59. sqlType = JDBCContext.SQL_EXEDDL,
  60. statements = Seq(dropSQL,createSQL)
  61. )
  62. def createTbl: Source[JDBCResult,NotUsed] = {
  63. logger.info(s"running createTbl ...")
  64. Source
  65. .single(ctx)
  66. .via(stub.runDDL)
  67. }
  68. val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)
  69. val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)
  70. val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)
  71. val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)
  72. def insertRows: Source[JDBCResult,NotUsed] = {
  73. logger.info(s"running insertRows ...")
  74. Source
  75. .fromIterator(() => List(p1,p2,p3,p4).toIterator)
  76. .via(stub.insertRows)
  77. }
  78. val queryMember = JDBCQuery (
  79. dbName = "h2",
  80. statement = "select * from members"
  81. )
  82. def updateRows: Source[JDBCResult,NotUsed] = {
  83. logger.info(s"running updateRows ...")
  84. Source
  85. .single(queryMember)
  86. .via(stub.getMembers)
  87. .via(stub.updateRows)
  88. }
  89. def updateBatches: Source[JDBCDataRow,NotUsed] = {
  90. logger.info(s"running updateBatches ...")
  91. Source
  92. .fromIterator(() => List(query,query2,query3).toIterator)
  93. .via(stub.batQuery)
  94. .via(stub.updateBat)
  95. }
  96. }
  97. object TestConversion extends App {
  98. val orgval: Seq[Option[Any]] = Seq(Some(1),Some("id"),None,Some(2))
  99. println(s"original value: ${orgval}")
  100. val marval = marshal(orgval)
  101. println(s"marshal value: ${marval}")
  102. val unmval = unmarshal[Seq[Option[Any]]](marval)
  103. println(s"marshal value: ${unmval}")
  104. val m1 = MemberRow(name = "Peter")
  105. val m2 = m1.update(
  106. _.birthday.yyyy := 1989,
  107. _.birthday.mm := 10,
  108. _.birthday.dd := 3,
  109. _.description := "a new member"
  110. )
  111. }
  112. object QueryRows extends App {
  113. implicit val system = ActorSystem("QueryRows")
  114. implicit val mat = ActorMaterializer.create(system)
  115. val client = new JDBCStreamClient("localhost", 50051)
  116. client.queryRows.runForeach { r => println(r) }
  117. scala.io.StdIn.readLine()
  118. mat.shutdown()
  119. system.terminate()
  120. }
  121. object BatQueryRows extends App {
  122. implicit val system = ActorSystem("BatQueryRows")
  123. implicit val mat = ActorMaterializer.create(system)
  124. val client = new JDBCStreamClient("localhost", 50051)
  125. client.batQueryRows.runForeach(println)
  126. scala.io.StdIn.readLine()
  127. mat.shutdown()
  128. system.terminate()
  129. }
  130. object RunDDL extends App {
  131. implicit val system = ActorSystem("RunDDL")
  132. implicit val mat = ActorMaterializer.create(system)
  133. val client = new JDBCStreamClient("localhost", 50051)
  134. client.createTbl.runForeach{r => println(unmarshal[Seq[Any]](r.result))}
  135. scala.io.StdIn.readLine()
  136. mat.shutdown()
  137. system.terminate()
  138. }
  139. object InsertRows extends App {
  140. implicit val system = ActorSystem("InsertRows")
  141. implicit val mat = ActorMaterializer.create(system)
  142. val client = new JDBCStreamClient("localhost", 50051)
  143. client.insertRows.runForeach { r => println(unmarshal[Vector[Long]](r.result)) }
  144. scala.io.StdIn.readLine()
  145. mat.shutdown()
  146. system.terminate()
  147. }
  148. object UpdateRows extends App {
  149. implicit val system = ActorSystem("UpdateRows")
  150. implicit val mat = ActorMaterializer.create(system)
  151. val client = new JDBCStreamClient("localhost", 50051)
  152. client.updateRows.runForeach{ r => println(unmarshal[Vector[Long]](r.result)) }
  153. scala.io.StdIn.readLine()
  154. mat.shutdown()
  155. system.terminate()
  156. }
  157. object BatUpdates extends App {
  158. implicit val system = ActorSystem("BatUpdates")
  159. implicit val mat = ActorMaterializer.create(system)
  160. val client = new JDBCStreamClient("localhost", 50051)
  161. client.updateBatches.runForeach(println)
  162. scala.io.StdIn.readLine()
  163. mat.shutdown()
  164. system.terminate()
  165. }

ByteConverter.scala

  1. package protobuf.bytes
  2. import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
  3. import com.google.protobuf.ByteString
  4. object Converter {
  5. def marshal(value: Any): ByteString = {
  6. val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  7. val oos = new ObjectOutputStream(stream)
  8. oos.writeObject(value)
  9. oos.close()
  10. ByteString.copyFrom(stream.toByteArray())
  11. }
  12. def unmarshal[A](bytes: ByteString): A = {
  13. val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
  14. val value = ois.readObject()
  15. ois.close()
  16. value.asInstanceOf[A]
  17. }
  18. }

其它部分的源代码和系统设置可以从上次的讨论稿中获取。

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号