在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化特性。所以,JDBC数据库服务器必须通过服务方式来向外提供数据操。在这种场景里服务端是JDBC服务,其它节点,包括其它的JDBC数据库节点都是这个JDBC服务的调用客户端。因为我们已经明确选择了在akka-cluster集群环境里实施gRPC服务模式,通过akka-stream的流控制方式实现数据库操作的程序控制,所以在本次讨论里我们将示范说明gRPC-JDBC-Streaming的具体实现和使用方式。
在上次的讨论里我们已经示范了最简单的JDBC-Streaming Unary request/response模式:从客户端向JDBC-Service发送一个JDBCQuery、JDBC服务端运行JDBCQuery后向客户端返回一个数据流DataRows。jdbc.proto文件里用IDL定义的数据和服务类型如下:
- message JDBCDataRow {
- string year = 1;
- string state = 2;
- string county = 3;
- string value = 4;
- }
- message JDBCQuery {
- string dbName = 1;
- string statement = 2;
- bytes parameters = 3;
- google.protobuf.Int32Value fetchSize= 4;
- google.protobuf.BoolValue autoCommit = 5;
- google.protobuf.Int32Value queryTimeout = 6;
- }
- service JDBCServices {
- rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
- }
以上数据类型JDBCDataRow和JDBCQuery分别对应JDBC-Streaming工具的流元素结构和JDBCQueryContext,如下:
- val toRow = (rs: WrappedResultSet) => JDBCDataRow(
- year = rs.string("REPORTYEAR"),
- state = rs.string("STATENAME"),
- county = rs.string("COUNTYNAME"),
- value = rs.string("VALUE")
- )
- val ctx = JDBCQueryContext[JDBCDataRow](
- dbName = Symbol(q.dbName),
- statement = q.statement,
- parameters = params,
- fetchSize = q.fetchSize.getOrElse(100),
- autoCommit = q.autoCommit.getOrElse(false),
- queryTimeout = q.queryTimeout
- )
- jdbcAkkaStream(ctx, toRow)
用scalaPB编译后自动产生服务端和客户端框架代码(boilerplate-code)。我们需要实现具体的JDBC服务:
- class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {
- val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)
- val toRow = (rs: WrappedResultSet) => JDBCDataRow(
- year = rs.string("REPORTYEAR"),
- state = rs.string("STATENAME"),
- county = rs.string("COUNTYNAME"),
- value = rs.string("VALUE")
- )
- override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {
- logger.info("**** runQuery called on service side ***")
- Flow[JDBCQuery]
- .flatMapConcat { q =>
- //unpack JDBCQuery and construct the context
- val params: Seq[Any] = unmarshal[Seq[Any]](q.parameters)
- logger.info(s"**** query parameters: ${params} ****")
- val ctx = JDBCQueryContext[JDBCDataRow](
- dbName = Symbol(q.dbName),
- statement = q.statement,
- parameters = params,
- fetchSize = q.fetchSize.getOrElse(100),
- autoCommit = q.autoCommit.getOrElse(false),
- queryTimeout = q.queryTimeout
- )
- jdbcAkkaStream(ctx, toRow)
- }
- }
- }
下面是客户端调用服务示范:
- val query = JDBCQuery (
- dbName = "h2",
- statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
- parameters = marshal(Seq("Arizona", 5))
- )
- def queryRows: Source[JDBCDataRow,NotUsed] = {
- logger.info(s"running queryRows ...")
- Source
- .single(query)
- .via(stub.runQuery)
- }
这个程序的运行方式如下:
- object QueryRows extends App {
- implicit val system = ActorSystem("QueryRows")
- implicit val mat = ActorMaterializer.create(system)
- val client = new JDBCStreamClient("localhost", 50051)
- client.queryRows.runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
那么如果从客户端发出一串的JDBCQuery又如何呢?这也是所谓的BiDi-Streaming模式,在jdbc.proto的服务描述如下:
- service JDBCServices {
- rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
- rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
- }
我们看到batQuery的入参是一个stream。自动产生的服务函数batQuery款式是这样的:
- override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = { ... }
- override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery
runQuery和batQuery的函数款式是一样的。这就说明服务端提供的服务模式是一样的。在我们这个例子里它们都是对每个收到的JDBCQuery发还相关的数据流。实际上这两项服务的区别在客户方。下面是scalaPB产生的源代码:
- override def runQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] =
- Flow.fromGraph(
- new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow]({ outputObserver =>
- new StreamObserver[grpc.jdbc.services.JDBCQuery] {
- override def onError(t: Throwable): Unit = ()
- override def onCompleted(): Unit = ()
- override def onNext(request: grpc.jdbc.services.JDBCQuery): Unit =
- ClientCalls.asyncServerStreamingCall(
- channel.newCall(METHOD_RUN_QUERY, options),
- request,
- outputObserver
- )
- }
- })
- )
- ...
- override def batQuery: Flow[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow, NotUsed] =
- Flow.fromGraph(new GrpcGraphStage[grpc.jdbc.services.JDBCQuery, grpc.jdbc.services.JDBCDataRow](outputObserver =>
- ClientCalls.asyncBidiStreamingCall(
- channel.newCall(METHOD_BAT_QUERY, options),
- outputObserver
- )
- ))
所以在客户端我们调用batQuery:
- def batQueryRows: Source[JDBCDataRow,NotUsed] = {
- logger.info(s"running batQueryRows ...")
- Source
- .fromIterator(() => List(query,query2,query3).toIterator)
- .via(stub.batQuery)
- }
JDBC操作除Query之外还应该具备数据更新部分,包括Schema DDL和database-updates。JDBC-update是通过JDBCContext来传递更新要求的:
- case class JDBCContext(
- dbName: Symbol,
- statements: Seq[String] = Nil,
- parameters: Seq[Seq[Any]] = Nil,
- fetchSize: Int = 100,
- queryTimeout: Option[Int] = None,
- queryTags: Seq[String] = Nil,
- sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
- batch: Boolean = false,
- returnGeneratedKey: Seq[Option[Any]] = Nil,
- // no return: None, return by index: Some(1), by name: Some("id")
- preAction: Option[PreparedStatement => Unit] = None,
- postAction: Option[PreparedStatement => Unit] = None) {... }
这个class对应的protobuf message定义如下:
- message JDBCResult {
- bytes result = 1;
- }
- message JDBCUpdate {
- string dbName = 1;
- repeated string statements = 2;
- bytes parameters = 3;
- google.protobuf.Int32Value fetchSize= 4;
- google.protobuf.Int32Value queryTimeout = 5;
- int32 sqlType = 6;
- google.protobuf.Int32Value batch = 7;
- bytes returnGeneratedKey = 8;
- }
- service JDBCServices {
- rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
- rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
- rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
- }
服务函数runDDL返回消息类型JDBCResult: 包嵌一个Seq[Any]类型的返回值。下面是JDBCContext的protobuf message打包、还原使用方法示范,在服务端把JDBCUpdate拆解构建JDBCContext后调用jdbcExecuteDDL:
- override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = {
- logger.info("**** runDDL called on service side ***")
- Flow[JDBCUpdate]
- .flatMapConcat { context =>
- //unpack JDBCUpdate and construct the context
- val ctx = JDBCContext(
- dbName = Symbol(context.dbName),
- statements = context.statements,
- sqlType = JDBCContext.SQL_EXEDDL,
- queryTimeout = context.queryTimeout
- )
- logger.info(s"**** JDBCContext => ${ctx} ***")
- Source
- .fromFuture(jdbcExecuteDDL(ctx))
- .map { r => JDBCResult(marshal(r)) }
- }
- }
jdbcExecuteDDL返回Future[String],如下:
- def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
- if (ctx.sqlType != SQL_EXEDDL) {
- Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
- }
- else {
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- ctx.statements.foreach { stm =>
- val ddl = new SQLExecution(statement = stm, parameters = Nil)(
- before = WrappedResultSet => {})(
- after = WrappedResultSet => {})
- ddl.apply()
- }
- "SQL_EXEDDL executed succesfully."
- }
- }
- }
- }
我们可以用Source.fromFuture(jdbcExecuteDDL(cox))来构建一个akka-stream Source。 在客户端构建一个JDBCUpdate结构传给服务端进行运算:
- val dropSQL: String ="""
- drop table members
- """
- val createSQL: String ="""
- create table members (
- id serial not null primary key,
- name varchar(30) not null,
- description varchar(1000),
- birthday date,
- created_at timestamp not null,
- picture blob
- )"""
- val ctx = JDBCUpdate (
- dbName = "h2",
- sqlType = JDBCContext.SQL_EXEDDL,
- statements = Seq(dropSQL,createSQL)
- )
- def createTbl: Source[JDBCResult,NotUsed] = {
- logger.info(s"running createTbl ...")
- Source
- .single(ctx)
- .via(stub.runDDL)
- }
注意:statements = Seq(dropSQL,createSQL)包含了两个独立的SQL运算。
下面我们示范一下从客户端传送一个数据流(stream MemberRow),由服务端插入数据库操作。DDL数据类型和服务函数定义如下:
- message JDBCDate {
- int32 yyyy = 1;
- int32 mm = 2;
- int32 dd = 3;
- }
- message JDBCTime {
- int32 hh = 1;
- int32 mm = 2;
- int32 ss = 3;
- int32 nnn = 4;
- }
- message JDBCDateTime {
- JDBCDate date = 1;
- JDBCTime time = 2;
- }
- message MemberRow {
- string name = 1;
- JDBCDate birthday = 2;
- string description = 3;
- JDBCDateTime created_at = 4;
- bytes picture = 5;
- }
- service JDBCServices {
- rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
- rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
- rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
- rpc insertRows(stream MemberRow) returns(JDBCResult) {}
- }
insertRows服务函数的实现如下:
- override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = {
- logger.info("**** insertRows called on service side ***")
- val insertSQL = """
- insert into members(
- name,
- birthday,
- description,
- created_at
- ) values (?, ?, ?, ?)
- """
- Flow[MemberRow]
- .flatMapConcat { row =>
- val ctx = JDBCContext('h2)
- .setUpdateCommand(true,insertSQL,
- row.name,
- jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd),
- row.description,
- jdbcSetNow
- )
- logger.info(s"**** JDBCContext => ${ctx} ***")
- Source
- .fromFuture(jdbcTxUpdates[Vector](ctx))
- .map { r => JDBCResult(marshal(r)) }
- }
- }
同样,这个jdbcTxUpdates返回结果是Future类型。具体实现在附件的JDBCEngine.scala中。
客户端构建一个MemberRow流,然后经过stub.insertRows发送给服务端:
- val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)
- val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)
- val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)
- val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)
- def insertRows: Source[JDBCResult,NotUsed] = {
- logger.info(s"running insertRows ...")
- Source
- .fromIterator(() => List(p1,p2,p3,p4).toIterator)
- .via(stub.insertRows)
- }
最后,我们再示范jdbcBatchUpdate函数的使用。我们从服务端读取MemberRow再传回服务端进行更新操作。DDL如下:
- message MemberRow {
- string name = 1;
- JDBCDate birthday = 2;
- string description = 3;
- JDBCDateTime created_at = 4;
- bytes picture = 5;
- }
- service JDBCServices {
- rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
- rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
- rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
- rpc insertRows(stream MemberRow) returns(JDBCResult) {}
- rpc updateRows(stream MemberRow) returns(JDBCResult) {}
- rpc getMembers(JDBCQuery) returns (stream MemberRow) {}
- }
服务端函数定义如下:
- val toMemberRow = (rs: WrappedResultSet) => MemberRow(
- name = rs.string("name"),
- description = rs.string("description"),
- birthday = None,
- createdAt = None,
- picture = _root_.com.google.protobuf.ByteString.EMPTY
- )
- override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = {
- logger.info("**** getMembers called on service side ***")
- Flow[JDBCQuery]
- .flatMapConcat { q =>
- //unpack JDBCQuery and construct the context
- var params: Seq[Any] = Nil
- if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
- params = unmarshal[Seq[Any]](q.parameters)
- logger.info(s"**** query parameters: ${params} ****")
- val ctx = JDBCQueryContext[MemberRow](
- dbName = Symbol(q.dbName),
- statement = q.statement,
- parameters = params,
- fetchSize = q.fetchSize.getOrElse(100),
- autoCommit = q.autoCommit.getOrElse(false),
- queryTimeout = q.queryTimeout
- )
- jdbcAkkaStream(ctx, toMemberRow)
- }
- }
- override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = {
- logger.info("**** updateRows called on service side ***")
- val updateSQL = "update members set description = ?, created_at = ? where name = ?"
- Flow[MemberRow]
- .flatMapConcat { row =>
- val ctx = JDBCContext('h2)
- .setBatchCommand(updateSQL)
- .appendBatchParameters(
- row.name + " updated.",
- jdbcSetNow,
- row.name
- ).setBatchReturnGeneratedKeyOption(true)
- logger.info(s"**** JDBCContext => ${ctx} ***")
- Source
- .fromFuture(jdbcBatchUpdate[Vector](ctx))
- .map { r => JDBCResult(marshal(r)) }
- }
- }
jdbcBatchUpdate函数的源代码在附件JDBCEngine.scala中。客户端代码如下:
- val queryMember = JDBCQuery (
- dbName = "h2",
- statement = "select * from members"
- )
- def updateRows: Source[JDBCResult,NotUsed] = {
- logger.info(s"running updateRows ...")
- Source
- .single(queryMember)
- .via(stub.getMembers)
- .via(stub.updateRows)
- }
下面的例子示范了如何利用JDBCActionStream来批量处理数据。服务端的源代码如下:
- val params: JDBCDataRow => Seq[Any] = row => {
- Seq((row.value.toInt * 2), row.state, row.county, row.year) }
- val sql = "update AQMRPT set total = ? where statename = ? and countyname = ? and reportyear = ?"
- val jdbcActionStream = JDBCActionStream('h2,sql ,params)
- .setParallelism(4).setProcessOrder(false)
- val jdbcActionFlow = jdbcActionStream.performOnRow
- override def updateBat: Flow[JDBCDataRow, JDBCDataRow, NotUsed] = {
- logger.info("**** updateBat called on service side ***")
- Flow[JDBCDataRow]
- .via(jdbcActionFlow)
- }
jdbcActionFlow是一个Flow[R,R,_],所以我们直接用via把它连接到上一个Flow。下面是JDBCActionStream的定义代码:
- case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
- statement: String, prepareParams: R => Seq[Any]) {
- jas =>
- def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
- def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
- def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
- private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
- import scala.concurrent._
- val params = prepareParams(r)
- Future {
- NamedDB(dbName) autoCommit { session =>
- session.execute(statement, params: _*)
- }
- r
- }
- }
- def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
- if (processInOrder)
- Flow[R].mapAsync(parallelism)(perform)
- else
- Flow[R].mapAsyncUnordered(parallelism)(perform)
- }
- object JDBCActionStream {
- def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
- new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
- }
函数performOnRow是个passthrough处理过程,使用了mapAsync来支持多线程运算。客户端调用方式如下:
- def updateBatches: Source[JDBCDataRow,NotUsed] = {
- logger.info(s"running updateBatches ...")
- Source
- .fromIterator(() => List(query,query2,query3).toIterator)
- .via(stub.batQuery)
- .via(stub.updateBat)
- }
下面是本次示范的完整源代码:
jdbc.proto
- syntax = "proto3";
- import "google/protobuf/wrappers.proto";
- import "google/protobuf/any.proto";
- import "scalapb/scalapb.proto";
- package grpc.jdbc.services;
- option (scalapb.options) = {
- // use a custom Scala package name
- // package_name: "io.ontherocks.introgrpc.demo"
- // don't append file name to package
- flat_package: true
- // generate one Scala file for all messages (services still get their own file)
- single_file: true
- // add imports to generated file
- // useful when extending traits or using custom types
- // import: "io.ontherocks.hellogrpc.RockingMessage"
- // code to put at the top of generated file
- // works only with `single_file: true`
- //preamble: "sealed trait SomeSealedTrait"
- };
- /*
- * Demoes various customization options provided by ScalaPBs.
- */
- message JDBCDataRow {
- string year = 1;
- string state = 2;
- string county = 3;
- string value = 4;
- }
- message JDBCQuery {
- string dbName = 1;
- string statement = 2;
- bytes parameters = 3;
- google.protobuf.Int32Value fetchSize= 4;
- google.protobuf.BoolValue autoCommit = 5;
- google.protobuf.Int32Value queryTimeout = 6;
- }
- message JDBCResult {
- bytes result = 1;
- }
- message JDBCUpdate {
- string dbName = 1;
- repeated string statements = 2;
- bytes parameters = 3;
- google.protobuf.Int32Value fetchSize= 4;
- google.protobuf.Int32Value queryTimeout = 5;
- int32 sqlType = 6;
- google.protobuf.Int32Value batch = 7;
- bytes returnGeneratedKey = 8;
- }
- message JDBCDate {
- int32 yyyy = 1;
- int32 mm = 2;
- int32 dd = 3;
- }
- message JDBCTime {
- int32 hh = 1;
- int32 mm = 2;
- int32 ss = 3;
- int32 nnn = 4;
- }
- message JDBCDateTime {
- JDBCDate date = 1;
- JDBCTime time = 2;
- }
- message MemberRow {
- string name = 1;
- JDBCDate birthday = 2;
- string description = 3;
- JDBCDateTime created_at = 4;
- bytes picture = 5;
- }
- service JDBCServices {
- rpc runQuery(JDBCQuery) returns (stream JDBCDataRow) {}
- rpc batQuery(stream JDBCQuery) returns (stream JDBCDataRow) {}
- rpc runDDL(JDBCUpdate) returns (JDBCResult) {}
- rpc insertRows(stream MemberRow) returns(JDBCResult) {}
- rpc updateRows(stream MemberRow) returns(JDBCResult) {}
- rpc getMembers(JDBCQuery) returns (stream MemberRow) {}
- }
JDBCEngine.scala
- package sdp.jdbc.engine
- import java.sql.PreparedStatement
- import scala.collection.generic.CanBuildFrom
- import akka.stream.scaladsl._
- import scalikejdbc._
- import scalikejdbc.streams._
- import akka.NotUsed
- import akka.stream._
- import java.time._
- import scala.concurrent.duration._
- import scala.concurrent._
- import sdp.jdbc.FileStreaming._
- import scalikejdbc.TxBoundary.Try._
- import scala.concurrent.ExecutionContextExecutor
- import java.io.InputStream
- object JDBCContext {
- type SQLTYPE = Int
- val SQL_EXEDDL= 1
- val SQL_UPDATE = 2
- val RETURN_GENERATED_KEYVALUE = true
- val RETURN_UPDATED_COUNT = false
- }
- case class JDBCQueryContext[M](
- dbName: Symbol,
- statement: String,
- parameters: Seq[Any] = Nil,
- fetchSize: Int = 100,
- autoCommit: Boolean = false,
- queryTimeout: Option[Int] = None)
- case class JDBCContext(
- dbName: Symbol,
- statements: Seq[String] = Nil,
- parameters: Seq[Seq[Any]] = Nil,
- fetchSize: Int = 100,
- queryTimeout: Option[Int] = None,
- queryTags: Seq[String] = Nil,
- sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
- batch: Boolean = false,
- returnGeneratedKey: Seq[Option[Any]] = Nil,
- // no return: None, return by index: Some(1), by name: Some("id")
- preAction: Option[PreparedStatement => Unit] = None,
- postAction: Option[PreparedStatement => Unit] = None) {
- ctx =>
- //helper functions
- def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)
- def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)
- def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)
- def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)
- def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
- if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
- !ctx.batch && ctx.statements.size == 1)
- ctx.copy(preAction = action)
- else
- throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
- }
- def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
- if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
- !ctx.batch && ctx.statements.size == 1)
- ctx.copy(postAction = action)
- else
- throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
- }
- def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
- if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
- ctx.copy(
- statements = ctx.statements ++ Seq(_statement),
- parameters = ctx.parameters ++ Seq(Seq(_parameters))
- )
- } else
- throw new IllegalStateException("JDBCContex setting error: option not supported!")
- }
- def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
- if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
- ctx.copy(
- statements = ctx.statements ++ Seq(_statement),
- parameters = ctx.parameters ++ Seq(_parameters),
- returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
- )
- } else
- throw new IllegalStateException("JDBCContex setting error: option not supported!")
- }
- def appendBatchParameters(_parameters: Any*): JDBCContext = {
- if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
- throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
- var matchParams = true
- if (ctx.parameters != Nil)
- if (ctx.parameters.head.size != _parameters.size)
- matchParams = false
- if (matchParams) {
- ctx.copy(
- parameters = ctx.parameters ++ Seq(_parameters)
- )
- } else
- throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
- }
- def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
- if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
- throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
- ctx.copy(
- returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
- )
- }
- def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
- ctx.copy(
- statements = Seq(_statement),
- parameters = Seq(_parameters),
- sqlType = JDBCContext.SQL_EXEDDL,
- batch = false
- )
- }
- def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
- ctx.copy(
- statements = Seq(_statement),
- parameters = Seq(_parameters),
- returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
- sqlType = JDBCContext.SQL_UPDATE,
- batch = false
- )
- }
- def setBatchCommand(_statement: String): JDBCContext = {
- ctx.copy (
- statements = Seq(_statement),
- sqlType = JDBCContext.SQL_UPDATE,
- batch = true
- )
- }
- }
- object JDBCEngine {
- import JDBCContext._
- type JDBCDate = LocalDate
- type JDBCDateTime = LocalDateTime
- type JDBCTime = LocalTime
- def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
- def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn)
- def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) = LocalDateTime.of(date,time)
- def jdbcSetNow = LocalDateTime.now()
- type JDBCBlob = InputStream
- def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
- implicit mat: Materializer) = FileToInputStream(fileName,timeOut)
- def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
- implicit mat: Materializer) = InputStreamToFile(blob,fileName)
- private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
- throw new IllegalStateException(message)
- }
- def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)
- (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
- val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream {
- val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
- ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
- val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
- sql.iterator
- .withDBSessionForceAdjuster(session => {
- session.connection.setAutoCommit(ctx.autoCommit)
- session.fetchSize(ctx.fetchSize)
- })
- }
- Source.fromPublisher[A](publisher)
- }
- def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],
- extractor: WrappedResultSet => A)(
- implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
- val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
- ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
- rawSql.fetchSize(ctx.fetchSize)
- implicit val session = NamedAutoSession(ctx.dbName)
- val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
- sql.collection.apply[C]()
- }
- def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
- if (ctx.sqlType != SQL_EXEDDL) {
- Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
- }
- else {
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- ctx.statements.foreach { stm =>
- val ddl = new SQLExecution(statement = stm, parameters = Nil)(
- before = WrappedResultSet => {})(
- after = WrappedResultSet => {})
- ddl.apply()
- }
- "SQL_EXEDDL executed succesfully."
- }
- }
- }
- }
- def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- if (ctx.statements == Nil)
- Future.failed ( new IllegalStateException("JDBCContex setting error: statements empty!"))
- if (ctx.sqlType != SQL_UPDATE) {
- Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
- }
- else {
- if (ctx.batch) {
- if (noReturnKey(ctx)) {
- val usql = SQL(ctx.statements.head)
- .tags(ctx.queryTags: _*)
- .batch(ctx.parameters: _*)
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- usql.apply[Seq]()
- Seq.empty[Long].to[C]
- }
- }
- } else {
- val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- usql.apply[C]()
- }
- }
- }
- } else {
- Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
- }
- }
- }
- private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- val Some(key) :: xs = ctx.returnGeneratedKey
- val params: Seq[Any] = ctx.parameters match {
- case Nil => Nil
- case p@_ => p.head
- }
- val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- session.fetchSize(ctx.fetchSize)
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- val result = usql.apply()
- Seq(result).to[C]
- }
- }
- }
- private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- val params: Seq[Any] = ctx.parameters match {
- case Nil => Nil
- case p@_ => p.head
- }
- val before = ctx.preAction match {
- case None => pstm: PreparedStatement => {}
- case Some(f) => f
- }
- val after = ctx.postAction match {
- case None => pstm: PreparedStatement => {}
- case Some(f) => f
- }
- val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
- Future {
- NamedDB(ctx.dbName) localTx {implicit session =>
- session.fetchSize(ctx.fetchSize)
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- val result = usql.apply()
- Seq(result.toLong).to[C]
- }
- }
- }
- private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- if (noReturnKey(ctx))
- singleTxUpdateNoReturnKey(ctx)
- else
- singleTxUpdateWithReturnKey(ctx)
- }
- private def noReturnKey(ctx: JDBCContext): Boolean = {
- if (ctx.returnGeneratedKey != Nil) {
- val k :: xs = ctx.returnGeneratedKey
- k match {
- case None => true
- case Some(k) => false
- }
- } else true
- }
- def noActon: PreparedStatement=>Unit = pstm => {}
- def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- session.fetchSize(ctx.fetchSize)
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
- case Nil => Seq.fill(ctx.statements.size)(None)
- case k@_ => k
- }
- val sqlcmd = ctx.statements zip ctx.parameters zip keys
- val results = sqlcmd.map { case ((stm, param), key) =>
- key match {
- case None =>
- new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
- case Some(k) =>
- new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
- }
- }
- results.to[C]
- }
- }
- }
- def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- if (ctx.statements == Nil)
- Future.failed( new IllegalStateException("JDBCContex setting error: statements empty!"))
- if (ctx.sqlType != SQL_UPDATE) {
- Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
- }
- else {
- if (!ctx.batch) {
- if (ctx.statements.size == 1)
- singleTxUpdate(ctx)
- else
- multiTxUpdates(ctx)
- } else
- Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !"))
- }
- }
- case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
- statement: String, prepareParams: R => Seq[Any]) {
- jas =>
- def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
- def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
- def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
- private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
- import scala.concurrent._
- val params = prepareParams(r)
- Future {
- NamedDB(dbName) autoCommit { session =>
- session.execute(statement, params: _*)
- }
- r
- }
- // Future.successful(r)
- }
- def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
- if (processInOrder)
- Flow[R].mapAsync(parallelism)(perform)
- else
- Flow[R].mapAsyncUnordered(parallelism)(perform)
- }
- object JDBCActionStream {
- def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
- new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
- }
- }
JDBCService.scala
- package demo.grpc.jdbc.services
- import akka.NotUsed
- import akka.stream.scaladsl.{Source,Flow}
- import grpc.jdbc.services._
- import java.util.logging.Logger
- import protobuf.bytes.Converter._
- import sdp.jdbc.engine._
- import JDBCEngine._
- import scalikejdbc.WrappedResultSet
- import scala.concurrent.ExecutionContextExecutor
- class JDBCStreamingServices(implicit ec: ExecutionContextExecutor) extends JdbcGrpcAkkaStream.JDBCServices {
- val logger = Logger.getLogger(classOf[JDBCStreamingServices].getName)
- val toRow = (rs: WrappedResultSet) => JDBCDataRow(
- year = rs.string("REPORTYEAR"),
- state = rs.string("STATENAME"),
- county = rs.string("COUNTYNAME"),
- value = rs.string("VALUE")
- )
- override def runQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = {
- logger.info("**** runQuery called on service side ***")
- Flow[JDBCQuery]
- .flatMapConcat { q =>
- //unpack JDBCQuery and construct the context
- var params: Seq[Any] = Nil
- if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
- params = unmarshal[Seq[Any]](q.parameters)
- logger.info(s"**** query parameters: ${params} ****")
- val ctx = JDBCQueryContext[JDBCDataRow](
- dbName = Symbol(q.dbName),
- statement = q.statement,
- parameters = params,
- fetchSize = q.fetchSize.getOrElse(100),
- autoCommit = q.autoCommit.getOrElse(false),
- queryTimeout = q.queryTimeout
- )
- jdbcAkkaStream(ctx, toRow)
- }
- }
- override def batQuery: Flow[JDBCQuery, JDBCDataRow, NotUsed] = runQuery
- override def runDDL: Flow[JDBCUpdate, JDBCResult, NotUsed] = {
- logger.info("**** runDDL called on service side ***")
- Flow[JDBCUpdate]
- .flatMapConcat { context =>
- //unpack JDBCUpdate and construct the context
- val ctx = JDBCContext(
- dbName = Symbol(context.dbName),
- statements = context.statements,
- sqlType = JDBCContext.SQL_EXEDDL,
- queryTimeout = context.queryTimeout
- )
- logger.info(s"**** JDBCContext => ${ctx} ***")
- Source
- .fromFuture(jdbcExecuteDDL(ctx))
- .map { r => JDBCResult(marshal(r)) }
- }
- }
- override def insertRows: Flow[MemberRow, JDBCResult, NotUsed] = {
- logger.info("**** insertRows called on service side ***")
- val insertSQL = """
- insert into members(
- name,
- birthday,
- description,
- created_at
- ) values (?, ?, ?, ?)
- """
- Flow[MemberRow]
- .flatMapConcat { row =>
- val ctx = JDBCContext('h2)
- .setUpdateCommand(true,insertSQL,
- row.name,
- jdbcSetDate(row.birthday.get.yyyy,row.birthday.get.mm,row.birthday.get.dd),
- row.description,
- jdbcSetNow
- )
- logger.info(s"**** JDBCContext => ${ctx} ***")
- Source
- .fromFuture(jdbcTxUpdates[Vector](ctx))
- .map { r => JDBCResult(marshal(r)) }
- }
- }
- override def updateRows: Flow[MemberRow, JDBCResult, NotUsed] = {
- logger.info("**** updateRows called on service side ***")
- val updateSQL = "update members set description = ?, created_at = ? where name = ?"
- Flow[MemberRow]
- .flatMapConcat { row =>
- val ctx = JDBCContext('h2)
- .setBatchCommand(updateSQL)
- .appendBatchParameters(
- row.name + " updated.",
- jdbcSetNow,
- row.name
- ).setBatchReturnGeneratedKeyOption(true)
- logger.info(s"**** JDBCContext => ${ctx} ***")
- Source
- .fromFuture(jdbcBatchUpdate[Vector](ctx))
- .map { r => JDBCResult(marshal(r)) }
- }
- }
- val toMemberRow = (rs: WrappedResultSet) => MemberRow(
- name = rs.string("name"),
- description = rs.string("description"),
- birthday = None,
- createdAt = None,
- picture = _root_.com.google.protobuf.ByteString.EMPTY
- )
- override def getMembers: Flow[JDBCQuery, MemberRow, NotUsed] = {
- logger.info("**** getMembers called on service side ***")
- Flow[JDBCQuery]
- .flatMapConcat { q =>
- //unpack JDBCQuery and construct the context
- var params: Seq[Any] = Nil
- if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
- params = unmarshal[Seq[Any]](q.parameters)
- logger.info(s"**** query parameters: ${params} ****")
- val ctx = JDBCQueryContext[MemberRow](
- dbName = Symbol(q.dbName),
- statement = q.statement,
- parameters = params,
- fetchSize = q.fetchSize.getOrElse(100),
- autoCommit = q.autoCommit.getOrElse(false),
- queryTimeout = q.queryTimeout
- )
- jdbcAkkaStream(ctx, toMemberRow)
- }
- }
- }
JDBCServer.scala
- package demo.grpc.jdbc.server
- import java.util.logging.Logger
- import akka.actor.ActorSystem
- import akka.stream.ActorMaterializer
- import io.grpc.Server
- import demo.grpc.jdbc.services._
- import io.grpc.ServerBuilder
- import grpc.jdbc.services._
- class gRPCServer(server: Server) {
- val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)
- def start(): Unit = {
- server.start()
- logger.info(s"Server started, listening on ${server.getPort}")
- sys.addShutdownHook {
- // Use stderr here since the logger may has been reset by its JVM shutdown hook.
- System.err.println("*** shutting down gRPC server since JVM is shutting down")
- stop()
- System.err.println("*** server shut down")
- }
- ()
- }
- def stop(): Unit = {
- server.shutdown()
- }
- /**
- * Await termination on the main thread since the grpc library uses daemon threads.
- */
- def blockUntilShutdown(): Unit = {
- server.awaitTermination()
- }
- }
- object JDBCServer extends App {
- import sdp.jdbc.config._
- implicit val system = ActorSystem("JDBCServer")
- implicit val mat = ActorMaterializer.create(system)
- implicit val ec = system.dispatcher
- ConfigDBsWithEnv("dev").setup('h2)
- ConfigDBsWithEnv("dev").loadGlobalSettings()
- val server = new gRPCServer(
- ServerBuilder
- .forPort(50051)
- .addService(
- JdbcGrpcAkkaStream.bindService(
- new JDBCStreamingServices
- )
- ).build()
- )
- server.start()
- // server.blockUntilShutdown()
- scala.io.StdIn.readLine()
- ConfigDBsWithEnv("dev").close('h2)
- mat.shutdown()
- system.terminate()
- }
JDBCClient.scala
- package demo.grpc.jdbc.client
- import grpc.jdbc.services._
- import java.util.logging.Logger
- import protobuf.bytes.Converter._
- import akka.stream.scaladsl._
- import akka.NotUsed
- import akka.actor.ActorSystem
- import akka.stream.{ActorMaterializer, ThrottleMode}
- import io.grpc._
- import sdp.jdbc.engine._
- class JDBCStreamClient(host: String, port: Int) {
- val logger: Logger = Logger.getLogger(classOf[JDBCStreamClient].getName)
- val channel = ManagedChannelBuilder
- .forAddress(host,port)
- .usePlaintext(true)
- .build()
- val stub = JdbcGrpcAkkaStream.stub(channel)
- val query = JDBCQuery (
- dbName = "h2",
- statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
- parameters = marshal(Seq("Arizona", 2))
- )
- val query2 = JDBCQuery (
- dbName = "h2",
- statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
- parameters = marshal(Seq("Colorado", 3))
- )
- val query3= JDBCQuery (
- dbName = "h2",
- statement = "select * from AQMRPT where STATENAME = ? and VALUE = ?",
- parameters = marshal(Seq("Arkansas", 8))
- )
- def queryRows: Source[JDBCDataRow,NotUsed] = {
- logger.info(s"running queryRows ...")
- Source
- .single(query)
- .via(stub.runQuery)
- }
- def batQueryRows: Source[JDBCDataRow,NotUsed] = {
- logger.info(s"running batQueryRows ...")
- Source
- .fromIterator(() => List(query,query2,query3).toIterator)
- .via(stub.batQuery)
- }
- val dropSQL: String ="""
- drop table members
- """
- val createSQL: String ="""
- create table members (
- id serial not null primary key,
- name varchar(30) not null,
- description varchar(1000),
- birthday date,
- created_at timestamp not null,
- picture blob
- )"""
- val ctx = JDBCUpdate (
- dbName = "h2",
- sqlType = JDBCContext.SQL_EXEDDL,
- statements = Seq(dropSQL,createSQL)
- )
- def createTbl: Source[JDBCResult,NotUsed] = {
- logger.info(s"running createTbl ...")
- Source
- .single(ctx)
- .via(stub.runDDL)
- }
- val p1 = MemberRow( "Peter Chan",Some(JDBCDate(1967,5,17)),"new member1",None,_root_.com.google.protobuf.ByteString.EMPTY)
- val p2 = MemberRow( "Alanda Wong",Some(JDBCDate(1980,11,10)),"new member2",None,_root_.com.google.protobuf.ByteString.EMPTY)
- val p3 = MemberRow( "Kate Zhang",Some(JDBCDate(1969,8,13)),"new member3",None,_root_.com.google.protobuf.ByteString.EMPTY)
- val p4 = MemberRow( "Tiger Chan",Some(JDBCDate(1962,5,1)),"new member4",None,_root_.com.google.protobuf.ByteString.EMPTY)
- def insertRows: Source[JDBCResult,NotUsed] = {
- logger.info(s"running insertRows ...")
- Source
- .fromIterator(() => List(p1,p2,p3,p4).toIterator)
- .via(stub.insertRows)
- }
- val queryMember = JDBCQuery (
- dbName = "h2",
- statement = "select * from members"
- )
- def updateRows: Source[JDBCResult,NotUsed] = {
- logger.info(s"running updateRows ...")
- Source
- .single(queryMember)
- .via(stub.getMembers)
- .via(stub.updateRows)
- }
- def updateBatches: Source[JDBCDataRow,NotUsed] = {
- logger.info(s"running updateBatches ...")
- Source
- .fromIterator(() => List(query,query2,query3).toIterator)
- .via(stub.batQuery)
- .via(stub.updateBat)
- }
- }
- object TestConversion extends App {
- val orgval: Seq[Option[Any]] = Seq(Some(1),Some("id"),None,Some(2))
- println(s"original value: ${orgval}")
- val marval = marshal(orgval)
- println(s"marshal value: ${marval}")
- val unmval = unmarshal[Seq[Option[Any]]](marval)
- println(s"marshal value: ${unmval}")
- val m1 = MemberRow(name = "Peter")
- val m2 = m1.update(
- _.birthday.yyyy := 1989,
- _.birthday.mm := 10,
- _.birthday.dd := 3,
- _.description := "a new member"
- )
- }
- object QueryRows extends App {
- implicit val system = ActorSystem("QueryRows")
- implicit val mat = ActorMaterializer.create(system)
- val client = new JDBCStreamClient("localhost", 50051)
- client.queryRows.runForeach { r => println(r) }
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object BatQueryRows extends App {
- implicit val system = ActorSystem("BatQueryRows")
- implicit val mat = ActorMaterializer.create(system)
- val client = new JDBCStreamClient("localhost", 50051)
- client.batQueryRows.runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object RunDDL extends App {
- implicit val system = ActorSystem("RunDDL")
- implicit val mat = ActorMaterializer.create(system)
- val client = new JDBCStreamClient("localhost", 50051)
- client.createTbl.runForeach{r => println(unmarshal[Seq[Any]](r.result))}
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object InsertRows extends App {
- implicit val system = ActorSystem("InsertRows")
- implicit val mat = ActorMaterializer.create(system)
- val client = new JDBCStreamClient("localhost", 50051)
- client.insertRows.runForeach { r => println(unmarshal[Vector[Long]](r.result)) }
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object UpdateRows extends App {
- implicit val system = ActorSystem("UpdateRows")
- implicit val mat = ActorMaterializer.create(system)
- val client = new JDBCStreamClient("localhost", 50051)
- client.updateRows.runForeach{ r => println(unmarshal[Vector[Long]](r.result)) }
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object BatUpdates extends App {
- implicit val system = ActorSystem("BatUpdates")
- implicit val mat = ActorMaterializer.create(system)
- val client = new JDBCStreamClient("localhost", 50051)
- client.updateBatches.runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
ByteConverter.scala
- package protobuf.bytes
- import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
- import com.google.protobuf.ByteString
- object Converter {
- def marshal(value: Any): ByteString = {
- val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(stream)
- oos.writeObject(value)
- oos.close()
- ByteString.copyFrom(stream.toByteArray())
- }
- def unmarshal[A](bytes: ByteString): A = {
- val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
- val value = ois.readObject()
- ois.close()
- value.asInstanceOf[A]
- }
- }
其它部分的源代码和系统设置可以从上次的讨论稿中获取。