经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
PICE(3):CassandraStreaming - gRPC-CQL Service
来源:cnblogs  作者:雪川大虫  时间:2018/9/25 20:28:19  对本文有异议

  在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。

在前面sdp系列讨论里我们已经实现了Cassandra-Engine。它的运作原理还是通过某种Context把指令提交给cassandra去执行。我们先设计一个创建库表的例子。CQL语句和Cassandra-Engine程序代码如下,这是客户端部分:

  1. val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT"
  2. val createCQL ="""
  3. CREATE TABLE testdb.AQMRPT (
  4. rowid bigint primary key,
  5. measureid bigint,
  6. statename text,
  7. countyname text,
  8. reportyear int,
  9. value int,
  10. created timestamp
  11. )"""
  12. val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
  13. def createTbl: Source[CQLResult,NotUsed] = {
  14. log.info(s"running createTbl ...")
  15. Source
  16. .single(cqlddl)
  17. .via(stub.runDDL)
  18. }

首先,我们在CQLUpdate这个protobuf对应Context里传入两条指令dropCQL和createCQL,可以预计这会是一种批次型batch方式。然后一如既往,我们使用了streaming编程模式。在.proto文件里用DDL来对应Context和Service:

  1. message CQLUpdate {
  2. repeated string statements = 1;
  3. bytes parameters = 2;
  4. google.protobuf.Int32Value consistency = 3;
  5. google.protobuf.BoolValue batch = 4;
  6. }
  7. service CQLServices {
  8. rpc runDDL(CQLUpdate) returns (CQLResult) {}
  9. }

服务函数runDDL程序实现如下:

  1. override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
  2. Flow[CQLUpdate]
  3. .flatMapConcat { context =>
  4. //unpack CQLUpdate and construct the context
  5. val ctx = CQLContext(context.statements)
  6. log.info(s"**** CQLContext => ${ctx} ***")
  7. Source
  8. .fromFuture(cqlExecute(ctx))
  9. .map { r => CQLResult(marshal(r)) }
  10. }
  11. }

这里我们调用了Cassandra-Engine的cqlExecute(ctx)函数:

  1. def cqlExecute(ctx: CQLContext)(
  2. implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
  3. var invalidBat = false
  4. if ( ctx.batch ) {
  5. if (ctx.parameters == Nil)
  6. invalidBat = true
  7. else if (ctx.parameters.size < 2)
  8. invalidBat = true;
  9. }
  10. if (!ctx.batch || invalidBat) {
  11. if(invalidBat)
  12. log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
  13. if (ctx.statements.size == 1) {
  14. var param: Seq[Object] = Nil
  15. if (ctx.parameters != Nil) param = ctx.parameters.head
  16. log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
  17. cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
  18. }
  19. else {
  20. var params: Seq[Seq[Object]] = Nil
  21. if (ctx.parameters == Nil)
  22. params = Seq.fill(ctx.statements.length)(Nil)
  23. else {
  24. if (ctx.statements.size > ctx.parameters.size) {
  25. log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
  26. val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
  27. params = ctx.parameters ++ nils
  28. }
  29. else
  30. params = ctx.parameters
  31. }
  32. val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
  33. log.info(s"cqlExecute> multi-commands: ${commands}")
  34. /*
  35. //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
  36. //therefore, make sure no command replies on prev command effect
  37. val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
  38. cqlSingleUpdate(ctx.consistency, stmt, param)
  39. }.toList
  40. val futList = lstCmds.sequence.map(_ => true) //must map to execute
  41. */
  42. /*
  43. //using traverse to have some degree of parallelism = max(runtimes)
  44. //therefore, make sure no command replies on prev command effect
  45. val futList = Future.traverse(commands) { case (stmt,param) =>
  46. cqlSingleUpdate(ctx.consistency, stmt, param)
  47. }.map(_ => true)
  48. Await.result(futList, 3 seconds)
  49. Future.successful(true)
  50. */
  51. // run sync directly
  52. Future {
  53. commands.foreach { case (stm, pars) =>
  54. cqlExecuteSync(ctx.consistency, stm, pars)
  55. }
  56. true
  57. }
  58. }
  59. }
  60. else
  61. cqlBatchUpdate(ctx)
  62. }

特别展示了这个函数的代码是因为对于一批次多条指令可能会涉及到non-blocking和并行计算。可参考上面代码标注段落里函数式方法(cats)sequence,traverse如何实现对一串Future的运算。

下一个例子是用流方式把JDBC数据库数据并入cassandra数据库里。.proto DDL内容如下:

  1. message ProtoDate {
  2. int32 yyyy = 1;
  3. int32 mm = 2;
  4. int32 dd = 3;
  5. }
  6. message ProtoTime {
  7. int32 hh = 1;
  8. int32 mm = 2;
  9. int32 ss = 3;
  10. int32 nnn = 4;
  11. }
  12. message ProtoDateTime {
  13. ProtoDate date = 1;
  14. ProtoTime time = 2;
  15. }
  16. message AQMRPTRow {
  17. int64 rowid = 1;
  18. string countyname = 2;
  19. string statename = 3;
  20. int64 measureid = 4;
  21. int32 reportyear = 5;
  22. int32 value = 6;
  23. ProtoDateTime created = 7;
  24. }
  25. message CQLResult {
  26. bytes result = 1;
  27. }
  28. message CQLUpdate {
  29. repeated string statements = 1;
  30. bytes parameters = 2;
  31. google.protobuf.Int32Value consistency = 3;
  32. google.protobuf.BoolValue batch = 4;
  33. }
  34. service CQLServices {
  35. rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
  36. rpc runDDL(CQLUpdate) returns (CQLResult) {}
  37. }

下面是服务函数的实现:

  1. val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
  2. row.rowid.asInstanceOf[Object],
  3. row.measureid.asInstanceOf[Object],
  4. row.statename,
  5. row.countyname,
  6. row.reportyear.asInstanceOf[Object],
  7. row.value.asInstanceOf[Object],
  8. CQLDateTimeNow
  9. )
  10. val cqlInsert ="""
  11. |insert into testdb.AQMRPT(
  12. | rowid,
  13. | measureid,
  14. | statename,
  15. | countyname,
  16. | reportyear,
  17. | value,
  18. | created)
  19. | values(?,?,?,?,?,?,?)
  20. """.stripMargin
  21. val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2)
  22. .setProcessOrder(false)
  23. /*
  24. val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
  25. Flow[AQMRPTRow]
  26. .via(cqlActionStream.performOnRow)
  27. */
  28. val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
  29. Flow[AQMRPTRow]
  30. .mapAsync(cqlActionStream.parallelism){ row =>
  31. if (IfExists(row.rowid))
  32. Future.successful(CQLResult(marshal(0)))
  33. else
  34. cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))}
  35. }
  36. }
  37. override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
  38. Flow[AQMRPTRow]
  39. .via(cqlActionFlow)
  40. }
  41. private def IfExists(rowid: Long): Boolean = {
  42. val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
  43. val param = Seq(rowid.asInstanceOf[Object])
  44. val toRowId: Row => Long = r => r.getLong("rowid")
  45. val ctx = CQLQueryContext(cql,param)
  46. val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
  47. val fut = src.toMat(Sink.headOption)(Keep.right).run()
  48. val result = Await.result(fut,3 seconds)
  49. log.info(s"checking existence: ${result}")
  50. result match {
  51. case Some(x) => true
  52. case None => false
  53. }
  54. }

在上面的代码里我们调用了Cassandra-Engine的CassandraActionStream类型的流处理方法。值得注意的是这里我们尝试在stream Flow里运算另一个Flow,如:IfExists函数里运算一个Source来确定rowid是否存在。不要在意这个函数的实际应用,它只是一个人为的例子。另外,rowid:Long这样的定义是硬性规定的。cassandra对数据类型的匹配要求很弱智,没有提供任何自然转换。所以,Int <> Long被视为类型错误,而且无法catch任何明白的错误信息。

这项服务的客户端调用如下:

  1. val stub = CqlGrpcAkkaStream.stub(channel)
  2. val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
  3. dbName = 'h2,
  4. statement = "select * from AQMRPT where statename='Arkansas'"
  5. )
  6. def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
  7. rowid = rs.long("ROWID"),
  8. measureid = rs.long("MEASUREID"),
  9. statename = rs.string("STATENAME"),
  10. countyname = rs.string("COUNTYNAME"),
  11. reportyear = rs.int("REPORTYEAR"),
  12. value = rs.int("VALUE"),
  13. created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0))))
  14. )
  15. import scala.concurrent.duration._
  16. def transferRows: Source[CQLResult, NotUsed] = {
  17. log.info(s"**** calling transferRows ****")
  18. jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
  19. // .throttle(1, 500.millis, 1, ThrottleMode.shaping)
  20. .via(stub.transferRows)
  21. }

注意:JDBC在客户端本地,cassandra是远程服务。

最后我们示范一下cassandra Query。.proto DDL 定义:

  1. message CQLQuery {
  2. string statement = 1;
  3. bytes parameters = 2;
  4. google.protobuf.Int32Value consistency = 3;
  5. google.protobuf.Int32Value fetchSize = 4;
  6. }
  7. service CQLServices {
  8. rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
  9. rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
  10. rpc runDDL(CQLUpdate) returns (CQLResult) {}
  11. }

