在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。
在前面sdp系列讨论里我们已经实现了Cassandra-Engine。它的运作原理还是通过某种Context把指令提交给cassandra去执行。我们先设计一个创建库表的例子。CQL语句和Cassandra-Engine程序代码如下,这是客户端部分:
- val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT"
- val createCQL ="""
- CREATE TABLE testdb.AQMRPT (
- rowid bigint primary key,
- measureid bigint,
- statename text,
- countyname text,
- reportyear int,
- value int,
- created timestamp
- )"""
- val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
- def createTbl: Source[CQLResult,NotUsed] = {
- log.info(s"running createTbl ...")
- Source
- .single(cqlddl)
- .via(stub.runDDL)
- }
首先,我们在CQLUpdate这个protobuf对应Context里传入两条指令dropCQL和createCQL,可以预计这会是一种批次型batch方式。然后一如既往,我们使用了streaming编程模式。在.proto文件里用DDL来对应Context和Service:
- message CQLUpdate {
- repeated string statements = 1;
- bytes parameters = 2;
- google.protobuf.Int32Value consistency = 3;
- google.protobuf.BoolValue batch = 4;
- }
- service CQLServices {
- rpc runDDL(CQLUpdate) returns (CQLResult) {}
- }
服务函数runDDL程序实现如下:
- override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
- Flow[CQLUpdate]
- .flatMapConcat { context =>
- //unpack CQLUpdate and construct the context
- val ctx = CQLContext(context.statements)
- log.info(s"**** CQLContext => ${ctx} ***")
- Source
- .fromFuture(cqlExecute(ctx))
- .map { r => CQLResult(marshal(r)) }
- }
- }
这里我们调用了Cassandra-Engine的cqlExecute(ctx)函数:
- def cqlExecute(ctx: CQLContext)(
- implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
- var invalidBat = false
- if ( ctx.batch ) {
- if (ctx.parameters == Nil)
- invalidBat = true
- else if (ctx.parameters.size < 2)
- invalidBat = true;
- }
- if (!ctx.batch || invalidBat) {
- if(invalidBat)
- log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
- if (ctx.statements.size == 1) {
- var param: Seq[Object] = Nil
- if (ctx.parameters != Nil) param = ctx.parameters.head
- log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
- cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
- }
- else {
- var params: Seq[Seq[Object]] = Nil
- if (ctx.parameters == Nil)
- params = Seq.fill(ctx.statements.length)(Nil)
- else {
- if (ctx.statements.size > ctx.parameters.size) {
- log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
- val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
- params = ctx.parameters ++ nils
- }
- else
- params = ctx.parameters
- }
- val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
- log.info(s"cqlExecute> multi-commands: ${commands}")
- /*
- //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
- //therefore, make sure no command replies on prev command effect
- val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
- cqlSingleUpdate(ctx.consistency, stmt, param)
- }.toList
- val futList = lstCmds.sequence.map(_ => true) //must map to execute
- */
- /*
- //using traverse to have some degree of parallelism = max(runtimes)
- //therefore, make sure no command replies on prev command effect
- val futList = Future.traverse(commands) { case (stmt,param) =>
- cqlSingleUpdate(ctx.consistency, stmt, param)
- }.map(_ => true)
- Await.result(futList, 3 seconds)
- Future.successful(true)
- */
- // run sync directly
- Future {
- commands.foreach { case (stm, pars) =>
- cqlExecuteSync(ctx.consistency, stm, pars)
- }
- true
- }
- }
- }
- else
- cqlBatchUpdate(ctx)
- }
特别展示了这个函数的代码是因为对于一批次多条指令可能会涉及到non-blocking和并行计算。可参考上面代码标注段落里函数式方法(cats)sequence,traverse如何实现对一串Future的运算。
下一个例子是用流方式把JDBC数据库数据并入cassandra数据库里。.proto DDL内容如下:
- message ProtoDate {
- int32 yyyy = 1;
- int32 mm = 2;
- int32 dd = 3;
- }
- message ProtoTime {
- int32 hh = 1;
- int32 mm = 2;
- int32 ss = 3;
- int32 nnn = 4;
- }
- message ProtoDateTime {
- ProtoDate date = 1;
- ProtoTime time = 2;
- }
- message AQMRPTRow {
- int64 rowid = 1;
- string countyname = 2;
- string statename = 3;
- int64 measureid = 4;
- int32 reportyear = 5;
- int32 value = 6;
- ProtoDateTime created = 7;
- }
- message CQLResult {
- bytes result = 1;
- }
- message CQLUpdate {
- repeated string statements = 1;
- bytes parameters = 2;
- google.protobuf.Int32Value consistency = 3;
- google.protobuf.BoolValue batch = 4;
- }
- service CQLServices {
- rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
- rpc runDDL(CQLUpdate) returns (CQLResult) {}
- }
下面是服务函数的实现:
- val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
- row.rowid.asInstanceOf[Object],
- row.measureid.asInstanceOf[Object],
- row.statename,
- row.countyname,
- row.reportyear.asInstanceOf[Object],
- row.value.asInstanceOf[Object],
- CQLDateTimeNow
- )
- val cqlInsert ="""
- |insert into testdb.AQMRPT(
- | rowid,
- | measureid,
- | statename,
- | countyname,
- | reportyear,
- | value,
- | created)
- | values(?,?,?,?,?,?,?)
- """.stripMargin
-
- val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2)
- .setProcessOrder(false)
- /*
- val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
- Flow[AQMRPTRow]
- .via(cqlActionStream.performOnRow)
- */
- val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
- Flow[AQMRPTRow]
- .mapAsync(cqlActionStream.parallelism){ row =>
- if (IfExists(row.rowid))
- Future.successful(CQLResult(marshal(0)))
- else
- cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))}
- }
- }
- override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
- Flow[AQMRPTRow]
- .via(cqlActionFlow)
- }
- private def IfExists(rowid: Long): Boolean = {
- val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
- val param = Seq(rowid.asInstanceOf[Object])
- val toRowId: Row => Long = r => r.getLong("rowid")
- val ctx = CQLQueryContext(cql,param)
- val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
- val fut = src.toMat(Sink.headOption)(Keep.right).run()
- val result = Await.result(fut,3 seconds)
- log.info(s"checking existence: ${result}")
- result match {
- case Some(x) => true
- case None => false
- }
- }
在上面的代码里我们调用了Cassandra-Engine的CassandraActionStream类型的流处理方法。值得注意的是这里我们尝试在stream Flow里运算另一个Flow,如:IfExists函数里运算一个Source来确定rowid是否存在。不要在意这个函数的实际应用,它只是一个人为的例子。另外,rowid:Long这样的定义是硬性规定的。cassandra对数据类型的匹配要求很弱智,没有提供任何自然转换。所以,Int <> Long被视为类型错误,而且无法catch任何明白的错误信息。
这项服务的客户端调用如下:
- val stub = CqlGrpcAkkaStream.stub(channel)
- val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
- dbName = 'h2,
- statement = "select * from AQMRPT where statename='Arkansas'"
- )
- def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
- rowid = rs.long("ROWID"),
- measureid = rs.long("MEASUREID"),
- statename = rs.string("STATENAME"),
- countyname = rs.string("COUNTYNAME"),
- reportyear = rs.int("REPORTYEAR"),
- value = rs.int("VALUE"),
- created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0))))
- )
- import scala.concurrent.duration._
- def transferRows: Source[CQLResult, NotUsed] = {
- log.info(s"**** calling transferRows ****")
- jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
- // .throttle(1, 500.millis, 1, ThrottleMode.shaping)
- .via(stub.transferRows)
- }
注意:JDBC在客户端本地,cassandra是远程服务。
最后我们示范一下cassandra Query。.proto DDL 定义:
- message CQLQuery {
- string statement = 1;
- bytes parameters = 2;
- google.protobuf.Int32Value consistency = 3;
- google.protobuf.Int32Value fetchSize = 4;
- }
- service CQLServices {
- rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
- rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
- rpc runDDL(CQLUpdate) returns (CQLResult) {}
- }
服务函数代码如下:
- def toCQLTimestamp(rs: Row) = {
- try {
- val tm = rs.getTimestamp("CREATED")
- if (tm == null) None
- else {
- val localdt = cqlGetTimestamp(tm)
- Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
- Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
- }
- }
- catch {
- case e: Exception => None
- }
- }
- val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
- rowid = rs.getLong("ROWID"),
- measureid = rs.getLong("MEASUREID"),
- statename = rs.getString("STATENAME"),
- countyname = rs.getString("COUNTYNAME"),
- reportyear = rs.getInt("REPORTYEAR"),
- value = rs.getInt("VALUE"),
- created = toCQLTimestamp(rs)
- )
- override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
- log.info("**** runQuery called on service side ***")
- Flow[CQLQuery]
- .flatMapConcat { q =>
- //unpack JDBCQuery and construct the context
- var params: Seq[Object] = Nil
- if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
- params = unmarshal[Seq[Object]](q.parameters)
- log.info(s"**** query parameters: ${params} ****")
- val ctx = CQLQueryContext(q.statement,params)
- CQLEngine.cassandraStream(ctx,toAQMRow)
- }
- }
这里值得看看的一是日期转换,二是对于cassandra parameter Seq[Object]的marshal和unmarshal。客户端代码:
- val query = CQLQuery(
- statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
- parameters = marshal(Seq("Arkansas", 0.toInt))
- )
- val query2 = CQLQuery (
- statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
- parameters = marshal(Seq("Colorado", 3.toInt))
- )
- val query3= CQLQuery (
- statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
- parameters = marshal(Seq("Arkansas", 8.toInt))
- )
- def queryRows: Source[AQMRPTRow,NotUsed] = {
- log.info(s"running queryRows ...")
- Source
- .single(query)
- .via(stub.runQuery)
- }
这段相对直白。
下面就是本次讨论涉及的完整源代码:
project/scalapb.sbt
- addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
- resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
- libraryDependencies ++= Seq(
- "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4",
- "beyondthelines" %% "grpcakkastreamgenerator" % "0.0.5"
- )
build.sbt
- import scalapb.compiler.Version.scalapbVersion
- import scalapb.compiler.Version.grpcJavaVersion
- name := "gRPCCassandra"
- version := "0.1"
- scalaVersion := "2.12.6"
- resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
- scalacOptions += "-Ypartial-unification"
- libraryDependencies := Seq(
- "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
- "io.grpc" % "grpc-netty" % grpcJavaVersion,
- "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
- "io.monix" %% "monix" % "2.3.0",
- // for GRPC Akkastream
- "beyondthelines" %% "grpcakkastreamruntime" % "0.0.5",
- // for scalikejdbc
- "org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
- "org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
- "org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
- "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
- "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
- "com.h2database" % "h2" % "1.4.196",
- "mysql" % "mysql-connector-java" % "6.0.6",
- "org.postgresql" % "postgresql" % "42.2.0",
- "commons-dbcp" % "commons-dbcp" % "1.4",
- "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
- "com.zaxxer" % "HikariCP" % "2.7.4",
- "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
- "com.typesafe.slick" %% "slick" % "3.2.1",
- //for cassandra 340
- "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
- "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
- "com.typesafe.akka" %% "akka-stream" % "2.5.13",
- "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.19",
- "ch.qos.logback" % "logback-classic" % "1.2.3",
- "org.typelevel" %% "cats-core" % "1.1.0"
- )
- PB.targets in Compile := Seq(
- scalapb.gen() -> (sourceManaged in Compile).value,
- // generate the akka stream files
- grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value
- )
main/resources/application.conf
- # JDBC settings
- test {
- db {
- h2 {
- driver = "org.h2.Driver"
- url = "jdbc:h2:tcp://localhost/~/slickdemo"
- user = ""
- password = ""
- poolInitialSize = 5
- poolMaxSize = 7
- poolConnectionTimeoutMillis = 1000
- poolValidationQuery = "select 1 as one"
- poolFactoryName = "commons-dbcp2"
- }
- }
- db.mysql.driver = "com.mysql.cj.jdbc.Driver"
- db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
- db.mysql.user = "root"
- db.mysql.password = "123"
- db.mysql.poolInitialSize = 5
- db.mysql.poolMaxSize = 7
- db.mysql.poolConnectionTimeoutMillis = 1000
- db.mysql.poolValidationQuery = "select 1 as one"
- db.mysql.poolFactoryName = "bonecp"
- # scallikejdbc Global settings
- scalikejdbc.global.loggingSQLAndTime.enabled = true
- scalikejdbc.global.loggingSQLAndTime.logLevel = info
- scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
- scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
- scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
- scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
- scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
- scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
- }
- dev {
- db {
- h2 {
- driver = "org.h2.Driver"
- url = "jdbc:h2:tcp://localhost/~/slickdemo"
- user = ""
- password = ""
- poolFactoryName = "hikaricp"
- numThreads = 10
- maxConnections = 12
- minConnections = 4
- keepAliveConnection = true
- }
- mysql {
- driver = "com.mysql.cj.jdbc.Driver"
- url = "jdbc:mysql://localhost:3306/testdb"
- user = "root"
- password = "123"
- poolInitialSize = 5
- poolMaxSize = 7
- poolConnectionTimeoutMillis = 1000
- poolValidationQuery = "select 1 as one"
- poolFactoryName = "bonecp"
- }
- postgres {
- driver = "org.postgresql.Driver"
- url = "jdbc:postgresql://localhost:5432/testdb"
- user = "root"
- password = "123"
- poolFactoryName = "hikaricp"
- numThreads = 10
- maxConnections = 12
- minConnections = 4
- keepAliveConnection = true
- }
- }
- # scallikejdbc Global settings
- scalikejdbc.global.loggingSQLAndTime.enabled = true
- scalikejdbc.global.loggingSQLAndTime.logLevel = info
- scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
- scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
- scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
- scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
- scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
- scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
- }
main/resources/logback.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <configuration>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <layout class="ch.qos.logback.classic.PatternLayout">
- <Pattern>
- %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
- </Pattern>
- </layout>
- </appender>
-
- <logger name="sdp.cql" level="info"
- additivity="false">
- <appender-ref ref="STDOUT" />
- </logger>
-
- <logger name="demo.sdp.grpc.cql" level="info"
- additivity="false">
- <appender-ref ref="STDOUT" />
- </logger>
-
- <root level="error">
- <appender-ref ref="STDOUT" />
- </root>
-
- </configuration>
main/protobuf/cql.proto
- syntax = "proto3";
- import "google/protobuf/wrappers.proto";
- import "google/protobuf/any.proto";
- import "scalapb/scalapb.proto";
- option (scalapb.options) = {
- // use a custom Scala package name
- // package_name: "io.ontherocks.introgrpc.demo"
- // don't append file name to package
- flat_package: true
-
- // generate one Scala file for all messages (services still get their own file)
- single_file: true
-
- // add imports to generated file
- // useful when extending traits or using custom types
- // import: "io.ontherocks.hellogrpc.RockingMessage"
- // code to put at the top of generated file
- // works only with `single_file: true`
- //preamble: "sealed trait SomeSealedTrait"
- };
- /*
- * Demoes various customization options provided by ScalaPBs.
- */
- package sdp.grpc.services;
- message ProtoDate {
- int32 yyyy = 1;
- int32 mm = 2;
- int32 dd = 3;
- }
- message ProtoTime {
- int32 hh = 1;
- int32 mm = 2;
- int32 ss = 3;
- int32 nnn = 4;
- }
- message ProtoDateTime {
- ProtoDate date = 1;
- ProtoTime time = 2;
- }
- message AQMRPTRow {
- int64 rowid = 1;
- string countyname = 2;
- string statename = 3;
- int64 measureid = 4;
- int32 reportyear = 5;
- int32 value = 6;
- ProtoDateTime created = 7;
- }
- message CQLResult {
- bytes result = 1;
- }
- message CQLQuery {
- string statement = 1;
- bytes parameters = 2;
- google.protobuf.Int32Value consistency = 3;
- google.protobuf.Int32Value fetchSize = 4;
- }
- message CQLUpdate {
- repeated string statements = 1;
- bytes parameters = 2;
- google.protobuf.Int32Value consistency = 3;
- google.protobuf.BoolValue batch = 4;
- }
- message HelloMsg {
- string hello = 1;
- }
- service CQLServices {
- rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {}
- rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
- rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
- rpc runDDL(CQLUpdate) returns (CQLResult) {}
- }
logging/log.scala
- package sdp.logging
- import org.slf4j.Logger
- /**
- * Logger which just wraps org.slf4j.Logger internally.
- *
- * @param logger logger
- */
- class Log(logger: Logger) {
- // use var consciously to enable squeezing later
- var isDebugEnabled: Boolean = logger.isDebugEnabled
- var isInfoEnabled: Boolean = logger.isInfoEnabled
- var isWarnEnabled: Boolean = logger.isWarnEnabled
- var isErrorEnabled: Boolean = logger.isErrorEnabled
- def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
- level match {
- case 'debug | 'DEBUG => debug(msg)
- case 'info | 'INFO => info(msg)
- case 'warn | 'WARN => warn(msg)
- case 'error | 'ERROR => error(msg)
- case _ => // nothing to do
- }
- }
- def debug(msg: => String): Unit = {
- if (isDebugEnabled && logger.isDebugEnabled) {
- logger.debug(msg)
- }
- }
- def debug(msg: => String, e: Throwable): Unit = {
- if (isDebugEnabled && logger.isDebugEnabled) {
- logger.debug(msg, e)
- }
- }
- def info(msg: => String): Unit = {
- if (isInfoEnabled && logger.isInfoEnabled) {
- logger.info(msg)
- }
- }
- def info(msg: => String, e: Throwable): Unit = {
- if (isInfoEnabled && logger.isInfoEnabled) {
- logger.info(msg, e)
- }
- }
- def warn(msg: => String): Unit = {
- if (isWarnEnabled && logger.isWarnEnabled) {
- logger.warn(msg)
- }
- }
- def warn(msg: => String, e: Throwable): Unit = {
- if (isWarnEnabled && logger.isWarnEnabled) {
- logger.warn(msg, e)
- }
- }
- def error(msg: => String): Unit = {
- if (isErrorEnabled && logger.isErrorEnabled) {
- logger.error(msg)
- }
- }
- def error(msg: => String, e: Throwable): Unit = {
- if (isErrorEnabled && logger.isErrorEnabled) {
- logger.error(msg, e)
- }
- }
- }
logging/LogSupport.scala
- package sdp.logging
- import org.slf4j.LoggerFactory
- trait LogSupport {
- /**
- * Logger
- */
- protected val log = new Log(LoggerFactory.getLogger(this.getClass))
- }
filestreaming/FileStreaming.scala
- package sdp.file
- import java.io.{ByteArrayInputStream, InputStream}
- import java.nio.ByteBuffer
- import java.nio.file.Paths
- import akka.stream.Materializer
- import akka.stream.scaladsl.{FileIO, StreamConverters}
- import akka.util._
- import scala.concurrent.Await
- import scala.concurrent.duration._
- object Streaming {
- def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
- implicit mat: Materializer):ByteBuffer = {
- val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
- hd ++ bs
- }
- (Await.result(fut, timeOut)).toByteBuffer
- }
- def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
- implicit mat: Materializer): Array[Byte] = {
- val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
- hd ++ bs
- }
- (Await.result(fut, timeOut)).toArray
- }
- def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
- implicit mat: Materializer): InputStream = {
- val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
- hd ++ bs
- }
- val buf = (Await.result(fut, timeOut)).toArray
- new ByteArrayInputStream(buf)
- }
- def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
- implicit mat: Materializer) = {
- val ba = new Array[Byte](byteBuf.remaining())
- byteBuf.get(ba,0,ba.length)
- val baInput = new ByteArrayInputStream(ba)
- val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
- source.runWith(FileIO.toPath(Paths.get(fileName)))
- }
- def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
- implicit mat: Materializer) = {
- val bb = ByteBuffer.wrap(bytes)
- val baInput = new ByteArrayInputStream(bytes)
- val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
- source.runWith(FileIO.toPath(Paths.get(fileName)))
- }
- def InputStreamToFile(is: InputStream, fileName: String)(
- implicit mat: Materializer) = {
- val source = StreamConverters.fromInputStream(() => is)
- source.runWith(FileIO.toPath(Paths.get(fileName)))
- }
- }
jdbc/JDBCConfig.scala
- package sdp.jdbc.config
- import scala.collection.mutable
- import scala.concurrent.duration.Duration
- import scala.language.implicitConversions
- import com.typesafe.config._
- import java.util.concurrent.TimeUnit
- import java.util.Properties
- import scalikejdbc.config._
- import com.typesafe.config.Config
- import com.zaxxer.hikari._
- import scalikejdbc.ConnectionPoolFactoryRepository
- /** Extension methods to make Typesafe Config easier to use */
- class ConfigExtensionMethods(val c: Config) extends AnyVal {
- import scala.collection.JavaConverters._
- def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
- def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
- def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
- def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default
- def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
- def getDurationOr(path: String, default: => Duration = Duration.Zero) =
- if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default
- def getPropertiesOr(path: String, default: => Properties = null): Properties =
- if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default
- def toProperties: Properties = {
- def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
- val props = new Properties(null)
- m.foreach { case (k, cv) =>
- val v =
- if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
- else if(cv.unwrapped eq null) null
- else cv.unwrapped.toString
- if(v ne null) props.put(k, v)
- }
- props
- }
- toProps(c.root.asScala)
- }
- def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
- def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
- def getStringOpt(path: String) = Option(getStringOr(path))
- def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
- }
- object ConfigExtensionMethods {
- @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
- }
- trait HikariConfigReader extends TypesafeConfigReader {
- self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix =>
- import ConfigExtensionMethods.configExtensionMethods
- def getFactoryName(dbName: Symbol): String = {
- val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
- c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
- }
- def hikariCPConfig(dbName: Symbol): HikariConfig = {
- val hconf = new HikariConfig()
- val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
- // Connection settings
- if (c.hasPath("dataSourceClass")) {
- hconf.setDataSourceClassName(c.getString("dataSourceClass"))
- } else {
- Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
- }
- hconf.setJdbcUrl(c.getStringOr("url", null))
- c.getStringOpt("user").foreach(hconf.setUsername)
- c.getStringOpt("password").foreach(hconf.setPassword)
- c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)
- // Pool configuration
- hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
- hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
- hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
- hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
- hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
- hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
- c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
- c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
- val numThreads = c.getIntOr("numThreads", 20)
- hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
- hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
- hconf.setPoolName(c.getStringOr("poolName", dbName.name))
- hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))
- // Equivalent of ConnectionPreparer
- hconf.setReadOnly(c.getBooleanOr("readOnly", false))
- c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
- hconf.setCatalog(c.getStringOr("catalog", null))
- hconf
- }
- }
- import scalikejdbc._
- trait ConfigDBs {
- self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>
- def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
- getFactoryName(dbName) match {
- case "hikaricp" => {
- val hconf = hikariCPConfig(dbName)
- val hikariCPSource = new HikariDataSource(hconf)
- case class HikariDataSourceCloser(src: HikariDataSource) extends DataSourceCloser {
- var closed = false
- override def close(): Unit = src.close()
- }
- if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
- Class.forName(hconf.getDriverClassName)
- }
- ConnectionPool.add(dbName, new DataSourceConnectionPool(dataSource = hikariCPSource,settings = DataSourceConnectionPoolSettings(),
- closer = HikariDataSourceCloser(hikariCPSource)))
- }
- case _ => {
- val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
- val cpSettings = readConnectionPoolSettings(dbName)
- if (driver != null && driver.trim.nonEmpty) {
- Class.forName(driver)
- }
- ConnectionPool.add(dbName, url, user, password, cpSettings)
- }
- }
- }
- def setupAll(): Unit = {
- loadGlobalSettings()
- dbNames.foreach { dbName => setup(Symbol(dbName)) }
- }
- def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
- ConnectionPool.close(dbName)
- }
- def closeAll(): Unit = {
- ConnectionPool.closeAll
- }
- }
- object ConfigDBs extends ConfigDBs
- with TypesafeConfigReader
- with StandardTypesafeConfig
- with HikariConfigReader
- case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
- with TypesafeConfigReader
- with StandardTypesafeConfig
- with HikariConfigReader
- with EnvPrefix {
- override val env = Option(envValue)
- }
jdbc/JDBCEngine.scala
- package sdp.jdbc.engine
- import java.sql.PreparedStatement
- import scala.collection.generic.CanBuildFrom
- import akka.stream.scaladsl._
- import scalikejdbc._
- import scalikejdbc.streams._
- import akka.NotUsed
- import akka.stream._
- import java.time._
- import scala.concurrent.duration._
- import scala.concurrent._
- import sdp.file.Streaming._
- import scalikejdbc.TxBoundary.Try._
- import scala.concurrent.ExecutionContextExecutor
- import java.io.InputStream
- import sdp.logging.LogSupport
- object JDBCContext {
- type SQLTYPE = Int
- val SQL_EXEDDL= 1
- val SQL_UPDATE = 2
- val RETURN_GENERATED_KEYVALUE = true
- val RETURN_UPDATED_COUNT = false
- }
- case class JDBCQueryContext[M](
- dbName: Symbol,
- statement: String,
- parameters: Seq[Any] = Nil,
- fetchSize: Int = 100,
- autoCommit: Boolean = false,
- queryTimeout: Option[Int] = None)
- case class JDBCContext (
- dbName: Symbol,
- statements: Seq[String] = Nil,
- parameters: Seq[Seq[Any]] = Nil,
- fetchSize: Int = 100,
- queryTimeout: Option[Int] = None,
- queryTags: Seq[String] = Nil,
- sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
- batch: Boolean = false,
- returnGeneratedKey: Seq[Option[Any]] = Nil,
- // no return: None, return by index: Some(1), by name: Some("id")
- preAction: Option[PreparedStatement => Unit] = None,
- postAction: Option[PreparedStatement => Unit] = None)
- extends LogSupport {
- ctx =>
-
- //helper functions
- def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)
- def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)
- def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)
- def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)
- def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
- if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
- !ctx.batch && ctx.statements.size == 1) {
- val nc = ctx.copy(preAction = action)
- log.info("setPreAction> set")
- nc
- }
- else {
- log.info("setPreAction> JDBCContex setting error: preAction not supported!")
- throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
- }
- }
- def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
- if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
- !ctx.batch && ctx.statements.size == 1) {
- val nc = ctx.copy(postAction = action)
- log.info("setPostAction> set")
- nc
- }
- else {
- log.info("setPreAction> JDBCContex setting error: postAction not supported!")
- throw new IllegalStateException("JDBCContex setting error: postAction not supported!")
- }
- }
- def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
- if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
- log.info(s"appendDDLCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
- val nc = ctx.copy(
- statements = ctx.statements ++ Seq(_statement),
- parameters = ctx.parameters ++ Seq(Seq(_parameters))
- )
- log.info(s"appendDDLCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- } else {
- log.info(s"appendDDLCommand> JDBCContex setting error: option not supported!")
- throw new IllegalStateException("JDBCContex setting error: option not supported!")
- }
- }
- def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
- if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
- log.info(s"appendUpdateCommand> appending: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}")
- val nc = ctx.copy(
- statements = ctx.statements ++ Seq(_statement),
- parameters = ctx.parameters ++ Seq(_parameters),
- returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
- )
- log.info(s"appendUpdateCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- } else {
- log.info(s"appendUpdateCommand> JDBCContex setting error: option not supported!")
- throw new IllegalStateException("JDBCContex setting error: option not supported!")
- }
- }
- def appendBatchParameters(_parameters: Any*): JDBCContext = {
- log.info(s"appendBatchParameters> appending: parameters: ${_parameters}")
- if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) {
- log.info(s"appendBatchParameters> JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
- throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
- }
- var matchParams = true
- if (ctx.parameters != Nil)
- if (ctx.parameters.head.size != _parameters.size)
- matchParams = false
- if (matchParams) {
- val nc = ctx.copy(
- parameters = ctx.parameters ++ Seq(_parameters)
- )
- log.info(s"appendBatchParameters> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- } else {
- log.info(s"appendBatchParameters> JDBCContex setting error: batch command parameters not match!")
- throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
- }
- }
- def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
- if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
- throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
- ctx.copy(
- returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
- )
- }
- def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
- log.info(s"setDDLCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
- val nc = ctx.copy(
- statements = Seq(_statement),
- parameters = Seq(_parameters),
- sqlType = JDBCContext.SQL_EXEDDL,
- batch = false
- )
- log.info(s"setDDLCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- }
- def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
- log.info(s"setUpdateCommand> setting: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}")
- val nc = ctx.copy(
- statements = Seq(_statement),
- parameters = Seq(_parameters),
- returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
- sqlType = JDBCContext.SQL_UPDATE,
- batch = false
- )
- log.info(s"setUpdateCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- }
- def setBatchCommand(_statement: String): JDBCContext = {
- log.info(s"setBatchCommand> appending: statement: ${_statement}")
- val nc = ctx.copy (
- statements = Seq(_statement),
- sqlType = JDBCContext.SQL_UPDATE,
- batch = true
- )
- log.info(s"setBatchCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- }
- }
- object JDBCEngine extends LogSupport {
- import JDBCContext._
- type JDBCDate = LocalDate
- type JDBCDateTime = LocalDateTime
- type JDBCTime = LocalTime
- def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
- def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn)
- def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) = LocalDateTime.of(date,time)
- def jdbcSetNow = LocalDateTime.now()
- def jdbcGetDate(sqlDate: java.sql.Date): java.time.LocalDate = sqlDate.toLocalDate
- def jdbcGetTime(sqlTime: java.sql.Time): java.time.LocalTime = sqlTime.toLocalTime
- def jdbcGetTimestamp(sqlTimestamp: java.sql.Timestamp): java.time.LocalDateTime =
- sqlTimestamp.toLocalDateTime
- type JDBCBlob = InputStream
- def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
- implicit mat: Materializer) = FileToInputStream(fileName,timeOut)
- def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
- implicit mat: Materializer) = InputStreamToFile(blob,fileName)
- private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
- throw new IllegalStateException(message)
- }
- def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)
- (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
- val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream {
- val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
- ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
- val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
- sql.iterator
- .withDBSessionForceAdjuster(session => {
- session.connection.setAutoCommit(ctx.autoCommit)
- session.fetchSize(ctx.fetchSize)
- })
- }
- log.info(s"jdbcAkkaStream> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}")
- Source.fromPublisher[A](publisher)
- }
- def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],
- extractor: WrappedResultSet => A)(
- implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
- val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
- ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
- rawSql.fetchSize(ctx.fetchSize)
- try {
- implicit val session = NamedAutoSession(ctx.dbName)
- log.info(s"jdbcQueryResult> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}")
- val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
- sql.collection.apply[C]()
- } catch {
- case e: Exception =>
- log.error(s"jdbcQueryResult> runtime error: ${e.getMessage}")
- throw new RuntimeException(s"jdbcQueryResult> Error: ${e.getMessage}")
- }
- }
- def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
- if (ctx.sqlType != SQL_EXEDDL) {
- log.info(s"jdbcExecuteDDL> JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")
- Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
- }
- else {
- log.info(s"jdbcExecuteDDL> Source: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- ctx.statements.foreach { stm =>
- val ddl = new SQLExecution(statement = stm, parameters = Nil)(
- before = WrappedResultSet => {})(
- after = WrappedResultSet => {})
- ddl.apply()
- }
- "SQL_EXEDDL executed succesfully."
- }
- }
- }
- }
- def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- if (ctx.statements == Nil) {
- log.info(s"jdbcBatchUpdate> JDBCContex setting error: statements empty!")
- Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!"))
- }
- if (ctx.sqlType != SQL_UPDATE) {
- log.info(s"jdbcBatchUpdate> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")
- Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
- }
- else {
- if (ctx.batch) {
- if (noReturnKey(ctx)) {
- log.info(s"jdbcBatchUpdate> batch updating no return: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
- val usql = SQL(ctx.statements.head)
- .tags(ctx.queryTags: _*)
- .batch(ctx.parameters: _*)
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- usql.apply[Seq]()
- Seq.empty[Long].to[C]
- }
- }
- } else {
- log.info(s"jdbcBatchUpdate> batch updating return genkey: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
- val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- usql.apply[C]()
- }
- }
- }
- } else {
- log.info(s"jdbcBatchUpdate> JDBCContex setting error: must set batch = true !")
- Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
- }
- }
- }
- private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- val Some(key) :: xs = ctx.returnGeneratedKey
- val params: Seq[Any] = ctx.parameters match {
- case Nil => Nil
- case p@_ => p.head
- }
- log.info(s"singleTxUpdateWithReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
- val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- session.fetchSize(ctx.fetchSize)
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- val result = usql.apply()
- Seq(result).to[C]
- }
- }
- }
- private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- val params: Seq[Any] = ctx.parameters match {
- case Nil => Nil
- case p@_ => p.head
- }
- val before = ctx.preAction match {
- case None => pstm: PreparedStatement => {}
- case Some(f) => f
- }
- val after = ctx.postAction match {
- case None => pstm: PreparedStatement => {}
- case Some(f) => f
- }
- log.info(s"singleTxUpdateNoReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
- val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
- Future {
- NamedDB(ctx.dbName) localTx {implicit session =>
- session.fetchSize(ctx.fetchSize)
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- val result = usql.apply()
- Seq(result.toLong).to[C]
- }
- }
- }
- private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- if (noReturnKey(ctx))
- singleTxUpdateNoReturnKey(ctx)
- else
- singleTxUpdateWithReturnKey(ctx)
- }
- private def noReturnKey(ctx: JDBCContext): Boolean = {
- if (ctx.returnGeneratedKey != Nil) {
- val k :: xs = ctx.returnGeneratedKey
- k match {
- case None => true
- case Some(k) => false
- }
- } else true
- }
- def noActon: PreparedStatement=>Unit = pstm => {}
- def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
- case Nil => Seq.fill(ctx.statements.size)(None)
- case k@_ => k
- }
- val sqlcmd = ctx.statements zip ctx.parameters zip keys
- log.info(s"multiTxUpdates> updating: db: ${ctx.dbName}, SQL Commands: ${sqlcmd}")
- Future {
- NamedDB(ctx.dbName) localTx { implicit session =>
- session.fetchSize(ctx.fetchSize)
- ctx.queryTimeout.foreach(session.queryTimeout(_))
- val results = sqlcmd.map { case ((stm, param), key) =>
- key match {
- case None =>
- new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
- case Some(k) =>
- new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
- }
- }
- results.to[C]
- }
- }
- }
- def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
- implicit ec: ExecutionContextExecutor,
- cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
- if (ctx.statements == Nil) {
- log.info(s"jdbcTxUpdates> JDBCContex setting error: statements empty!")
- Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!"))
- }
- if (ctx.sqlType != SQL_UPDATE) {
- log.info(s"jdbcTxUpdates> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")
- Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
- }
- else {
- if (!ctx.batch) {
- if (ctx.statements.size == 1)
- singleTxUpdate(ctx)
- else
- multiTxUpdates(ctx)
- } else {
- log.info(s"jdbcTxUpdates> JDBCContex setting error: must set batch = false !")
- Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !"))
- }
- }
- }
- case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
- statement: String, prepareParams: R => Seq[Any]) extends LogSupport {
- jas =>
- def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
- def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
- def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
- private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
- import scala.concurrent._
- val params = prepareParams(r)
- log.info(s"JDBCActionStream.perform> db: ${dbName}, statement: ${statement}, parameters: ${params}")
- Future {
- NamedDB(dbName) autoCommit { session =>
- session.execute(statement, params: _*)
- }
- r
- }
- }
- def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
- if (processInOrder)
- Flow[R].mapAsync(parallelism)(perform)
- else
- Flow[R].mapAsyncUnordered(parallelism)(perform)
- }
- object JDBCActionStream {
- def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
- new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
- }
- }
cql/CassandraEngine.scala
- package sdp.cql.engine
- import akka.NotUsed
- import akka.stream.alpakka.cassandra.scaladsl._
- import akka.stream.scaladsl._
- import com.datastax.driver.core._
- import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
- import scala.collection.JavaConverters._
- import scala.collection.generic.CanBuildFrom
- import scala.concurrent._
- import scala.concurrent.duration.Duration
- import sdp.logging.LogSupport
- object CQLContext {
- // Consistency Levels
- type CONSISTENCY_LEVEL = Int
- val ANY: CONSISTENCY_LEVEL = 0x0000
- val ONE: CONSISTENCY_LEVEL = 0x0001
- val TWO: CONSISTENCY_LEVEL = 0x0002
- val THREE: CONSISTENCY_LEVEL = 0x0003
- val QUORUM : CONSISTENCY_LEVEL = 0x0004
- val ALL: CONSISTENCY_LEVEL = 0x0005
- val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006
- val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007
- val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A
- val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B
- val SERIAL: CONSISTENCY_LEVEL = 0x000C
- def apply(): CQLContext = CQLContext(statements = Nil)
- def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
- consistency match {
- case ALL => ConsistencyLevel.ALL
- case ONE => ConsistencyLevel.ONE
- case TWO => ConsistencyLevel.TWO
- case THREE => ConsistencyLevel.THREE
- case ANY => ConsistencyLevel.ANY
- case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
- case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
- case QUORUM => ConsistencyLevel.QUORUM
- case SERIAL => ConsistencyLevel.SERIAL
- case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
- }
- }
- }
- case class CQLQueryContext(
- statement: String,
- parameter: Seq[Object] = Nil,
- consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
- fetchSize: Int = 100
- ) { ctx =>
- def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext =
- ctx.copy(consistency = Some(_consistency))
- def setFetchSize(pageSize: Int): CQLQueryContext =
- ctx.copy(fetchSize = pageSize)
- def setParameters(param: Seq[Object]): CQLQueryContext =
- ctx.copy(parameter = param)
- }
- object CQLQueryContext {
- def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameter = param)
- }
- case class CQLContext(
- statements: Seq[String],
- parameters: Seq[Seq[Object]] = Nil,
- consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
- batch: Boolean = false
- ) extends LogSupport { ctx =>
- def setBatch(bat: Boolean) = ctx.copy(batch = bat)
- def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
- ctx.copy(consistency = Some(_consistency))
- def setCommand(_statement: String, _parameters: Object*): CQLContext = {
- log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
- val nc = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
- log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- }
- def appendCommand(_statement: String, _parameters: Object*): CQLContext = {
- log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
- val nc = ctx.copy(statements = ctx.statements :+ _statement,
- parameters = ctx.parameters ++ Seq(_parameters))
- log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- }
- }
- object CQLEngine extends LogSupport {
- import CQLContext._
- import CQLHelpers._
- import cats._, cats.data._, cats.implicits._
- import scala.concurrent.{Await, Future}
- import scala.concurrent.duration._
- def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100
- ,extractor: Row => A)(
- implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
- val prepStmt = session.prepare(ctx.statement)
- var boundStmt = prepStmt.bind()
- var params: Seq[Object] = Nil
- if (ctx.parameter != Nil) {
- params = processParameters(ctx.parameter)
- boundStmt = prepStmt.bind(params:_*)
- }
- log.info(s"fetchResultPage> statement: ${prepStmt.getQueryString}, parameters: ${params}")
- ctx.consistency.foreach {consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
- val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
- (resultSet,(resultSet.asScala.view.map(extractor)).to[C])
- }
- def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
- extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
- if (resultSet.isFullyFetched) {
- (resultSet, None)
- } else {
- try {
- val result = Await.result(resultSet.fetchMoreResults(), timeOut)
- (result, Some((result.asScala.view.map(extractor)).to[C]))
- } catch { case e: Throwable => (resultSet, None) }
- }
- def cqlExecute(ctx: CQLContext)(
- implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
- var invalidBat = false
- if ( ctx.batch ) {
- if (ctx.parameters == Nil)
- invalidBat = true
- else if (ctx.parameters.size < 2)
- invalidBat = true;
- }
- if (!ctx.batch || invalidBat) {
- if(invalidBat)
- log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
- if (ctx.statements.size == 1) {
- var param: Seq[Object] = Nil
- if (ctx.parameters != Nil) param = ctx.parameters.head
- log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
- cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
- }
- else {
- var params: Seq[Seq[Object]] = Nil
- if (ctx.parameters == Nil)
- params = Seq.fill(ctx.statements.length)(Nil)
- else {
- if (ctx.statements.size > ctx.parameters.size) {
- log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
- val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
- params = ctx.parameters ++ nils
- }
- else
- params = ctx.parameters
- }
- val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
- log.info(s"cqlExecute> multi-commands: ${commands}")
- /*
- //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
- //therefore, make sure no command replies on prev command effect
- val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
- cqlSingleUpdate(ctx.consistency, stmt, param)
- }.toList
- val futList = lstCmds.sequence.map(_ => true) //must map to execute
- */
- /*
- //using traverse to have some degree of parallelism = max(runtimes)
- //therefore, make sure no command replies on prev command effect
- val futList = Future.traverse(commands) { case (stmt,param) =>
- cqlSingleUpdate(ctx.consistency, stmt, param)
- }.map(_ => true)
- Await.result(futList, 3 seconds)
- Future.successful(true)
- */
- // run sync directly
- Future {
- commands.foreach { case (stm, pars) =>
- cqlExecuteSync(ctx.consistency, stm, pars)
- }
- true
- }
- }
- }
- else
- cqlBatchUpdate(ctx)
- }
- def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
- implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
- val prepStmt = session.prepare(stmt)
- var boundStmt = prepStmt.bind()
- var pars: Seq[Object] = Nil
- if (params != Nil) {
- pars = processParameters(params)
- boundStmt = prepStmt.bind(pars: _*)
- }
- log.info(s"cqlSingleUpdate> statement: ${prepStmt.getQueryString}, parameters: ${pars}")
- cons.foreach { consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))
- }
- session.executeAsync(boundStmt).map(_.wasApplied())
- }
- def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
- implicit session: Session, ec: ExecutionContext): Boolean = {
- val prepStmt = session.prepare(stmt)
- var boundStmt = prepStmt.bind()
- var pars: Seq[Object] = Nil
- if (params != Nil) {
- pars = processParameters(params)
- boundStmt = prepStmt.bind(pars: _*)
- }
- log.info(s"cqlExecuteSync> statement: ${prepStmt.getQueryString}, parameters: ${pars}")
- cons.foreach { consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))
- }
- session.execute(boundStmt).wasApplied()
- }
- def cqlBatchUpdate(ctx: CQLContext)(
- implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
- var params: Seq[Seq[Object]] = Nil
- if (ctx.parameters == Nil)
- params = Seq.fill(ctx.statements.length)(Nil)
- else
- params = ctx.parameters
- log.info(s"cqlBatchUpdate> statement: ${ctx.statements.head}, parameters: ${params}")
- val prepStmt = session.prepare(ctx.statements.head)
- var batch = new BatchStatement()
- params.foreach { p =>
- log.info(s"cqlBatchUpdate> batch with raw parameter: ${p}")
- val pars = processParameters(p)
- log.info(s"cqlMultiUpdate> batch with cooked parameters: ${pars}")
- batch.add(prepStmt.bind(pars: _*))
- }
- ctx.consistency.foreach { consistency =>
- batch.setConsistencyLevel(consistencyLevel(consistency))
- }
- session.executeAsync(batch).map(_.wasApplied())
- }
- def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A)
- (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {
- val prepStmt = session.prepare(ctx.statement)
- var boundStmt = prepStmt.bind()
- val params = processParameters(ctx.parameter)
- boundStmt = prepStmt.bind(params:_*)
- ctx.consistency.foreach {consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
- log.info(s"cassandraStream> statement: ${prepStmt.getQueryString}, parameters: ${params}")
- CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor)
- }
- case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
- statement: String, prepareParams: R => Seq[Object],
- consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
- def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
- def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
- def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
- cas.copy(consistency = Some(_consistency))
- def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
- var prepStmt = session.prepare(statement)
- var boundStmt = prepStmt.bind()
- val params = processParameters(prepareParams(r))
- boundStmt = prepStmt.bind(params: _*)
- consistency.foreach { cons =>
- boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
- }
- log.info(s"CassandraActionStream.perform> statement: ${prepStmt.getQueryString}, parameters: ${params}")
- session.executeAsync(boundStmt).map(_ => r)
- }
- def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
- if (processInOrder)
- Flow[R].mapAsync(parallelism)(perform)
- else
- Flow[R].mapAsyncUnordered(parallelism)(perform)
- def unloggedBatch[K](statementBinder: (
- R, PreparedStatement) => BoundStatement,partitionKey: R => K)(
- implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = {
- val preparedStatement = session.prepare(statement)
- log.info(s"CassandraActionStream.unloggedBatch> statement: ${preparedStatement.getQueryString}")
- CassandraFlow.createUnloggedBatchWithPassThrough[R, K](
- parallelism,
- preparedStatement,
- statementBinder,
- partitionKey)
- }
- }
- object CassandraActionStream {
- def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
- new CassandraActionStream[R]( statement=_statement, prepareParams = params)
- }
- }
- object CQLHelpers extends LogSupport {
- import java.nio.ByteBuffer
- import java.io._
- import java.nio.file._
- import com.datastax.driver.core.LocalDate
- import com.datastax.driver.extras.codecs.jdk8.InstantCodec
- import java.time.Instant
- import akka.stream.scaladsl._
- import akka.stream._
- implicit def listenableFutureToFuture[T](
- listenableFuture: ListenableFuture[T]): Future[T] = {
- val promise = Promise[T]()
- Futures.addCallback(listenableFuture, new FutureCallback[T] {
- def onFailure(error: Throwable): Unit = {
- promise.failure(error)
- ()
- }
- def onSuccess(result: T): Unit = {
- promise.success(result)
- ()
- }
- })
- promise.future
- }
- case class CQLDate(year: Int, month: Int, day: Int)
- case object CQLTodayDate
- case class CQLDateTime(year: Int, Month: Int,
- day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
- case object CQLDateTimeNow
- def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate =
- dateToConvert.toInstant()
- .atZone(java.time.ZoneId.systemDefault())
- .toLocalDate()
- def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime =
- dateToConvert.toInstant()
- .atZone(java.time.ZoneId.systemDefault())
- .toLocalTime()
- def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime=
- new java.sql.Timestamp(
- dateToConvert.getTime()
- ).toLocalDateTime()
- def processParameters(params: Seq[Object]): Seq[Object] = {
- import java.time.{Clock,ZoneId}
- log.info(s"[processParameters] input: ${params}")
- val outParams = params.map { obj =>
- obj match {
- case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
- case CQLTodayDate =>
- val today = java.time.LocalDate.now()
- LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
- case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
- case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
- Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
- case p@_ => p
- }
- }
- log.info(s"[processParameters] output: ${params}")
- outParams
- }
- class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
- override def read: Int = {
- if (!buf.hasRemaining) return -1
- buf.get
- }
- override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
- val length: Int = Math.min(len, buf.remaining)
- buf.get(bytes, off, length)
- length
- }
- }
- object ByteBufferInputStream {
- def apply(buf: ByteBuffer): ByteBufferInputStream = {
- new ByteBufferInputStream(buf)
- }
- }
- class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {
- override def write(b: Int): Unit = {
- buf.put(b.toByte)
- }
- override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
- buf.put(bytes, off, len)
- }
- }
- object FixsizedByteBufferOutputStream {
- def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
- }
- class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {
- private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR
- override def write(b: Array[Byte], off: Int, len: Int): Unit = {
- val position = buf.position
- val limit = buf.limit
- val newTotal: Long = position + len
- if(newTotal > limit){
- var capacity = (buf.capacity * increasing)
- while(capacity <= newTotal){
- capacity = (capacity*increasing)
- }
- increase(capacity.toInt)
- }
- buf.put(b, 0, len)
- }
- override def write(b: Int): Unit= {
- if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
- buf.put(b.toByte)
- }
- protected def increase(newCapacity: Int): Unit = {
- buf.limit(buf.position)
- buf.rewind
- val newBuffer =
- if (onHeap) ByteBuffer.allocate(newCapacity)
- else ByteBuffer.allocateDirect(newCapacity)
- newBuffer.put(buf)
- buf.clear
- buf = newBuffer
- }
- def size: Long = buf.position
- def capacity: Long = buf.capacity
- def byteBuffer: ByteBuffer = buf
- }
- object ExpandingByteBufferOutputStream {
- val DEFAULT_INCREASING_FACTOR = 1.5f
- def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
- if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
- val buffer: ByteBuffer =
- if (onHeap) ByteBuffer.allocate(size)
- else ByteBuffer.allocateDirect(size)
- new ExpandingByteBufferOutputStream(buffer,onHeap)
- }
- def apply(size: Int): ExpandingByteBufferOutputStream = {
- apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
- }
- def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
- apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
- }
- def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
- apply(size, increasingBy, false)
- }
- }
- def cqlFileToBytes(fileName: String): ByteBuffer = {
- val fis = new FileInputStream(fileName)
- val b = new Array[Byte](fis.available + 1)
- val length = b.length
- fis.read(b)
- ByteBuffer.wrap(b)
- }
- def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
- implicit mat: Materializer): Future[IOResult] = {
- val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
- source.runWith(FileIO.toPath(Paths.get(fileName)))
- }
- def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
- val outputFormat = new java.text.SimpleDateFormat(fmt)
- outputFormat.format(date)
- }
- def useJava8DateTime(cluster: Cluster) = {
- //for jdk8 datetime format
- cluster.getConfiguration().getCodecRegistry()
- .register(InstantCodec.instance)
- }
- }
BytesConverter.scala
- package protobuf.bytes
- import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
- import com.google.protobuf.ByteString
- object Converter {
- def marshal(value: Any): ByteString = {
- val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(stream)
- oos.writeObject(value)
- oos.close()
- ByteString.copyFrom(stream.toByteArray())
- }
- def unmarshal[A](bytes: ByteString): A = {
- val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
- val value = ois.readObject()
- ois.close()
- value.asInstanceOf[A]
- }
- }
CQLServices.scala
- package demo.sdp.grpc.cql.server
- import akka.NotUsed
- import akka.stream.scaladsl._
- import protobuf.bytes.Converter._
- import com.datastax.driver.core._
- import scala.concurrent.ExecutionContextExecutor
- import sdp.grpc.services._
- import sdp.cql.engine._
- import CQLEngine._
- import CQLHelpers._
- import sdp.logging.LogSupport
- import scala.concurrent._
- import scala.concurrent.duration._
- import akka.stream.ActorMaterializer
- class CQLStreamingServices(implicit ec: ExecutionContextExecutor,
- mat: ActorMaterializer, session: Session)
- extends CqlGrpcAkkaStream.CQLServices with LogSupport{
- val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
- row.rowid.asInstanceOf[Object],
- row.measureid.asInstanceOf[Object],
- row.statename,
- row.countyname,
- row.reportyear.asInstanceOf[Object],
- row.value.asInstanceOf[Object],
- CQLDateTimeNow
- )
- val cqlInsert ="""
- |insert into testdb.AQMRPT(
- | rowid,
- | measureid,
- | statename,
- | countyname,
- | reportyear,
- | value,
- | created)
- | values(?,?,?,?,?,?,?)
- """.stripMargin
- val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2)
- .setProcessOrder(false)
- /*
- val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
- Flow[AQMRPTRow]
- .via(cqlActionStream.performOnRow)
- */
- val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
- Flow[AQMRPTRow]
- .mapAsync(cqlActionStream.parallelism){ row =>
- if (IfExists(row.rowid))
- Future.successful(CQLResult(marshal(0)))
- else
- cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))}
- }
- }
- override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
- Flow[AQMRPTRow]
- .via(cqlActionFlow)
- }
- private def IfExists(rowid: Long): Boolean = {
- val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
- val param = Seq(rowid.asInstanceOf[Object])
- val toRowId: Row => Long = r => r.getLong("rowid")
- val ctx = CQLQueryContext(cql,param)
- val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
- val fut = src.toMat(Sink.headOption)(Keep.right).run()
- val result = Await.result(fut,3 seconds)
- log.info(s"checking existence: ${result}")
- result match {
- case Some(x) => true
- case None => false
- }
- }
- override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = {
- Flow[HelloMsg]
- .map {r => println(r) ; r}
- }
- override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
- Flow[CQLUpdate]
- .flatMapConcat { context =>
- //unpack CQLUpdate and construct the context
- val ctx = CQLContext(context.statements)
- log.info(s"**** CQLContext => ${ctx} ***")
- Source
- .fromFuture(cqlExecute(ctx))
- .map { r => CQLResult(marshal(r)) }
- }
- }
- def toCQLTimestamp(rs: Row) = {
- try {
- val tm = rs.getTimestamp("CREATED")
- if (tm == null) None
- else {
- val localdt = cqlGetTimestamp(tm)
- Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
- Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
- }
- }
- catch {
- case e: Exception => None
- }
- }
- val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
- rowid = rs.getLong("ROWID"),
- measureid = rs.getLong("MEASUREID"),
- statename = rs.getString("STATENAME"),
- countyname = rs.getString("COUNTYNAME"),
- reportyear = rs.getInt("REPORTYEAR"),
- value = rs.getInt("VALUE"),
- created = toCQLTimestamp(rs)
- )
- override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
- log.info("**** runQuery called on service side ***")
- Flow[CQLQuery]
- .flatMapConcat { q =>
- //unpack JDBCQuery and construct the context
- var params: Seq[Object] = Nil
- if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
- params = unmarshal[Seq[Object]](q.parameters)
- log.info(s"**** query parameters: ${params} ****")
- val ctx = CQLQueryContext(q.statement,params)
- CQLEngine.cassandraStream(ctx,toAQMRow)
- }
- }
-
- }
CQLServer.scala
- package demo.sdp.grpc.cql.server
- import java.util.logging.Logger
- import com.datastax.driver.core._
- import akka.actor.ActorSystem
- import akka.stream.ActorMaterializer
- import io.grpc.Server
- import io.grpc.ServerBuilder
- import sdp.grpc.services._
- import sdp.cql.engine._
- import CQLHelpers._
- class gRPCServer(server: Server) {
- val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)
- def start(): Unit = {
- server.start()
- logger.info(s"Server started, listening on ${server.getPort}")
- sys.addShutdownHook {
- // Use stderr here since the logger may has been reset by its JVM shutdown hook.
- System.err.println("*** shutting down gRPC server since JVM is shutting down")
- stop()
- System.err.println("*** server shut down")
- }
- ()
- }
- def stop(): Unit = {
- server.shutdown()
- }
- /**
- * Await termination on the main thread since the grpc library uses daemon threads.
- */
- def blockUntilShutdown(): Unit = {
- server.awaitTermination()
- }
- }
- object CQLServer extends App {
- implicit val cqlsys = ActorSystem("cqlSystem")
- implicit val mat = ActorMaterializer()
- implicit val ec = cqlsys.dispatcher
- val cluster = new Cluster
- .Builder()
- .addContactPoints("localhost")
- .withPort(9042)
- .build()
- useJava8DateTime(cluster)
- implicit val session = cluster.connect()
- val server = new gRPCServer(
- ServerBuilder
- .forPort(50051)
- .addService(
- CqlGrpcAkkaStream.bindService(
- new CQLStreamingServices
- )
- ).build()
- )
- server.start()
- // server.blockUntilShutdown()
- scala.io.StdIn.readLine()
- session.close()
- cluster.close()
- mat.shutdown()
- cqlsys.terminate()
- }
CQLClient.scala
- package demo.sdp.grpc.cql.client
- import sdp.grpc.services._
- import protobuf.bytes.Converter._
- import akka.stream.scaladsl._
- import akka.NotUsed
- import akka.actor.ActorSystem
- import akka.stream.{ActorMaterializer, ThrottleMode}
- import io.grpc._
- import sdp.logging.LogSupport
- import sdp.jdbc.engine._
- import JDBCEngine._
- import scalikejdbc.WrappedResultSet
- import sdp.cql.engine.CQLHelpers.CQLDateTimeNow
- import scala.util._
- import scala.concurrent.ExecutionContextExecutor
- class CQLStreamClient(host: String, port: Int)(
- implicit ec: ExecutionContextExecutor) extends LogSupport {
- val channel = ManagedChannelBuilder
- .forAddress(host, port)
- .usePlaintext(true)
- .build()
- val stub = CqlGrpcAkkaStream.stub(channel)
- val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
- dbName = 'h2,
- statement = "select * from AQMRPT where statename='Arkansas'"
- )
- def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
- rowid = rs.long("ROWID"),
- measureid = rs.long("MEASUREID"),
- statename = rs.string("STATENAME"),
- countyname = rs.string("COUNTYNAME"),
- reportyear = rs.int("REPORTYEAR"),
- value = rs.int("VALUE"),
- created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0))))
- )
- import scala.concurrent.duration._
- def transferRows: Source[CQLResult, NotUsed] = {
- log.info(s"**** calling transferRows ****")
- jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
- // .throttle(1, 500.millis, 1, ThrottleMode.shaping)
- .via(stub.transferRows)
- }
- def echoHello: Source[HelloMsg,NotUsed] = {
- val row = HelloMsg("hello world!")
- val rows = List.fill[HelloMsg](100)(row)
- Source
- .fromIterator(() => rows.iterator)
- .via(stub.clientStreaming)
- }
- val query0 = CQLQuery(
- statement = "select * from testdb.AQMRPT"
- )
- val query = CQLQuery(
- statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
- parameters = marshal(Seq("Arkansas", 0.toInt))
- )
- val query2 = CQLQuery (
- statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
- parameters = marshal(Seq("Colorado", 3.toInt))
- )
- val query3= CQLQuery (
- statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
- parameters = marshal(Seq("Arkansas", 8.toInt))
- )
- def queryRows: Source[AQMRPTRow,NotUsed] = {
- log.info(s"running queryRows ...")
- Source
- .single(query)
- .via(stub.runQuery)
- }
- val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT"
- val createCQL ="""
- CREATE TABLE testdb.AQMRPT (
- rowid bigint primary key,
- measureid bigint,
- statename text,
- countyname text,
- reportyear int,
- value int,
- created timestamp
- )"""
- val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
- def createTbl: Source[CQLResult,NotUsed] = {
- log.info(s"running createTbl ...")
- Source
- .single(cqlddl)
- .via(stub.runDDL)
- }
-
- }
- object EchoHelloClient extends App {
- implicit val system = ActorSystem("EchoNumsClient")
- implicit val mat = ActorMaterializer.create(system)
- implicit val ec = system.dispatcher
- val client = new CQLStreamClient("localhost", 50051)
- client.echoHello.runForeach(println)
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object TransferRows extends App {
- import sdp.jdbc.config._
- implicit val system = ActorSystem("JDBCServer")
- implicit val mat = ActorMaterializer.create(system)
- implicit val ec = system.dispatcher
- ConfigDBsWithEnv("dev").setup('h2)
- ConfigDBsWithEnv("dev").loadGlobalSettings()
- val client = new CQLStreamClient("localhost", 50051)
- val fut = client.transferRows.runFold(0){(a,b) => a + unmarshal[Int](b.result)}
- fut.onComplete {
- case scala.util.Success(cnt) => println(s"done transfer ${cnt} rows.")
- case Failure(e) => println(s"!!!!!streaming error: ${e.getMessage}")
- }
- scala.io.StdIn.readLine()
- ConfigDBsWithEnv("dev").close('h2)
- mat.shutdown()
- system.terminate()
- }
- object QueryRows extends App {
- implicit val system = ActorSystem("QueryRows")
- implicit val mat = ActorMaterializer.create(system)
- implicit val ec = system.dispatcher
- val client = new CQLStreamClient("localhost", 50051)
- val fut = client.queryRows.runForeach { r => println(r) }
- fut.onComplete {
- case scala.util.Success(d) => println(s"done querying.")
- case Failure(e) => println(s"!!!!!query error: ${e.getMessage}")
- }
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }
- object RunDDL extends App {
- implicit val system = ActorSystem("RunDDL")
- implicit val mat = ActorMaterializer.create(system)
- implicit val ec = system.dispatcher
- val client = new CQLStreamClient("localhost", 50051)
- client.createTbl.runForeach { r => println(unmarshal(r.result)) }
- scala.io.StdIn.readLine()
- mat.shutdown()
- system.terminate()
- }