服务函数代码如下:

  1. def toCQLTimestamp(rs: Row) = {
  2. try {
  3. val tm = rs.getTimestamp("CREATED")
  4. if (tm == null) None
  5. else {
  6. val localdt = cqlGetTimestamp(tm)
  7. Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
  8. Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
  9. }
  10. }
  11. catch {
  12. case e: Exception => None
  13. }
  14. }
  15. val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
  16. rowid = rs.getLong("ROWID"),
  17. measureid = rs.getLong("MEASUREID"),
  18. statename = rs.getString("STATENAME"),
  19. countyname = rs.getString("COUNTYNAME"),
  20. reportyear = rs.getInt("REPORTYEAR"),
  21. value = rs.getInt("VALUE"),
  22. created = toCQLTimestamp(rs)
  23. )
  24. override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
  25. log.info("**** runQuery called on service side ***")
  26. Flow[CQLQuery]
  27. .flatMapConcat { q =>
  28. //unpack JDBCQuery and construct the context
  29. var params: Seq[Object] = Nil
  30. if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
  31. params = unmarshal[Seq[Object]](q.parameters)
  32. log.info(s"**** query parameters: ${params} ****")
  33. val ctx = CQLQueryContext(q.statement,params)
  34. CQLEngine.cassandraStream(ctx,toAQMRow)
  35. }
  36. }

这里值得看看的一是日期转换,二是对于cassandra parameter Seq[Object]的marshal和unmarshal。客户端代码:

  1. val query = CQLQuery(
  2. statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
  3. parameters = marshal(Seq("Arkansas", 0.toInt))
  4. )
  5. val query2 = CQLQuery (
  6. statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
  7. parameters = marshal(Seq("Colorado", 3.toInt))
  8. )
  9. val query3= CQLQuery (
  10. statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
  11. parameters = marshal(Seq("Arkansas", 8.toInt))
  12. )
  13. def queryRows: Source[AQMRPTRow,NotUsed] = {
  14. log.info(s"running queryRows ...")
  15. Source
  16. .single(query)
  17. .via(stub.runQuery)
  18. }

这段相对直白。

下面就是本次讨论涉及的完整源代码:

project/scalapb.sbt

 

  1. addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
  2. resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
  3. libraryDependencies ++= Seq(
  4. "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4",
  5. "beyondthelines" %% "grpcakkastreamgenerator" % "0.0.5"
  6. )

 

build.sbt

  1. import scalapb.compiler.Version.scalapbVersion
  2. import scalapb.compiler.Version.grpcJavaVersion
  3. name := "gRPCCassandra"
  4. version := "0.1"
  5. scalaVersion := "2.12.6"
  6. resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
  7. scalacOptions += "-Ypartial-unification"
  8. libraryDependencies := Seq(
  9. "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
  10. "io.grpc" % "grpc-netty" % grpcJavaVersion,
  11. "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
  12. "io.monix" %% "monix" % "2.3.0",
  13. // for GRPC Akkastream
  14. "beyondthelines" %% "grpcakkastreamruntime" % "0.0.5",
  15. // for scalikejdbc
  16. "org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
  17. "org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
  18. "org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
  19. "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
  20. "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
  21. "com.h2database" % "h2" % "1.4.196",
  22. "mysql" % "mysql-connector-java" % "6.0.6",
  23. "org.postgresql" % "postgresql" % "42.2.0",
  24. "commons-dbcp" % "commons-dbcp" % "1.4",
  25. "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
  26. "com.zaxxer" % "HikariCP" % "2.7.4",
  27. "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  28. "com.typesafe.slick" %% "slick" % "3.2.1",
  29. //for cassandra 340
  30. "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
  31. "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
  32. "com.typesafe.akka" %% "akka-stream" % "2.5.13",
  33. "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.19",
  34. "ch.qos.logback" % "logback-classic" % "1.2.3",
  35. "org.typelevel" %% "cats-core" % "1.1.0"
  36. )
  37. PB.targets in Compile := Seq(
  38. scalapb.gen() -> (sourceManaged in Compile).value,
  39. // generate the akka stream files
  40. grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value
  41. )

main/resources/application.conf

  1. # JDBC settings
  2. test {
  3. db {
  4. h2 {
  5. driver = "org.h2.Driver"
  6. url = "jdbc:h2:tcp://localhost/~/slickdemo"
  7. user = ""
  8. password = ""
  9. poolInitialSize = 5
  10. poolMaxSize = 7
  11. poolConnectionTimeoutMillis = 1000
  12. poolValidationQuery = "select 1 as one"
  13. poolFactoryName = "commons-dbcp2"
  14. }
  15. }
  16. db.mysql.driver = "com.mysql.cj.jdbc.Driver"
  17. db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
  18. db.mysql.user = "root"
  19. db.mysql.password = "123"
  20. db.mysql.poolInitialSize = 5
  21. db.mysql.poolMaxSize = 7
  22. db.mysql.poolConnectionTimeoutMillis = 1000
  23. db.mysql.poolValidationQuery = "select 1 as one"
  24. db.mysql.poolFactoryName = "bonecp"
  25. # scallikejdbc Global settings
  26. scalikejdbc.global.loggingSQLAndTime.enabled = true
  27. scalikejdbc.global.loggingSQLAndTime.logLevel = info
  28. scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  29. scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  30. scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  31. scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  32. scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  33. scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
  34. }
  35. dev {
  36. db {
  37. h2 {
  38. driver = "org.h2.Driver"
  39. url = "jdbc:h2:tcp://localhost/~/slickdemo"
  40. user = ""
  41. password = ""
  42. poolFactoryName = "hikaricp"
  43. numThreads = 10
  44. maxConnections = 12
  45. minConnections = 4
  46. keepAliveConnection = true
  47. }
  48. mysql {
  49. driver = "com.mysql.cj.jdbc.Driver"
  50. url = "jdbc:mysql://localhost:3306/testdb"
  51. user = "root"
  52. password = "123"
  53. poolInitialSize = 5
  54. poolMaxSize = 7
  55. poolConnectionTimeoutMillis = 1000
  56. poolValidationQuery = "select 1 as one"
  57. poolFactoryName = "bonecp"
  58. }
  59. postgres {
  60. driver = "org.postgresql.Driver"
  61. url = "jdbc:postgresql://localhost:5432/testdb"
  62. user = "root"
  63. password = "123"
  64. poolFactoryName = "hikaricp"
  65. numThreads = 10
  66. maxConnections = 12
  67. minConnections = 4
  68. keepAliveConnection = true
  69. }
  70. }
  71. # scallikejdbc Global settings
  72. scalikejdbc.global.loggingSQLAndTime.enabled = true
  73. scalikejdbc.global.loggingSQLAndTime.logLevel = info
  74. scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  75. scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  76. scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  77. scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  78. scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  79. scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
  80. }

main/resources/logback.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration>
  3.  
  4. <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  5. <layout class="ch.qos.logback.classic.PatternLayout">
  6. <Pattern>
  7. %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
  8. </Pattern>
  9. </layout>
  10. </appender>
  11.  
  12. <logger name="sdp.cql" level="info"
  13. additivity="false">
  14. <appender-ref ref="STDOUT" />
  15. </logger>
  16.  
  17. <logger name="demo.sdp.grpc.cql" level="info"
  18. additivity="false">
  19. <appender-ref ref="STDOUT" />
  20. </logger>
  21.  
  22. <root level="error">
  23. <appender-ref ref="STDOUT" />
  24. </root>
  25.  
  26. </configuration>

main/protobuf/cql.proto

  1. syntax = "proto3";
  2. import "google/protobuf/wrappers.proto";
  3. import "google/protobuf/any.proto";
  4. import "scalapb/scalapb.proto";
  5. option (scalapb.options) = {
  6. // use a custom Scala package name
  7. // package_name: "io.ontherocks.introgrpc.demo"
  8. // don't append file name to package
  9. flat_package: true
  10.  
  11. // generate one Scala file for all messages (services still get their own file)
  12. single_file: true
  13.  
  14. // add imports to generated file
  15. // useful when extending traits or using custom types
  16. // import: "io.ontherocks.hellogrpc.RockingMessage"
  17. // code to put at the top of generated file
  18. // works only with `single_file: true`
  19. //preamble: "sealed trait SomeSealedTrait"
  20. };
  21. /*
  22. * Demoes various customization options provided by ScalaPBs.
  23. */
  24. package sdp.grpc.services;
  25. message ProtoDate {
  26. int32 yyyy = 1;
  27. int32 mm = 2;
  28. int32 dd = 3;
  29. }
  30. message ProtoTime {
  31. int32 hh = 1;
  32. int32 mm = 2;
  33. int32 ss = 3;
  34. int32 nnn = 4;
  35. }
  36. message ProtoDateTime {
  37. ProtoDate date = 1;
  38. ProtoTime time = 2;
  39. }
  40. message AQMRPTRow {
  41. int64 rowid = 1;
  42. string countyname = 2;
  43. string statename = 3;
  44. int64 measureid = 4;
  45. int32 reportyear = 5;
  46. int32 value = 6;
  47. ProtoDateTime created = 7;
  48. }
  49. message CQLResult {
  50. bytes result = 1;
  51. }
  52. message CQLQuery {
  53. string statement = 1;
  54. bytes parameters = 2;
  55. google.protobuf.Int32Value consistency = 3;
  56. google.protobuf.Int32Value fetchSize = 4;
  57. }
  58. message CQLUpdate {
  59. repeated string statements = 1;
  60. bytes parameters = 2;
  61. google.protobuf.Int32Value consistency = 3;
  62. google.protobuf.BoolValue batch = 4;
  63. }
  64. message HelloMsg {
  65. string hello = 1;
  66. }
  67. service CQLServices {
  68. rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {}
  69. rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
  70. rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
  71. rpc runDDL(CQLUpdate) returns (CQLResult) {}
  72. }

logging/log.scala

  1. package sdp.logging
  2. import org.slf4j.Logger
  3. /**
  4. * Logger which just wraps org.slf4j.Logger internally.
  5. *
  6. * @param logger logger
  7. */
  8. class Log(logger: Logger) {
  9. // use var consciously to enable squeezing later
  10. var isDebugEnabled: Boolean = logger.isDebugEnabled
  11. var isInfoEnabled: Boolean = logger.isInfoEnabled
  12. var isWarnEnabled: Boolean = logger.isWarnEnabled
  13. var isErrorEnabled: Boolean = logger.isErrorEnabled
  14. def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
  15. level match {
  16. case 'debug | 'DEBUG => debug(msg)
  17. case 'info | 'INFO => info(msg)
  18. case 'warn | 'WARN => warn(msg)
  19. case 'error | 'ERROR => error(msg)
  20. case _ => // nothing to do
  21. }
  22. }
  23. def debug(msg: => String): Unit = {
  24. if (isDebugEnabled && logger.isDebugEnabled) {
  25. logger.debug(msg)
  26. }
  27. }
  28. def debug(msg: => String, e: Throwable): Unit = {
  29. if (isDebugEnabled && logger.isDebugEnabled) {
  30. logger.debug(msg, e)
  31. }
  32. }
  33. def info(msg: => String): Unit = {
  34. if (isInfoEnabled && logger.isInfoEnabled) {
  35. logger.info(msg)
  36. }
  37. }
  38. def info(msg: => String, e: Throwable): Unit = {
  39. if (isInfoEnabled && logger.isInfoEnabled) {
  40. logger.info(msg, e)
  41. }
  42. }
  43. def warn(msg: => String): Unit = {
  44. if (isWarnEnabled && logger.isWarnEnabled) {
  45. logger.warn(msg)
  46. }
  47. }
  48. def warn(msg: => String, e: Throwable): Unit = {
  49. if (isWarnEnabled && logger.isWarnEnabled) {
  50. logger.warn(msg, e)
  51. }
  52. }
  53. def error(msg: => String): Unit = {
  54. if (isErrorEnabled && logger.isErrorEnabled) {
  55. logger.error(msg)
  56. }
  57. }
  58. def error(msg: => String, e: Throwable): Unit = {
  59. if (isErrorEnabled && logger.isErrorEnabled) {
  60. logger.error(msg, e)
  61. }
  62. }
  63. }

logging/LogSupport.scala

  1. package sdp.logging
  2. import org.slf4j.LoggerFactory
  3. trait LogSupport {
  4. /**
  5. * Logger
  6. */
  7. protected val log = new Log(LoggerFactory.getLogger(this.getClass))
  8. }

filestreaming/FileStreaming.scala

  1. package sdp.file
  2. import java.io.{ByteArrayInputStream, InputStream}
  3. import java.nio.ByteBuffer
  4. import java.nio.file.Paths
  5. import akka.stream.Materializer
  6. import akka.stream.scaladsl.{FileIO, StreamConverters}
  7. import akka.util._
  8. import scala.concurrent.Await
  9. import scala.concurrent.duration._
  10. object Streaming {
  11. def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  12. implicit mat: Materializer):ByteBuffer = {
  13. val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
  14. hd ++ bs
  15. }
  16. (Await.result(fut, timeOut)).toByteBuffer
  17. }
  18. def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  19. implicit mat: Materializer): Array[Byte] = {
  20. val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
  21. hd ++ bs
  22. }
  23. (Await.result(fut, timeOut)).toArray
  24. }
  25. def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  26. implicit mat: Materializer): InputStream = {
  27. val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
  28. hd ++ bs
  29. }
  30. val buf = (Await.result(fut, timeOut)).toArray
  31. new ByteArrayInputStream(buf)
  32. }
  33. def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
  34. implicit mat: Materializer) = {
  35. val ba = new Array[Byte](byteBuf.remaining())
  36. byteBuf.get(ba,0,ba.length)
  37. val baInput = new ByteArrayInputStream(ba)
  38. val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
  39. source.runWith(FileIO.toPath(Paths.get(fileName)))
  40. }
  41. def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
  42. implicit mat: Materializer) = {
  43. val bb = ByteBuffer.wrap(bytes)
  44. val baInput = new ByteArrayInputStream(bytes)
  45. val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
  46. source.runWith(FileIO.toPath(Paths.get(fileName)))
  47. }
  48. def InputStreamToFile(is: InputStream, fileName: String)(
  49. implicit mat: Materializer) = {
  50. val source = StreamConverters.fromInputStream(() => is)
  51. source.runWith(FileIO.toPath(Paths.get(fileName)))
  52. }
  53. }

jdbc/JDBCConfig.scala

  1. package sdp.jdbc.config
  2. import scala.collection.mutable
  3. import scala.concurrent.duration.Duration
  4. import scala.language.implicitConversions
  5. import com.typesafe.config._
  6. import java.util.concurrent.TimeUnit
  7. import java.util.Properties
  8. import scalikejdbc.config._
  9. import com.typesafe.config.Config
  10. import com.zaxxer.hikari._
  11. import scalikejdbc.ConnectionPoolFactoryRepository
  12. /** Extension methods to make Typesafe Config easier to use */
  13. class ConfigExtensionMethods(val c: Config) extends AnyVal {
  14. import scala.collection.JavaConverters._
  15. def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
  16. def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
  17. def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
  18. def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default
  19. def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
  20. def getDurationOr(path: String, default: => Duration = Duration.Zero) =
  21. if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default
  22. def getPropertiesOr(path: String, default: => Properties = null): Properties =
  23. if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default
  24. def toProperties: Properties = {
  25. def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
  26. val props = new Properties(null)
  27. m.foreach { case (k, cv) =>
  28. val v =
  29. if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
  30. else if(cv.unwrapped eq null) null
  31. else cv.unwrapped.toString
  32. if(v ne null) props.put(k, v)
  33. }
  34. props
  35. }
  36. toProps(c.root.asScala)
  37. }
  38. def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
  39. def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
  40. def getStringOpt(path: String) = Option(getStringOr(path))
  41. def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
  42. }
  43. object ConfigExtensionMethods {
  44. @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
  45. }
  46. trait HikariConfigReader extends TypesafeConfigReader {
  47. self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix =>
  48. import ConfigExtensionMethods.configExtensionMethods
  49. def getFactoryName(dbName: Symbol): String = {
  50. val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
  51. c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
  52. }
  53. def hikariCPConfig(dbName: Symbol): HikariConfig = {
  54. val hconf = new HikariConfig()
  55. val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
  56. // Connection settings
  57. if (c.hasPath("dataSourceClass")) {
  58. hconf.setDataSourceClassName(c.getString("dataSourceClass"))
  59. } else {
  60. Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
  61. }
  62. hconf.setJdbcUrl(c.getStringOr("url", null))
  63. c.getStringOpt("user").foreach(hconf.setUsername)
  64. c.getStringOpt("password").foreach(hconf.setPassword)
  65. c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)
  66. // Pool configuration
  67. hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
  68. hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
  69. hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
  70. hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
  71. hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
  72. hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
  73. c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
  74. c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
  75. val numThreads = c.getIntOr("numThreads", 20)
  76. hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
  77. hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
  78. hconf.setPoolName(c.getStringOr("poolName", dbName.name))
  79. hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))
  80. // Equivalent of ConnectionPreparer
  81. hconf.setReadOnly(c.getBooleanOr("readOnly", false))
  82. c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
  83. hconf.setCatalog(c.getStringOr("catalog", null))
  84. hconf
  85. }
  86. }
  87. import scalikejdbc._
  88. trait ConfigDBs {
  89. self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>
  90. def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
  91. getFactoryName(dbName) match {
  92. case "hikaricp" => {
  93. val hconf = hikariCPConfig(dbName)
  94. val hikariCPSource = new HikariDataSource(hconf)
  95. case class HikariDataSourceCloser(src: HikariDataSource) extends DataSourceCloser {
  96. var closed = false
  97. override def close(): Unit = src.close()
  98. }
  99. if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
  100. Class.forName(hconf.getDriverClassName)
  101. }
  102. ConnectionPool.add(dbName, new DataSourceConnectionPool(dataSource = hikariCPSource,settings = DataSourceConnectionPoolSettings(),
  103. closer = HikariDataSourceCloser(hikariCPSource)))
  104. }
  105. case _ => {
  106. val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
  107. val cpSettings = readConnectionPoolSettings(dbName)
  108. if (driver != null && driver.trim.nonEmpty) {
  109. Class.forName(driver)
  110. }
  111. ConnectionPool.add(dbName, url, user, password, cpSettings)
  112. }
  113. }
  114. }
  115. def setupAll(): Unit = {
  116. loadGlobalSettings()
  117. dbNames.foreach { dbName => setup(Symbol(dbName)) }
  118. }
  119. def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
  120. ConnectionPool.close(dbName)
  121. }
  122. def closeAll(): Unit = {
  123. ConnectionPool.closeAll
  124. }
  125. }
  126. object ConfigDBs extends ConfigDBs
  127. with TypesafeConfigReader
  128. with StandardTypesafeConfig
  129. with HikariConfigReader
  130. case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
  131. with TypesafeConfigReader
  132. with StandardTypesafeConfig
  133. with HikariConfigReader
  134. with EnvPrefix {
  135. override val env = Option(envValue)
  136. }

jdbc/JDBCEngine.scala

  1. package sdp.jdbc.engine
  2. import java.sql.PreparedStatement
  3. import scala.collection.generic.CanBuildFrom
  4. import akka.stream.scaladsl._
  5. import scalikejdbc._
  6. import scalikejdbc.streams._
  7. import akka.NotUsed
  8. import akka.stream._
  9. import java.time._
  10. import scala.concurrent.duration._
  11. import scala.concurrent._
  12. import sdp.file.Streaming._
  13. import scalikejdbc.TxBoundary.Try._
  14. import scala.concurrent.ExecutionContextExecutor
  15. import java.io.InputStream
  16. import sdp.logging.LogSupport
  17. object JDBCContext {
  18. type SQLTYPE = Int
  19. val SQL_EXEDDL= 1
  20. val SQL_UPDATE = 2
  21. val RETURN_GENERATED_KEYVALUE = true
  22. val RETURN_UPDATED_COUNT = false
  23. }
  24. case class JDBCQueryContext[M](
  25. dbName: Symbol,
  26. statement: String,
  27. parameters: Seq[Any] = Nil,
  28. fetchSize: Int = 100,
  29. autoCommit: Boolean = false,
  30. queryTimeout: Option[Int] = None)
  31. case class JDBCContext (
  32. dbName: Symbol,
  33. statements: Seq[String] = Nil,
  34. parameters: Seq[Seq[Any]] = Nil,
  35. fetchSize: Int = 100,
  36. queryTimeout: Option[Int] = None,
  37. queryTags: Seq[String] = Nil,
  38. sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
  39. batch: Boolean = false,
  40. returnGeneratedKey: Seq[Option[Any]] = Nil,
  41. // no return: None, return by index: Some(1), by name: Some("id")
  42. preAction: Option[PreparedStatement => Unit] = None,
  43. postAction: Option[PreparedStatement => Unit] = None)
  44. extends LogSupport {
  45. ctx =>
  46.  
  47. //helper functions
  48. def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)
  49. def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)
  50. def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)
  51. def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)
  52. def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
  53. if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
  54. !ctx.batch && ctx.statements.size == 1) {
  55. val nc = ctx.copy(preAction = action)
  56. log.info("setPreAction> set")
  57. nc
  58. }
  59. else {
  60. log.info("setPreAction> JDBCContex setting error: preAction not supported!")
  61. throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
  62. }
  63. }
  64. def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
  65. if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
  66. !ctx.batch && ctx.statements.size == 1) {
  67. val nc = ctx.copy(postAction = action)
  68. log.info("setPostAction> set")
  69. nc
  70. }
  71. else {
  72. log.info("setPreAction> JDBCContex setting error: postAction not supported!")
  73. throw new IllegalStateException("JDBCContex setting error: postAction not supported!")
  74. }
  75. }
  76. def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
  77. if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
  78. log.info(s"appendDDLCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
  79. val nc = ctx.copy(
  80. statements = ctx.statements ++ Seq(_statement),
  81. parameters = ctx.parameters ++ Seq(Seq(_parameters))
  82. )
  83. log.info(s"appendDDLCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
  84. nc
  85. } else {
  86. log.info(s"appendDDLCommand> JDBCContex setting error: option not supported!")
  87. throw new IllegalStateException("JDBCContex setting error: option not supported!")
  88. }
  89. }
  90. def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
  91. if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
  92. log.info(s"appendUpdateCommand> appending: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}")
  93. val nc = ctx.copy(
  94. statements = ctx.statements ++ Seq(_statement),
  95. parameters = ctx.parameters ++ Seq(_parameters),
  96. returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
  97. )
  98. log.info(s"appendUpdateCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
  99. nc
  100. } else {
  101. log.info(s"appendUpdateCommand> JDBCContex setting error: option not supported!")
  102. throw new IllegalStateException("JDBCContex setting error: option not supported!")
  103. }
  104. }
  105. def appendBatchParameters(_parameters: Any*): JDBCContext = {
  106. log.info(s"appendBatchParameters> appending: parameters: ${_parameters}")
  107. if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) {
  108. log.info(s"appendBatchParameters> JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
  109. throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
  110. }
  111. var matchParams = true
  112. if (ctx.parameters != Nil)
  113. if (ctx.parameters.head.size != _parameters.size)
  114. matchParams = false
  115. if (matchParams) {
  116. val nc = ctx.copy(
  117. parameters = ctx.parameters ++ Seq(_parameters)
  118. )
  119. log.info(s"appendBatchParameters> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
  120. nc
  121. } else {
  122. log.info(s"appendBatchParameters> JDBCContex setting error: batch command parameters not match!")
  123. throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
  124. }
  125. }
  126. def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
  127. if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
  128. throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
  129. ctx.copy(
  130. returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
  131. )
  132. }
  133. def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
  134. log.info(s"setDDLCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
  135. val nc = ctx.copy(
  136. statements = Seq(_statement),
  137. parameters = Seq(_parameters),
  138. sqlType = JDBCContext.SQL_EXEDDL,
  139. batch = false
  140. )
  141. log.info(s"setDDLCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
  142. nc
  143. }
  144. def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
  145. log.info(s"setUpdateCommand> setting: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}")
  146. val nc = ctx.copy(
  147. statements = Seq(_statement),
  148. parameters = Seq(_parameters),
  149. returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
  150. sqlType = JDBCContext.SQL_UPDATE,
  151. batch = false
  152. )
  153. log.info(s"setUpdateCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
  154. nc
  155. }
  156. def setBatchCommand(_statement: String): JDBCContext = {
  157. log.info(s"setBatchCommand> appending: statement: ${_statement}")
  158. val nc = ctx.copy (
  159. statements = Seq(_statement),
  160. sqlType = JDBCContext.SQL_UPDATE,
  161. batch = true
  162. )
  163. log.info(s"setBatchCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
  164. nc
  165. }
  166. }
  167. object JDBCEngine extends LogSupport {
  168. import JDBCContext._
  169. type JDBCDate = LocalDate
  170. type JDBCDateTime = LocalDateTime
  171. type JDBCTime = LocalTime
  172. def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
  173. def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn)
  174. def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) = LocalDateTime.of(date,time)
  175. def jdbcSetNow = LocalDateTime.now()
  176. def jdbcGetDate(sqlDate: java.sql.Date): java.time.LocalDate = sqlDate.toLocalDate
  177. def jdbcGetTime(sqlTime: java.sql.Time): java.time.LocalTime = sqlTime.toLocalTime
  178. def jdbcGetTimestamp(sqlTimestamp: java.sql.Timestamp): java.time.LocalDateTime =
  179. sqlTimestamp.toLocalDateTime
  180. type JDBCBlob = InputStream
  181. def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  182. implicit mat: Materializer) = FileToInputStream(fileName,timeOut)
  183. def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
  184. implicit mat: Materializer) = InputStreamToFile(blob,fileName)
  185. private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
  186. throw new IllegalStateException(message)
  187. }
  188. def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)
  189. (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
  190. val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream {
  191. val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
  192. ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
  193. val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
  194. sql.iterator
  195. .withDBSessionForceAdjuster(session => {
  196. session.connection.setAutoCommit(ctx.autoCommit)
  197. session.fetchSize(ctx.fetchSize)
  198. })
  199. }
  200. log.info(s"jdbcAkkaStream> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}")
  201. Source.fromPublisher[A](publisher)
  202. }
  203. def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],
  204. extractor: WrappedResultSet => A)(
  205. implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
  206. val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
  207. ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
  208. rawSql.fetchSize(ctx.fetchSize)
  209. try {
  210. implicit val session = NamedAutoSession(ctx.dbName)
  211. log.info(s"jdbcQueryResult> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}")
  212. val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
  213. sql.collection.apply[C]()
  214. } catch {
  215. case e: Exception =>
  216. log.error(s"jdbcQueryResult> runtime error: ${e.getMessage}")
  217. throw new RuntimeException(s"jdbcQueryResult> Error: ${e.getMessage}")
  218. }
  219. }
  220. def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
  221. if (ctx.sqlType != SQL_EXEDDL) {
  222. log.info(s"jdbcExecuteDDL> JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")
  223. Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
  224. }
  225. else {
  226. log.info(s"jdbcExecuteDDL> Source: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
  227. Future {
  228. NamedDB(ctx.dbName) localTx { implicit session =>
  229. ctx.statements.foreach { stm =>
  230. val ddl = new SQLExecution(statement = stm, parameters = Nil)(
  231. before = WrappedResultSet => {})(
  232. after = WrappedResultSet => {})
  233. ddl.apply()
  234. }
  235. "SQL_EXEDDL executed succesfully."
  236. }
  237. }
  238. }
  239. }
  240. def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  241. implicit ec: ExecutionContextExecutor,
  242. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  243. if (ctx.statements == Nil) {
  244. log.info(s"jdbcBatchUpdate> JDBCContex setting error: statements empty!")
  245. Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!"))
  246. }
  247. if (ctx.sqlType != SQL_UPDATE) {
  248. log.info(s"jdbcBatchUpdate> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")
  249. Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
  250. }
  251. else {
  252. if (ctx.batch) {
  253. if (noReturnKey(ctx)) {
  254. log.info(s"jdbcBatchUpdate> batch updating no return: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
  255. val usql = SQL(ctx.statements.head)
  256. .tags(ctx.queryTags: _*)
  257. .batch(ctx.parameters: _*)
  258. Future {
  259. NamedDB(ctx.dbName) localTx { implicit session =>
  260. ctx.queryTimeout.foreach(session.queryTimeout(_))
  261. usql.apply[Seq]()
  262. Seq.empty[Long].to[C]
  263. }
  264. }
  265. } else {
  266. log.info(s"jdbcBatchUpdate> batch updating return genkey: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
  267. val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
  268. Future {
  269. NamedDB(ctx.dbName) localTx { implicit session =>
  270. ctx.queryTimeout.foreach(session.queryTimeout(_))
  271. usql.apply[C]()
  272. }
  273. }
  274. }
  275. } else {
  276. log.info(s"jdbcBatchUpdate> JDBCContex setting error: must set batch = true !")
  277. Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
  278. }
  279. }
  280. }
  281. private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  282. implicit ec: ExecutionContextExecutor,
  283. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  284. val Some(key) :: xs = ctx.returnGeneratedKey
  285. val params: Seq[Any] = ctx.parameters match {
  286. case Nil => Nil
  287. case p@_ => p.head
  288. }
  289. log.info(s"singleTxUpdateWithReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
  290. val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
  291. Future {
  292. NamedDB(ctx.dbName) localTx { implicit session =>
  293. session.fetchSize(ctx.fetchSize)
  294. ctx.queryTimeout.foreach(session.queryTimeout(_))
  295. val result = usql.apply()
  296. Seq(result).to[C]
  297. }
  298. }
  299. }
  300. private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  301. implicit ec: ExecutionContextExecutor,
  302. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  303. val params: Seq[Any] = ctx.parameters match {
  304. case Nil => Nil
  305. case p@_ => p.head
  306. }
  307. val before = ctx.preAction match {
  308. case None => pstm: PreparedStatement => {}
  309. case Some(f) => f
  310. }
  311. val after = ctx.postAction match {
  312. case None => pstm: PreparedStatement => {}
  313. case Some(f) => f
  314. }
  315. log.info(s"singleTxUpdateNoReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
  316. val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
  317. Future {
  318. NamedDB(ctx.dbName) localTx {implicit session =>
  319. session.fetchSize(ctx.fetchSize)
  320. ctx.queryTimeout.foreach(session.queryTimeout(_))
  321. val result = usql.apply()
  322. Seq(result.toLong).to[C]
  323. }
  324. }
  325. }
  326. private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  327. implicit ec: ExecutionContextExecutor,
  328. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  329. if (noReturnKey(ctx))
  330. singleTxUpdateNoReturnKey(ctx)
  331. else
  332. singleTxUpdateWithReturnKey(ctx)
  333. }
  334. private def noReturnKey(ctx: JDBCContext): Boolean = {
  335. if (ctx.returnGeneratedKey != Nil) {
  336. val k :: xs = ctx.returnGeneratedKey
  337. k match {
  338. case None => true
  339. case Some(k) => false
  340. }
  341. } else true
  342. }
  343. def noActon: PreparedStatement=>Unit = pstm => {}
  344. def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  345. implicit ec: ExecutionContextExecutor,
  346. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  347. val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
  348. case Nil => Seq.fill(ctx.statements.size)(None)
  349. case k@_ => k
  350. }
  351. val sqlcmd = ctx.statements zip ctx.parameters zip keys
  352. log.info(s"multiTxUpdates> updating: db: ${ctx.dbName}, SQL Commands: ${sqlcmd}")
  353. Future {
  354. NamedDB(ctx.dbName) localTx { implicit session =>
  355. session.fetchSize(ctx.fetchSize)
  356. ctx.queryTimeout.foreach(session.queryTimeout(_))
  357. val results = sqlcmd.map { case ((stm, param), key) =>
  358. key match {
  359. case None =>
  360. new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
  361. case Some(k) =>
  362. new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
  363. }
  364. }
  365. results.to[C]
  366. }
  367. }
  368. }
  369. def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
  370. implicit ec: ExecutionContextExecutor,
  371. cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
  372. if (ctx.statements == Nil) {
  373. log.info(s"jdbcTxUpdates> JDBCContex setting error: statements empty!")
  374. Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!"))
  375. }
  376. if (ctx.sqlType != SQL_UPDATE) {
  377. log.info(s"jdbcTxUpdates> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")
  378. Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
  379. }
  380. else {
  381. if (!ctx.batch) {
  382. if (ctx.statements.size == 1)
  383. singleTxUpdate(ctx)
  384. else
  385. multiTxUpdates(ctx)
  386. } else {
  387. log.info(s"jdbcTxUpdates> JDBCContex setting error: must set batch = false !")
  388. Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !"))
  389. }
  390. }
  391. }
  392. case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
  393. statement: String, prepareParams: R => Seq[Any]) extends LogSupport {
  394. jas =>
  395. def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
  396. def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
  397. def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
  398. private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
  399. import scala.concurrent._
  400. val params = prepareParams(r)
  401. log.info(s"JDBCActionStream.perform> db: ${dbName}, statement: ${statement}, parameters: ${params}")
  402. Future {
  403. NamedDB(dbName) autoCommit { session =>
  404. session.execute(statement, params: _*)
  405. }
  406. r
  407. }
  408. }
  409. def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
  410. if (processInOrder)
  411. Flow[R].mapAsync(parallelism)(perform)
  412. else
  413. Flow[R].mapAsyncUnordered(parallelism)(perform)
  414. }
  415. object JDBCActionStream {
  416. def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
  417. new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
  418. }
  419. }

cql/CassandraEngine.scala

  1. package sdp.cql.engine
  2. import akka.NotUsed
  3. import akka.stream.alpakka.cassandra.scaladsl._
  4. import akka.stream.scaladsl._
  5. import com.datastax.driver.core._
  6. import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
  7. import scala.collection.JavaConverters._
  8. import scala.collection.generic.CanBuildFrom
  9. import scala.concurrent._
  10. import scala.concurrent.duration.Duration
  11. import sdp.logging.LogSupport
  12. object CQLContext {
  13. // Consistency Levels
  14. type CONSISTENCY_LEVEL = Int
  15. val ANY: CONSISTENCY_LEVEL = 0x0000
  16. val ONE: CONSISTENCY_LEVEL = 0x0001
  17. val TWO: CONSISTENCY_LEVEL = 0x0002
  18. val THREE: CONSISTENCY_LEVEL = 0x0003
  19. val QUORUM : CONSISTENCY_LEVEL = 0x0004
  20. val ALL: CONSISTENCY_LEVEL = 0x0005
  21. val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006
  22. val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007
  23. val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A
  24. val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B
  25. val SERIAL: CONSISTENCY_LEVEL = 0x000C
  26. def apply(): CQLContext = CQLContext(statements = Nil)
  27. def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
  28. consistency match {
  29. case ALL => ConsistencyLevel.ALL
  30. case ONE => ConsistencyLevel.ONE
  31. case TWO => ConsistencyLevel.TWO
  32. case THREE => ConsistencyLevel.THREE
  33. case ANY => ConsistencyLevel.ANY
  34. case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
  35. case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
  36. case QUORUM => ConsistencyLevel.QUORUM
  37. case SERIAL => ConsistencyLevel.SERIAL
  38. case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
  39. }
  40. }
  41. }
  42. case class CQLQueryContext(
  43. statement: String,
  44. parameter: Seq[Object] = Nil,
  45. consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
  46. fetchSize: Int = 100
  47. ) { ctx =>
  48. def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext =
  49. ctx.copy(consistency = Some(_consistency))
  50. def setFetchSize(pageSize: Int): CQLQueryContext =
  51. ctx.copy(fetchSize = pageSize)
  52. def setParameters(param: Seq[Object]): CQLQueryContext =
  53. ctx.copy(parameter = param)
  54. }
  55. object CQLQueryContext {
  56. def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameter = param)
  57. }
  58. case class CQLContext(
  59. statements: Seq[String],
  60. parameters: Seq[Seq[Object]] = Nil,
  61. consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
  62. batch: Boolean = false
  63. ) extends LogSupport { ctx =>
  64. def setBatch(bat: Boolean) = ctx.copy(batch = bat)
  65. def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
  66. ctx.copy(consistency = Some(_consistency))
  67. def setCommand(_statement: String, _parameters: Object*): CQLContext = {
  68. log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
  69. val nc = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
  70. log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}")
  71. nc
  72. }
  73. def appendCommand(_statement: String, _parameters: Object*): CQLContext = {
  74. log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
  75. val nc = ctx.copy(statements = ctx.statements :+ _statement,
  76. parameters = ctx.parameters ++ Seq(_parameters))
  77. log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}")
  78. nc
  79. }
  80. }
  81. object CQLEngine extends LogSupport {
  82. import CQLContext._
  83. import CQLHelpers._
  84. import cats._, cats.data._, cats.implicits._
  85. import scala.concurrent.{Await, Future}
  86. import scala.concurrent.duration._
  87. def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100
  88. ,extractor: Row => A)(
  89. implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
  90. val prepStmt = session.prepare(ctx.statement)
  91. var boundStmt = prepStmt.bind()
  92. var params: Seq[Object] = Nil
  93. if (ctx.parameter != Nil) {
  94. params = processParameters(ctx.parameter)
  95. boundStmt = prepStmt.bind(params:_*)
  96. }
  97. log.info(s"fetchResultPage> statement: ${prepStmt.getQueryString}, parameters: ${params}")
  98. ctx.consistency.foreach {consistency =>
  99. boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
  100. val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
  101. (resultSet,(resultSet.asScala.view.map(extractor)).to[C])
  102. }
  103. def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
  104. extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
  105. if (resultSet.isFullyFetched) {
  106. (resultSet, None)
  107. } else {
  108. try {
  109. val result = Await.result(resultSet.fetchMoreResults(), timeOut)
  110. (result, Some((result.asScala.view.map(extractor)).to[C]))
  111. } catch { case e: Throwable => (resultSet, None) }
  112. }
  113. def cqlExecute(ctx: CQLContext)(
  114. implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
  115. var invalidBat = false
  116. if ( ctx.batch ) {
  117. if (ctx.parameters == Nil)
  118. invalidBat = true
  119. else if (ctx.parameters.size < 2)
  120. invalidBat = true;
  121. }
  122. if (!ctx.batch || invalidBat) {
  123. if(invalidBat)
  124. log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
  125. if (ctx.statements.size == 1) {
  126. var param: Seq[Object] = Nil
  127. if (ctx.parameters != Nil) param = ctx.parameters.head
  128. log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
  129. cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
  130. }
  131. else {
  132. var params: Seq[Seq[Object]] = Nil
  133. if (ctx.parameters == Nil)
  134. params = Seq.fill(ctx.statements.length)(Nil)
  135. else {
  136. if (ctx.statements.size > ctx.parameters.size) {
  137. log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
  138. val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
  139. params = ctx.parameters ++ nils
  140. }
  141. else
  142. params = ctx.parameters
  143. }
  144. val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
  145. log.info(s"cqlExecute> multi-commands: ${commands}")
  146. /*
  147. //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
  148. //therefore, make sure no command replies on prev command effect
  149. val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
  150. cqlSingleUpdate(ctx.consistency, stmt, param)
  151. }.toList
  152. val futList = lstCmds.sequence.map(_ => true) //must map to execute
  153. */
  154. /*
  155. //using traverse to have some degree of parallelism = max(runtimes)
  156. //therefore, make sure no command replies on prev command effect
  157. val futList = Future.traverse(commands) { case (stmt,param) =>
  158. cqlSingleUpdate(ctx.consistency, stmt, param)
  159. }.map(_ => true)
  160. Await.result(futList, 3 seconds)
  161. Future.successful(true)
  162. */
  163. // run sync directly
  164. Future {
  165. commands.foreach { case (stm, pars) =>
  166. cqlExecuteSync(ctx.consistency, stm, pars)
  167. }
  168. true
  169. }
  170. }
  171. }
  172. else
  173. cqlBatchUpdate(ctx)
  174. }
  175. def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
  176. implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
  177. val prepStmt = session.prepare(stmt)
  178. var boundStmt = prepStmt.bind()
  179. var pars: Seq[Object] = Nil
  180. if (params != Nil) {
  181. pars = processParameters(params)
  182. boundStmt = prepStmt.bind(pars: _*)
  183. }
  184. log.info(s"cqlSingleUpdate> statement: ${prepStmt.getQueryString}, parameters: ${pars}")
  185. cons.foreach { consistency =>
  186. boundStmt.setConsistencyLevel(consistencyLevel(consistency))
  187. }
  188. session.executeAsync(boundStmt).map(_.wasApplied())
  189. }
  190. def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
  191. implicit session: Session, ec: ExecutionContext): Boolean = {
  192. val prepStmt = session.prepare(stmt)
  193. var boundStmt = prepStmt.bind()
  194. var pars: Seq[Object] = Nil
  195. if (params != Nil) {
  196. pars = processParameters(params)
  197. boundStmt = prepStmt.bind(pars: _*)
  198. }
  199. log.info(s"cqlExecuteSync> statement: ${prepStmt.getQueryString}, parameters: ${pars}")
  200. cons.foreach { consistency =>
  201. boundStmt.setConsistencyLevel(consistencyLevel(consistency))
  202. }
  203. session.execute(boundStmt).wasApplied()
  204. }
  205. def cqlBatchUpdate(ctx: CQLContext)(
  206. implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
  207. var params: Seq[Seq[Object]] = Nil
  208. if (ctx.parameters == Nil)
  209. params = Seq.fill(ctx.statements.length)(Nil)
  210. else
  211. params = ctx.parameters
  212. log.info(s"cqlBatchUpdate> statement: ${ctx.statements.head}, parameters: ${params}")
  213. val prepStmt = session.prepare(ctx.statements.head)
  214. var batch = new BatchStatement()
  215. params.foreach { p =>
  216. log.info(s"cqlBatchUpdate> batch with raw parameter: ${p}")
  217. val pars = processParameters(p)
  218. log.info(s"cqlMultiUpdate> batch with cooked parameters: ${pars}")
  219. batch.add(prepStmt.bind(pars: _*))
  220. }
  221. ctx.consistency.foreach { consistency =>
  222. batch.setConsistencyLevel(consistencyLevel(consistency))
  223. }
  224. session.executeAsync(batch).map(_.wasApplied())
  225. }
  226. def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A)
  227. (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {
  228. val prepStmt = session.prepare(ctx.statement)
  229. var boundStmt = prepStmt.bind()
  230. val params = processParameters(ctx.parameter)
  231. boundStmt = prepStmt.bind(params:_*)
  232. ctx.consistency.foreach {consistency =>
  233. boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
  234. log.info(s"cassandraStream> statement: ${prepStmt.getQueryString}, parameters: ${params}")
  235. CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor)
  236. }
  237. case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
  238. statement: String, prepareParams: R => Seq[Object],
  239. consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
  240. def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
  241. def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
  242. def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
  243. cas.copy(consistency = Some(_consistency))
  244. def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
  245. var prepStmt = session.prepare(statement)
  246. var boundStmt = prepStmt.bind()
  247. val params = processParameters(prepareParams(r))
  248. boundStmt = prepStmt.bind(params: _*)
  249. consistency.foreach { cons =>
  250. boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
  251. }
  252. log.info(s"CassandraActionStream.perform> statement: ${prepStmt.getQueryString}, parameters: ${params}")
  253. session.executeAsync(boundStmt).map(_ => r)
  254. }
  255. def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
  256. if (processInOrder)
  257. Flow[R].mapAsync(parallelism)(perform)
  258. else
  259. Flow[R].mapAsyncUnordered(parallelism)(perform)
  260. def unloggedBatch[K](statementBinder: (
  261. R, PreparedStatement) => BoundStatement,partitionKey: R => K)(
  262. implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = {
  263. val preparedStatement = session.prepare(statement)
  264. log.info(s"CassandraActionStream.unloggedBatch> statement: ${preparedStatement.getQueryString}")
  265. CassandraFlow.createUnloggedBatchWithPassThrough[R, K](
  266. parallelism,
  267. preparedStatement,
  268. statementBinder,
  269. partitionKey)
  270. }
  271. }
  272. object CassandraActionStream {
  273. def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
  274. new CassandraActionStream[R]( statement=_statement, prepareParams = params)
  275. }
  276. }
  277. object CQLHelpers extends LogSupport {
  278. import java.nio.ByteBuffer
  279. import java.io._
  280. import java.nio.file._
  281. import com.datastax.driver.core.LocalDate
  282. import com.datastax.driver.extras.codecs.jdk8.InstantCodec
  283. import java.time.Instant
  284. import akka.stream.scaladsl._
  285. import akka.stream._
  286. implicit def listenableFutureToFuture[T](
  287. listenableFuture: ListenableFuture[T]): Future[T] = {
  288. val promise = Promise[T]()
  289. Futures.addCallback(listenableFuture, new FutureCallback[T] {
  290. def onFailure(error: Throwable): Unit = {
  291. promise.failure(error)
  292. ()
  293. }
  294. def onSuccess(result: T): Unit = {
  295. promise.success(result)
  296. ()
  297. }
  298. })
  299. promise.future
  300. }
  301. case class CQLDate(year: Int, month: Int, day: Int)
  302. case object CQLTodayDate
  303. case class CQLDateTime(year: Int, Month: Int,
  304. day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
  305. case object CQLDateTimeNow
  306. def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate =
  307. dateToConvert.toInstant()
  308. .atZone(java.time.ZoneId.systemDefault())
  309. .toLocalDate()
  310. def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime =
  311. dateToConvert.toInstant()
  312. .atZone(java.time.ZoneId.systemDefault())
  313. .toLocalTime()
  314. def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime=
  315. new java.sql.Timestamp(
  316. dateToConvert.getTime()
  317. ).toLocalDateTime()
  318. def processParameters(params: Seq[Object]): Seq[Object] = {
  319. import java.time.{Clock,ZoneId}
  320. log.info(s"[processParameters] input: ${params}")
  321. val outParams = params.map { obj =>
  322. obj match {
  323. case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
  324. case CQLTodayDate =>
  325. val today = java.time.LocalDate.now()
  326. LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
  327. case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
  328. case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
  329. Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
  330. case p@_ => p
  331. }
  332. }
  333. log.info(s"[processParameters] output: ${params}")
  334. outParams
  335. }
  336. class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
  337. override def read: Int = {
  338. if (!buf.hasRemaining) return -1
  339. buf.get
  340. }
  341. override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
  342. val length: Int = Math.min(len, buf.remaining)
  343. buf.get(bytes, off, length)
  344. length
  345. }
  346. }
  347. object ByteBufferInputStream {
  348. def apply(buf: ByteBuffer): ByteBufferInputStream = {
  349. new ByteBufferInputStream(buf)
  350. }
  351. }
  352. class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {
  353. override def write(b: Int): Unit = {
  354. buf.put(b.toByte)
  355. }
  356. override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
  357. buf.put(bytes, off, len)
  358. }
  359. }
  360. object FixsizedByteBufferOutputStream {
  361. def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
  362. }
  363. class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {
  364. private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR
  365. override def write(b: Array[Byte], off: Int, len: Int): Unit = {
  366. val position = buf.position
  367. val limit = buf.limit
  368. val newTotal: Long = position + len
  369. if(newTotal > limit){
  370. var capacity = (buf.capacity * increasing)
  371. while(capacity <= newTotal){
  372. capacity = (capacity*increasing)
  373. }
  374. increase(capacity.toInt)
  375. }
  376. buf.put(b, 0, len)
  377. }
  378. override def write(b: Int): Unit= {
  379. if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
  380. buf.put(b.toByte)
  381. }
  382. protected def increase(newCapacity: Int): Unit = {
  383. buf.limit(buf.position)
  384. buf.rewind
  385. val newBuffer =
  386. if (onHeap) ByteBuffer.allocate(newCapacity)
  387. else ByteBuffer.allocateDirect(newCapacity)
  388. newBuffer.put(buf)
  389. buf.clear
  390. buf = newBuffer
  391. }
  392. def size: Long = buf.position
  393. def capacity: Long = buf.capacity
  394. def byteBuffer: ByteBuffer = buf
  395. }
  396. object ExpandingByteBufferOutputStream {
  397. val DEFAULT_INCREASING_FACTOR = 1.5f
  398. def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
  399. if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
  400. val buffer: ByteBuffer =
  401. if (onHeap) ByteBuffer.allocate(size)
  402. else ByteBuffer.allocateDirect(size)
  403. new ExpandingByteBufferOutputStream(buffer,onHeap)
  404. }
  405. def apply(size: Int): ExpandingByteBufferOutputStream = {
  406. apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
  407. }
  408. def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
  409. apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
  410. }
  411. def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
  412. apply(size, increasingBy, false)
  413. }
  414. }
  415. def cqlFileToBytes(fileName: String): ByteBuffer = {
  416. val fis = new FileInputStream(fileName)
  417. val b = new Array[Byte](fis.available + 1)
  418. val length = b.length
  419. fis.read(b)
  420. ByteBuffer.wrap(b)
  421. }
  422. def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
  423. implicit mat: Materializer): Future[IOResult] = {
  424. val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
  425. source.runWith(FileIO.toPath(Paths.get(fileName)))
  426. }
  427. def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
  428. val outputFormat = new java.text.SimpleDateFormat(fmt)
  429. outputFormat.format(date)
  430. }
  431. def useJava8DateTime(cluster: Cluster) = {
  432. //for jdk8 datetime format
  433. cluster.getConfiguration().getCodecRegistry()
  434. .register(InstantCodec.instance)
  435. }
  436. }

BytesConverter.scala

  1. package protobuf.bytes
  2. import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
  3. import com.google.protobuf.ByteString
  4. object Converter {
  5. def marshal(value: Any): ByteString = {
  6. val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  7. val oos = new ObjectOutputStream(stream)
  8. oos.writeObject(value)
  9. oos.close()
  10. ByteString.copyFrom(stream.toByteArray())
  11. }
  12. def unmarshal[A](bytes: ByteString): A = {
  13. val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
  14. val value = ois.readObject()
  15. ois.close()
  16. value.asInstanceOf[A]
  17. }
  18. }

CQLServices.scala

  1. package demo.sdp.grpc.cql.server
  2. import akka.NotUsed
  3. import akka.stream.scaladsl._
  4. import protobuf.bytes.Converter._
  5. import com.datastax.driver.core._
  6. import scala.concurrent.ExecutionContextExecutor
  7. import sdp.grpc.services._
  8. import sdp.cql.engine._
  9. import CQLEngine._
  10. import CQLHelpers._
  11. import sdp.logging.LogSupport
  12. import scala.concurrent._
  13. import scala.concurrent.duration._
  14. import akka.stream.ActorMaterializer
  15. class CQLStreamingServices(implicit ec: ExecutionContextExecutor,
  16. mat: ActorMaterializer, session: Session)
  17. extends CqlGrpcAkkaStream.CQLServices with LogSupport{
  18. val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
  19. row.rowid.asInstanceOf[Object],
  20. row.measureid.asInstanceOf[Object],
  21. row.statename,
  22. row.countyname,
  23. row.reportyear.asInstanceOf[Object],
  24. row.value.asInstanceOf[Object],
  25. CQLDateTimeNow
  26. )
  27. val cqlInsert ="""
  28. |insert into testdb.AQMRPT(
  29. | rowid,
  30. | measureid,
  31. | statename,
  32. | countyname,
  33. | reportyear,
  34. | value,
  35. | created)
  36. | values(?,?,?,?,?,?,?)
  37. """.stripMargin
  38. val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2)
  39. .setProcessOrder(false)
  40. /*
  41. val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
  42. Flow[AQMRPTRow]
  43. .via(cqlActionStream.performOnRow)
  44. */
  45. val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
  46. Flow[AQMRPTRow]
  47. .mapAsync(cqlActionStream.parallelism){ row =>
  48. if (IfExists(row.rowid))
  49. Future.successful(CQLResult(marshal(0)))
  50. else
  51. cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))}
  52. }
  53. }
  54. override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
  55. Flow[AQMRPTRow]
  56. .via(cqlActionFlow)
  57. }
  58. private def IfExists(rowid: Long): Boolean = {
  59. val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
  60. val param = Seq(rowid.asInstanceOf[Object])
  61. val toRowId: Row => Long = r => r.getLong("rowid")
  62. val ctx = CQLQueryContext(cql,param)
  63. val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
  64. val fut = src.toMat(Sink.headOption)(Keep.right).run()
  65. val result = Await.result(fut,3 seconds)
  66. log.info(s"checking existence: ${result}")
  67. result match {
  68. case Some(x) => true
  69. case None => false
  70. }
  71. }
  72. override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = {
  73. Flow[HelloMsg]
  74. .map {r => println(r) ; r}
  75. }
  76. override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
  77. Flow[CQLUpdate]
  78. .flatMapConcat { context =>
  79. //unpack CQLUpdate and construct the context
  80. val ctx = CQLContext(context.statements)
  81. log.info(s"**** CQLContext => ${ctx} ***")
  82. Source
  83. .fromFuture(cqlExecute(ctx))
  84. .map { r => CQLResult(marshal(r)) }
  85. }
  86. }
  87. def toCQLTimestamp(rs: Row) = {
  88. try {
  89. val tm = rs.getTimestamp("CREATED")
  90. if (tm == null) None
  91. else {
  92. val localdt = cqlGetTimestamp(tm)
  93. Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
  94. Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
  95. }
  96. }
  97. catch {
  98. case e: Exception => None
  99. }
  100. }
  101. val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
  102. rowid = rs.getLong("ROWID"),
  103. measureid = rs.getLong("MEASUREID"),
  104. statename = rs.getString("STATENAME"),
  105. countyname = rs.getString("COUNTYNAME"),
  106. reportyear = rs.getInt("REPORTYEAR"),
  107. value = rs.getInt("VALUE"),
  108. created = toCQLTimestamp(rs)
  109. )
  110. override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
  111. log.info("**** runQuery called on service side ***")
  112. Flow[CQLQuery]
  113. .flatMapConcat { q =>
  114. //unpack JDBCQuery and construct the context
  115. var params: Seq[Object] = Nil
  116. if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
  117. params = unmarshal[Seq[Object]](q.parameters)
  118. log.info(s"**** query parameters: ${params} ****")
  119. val ctx = CQLQueryContext(q.statement,params)
  120. CQLEngine.cassandraStream(ctx,toAQMRow)
  121. }
  122. }
  123. }

CQLServer.scala

  1. package demo.sdp.grpc.cql.server
  2. import java.util.logging.Logger
  3. import com.datastax.driver.core._
  4. import akka.actor.ActorSystem
  5. import akka.stream.ActorMaterializer
  6. import io.grpc.Server
  7. import io.grpc.ServerBuilder
  8. import sdp.grpc.services._
  9. import sdp.cql.engine._
  10. import CQLHelpers._
  11. class gRPCServer(server: Server) {
  12. val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)
  13. def start(): Unit = {
  14. server.start()
  15. logger.info(s"Server started, listening on ${server.getPort}")
  16. sys.addShutdownHook {
  17. // Use stderr here since the logger may has been reset by its JVM shutdown hook.
  18. System.err.println("*** shutting down gRPC server since JVM is shutting down")
  19. stop()
  20. System.err.println("*** server shut down")
  21. }
  22. ()
  23. }
  24. def stop(): Unit = {
  25. server.shutdown()
  26. }
  27. /**
  28. * Await termination on the main thread since the grpc library uses daemon threads.
  29. */
  30. def blockUntilShutdown(): Unit = {
  31. server.awaitTermination()
  32. }
  33. }
  34. object CQLServer extends App {
  35. implicit val cqlsys = ActorSystem("cqlSystem")
  36. implicit val mat = ActorMaterializer()
  37. implicit val ec = cqlsys.dispatcher
  38. val cluster = new Cluster
  39. .Builder()
  40. .addContactPoints("localhost")
  41. .withPort(9042)
  42. .build()
  43. useJava8DateTime(cluster)
  44. implicit val session = cluster.connect()
  45. val server = new gRPCServer(
  46. ServerBuilder
  47. .forPort(50051)
  48. .addService(
  49. CqlGrpcAkkaStream.bindService(
  50. new CQLStreamingServices
  51. )
  52. ).build()
  53. )
  54. server.start()
  55. // server.blockUntilShutdown()
  56. scala.io.StdIn.readLine()
  57. session.close()
  58. cluster.close()
  59. mat.shutdown()
  60. cqlsys.terminate()
  61. }

CQLClient.scala

  1. package demo.sdp.grpc.cql.client
  2. import sdp.grpc.services._
  3. import protobuf.bytes.Converter._
  4. import akka.stream.scaladsl._
  5. import akka.NotUsed
  6. import akka.actor.ActorSystem
  7. import akka.stream.{ActorMaterializer, ThrottleMode}
  8. import io.grpc._
  9. import sdp.logging.LogSupport
  10. import sdp.jdbc.engine._
  11. import JDBCEngine._
  12. import scalikejdbc.WrappedResultSet
  13. import sdp.cql.engine.CQLHelpers.CQLDateTimeNow
  14. import scala.util._
  15. import scala.concurrent.ExecutionContextExecutor
  16. class CQLStreamClient(host: String, port: Int)(
  17. implicit ec: ExecutionContextExecutor) extends LogSupport {
  18. val channel = ManagedChannelBuilder
  19. .forAddress(host, port)
  20. .usePlaintext(true)
  21. .build()
  22. val stub = CqlGrpcAkkaStream.stub(channel)
  23. val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
  24. dbName = 'h2,
  25. statement = "select * from AQMRPT where statename='Arkansas'"
  26. )
  27. def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
  28. rowid = rs.long("ROWID"),
  29. measureid = rs.long("MEASUREID"),
  30. statename = rs.string("STATENAME"),
  31. countyname = rs.string("COUNTYNAME"),
  32. reportyear = rs.int("REPORTYEAR"),
  33. value = rs.int("VALUE"),
  34. created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0))))
  35. )
  36. import scala.concurrent.duration._
  37. def transferRows: Source[CQLResult, NotUsed] = {
  38. log.info(s"**** calling transferRows ****")
  39. jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
  40. // .throttle(1, 500.millis, 1, ThrottleMode.shaping)
  41. .via(stub.transferRows)
  42. }
  43. def echoHello: Source[HelloMsg,NotUsed] = {
  44. val row = HelloMsg("hello world!")
  45. val rows = List.fill[HelloMsg](100)(row)
  46. Source
  47. .fromIterator(() => rows.iterator)
  48. .via(stub.clientStreaming)
  49. }
  50. val query0 = CQLQuery(
  51. statement = "select * from testdb.AQMRPT"
  52. )
  53. val query = CQLQuery(
  54. statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
  55. parameters = marshal(Seq("Arkansas", 0.toInt))
  56. )
  57. val query2 = CQLQuery (
  58. statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
  59. parameters = marshal(Seq("Colorado", 3.toInt))
  60. )
  61. val query3= CQLQuery (
  62. statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
  63. parameters = marshal(Seq("Arkansas", 8.toInt))
  64. )
  65. def queryRows: Source[AQMRPTRow,NotUsed] = {
  66. log.info(s"running queryRows ...")
  67. Source
  68. .single(query)
  69. .via(stub.runQuery)
  70. }
  71. val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT"
  72. val createCQL ="""
  73. CREATE TABLE testdb.AQMRPT (
  74. rowid bigint primary key,
  75. measureid bigint,
  76. statename text,
  77. countyname text,
  78. reportyear int,
  79. value int,
  80. created timestamp
  81. )"""
  82. val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
  83. def createTbl: Source[CQLResult,NotUsed] = {
  84. log.info(s"running createTbl ...")
  85. Source
  86. .single(cqlddl)
  87. .via(stub.runDDL)
  88. }
  89. }
  90. object EchoHelloClient extends App {
  91. implicit val system = ActorSystem("EchoNumsClient")
  92. implicit val mat = ActorMaterializer.create(system)
  93. implicit val ec = system.dispatcher
  94. val client = new CQLStreamClient("localhost", 50051)
  95. client.echoHello.runForeach(println)
  96. scala.io.StdIn.readLine()
  97. mat.shutdown()
  98. system.terminate()
  99. }
  100. object TransferRows extends App {
  101. import sdp.jdbc.config._
  102. implicit val system = ActorSystem("JDBCServer")
  103. implicit val mat = ActorMaterializer.create(system)
  104. implicit val ec = system.dispatcher
  105. ConfigDBsWithEnv("dev").setup('h2)
  106. ConfigDBsWithEnv("dev").loadGlobalSettings()
  107. val client = new CQLStreamClient("localhost", 50051)
  108. val fut = client.transferRows.runFold(0){(a,b) => a + unmarshal[Int](b.result)}
  109. fut.onComplete {
  110. case scala.util.Success(cnt) => println(s"done transfer ${cnt} rows.")
  111. case Failure(e) => println(s"!!!!!streaming error: ${e.getMessage}")
  112. }
  113. scala.io.StdIn.readLine()
  114. ConfigDBsWithEnv("dev").close('h2)
  115. mat.shutdown()
  116. system.terminate()
  117. }
  118. object QueryRows extends App {
  119. implicit val system = ActorSystem("QueryRows")
  120. implicit val mat = ActorMaterializer.create(system)
  121. implicit val ec = system.dispatcher
  122. val client = new CQLStreamClient("localhost", 50051)
  123. val fut = client.queryRows.runForeach { r => println(r) }
  124. fut.onComplete {
  125. case scala.util.Success(d) => println(s"done querying.")
  126. case Failure(e) => println(s"!!!!!query error: ${e.getMessage}")
  127. }
  128. scala.io.StdIn.readLine()
  129. mat.shutdown()
  130. system.terminate()
  131. }
  132. object RunDDL extends App {
  133. implicit val system = ActorSystem("RunDDL")
  134. implicit val mat = ActorMaterializer.create(system)
  135. implicit val ec = system.dispatcher
  136. val client = new CQLStreamClient("localhost", 50051)
  137. client.createTbl.runForeach { r => println(unmarshal(r.result)) }
  138. scala.io.StdIn.readLine()
  139. mat.shutdown()
  140. system.terminate()
  141. }

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号