经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
Akka-CQRS(8)- CQRS Reader Actor 应用实例
来源:cnblogs  作者:雪川大虫  时间:2019/5/31 8:55:07  对本文有异议

  前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。

假如我们有个业务系统也是在cassandra上的,那么reader就需要把从日志读出来的事件恢复成cassandra表里的数据行row。首先,我们需要在cassandra上创建相关的keyspace和table。下面是在scala中使用cassandra-java-driver的例子:

  1. import com.datastax.driver.core._
  2. import akka.actor.ActorSystem
  3. import akka.stream.ActorMaterializer
  4. import sdp.cql.engine._
  5. import CQLEngine._
  6. import CQLHelpers._
  7. import monix.execution.Scheduler.Implicits.global
  8. import scala.util._
  9. object CQLCreatTables extends App {
  10. //#init-mat
  11. implicit val cqlsys = ActorSystem("cqlSystem")
  12. implicit val mat = ActorMaterializer()
  13. // implicit val ec = cqlsys.dispatcher
  14. val cluster = new Cluster
  15. .Builder()
  16. .addContactPoints("192.168.11.189")
  17. .withPort(9042)
  18. .build()
  19. useJava8DateTime(cluster)
  20. implicit val session = cluster.connect()
  21. val createKeyspace = """
  22. CREATE KEYSPACE pos_on_cloud WITH replication = { //pos业务数据库
  23. 'class': 'SimpleStrategy',
  24. 'replication_factor': '3'
  25. }"""
  26. val createVchLog ="""
  27. CREATE TABLE pos_on_cloud.vch_log ( //销售单号日志 (可以从某日开始重新运算交易)
  28. terminal text,
  29. txndate text,
  30. vchnum int,
  31. begin_seq bigint,
  32. end_seq bigint,
  33. PRIMARY KEY (terminal,txndate,vchnum)
  34. )"""
  35. val createTxnItems ="""
  36. CREATE TABLE pos_on_cloud.txn_log ( //pos交易记录表
  37. terminal text,
  38. txndate text,
  39. txntime text,
  40. opr text,
  41. num int,
  42. seq int,
  43. txntype int,
  44. salestype int,
  45. qty int,
  46. price int,
  47. amount int,
  48. disc int,
  49. dscamt int,
  50. member text,
  51. code text,
  52. acct text,
  53. dpt text,
  54. PRIMARY KEY (terminal,txndate,num,seq)
  55. )"""
  56. val createTxnSuspend ="""
  57. CREATE TABLE pos_on_cloud.txn_hold ( //临时挂单表
  58. terminal text,
  59. txndate text,
  60. txntime text,
  61. opr text,
  62. num int,
  63. seq int,
  64. txntype int,
  65. salestype int,
  66. qty int,
  67. price int,
  68. amount int,
  69. disc int
  70. dscamt int,
  71. member text,
  72. code text,
  73. acct text,
  74. dpt text,
  75. PRIMARY KEY (terminal,txndate,num,seq)
  76. )"""
  77. val ctxKeyspace = CQLContext().setCommand(createKeyspace)
  78. val ctxVchlog = CQLContext().setCommand(createVchLog)
  79. val ctxTxnlog = CQLContext().setCommand(createTxnItems)
  80. val ctxTxnhold = CQLContext().setCommand(createTxnSuspend)
  81. val results = for {
  82. stsKeyspace <- cqlExecute(ctxKeyspace)
  83. stsVchlog <- cqlExecute(ctxVchlog)
  84. stsTxnlog <- cqlExecute(ctxTxnlog)
  85. stsTxnhold <- cqlExecute(ctxTxnhold)
  86. } yield (stsKeyspace,stsVchlog,stsTxnlog,stsTxnhold)
  87. val task = results.value.value
  88. val cancellableFut = task.runToFuture
  89. cancellableFut.onComplete {
  90. case Success(value) =>
  91. println(s"returned status: $value")
  92. case Failure(ex) =>
  93. System.err.println(s"ERROR: ${ex.getMessage}")
  94. }
  95. // cancellableFut.cancel()
  96. /*
  97. val cancelable = task.runAsync { result =>
  98. result match {
  99. case Right(value) =>
  100. println(value)
  101. case Left(ex) =>
  102. System.err.println(s"ERROR: ${ex.getMessage}")
  103. }
  104. } */
  105. scala.io.StdIn.readLine()
  106. session.close()
  107. cluster.close()
  108. cqlsys.terminate()
  109. }

这里面调用了之前PICE系列博文中设计的CassandraEngine里的工具源代码。下面是用CassandraEngine工具向cassandra表里插入数据的示范代码: 

 

  1. object DBWriter {
  2. def writeTxnsToDB(vchnum: Int, susp: Boolean, txns: List[TxnItem])(pid: String, bseq: Long, eseq: Long) = {
  3. import monix.execution.Scheduler.Implicits.global
  4. val cluster = new Cluster
  5. .Builder()
  6. .addContactPoints("192.168.11.189")
  7. .withPort(9042)
  8. .build()
  9. useJava8DateTime(cluster)
  10. implicit val session = cluster.connect()
  11. val insertVchLog = """
  12. |insert into pos_on_cloud.vch_log(terminal,txndate,vchnum,begin_seq,end_seq)
  13. |values(?,?,?,?,?)
  14. |""".stripMargin
  15. val insertTxns = """
  16. |insert into pos_on_cloud.txn_log(terminal,txndate,txntime,opr,num,seq,txntype,salestype,
  17. |qty,price,amount,disc,dscamt,member,code,acct,dpt)
  18. |values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
  19. """.stripMargin
  20. val insertSusp = """
  21. |insert into pos_on_cloud.txn_hold(terminal,txndate,txntime,opr,num,seq,txntype,salestype,
  22. |qty,price,amount,disc,dscamt,member,code,acct,dpt)
  23. |values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
  24. """.stripMargin
  25. val vchParams: Seq[Object] = Seq(
  26. pid.asInstanceOf[Object],
  27. LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")).asInstanceOf[Object],
  28. vchnum.asInstanceOf[Object],
  29. bseq.asInstanceOf[Object],
  30. eseq.asInstanceOf[Object]
  31. )
  32. val txnParams: Seq[Seq[Object]] = txns.foldRight(Seq[Seq[Object]]()) { (txnitem,b) =>
  33. (Seq(pid.asInstanceOf[Object]) ++ ccToList(txnitem)) +: b
  34. }
  35. val ctxVchlog = CQLContext().setCommand(insertVchLog, 1,vchParams)
  36. val ctxTxnlog = CQLContext().setCommand((if(susp) insertSusp else insertTxns),txnParams.size,txnParams)
  37. val results = for {
  38. stsVchlog <- cqlExecute(ctxVchlog)
  39. stsTxnlog <- cqlExecute(ctxTxnlog)
  40. } yield (stsTxnlog)
  41. val task = results.value.value
  42. val cancellableFut = task.runToFuture
  43. cancellableFut.onComplete {
  44. case Success(value) =>
  45. println(s"returned status: $value")
  46. session.close()
  47. cluster.close()
  48. case Failure(ex) =>
  49. System.err.println(s"ERROR: ${ex.getMessage}")
  50. session.close()
  51. cluster.close()
  52. }
  53. // cqlsys.terminate()
  54. }
  55. def getMapFromCC(cc: Product) = cc.getClass.getDeclaredFields.map( _.getName ) // all field names
  56. .zip( cc.productIterator.to ).toMap // zipped with all values
  57. def ccFieldsToMap(cc: Product) = {
  58. val values = cc.productIterator
  59. cc.getClass.getDeclaredFields.map( _.getName -> (values.next).asInstanceOf[Object] ).toMap
  60. }
  61. def ccToList(cc: Product) = {
  62. val values = cc.productIterator
  63. cc.getClass.getDeclaredFields.map(_ => (values.next).asInstanceOf[Object] ).toList
  64. }
  65. def ccToMap(cc: Product): Map[String, Object] = {
  66. val values = cc.productIterator
  67. cc.getClass.getDeclaredFields.map {
  68. _.getName -> (values.next() match {
  69. case p: Product if p.productArity > 0 => ccToMap(p)
  70. case x => x.asInstanceOf[Object]
  71. })
  72. }.toMap
  73. }
  74. }

 

用cqlsh: select * from txn_hold 检查了一下,插入数据正确。对于这种批量数据同类处理,可能用akka-stream会更方便高效: 

  1. val actionStreamVs = CassandraActionStream(insertVchLog,vsToParams)
  2. .setParallelism(2)
  3. .setProcessOrder(false)
  4. val actionFlowVs: Flow[Seq[Object],Seq[Object],NotUsed] = actionStreamVs.performOnRow
  5. val sinkVs = Sink.foreach[Seq[Object]]{ r =>
  6. log.step(s"insert: $insertVchLog, values: ${r}")
  7. }
  8. // insert to vch_log
  9. val stsVs = Source.fromIterator(() => Seq(vchParams).iterator).via(actionFlowVs).to(sinkVs).run()
  10. val insertTxn = if (susp) insertSusp else insertTxns
  11. val txnitemToParams: TxnItem => Seq[Object] = txn =>
  12. (Seq(pid.asInstanceOf[Object]) ++ ccToList(txn))
  13. val actionStreamTxn = CassandraActionStream(insertTxn,txnitemToParams)
  14. .setParallelism(2)
  15. .setProcessOrder(false)
  16. val actionFlowTxn: Flow[TxnItem,TxnItem,NotUsed] = actionStreamTxn.performOnRow
  17. val sinkTxn = Sink.foreach[TxnItem]{ r =>
  18. log.step(s"insert: $insertTxn, values: ${r}")
  19. }
  20. // insert to txn_???
  21. val stsTxn = Source.fromIterator(() => txns.iterator).via(actionFlowTxn).to(sinkTxn).run()

检查cassandra数据库表,结果正确。用stream方式来做重复类型的处理会比较方便,在当前这个例子的场合下建议使用。

好了,完成了事件日志读取和转换成数据行格式并写入数据库表后,下一步就是建一个reader-actor负责完成这一轮工作。这个reader-actor只根据下面这个消息进行相关的工作:

  1. case class PerformRead(pid: String, vchnum: Int, bseq: Long, eseq: Long)

这个消息描述了读端需要读取的日志记录范围和persistenceId。然后再加一个远程路由remote-router,负责按照某种算法来向各个集群节点上的reader-actor分发读端任务。下面是reader-actor: 

  1. package datatech.cloud.pos
  2. import akka.actor._
  3. import akka.cluster._
  4. import akka.pattern._
  5. import scala.concurrent.duration._
  6. import com.typesafe.config.ConfigFactory
  7. import sdp.logging.LogSupport
  8. import Messages._
  9. import Reader._
  10. object ActionReader {
  11. def actionReaderProps(trace: Boolean): Props = Props(new ActionReader(trace))
  12. //backoff suppervisor must use onStop mode
  13. //respond only to failure of child
  14. def readerProps(trace:Boolean): Props = {
  15. val options = BackoffOpts.onFailure(
  16. childProps = actionReaderProps(trace),
  17. childName = "cqrs-reader",
  18. minBackoff = 1 second,
  19. maxBackoff = 10 seconds,
  20. randomFactor = 0.20
  21. ).withMaxNrOfRetries(3)
  22. BackoffSupervisor.props(options)
  23. }
  24. def create(port: Int): Unit = {
  25. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  26. .withFallback(ConfigFactory.load())
  27. val system = ActorSystem("cloud-pos-server", config)
  28. system.actorOf(readerProps(true),"reader")
  29. }
  30. }
  31. class ActionReader(trace: Boolean) extends Actor with LogSupport {
  32. val cluster = Cluster(context.system)
  33. val host = Cluster(context.system).selfAddress.host.get
  34. implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString)
  35. val readerId = "ActionReader"
  36. log.stepOn = trace
  37. log.step(s"${nodeAddress.address}-${readerId}")
  38. override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
  39. super.preRestart(reason, message)
  40. log.step(s"${nodeAddress.address}-${readerId} Restarting for $message ...")
  41. }
  42. override def postRestart(reason: Throwable): Unit = {
  43. super.postRestart(reason)
  44. log.step(s"${nodeAddress.address}-${readerId} restarted for ${reason.getMessage}.")
  45. }
  46. override def postStop(): Unit = {
  47. log.step(s"${nodeAddress.address}-${readerId} stooped.")
  48. }
  49. override def preStart(): Unit = {
  50. log.step(s"${nodeAddress.address}-${readerId} Starting ...")
  51. }
  52. var debugConfig: com.typesafe.config.Config = _
  53. var debug: Boolean = _
  54. try {
  55. debugConfig = ConfigFactory.load("pos.conf").getConfig("pos.server")
  56. debug = debugConfig.getBoolean("debug")
  57. }
  58. catch {
  59. case _ : Throwable => debug = false
  60. }
  61. log.step(s"${nodeAddress.address}-${readerId} debug mode = $debug")
  62. implicit val debugMode = DebugMode(debug)
  63. override def receive: Receive = {
  64. case PerformRead(pid, vchnum, bseq, eseq) =>
  65. log.step(s"${nodeAddress.address}-${readerId} PerformRead($pid, $vchnum, $bseq, $eseq)")
  66. readActions(host,bseq,eseq,pid,vchnum)(context.system,context.dispatcher,nodeAddress)
  67. case msg @ _ =>
  68. log.step(s"${nodeAddress.address}-${readerId} receive unsupported command:[$msg]")
  69. }
  70. }

这是一个在backoffSupervisor后面的actor。remote-router是从配置文件定义创建的:

  1. akka.actor.deployment {
  2. /readerRouter/readerRouter = {
  3. # Router type provided by metrics extension.
  4. router = cluster-metrics-adaptive-group
  5. # Router parameter specific for metrics extension.
  6. # metrics-selector = heap
  7. # metrics-selector = load
  8. # metrics-selector = cpu
  9. metrics-selector = mix
  10. #
  11. routees.paths = ["/user/reader"]
  12. cluster {
  13. max-nr-of-instances-per-node = 10
  14. max-total-nr-of-instances = 1000
  15. enabled = on
  16. allow-local-routees = on
  17. }
  18. }
  19. }

ReaderRouter的代码如下: 

  1. package datatech.cloud.pos
  2. import akka.actor._
  3. import akka.routing._
  4. import akka.cluster._
  5. import com.typesafe.config.ConfigFactory
  6. class ReaderRouter extends Actor {
  7. val router = context.actorOf(FromConfig.props(), name = "readerRouter")
  8. def receive: Receive = {
  9. case msg => router ! msg
  10. }
  11. }
  12. object ReaderRouter {
  13. var router: ActorRef = _
  14. def props = Props(new ReaderRouter)
  15. def create(port: Int) = {
  16. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  17. .withFallback(ConfigFactory.load())
  18. val system = ActorSystem("cloud-pos-server",config)
  19. Cluster(system).registerOnMemberUp{
  20. router = system.actorOf(props,"readerRouter")
  21. }
  22. }
  23. def getRouter = router
  24. }

好了,可以写个例子来测试运行这个router/routee。reader-actor所做的工作在前面的讨论里已经测试过了。 

  1. ackage datatech.cloud.pos
  2. import akka.actor._
  3. import datatech.cloud.pos.Messages.PerformRead
  4. object ReaderDemo extends App {
  5. ActionReader.create(2551)
  6. ActionReader.create(2552)
  7. ActionReader.create(2553)
  8. ReaderRouter.create(2558)
  9. scala.io.StdIn.readLine()
  10. val router = ReaderRouter.getRouter
  11. router ! PerformRead("1022",111,0,Long.MaxValue)
  12. scala.io.StdIn.readLine()
  13. router ! PerformRead("1022",222,0,Long.MaxValue)
  14. scala.io.StdIn.readLine()
  15. }

在这个例子里我们先在本机的2551,2552,2553端口上部署了routees, 即reader-actor。然后在2558端口部署router,再向router发送任务PerformRead。这里有些东西值得留意:akka-cluster使用了netty,而netty也需要占用一个端口。在配置文件里: 

  1. remote {
  2. log-remote-lifecycle-events = on
  3. netty.tcp {
  4. hostname = "192.168.11.189"
  5. # port set to 0 for netty to randomly choose from
  6. port = 0
  7. }
  8. }

下面是本次示范的源代码:

project/plugin.sbt

  1. addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.6.1")
  2. addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4") // ALPN agent

build.sbt

  1. name := "akka-cluster-reader"
  2.  
  3. version := "0.1"
  4.  
  5. scalaVersion := "2.12.8"
  6.  
  7. scalacOptions += "-Ypartial-unification"
  8.  
  9. // in build.sbt:
  10. //enablePlugins(AkkaGrpcPlugin)
  11. // ALPN agent
  12. //enablePlugins(JavaAgent)
  13. //javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"
  14.  
  15. libraryDependencies := Seq(
  16. "com.typesafe.akka" %% "akka-cluster-metrics" % "2.5.19",
  17. "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
  18. "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
  19. "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.0.1",
  20. "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
  21. "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.0.1",
  22. "com.typesafe.akka" %% "akka-persistence-query" % "2.5.19",
  23. "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93",
  24. "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test,
  25. "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
  26. "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
  27. "ch.qos.logback" % "logback-classic" % "1.2.3",
  28. "io.monix" %% "monix" % "3.0.0-RC2",
  29. "org.typelevel" %% "cats-core" % "2.0.0-M1",
  30. "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
  31. "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
  32.  
  33. )
  34.  
  35. // (optional) If you need scalapb/scalapb.proto or anything from
  36. // google/protobuf/*.proto
  37. //libraryDependencies += "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
  38.  
  39.  
  40. PB.targets in Compile := Seq(
  41. scalapb.gen() -> (sourceManaged in Compile).value
  42. )

resources/application.conf

  1. akka.actor.warn-about-java-serializer-usage = off
  2. akka.log-dead-letters-during-shutdown = off
  3. akka.log-dead-letters = off
  4. akka.remote.use-passive-connections=off
  5.  
  6. akka {
  7. loglevel = INFO
  8. actor {
  9. provider = "cluster"
  10. }
  11.  
  12. remote {
  13. log-remote-lifecycle-events = on
  14. netty.tcp {
  15. hostname = "192.168.11.189"
  16. # port set to 0 for netty to randomly choose from
  17. port = 0
  18. }
  19. }
  20.  
  21. cluster {
  22. seed-nodes = [
  23. "akka.tcp://cloud-pos-server@192.168.11.189:2551",
  24. "akka.tcp://cloud-pos-server@192.168.11.189:2552"
  25. ]
  26.  
  27. log-info = off
  28. sharding {
  29. role = "shard"
  30. passivate-idle-entity-after = 10 m
  31. }
  32. }
  33.  
  34. persistence {
  35. journal.plugin = "cassandra-journal"
  36. snapshot-store.plugin = "cassandra-snapshot-store"
  37. }
  38.  
  39. }
  40.  
  41. cassandra-journal {
  42. contact-points = ["192.168.11.189"]
  43. }
  44.  
  45. cassandra-snapshot-store {
  46. contact-points = ["192.168.11.189"]
  47. }
  48.  
  49. # Enable metrics extension in akka-cluster-metrics.
  50. akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
  51.  
  52. akka.actor.deployment {
  53. /readerRouter/readerRouter = {
  54. # Router type provided by metrics extension.
  55. router = cluster-metrics-adaptive-group
  56. # Router parameter specific for metrics extension.
  57. # metrics-selector = heap
  58. # metrics-selector = load
  59. # metrics-selector = cpu
  60. metrics-selector = mix
  61. #
  62. routees.paths = ["/user/reader"]
  63. cluster {
  64. max-nr-of-instances-per-node = 10
  65. max-total-nr-of-instances = 1000
  66. enabled = on
  67. allow-local-routees = on
  68. }
  69. }
  70. }
  71.  
  72. dbwork-dispatcher {
  73. # Dispatcher is the name of the event-based dispatcher
  74. type = Dispatcher
  75. # What kind of ExecutionService to use
  76. executor = "fork-join-executor"
  77. # Configuration for the fork join pool
  78. fork-join-executor {
  79. # Min number of threads to cap factor-based parallelism number to
  80. parallelism-min = 2
  81. # Parallelism (threads) ... ceil(available processors * factor)
  82. parallelism-factor = 2.0
  83. # Max number of threads to cap factor-based parallelism number to
  84. parallelism-max = 10
  85. }
  86. # Throughput defines the maximum number of messages to be
  87. # processed per actor before the thread jumps to the next actor.
  88. # Set to 1 for as fair as possible.
  89. throughput = 100
  90. }

ActionReader.scala

  1. package datatech.cloud.pos
  2. import akka.actor._
  3. import akka.cluster._
  4. import akka.pattern._
  5. import scala.concurrent.duration._
  6. import com.typesafe.config.ConfigFactory
  7. import sdp.logging.LogSupport
  8. import Messages._
  9. import Reader._
  10.  
  11.  
  12.  
  13. object ActionReader {
  14. def actionReaderProps(trace: Boolean): Props = Props(new ActionReader(trace))
  15.  
  16. //backoff suppervisor must use onStop mode
  17. //respond only to failure of child
  18. def readerProps(trace:Boolean): Props = {
  19. val options = BackoffOpts.onFailure(
  20. childProps = actionReaderProps(trace),
  21. childName = "cqrs-reader",
  22. minBackoff = 1 second,
  23. maxBackoff = 10 seconds,
  24. randomFactor = 0.20
  25. ).withMaxNrOfRetries(3)
  26. BackoffSupervisor.props(options)
  27. }
  28.  
  29. def create(port: Int): Unit = {
  30. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  31. .withFallback(ConfigFactory.load())
  32.  
  33. val system = ActorSystem("cloud-pos-server", config)
  34. system.actorOf(readerProps(true),"reader")
  35. }
  36.  
  37. }
  38.  
  39. class ActionReader(trace: Boolean) extends Actor with LogSupport {
  40. val cluster = Cluster(context.system)
  41. val host = Cluster(context.system).selfAddress.host.get
  42. implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString)
  43. val readerId = "ActionReader"
  44. log.stepOn = trace
  45.  
  46. log.step(s"${nodeAddress.address}-${readerId}")
  47.  
  48. override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
  49. super.preRestart(reason, message)
  50. log.step(s"${nodeAddress.address}-${readerId} Restarting for $message ...")
  51. }
  52.  
  53. override def postRestart(reason: Throwable): Unit = {
  54. super.postRestart(reason)
  55. log.step(s"${nodeAddress.address}-${readerId} restarted for ${reason.getMessage}.")
  56. }
  57.  
  58. override def postStop(): Unit = {
  59. log.step(s"${nodeAddress.address}-${readerId} stooped.")
  60. }
  61.  
  62. override def preStart(): Unit = {
  63. log.step(s"${nodeAddress.address}-${readerId} Starting ...")
  64. }
  65.  
  66. var debugConfig: com.typesafe.config.Config = _
  67. var debug: Boolean = _
  68. try {
  69.  
  70. debugConfig = ConfigFactory.load("pos.conf").getConfig("pos.server")
  71. debug = debugConfig.getBoolean("debug")
  72. }
  73. catch {
  74. case _ : Throwable => debug = false
  75. }
  76.  
  77. log.step(s"${nodeAddress.address}-${readerId} debug mode = $debug")
  78.  
  79. implicit val debugMode = DebugMode(debug)
  80.  
  81. override def receive: Receive = {
  82. case PerformRead(pid, vchnum, bseq, eseq) =>
  83. log.step(s"${nodeAddress.address}-${readerId} PerformRead($pid, $vchnum, $bseq, $eseq)")
  84. readActions(host,bseq,eseq,pid,vchnum)(context.system,context.dispatcher,nodeAddress)
  85. case msg @ _ =>
  86. log.step(s"${nodeAddress.address}-${readerId} receive unsupported command:[$msg]")
  87. }
  88.  
  89. }

ReaderRouter.scala

  1. package datatech.cloud.pos
  2. import akka.actor._
  3. import akka.routing._
  4. import akka.cluster._
  5. import com.typesafe.config.ConfigFactory
  6.  
  7.  
  8. class ReaderRouter extends Actor {
  9.  
  10. val router = context.actorOf(FromConfig.props(), name = "readerRouter")
  11.  
  12. def receive: Receive = {
  13. case msg => router ! msg
  14. }
  15.  
  16. }
  17.  
  18. object ReaderRouter {
  19. var router: ActorRef = _
  20.  
  21. def props = Props(new ReaderRouter)
  22.  
  23. def create(port: Int) = {
  24. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  25. .withFallback(ConfigFactory.load())
  26.  
  27. val system = ActorSystem("cloud-pos-server",config)
  28.  
  29. Cluster(system).registerOnMemberUp{
  30. router = system.actorOf(props,"readerRouter")
  31. }
  32.  
  33. }
  34. def getRouter = router
  35. }

Reader.scala

  1. package datatech.cloud.pos
  2. import akka.actor._
  3. import akka.stream.scaladsl._
  4.  
  5. import scala.util._
  6. import akka._
  7. import akka.persistence.query._
  8. import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
  9.  
  10. import scala.concurrent._
  11. import akka.stream._
  12. import sdp.logging._
  13. import Actions._
  14. import States._
  15. import Messages._
  16. import akka.cluster._
  17. import DBWriter._
  18.  
  19. object Reader extends LogSupport {
  20.  
  21. def readActions(cqlhost: String, startSeq: Long, endSeq: Long, persistenceId: String, vchnum: Int)(implicit sys: ActorSystem, ec: ExecutionContextExecutor, nodeAddress: NodeAddress) = {
  22. implicit var vchState = VchStates().copy(num = vchnum)
  23. implicit var vchItems = VchItems()
  24. implicit var curTxnItem = TxnItem()
  25. implicit val pid = PID(persistenceId)
  26. implicit val mat = ActorMaterializer()
  27.  
  28.  
  29. val readerId = "ActionReader"
  30.  
  31. // obtain read journal by plugin id
  32. val readJournal =
  33. PersistenceQuery(sys).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
  34.  
  35. // issue query to journal
  36. val source: Source[EventEnvelope, NotUsed] =
  37. readJournal.currentEventsByPersistenceId(persistenceId, startSeq, endSeq)
  38.  
  39. // materialize stream, consuming events
  40. val futureActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }
  41.  
  42. futureActions.onComplete {
  43. case Success(txns) =>
  44. log.step(s"${nodeAddress.address}-${readerId} recovered actions: $txns")
  45. buildVoucher(txns)
  46. case Failure(excpt) =>
  47. log.error(s"${nodeAddress.address}-${readerId} read actions error: ${excpt.getMessage}")
  48.  
  49. }
  50.  
  51. def buildVoucher(actions: List[Any])= {
  52. actions.reverse.foreach { txn =>
  53. txn match {
  54. case EndVoucher(_) =>
  55. writeTxnsToDB(cqlhost,vchState.num,vchState.susp,vchItems.txnitems)(persistenceId,startSeq,endSeq)
  56. mat.shutdown()
  57. case ti@_ =>
  58. curTxnItem = buildTxnItem(ti.asInstanceOf[Action])
  59. val sts = updateState(ti.asInstanceOf[Action],0)
  60. vchState = sts._1
  61. vchItems = sts._2
  62. }
  63. }
  64. }
  65. }
  66.  
  67. }

DBWriter.scala

  1. package datatech.cloud.pos
  2. import java.time.LocalDate
  3. import java.time.format.DateTimeFormatter
  4. import sdp.logging._
  5. import Messages._
  6. import com.datastax.driver.core._
  7. import akka.actor.ActorSystem
  8. import akka.stream.ActorMaterializer
  9. import sdp.cql.engine._
  10. import CQLEngine._
  11. import CQLHelpers._
  12. import com.typesafe.config._
  13. import akka.stream.scaladsl._
  14. import akka._
  15. import scala.concurrent._
  16.  
  17.  
  18. object DBWriter extends LogSupport {
  19. var posConfig: com.typesafe.config.Config = _
  20. def writeTxnsToDB(cqlhost: String, vchnum: Int, susp: Boolean, txns: List[TxnItem])(pid: String, bseq: Long, eseq: Long)(
  21. implicit sys: ActorSystem, ec: ExecutionContextExecutor, mat: ActorMaterializer, nodeAddress: NodeAddress) = {
  22.  
  23. val readerId = "DBWriter"
  24. var cqlport: Int = 9042
  25. try {
  26. posConfig = ConfigFactory.load("pos.conf").getConfig("pos.cqlport")
  27. cqlport = posConfig.getInt("cqlport")
  28. }
  29. catch {
  30. case _ : Throwable => cqlport = 9042
  31. }
  32.  
  33. val cluster = new Cluster
  34. .Builder()
  35. .addContactPoints(cqlhost)
  36. .withPort(cqlport)
  37. .build()
  38.  
  39. useJava8DateTime(cluster)
  40. implicit val session = cluster.connect()
  41. val insertVchLog = """
  42. |insert into pos_on_cloud.vch_log(
  43. |terminal,
  44. |txndate,
  45. |vchnum,
  46. |begin_seq,
  47. |end_seq)
  48. |values(?,?,?,?,?)
  49. |""".stripMargin
  50.  
  51. val insertTxns = """
  52. |insert into pos_on_cloud.txn_log(
  53. |terminal,
  54. |txndate,
  55. |txntime,
  56. |opr,
  57. |num,
  58. |seq,
  59. |txntype,
  60. |salestype,
  61. |qty,
  62. |price,
  63. |amount,
  64. |disc,
  65. |dscamt,
  66. |member,
  67. |code,
  68. |acct,
  69. |dpt)
  70. |values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
  71. """.stripMargin
  72.  
  73. val insertSusp = """
  74. |insert into pos_on_cloud.txn_hold(
  75. |terminal,
  76. |txndate,
  77. |txntime,
  78. |opr,
  79. |num,
  80. |seq,
  81. |txntype,
  82. |salestype,
  83. |qty,
  84. |price,
  85. |amount,
  86. |disc,
  87. |dscamt,
  88. |member,
  89. |code,
  90. |acct,
  91. |dpt)
  92. |values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
  93. """.stripMargin
  94.  
  95. val vchParams: Seq[Object] = Seq(
  96. pid.asInstanceOf[Object],
  97. LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")).asInstanceOf[Object],
  98. vchnum.asInstanceOf[Object],
  99. bseq.asInstanceOf[Object],
  100. eseq.asInstanceOf[Object]
  101. )
  102.  
  103. val vsToParams: Seq[Object] => Seq[Object] = vchParams => vchParams
  104.  
  105. val actionStreamVs = CassandraActionStream(insertVchLog,vsToParams)
  106. .setParallelism(2)
  107. .setProcessOrder(false)
  108. val actionFlowVs: Flow[Seq[Object],Seq[Object],NotUsed] = actionStreamVs.performOnRow
  109.  
  110. val sinkVs = Sink.foreach[Seq[Object]]{ r =>
  111. log.step(s"${nodeAddress.address}-${readerId} insert: $insertVchLog, values: ${r}")
  112. }
  113. // insert to vch_log
  114. val stsVs = Source.fromIterator(() => Seq(vchParams).iterator).via(actionFlowVs).to(sinkVs).run()
  115.  
  116. val insertTxn = if (susp) insertSusp else insertTxns
  117.  
  118. val txnitemToParams: TxnItem => Seq[Object] = txn =>
  119. (Seq(pid.asInstanceOf[Object]) ++ ccToList(txn))
  120.  
  121. val actionStreamTxn = CassandraActionStream(insertTxn,txnitemToParams)
  122. .setParallelism(2)
  123. .setProcessOrder(false)
  124. val actionFlowTxn: Flow[TxnItem,TxnItem,NotUsed] = actionStreamTxn.performOnRow
  125.  
  126. val sinkTxn = Sink.foreach[TxnItem]{ r =>
  127. log.step(s"${nodeAddress.address}-${readerId} insert: $insertTxn, values: ${r}")
  128. }
  129. // insert to txn_???
  130. val stsTxn = Source.fromIterator(() => txns.iterator).via(actionFlowTxn).to(sinkTxn).run()
  131. }
  132.  
  133. def ccToList(cc: Product) = {
  134. val values = cc.productIterator
  135. cc.getClass.getDeclaredFields.map(_ => (values.next).asInstanceOf[Object] ).toList
  136. }
  137.  
  138. }

ReaderDemo.scala

  1. package datatech.cloud.pos
  2. import akka.actor._
  3.  
  4. import datatech.cloud.pos.Messages.PerformRead
  5. object ReaderDemo extends App {
  6. ActionReader.create(2551)
  7.  
  8. ActionReader.create(2552)
  9.  
  10. ActionReader.create(2553)
  11.  
  12.  
  13. ReaderRouter.create(2558)
  14.  
  15. scala.io.StdIn.readLine()
  16.  
  17. val router = ReaderRouter.getRouter
  18.  
  19. router ! PerformRead("1022",111,0,Long.MaxValue)
  20.  
  21. scala.io.StdIn.readLine()
  22.  
  23. router ! PerformRead("1022",222,0,Long.MaxValue)
  24.  
  25. scala.io.StdIn.readLine()
  26.  
  27.  
  28. }

States.scala

  1. package datatech.cloud.pos
  2. import java.time.LocalDate
  3. import java.time.LocalDateTime
  4. import java.time.format.DateTimeFormatter
  5.  
  6.  
  7. import Messages._
  8. import sdp.logging._
  9.  
  10. object Actions {
  11.  
  12.  
  13. implicit class FoldLeftWhile[A](trav: Seq[A]) {
  14. def foldLeftWhile[B](z: B)(op: ((B,Boolean), A) => (B, Boolean)): B = {
  15. def go(acc: (B, Boolean), l: Seq[A]): (B, Boolean) = l match {
  16. case h +: t =>
  17. val nacc = op(acc, h)
  18. if (!nacc._2)
  19. go(nacc, t)
  20. else
  21. nacc
  22. case _ => acc
  23. }
  24. go((z, false), trav)._1
  25. }
  26. }
  27.  
  28.  
  29. case class ReadActions(startSeq: Int, endSeq: Int, persistenceId: String)
  30.  
  31. sealed trait Action {}
  32. case class LogOned(opr: String) extends Action
  33. case object LogOffed extends Action
  34. case class SuperOned(su: String) extends Action
  35. case object SuperOffed extends Action
  36. case class MemberOned(cardnum: String) extends Action
  37. case object MemberOffed extends Action //remove member status for the voucher
  38. case object RefundOned extends Action
  39. case object RefundOffed extends Action
  40. case object VoidOned extends Action
  41. case object VoidOffed extends Action
  42.  
  43.  
  44. case class SalesLogged(acct: String, dpt: String, code: String, qty: Int, price: Int) extends Action
  45. case class Subtotaled(level: Int) extends Action
  46. case class Discounted(disctype: Int, grouped: Boolean, code: String, percent: Int) extends Action
  47.  
  48. case class NewVoucher(vnum: Int) extends Action //新单, reminder for read-side to set new vnum
  49. case class EndVoucher(vnum: Int) extends Action //单据终结标示
  50. case object VoidVoucher extends Action
  51.  
  52.  
  53. case object SuspVoucher extends Action
  54.  
  55. case class VoucherNumed(fnum: Int, tnum: Int) extends Action
  56.  
  57. case class PaymentMade(acct: String, num: String, amount: Int) extends Action //settlement 结算支付
  58.  
  59. }
  60.  
  61.  
  62. object States extends LogSupport {
  63. import Actions._
  64.  
  65. def setShowSteps(b: Boolean) = log.stepOn = b
  66.  
  67. def buildTxnItem(evt: Action)(implicit vs: VchStates, vi: VchItems): TxnItem = evt match {
  68. case LogOned(op) => TxnItem(vs).copy(
  69. txntype = TXNTYPE.logon,
  70. salestype = SALESTYPE.crd,
  71. opr = op,
  72. code = op
  73. )
  74. case LogOffed => TxnItem(vs).copy(
  75. txntype = TXNTYPE.logon,
  76. salestype = SALESTYPE.crd
  77. )
  78. case SuperOned(su) => TxnItem(vs).copy(
  79. txntype = TXNTYPE.supon,
  80. salestype = SALESTYPE.crd,
  81. code = su
  82. )
  83. case SuperOffed => TxnItem(vs).copy(
  84. txntype = TXNTYPE.supon,
  85. salestype = SALESTYPE.crd
  86. )
  87. case MemberOned(cardnum) => TxnItem(vs).copy(
  88. txntype = TXNTYPE.sales,
  89. salestype = SALESTYPE.crd,
  90. member = cardnum
  91. )
  92. case MemberOffed => TxnItem(vs).copy(
  93. txntype = TXNTYPE.sales,
  94. salestype = SALESTYPE.crd
  95. )
  96. case RefundOned => TxnItem(vs).copy(
  97. txntype = TXNTYPE.refund
  98. )
  99. case RefundOffed => TxnItem(vs).copy(
  100. txntype = TXNTYPE.refund
  101. )
  102. case VoidOned => TxnItem(vs).copy(
  103. txntype = TXNTYPE.void
  104. )
  105. case VoidOffed => TxnItem(vs).copy(
  106. txntype = TXNTYPE.void
  107. )
  108. case VoidVoucher => TxnItem(vs).copy(
  109. txntype = TXNTYPE.voidall,
  110. code = vs.num.toString,
  111. acct = vs.num.toString
  112. )
  113. case SuspVoucher => TxnItem(vs).copy(
  114. txntype = TXNTYPE.suspend,
  115. code = vs.num.toString,
  116. acct = vs.num.toString
  117. )
  118. case Subtotaled(level) =>
  119. TxnItem(vs).copy(
  120. txntype = TXNTYPE.sales,
  121. salestype = SALESTYPE.sub
  122. )
  123. case Discounted(dt,gp,code,pct) => TxnItem(vs).copy(
  124. txntype = TXNTYPE.sales,
  125. salestype = SALESTYPE.dsc,
  126. acct = code,
  127. disc = pct
  128. )
  129. case PaymentMade(act,num,amt) => TxnItem(vs).copy(
  130. txntype = TXNTYPE.sales,
  131. salestype = SALESTYPE.ttl,
  132. acct = act,
  133. code = num,
  134. amount = amt
  135. )
  136.  
  137. case SalesLogged(sacct,sdpt,scode,sqty,sprice) => TxnItem(vs).copy(
  138. txntype = TXNTYPE.sales,
  139. salestype = SALESTYPE.itm,
  140. acct = sacct,
  141. dpt = sdpt,
  142. code = scode,
  143. qty = sqty,
  144. price = sprice,
  145. amount = sprice * sqty,
  146. dscamt = 0
  147. )
  148. case _ => TxnItem(vs)
  149. }
  150.  
  151. case class VchItems(txnitems: List[TxnItem] = Nil) {
  152.  
  153. def noSales: Boolean = (txnitems.find(txn => txn.salestype == SALESTYPE.itm)).isEmpty
  154.  
  155. def subTotal: (Int, Int, Int, Int) = txnitems.foldRight((0, 0, 0, 0)) { case (txn, b) =>
  156. if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
  157. b.copy(_1 = b._1 + 1, _2 = b._2 + txn.qty, _3 = b._3 + txn.amount, _4 = b._4 + txn.dscamt)
  158. else b
  159. }
  160.  
  161. def groupTotal(level:Int): (Int, Int, Int, Int) = {
  162. val gts = txnitems.foldLeftWhile((0, 0, 0, 0, 0)) { case (b,txn) =>
  163. if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
  164. ((b._1._1 +1,b._1._2 + txn.qty, b._1._3 + txn.amount, b._1._4 + txn.dscamt, b._1._5),false)
  165. else {
  166. if (txn.salestype == SALESTYPE.sub) {
  167. if (((b._1._5) + 1) >= level)
  168. ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), true)
  169. else
  170. ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), false)
  171. } else b
  172. }
  173. }
  174. (gts._1,gts._2,gts._3,gts._4)
  175. }
  176.  
  177. def updateDisc(dt: Int, grouped: Boolean, disc: Int): (List[TxnItem],(Int,Int,Int,Int)) = {
  178. //(salestype,(cnt,qty,amt,dsc),hassub,list)
  179. val accu = txnitems.foldLeft((-1, (0,0,0,0), false, List[TxnItem]())) { case (b, txn) =>
  180. var discAmt = 0
  181. if ((b._1) < 0) {
  182. if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) {
  183. if (txn.dscamt == 0)
  184. ((txn.salestype, (
  185. (b._2._1) + 1,
  186. (b._2._2) + txn.qty,
  187. (b._2._3) + txn.amount,
  188. (b._2._4) - (txn.amount * disc / 100)
  189. ), false, txn.copy(
  190. dscamt = - (txn.amount * disc / 100)) :: (b._4)))
  191. else {
  192. dt match {
  193. case DISCTYPE.duplicated =>
  194. if (txn.dscamt != 0) {
  195. ((txn.salestype, (
  196. (b._2._1) + 1,
  197. (b._2._2) + txn.qty,
  198. (b._2._3) + txn.amount,
  199. (b._2._4) - (txn.amount + txn.dscamt) * disc / 100
  200. ), false, txn.copy(
  201. dscamt = -(txn.amount + txn.dscamt) * disc / 100) :: (b._4)
  202. ))
  203. } else {
  204. ((txn.salestype, (
  205. (b._2._1) + 1,
  206. (b._2._2) + txn.qty,
  207. (b._2._3) + txn.amount,
  208. (b._2._4) - txn.amount * disc / 100
  209. ), false, txn.copy(
  210. dscamt = -txn.amount * disc / 100) :: (b._4)
  211. ))
  212. }
  213. case DISCTYPE.keep => ((txn.salestype, (
  214. (b._2._1) + 1,
  215. (b._2._2) + txn.qty,
  216. (b._2._3) + txn.amount,
  217. (b._2._4) + txn.dscamt), false, txn :: (b._4)))
  218. case DISCTYPE.best =>
  219. discAmt = -(txn.amount * disc / 100)
  220. if (discAmt < txn.dscamt)
  221. ((txn.salestype, (
  222. (b._2._1) + 1,
  223. (b._2._2) + txn.qty,
  224. (b._2._3) + txn.amount,
  225. (b._2._4) + discAmt), false, txn.copy(
  226. dscamt = discAmt
  227. ) :: (b._4)))
  228. else
  229. ((txn.salestype, (
  230. (b._2._1) + 1,
  231. (b._2._2) + txn.qty,
  232. (b._2._3) + txn.amount,
  233. (b._2._4) + txn.dscamt), false, txn :: (b._4)))
  234. }
  235. }
  236.  
  237. } else ((b._1,b._2,b._3,txn :: (b._4)))
  238. } else {
  239. if ((b._3))
  240. (((b._1), (b._2), true, txn :: (b._4)))
  241. else {
  242. if (txn.salestype == SALESTYPE.sub) {
  243. if (grouped)
  244. (((b._1), (b._2), true, txn :: (b._4)))
  245. else
  246. (((b._1), (b._2), false, txn :: (b._4)))
  247. } else {
  248. if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) {
  249. dt match {
  250. case DISCTYPE.duplicated =>
  251. if (txn.dscamt != 0) {
  252. ((txn.salestype, (
  253. (b._2._1) + 1,
  254. (b._2._2) + txn.qty,
  255. (b._2._3) + txn.amount,
  256. (b._2._4) - (txn.amount + txn.dscamt) * disc / 100), false, txn.copy(
  257. dscamt = -(txn.amount + txn.dscamt) * disc / 100) :: (b._4)
  258. ))
  259. } else {
  260. ((txn.salestype, (
  261. (b._2._1) + 1,
  262. (b._2._2) + txn.qty,
  263. (b._2._3) + txn.amount,
  264. (b._2._4) - txn.amount * disc / 100), false, txn.copy(
  265. dscamt = -(txn.amount * disc / 100)) :: (b._4)
  266. ))
  267. }
  268. case DISCTYPE.keep => ((txn.salestype, (
  269. (b._2._1) + 1,
  270. (b._2._2) + txn.qty,
  271. (b._2._3) + txn.amount,
  272. (b._2._4) + txn.dscamt), false, txn :: (b._4)))
  273. case DISCTYPE.best =>
  274. discAmt = -(txn.amount * disc / 100)
  275. if (discAmt < txn.dscamt)
  276. ((txn.salestype, (
  277. (b._2._1) + 1,
  278. (b._2._2) + txn.qty,
  279. (b._2._3) + txn.amount,
  280. (b._2._4) + discAmt), false, txn.copy(
  281. dscamt = discAmt
  282. ) :: (b._4)))
  283. else
  284. ((txn.salestype, (
  285. (b._2._1) + 1,
  286. (b._2._2) + txn.qty,
  287. (b._2._3) + txn.amount,
  288. (b._2._4) + txn.dscamt), false, txn :: (b._4)))
  289. }
  290. }
  291. else ((b._1, b._2, b._3, txn :: (b._4)))
  292. }
  293. }
  294. }
  295.  
  296. }
  297. (accu._4.reverse,accu._2)
  298. }
  299.  
  300. def totalSales: Int = txnitems.foldRight(0) { case (txn, b) =>
  301. if (txn.salestype == SALESTYPE.itm)
  302. (txn.amount + txn.dscamt) + b
  303. else b
  304.  
  305. /*
  306. val amt: Int = txn.salestype match {
  307. case (SALESTYPE.plu | SALESTYPE.cat | SALESTYPE.brd | SALESTYPE.ra) => txn.amount + txn.dscamt
  308. case _ => 0
  309. }
  310. amt + b */
  311. }
  312.  
  313. def totalPaid: Int = txnitems.foldRight(0) { case (txn, b) =>
  314. if (txn.txntype == TXNTYPE.sales && txn.salestype == SALESTYPE.ttl)
  315. txn.amount + b
  316. else b
  317. }
  318.  
  319. def addItem(item: TxnItem): VchItems = VchItems((item :: txnitems)) //.reverse)
  320.  
  321. }
  322.  
  323. def LastSecOfDate(ldate: LocalDate): LocalDateTime = {
  324. val dtStr = ldate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + " 23:59:59"
  325. LocalDateTime.parse(dtStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  326. }
  327.  
  328. def dateStr(dt: LocalDate): String = dt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
  329.  
  330. def updateState(evt: Action, lastSeqNr: BigInt = 0)(implicit nodeAddress: NodeAddress, persistenceId: PID, state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = {
  331. val (vs, vi) = updateStateImpl(evt, lastSeqNr)
  332. log.step(s"${nodeAddress.address}-${persistenceId.id} run updateState($evt, $lastSeqNr) with results state[$vs], txns[$vi].")
  333. (vs, vi)
  334. }
  335.  
  336. def updateStateImpl(evt: Action, lastSeqNr: BigInt = 0)(implicit state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = evt match {
  337. case LogOned(csr) => (state.copy(seq = state.seq + 1, opr = csr, jseq = lastSeqNr), items)
  338. case LogOffed => (state.copy(seq = state.seq + 1, opr = ""), items)
  339. case RefundOned => (state.copy(seq = state.seq + 1, refd = true), items)
  340. case RefundOffed => (state.copy(seq = state.seq + 1, refd = false), items)
  341. case VoidOned => (state.copy(seq = state.seq + 1, void = true), items)
  342. case VoidOffed => (state.copy(seq = state.seq + 1, void = false), items)
  343. case SuperOned(suser) => (state.copy(seq = state.seq + 1, su = suser), items)
  344. case SuperOffed => (state.copy(seq = state.seq + 1, su = ""), items)
  345. case MemberOned(num) => (state.copy(seq = state.seq + 1, mbr = num), items)
  346. case MemberOffed => (state.copy(seq = state.seq + 1, mbr = ""), items)
  347.  
  348.  
  349. case SalesLogged(_,_,_,_,_) => (state.copy(
  350. seq = state.seq + 1)
  351. , items.addItem(curItem))
  352.  
  353. case Subtotaled(level) =>
  354. var subs = (0,0,0,0)
  355. if (level == 0)
  356. subs = items.subTotal
  357. else
  358. subs = items.groupTotal(level)
  359. val (cnt, tqty, tamt, tdsc) = subs
  360.  
  361. val subttlItem =
  362. TxnItem(state).copy(
  363. txntype = TXNTYPE.sales,
  364. salestype = SALESTYPE.sub,
  365. qty = tqty,
  366. amount = tamt,
  367. dscamt = tdsc,
  368. price = cnt
  369. )
  370. (state.copy(
  371. seq = state.seq + 1)
  372. , items.addItem(subttlItem))
  373.  
  374. case Discounted(dt,gp,code,pct) =>
  375. val (lstItems, accum) = items.updateDisc(dt,gp,pct)
  376. val discItem = TxnItem(state).copy(
  377. txntype = TXNTYPE.sales,
  378. salestype = SALESTYPE.dsc,
  379. acct = code,
  380. disc = pct,
  381. price = accum._1,
  382. qty = accum._2,
  383. amount = accum._3,
  384. dscamt = accum._4
  385. )
  386. (state.copy(
  387. seq = state.seq + 1)
  388. , items.copy(txnitems = lstItems).addItem(discItem))
  389.  
  390.  
  391. case PaymentMade(_,_,_) =>
  392. val due = if (items.totalSales > 0) items.totalSales - items.totalPaid else items.totalSales + items.totalPaid
  393. val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount
  394. (state.copy(
  395. seq = state.seq + 1,
  396. due = (if ((curItem.amount.abs + items.totalPaid.abs) >= items.totalSales.abs) false else true)
  397. )
  398. ,items.addItem(curItem.copy(
  399. salestype = SALESTYPE.ttl,
  400. price = due,
  401. amount = curItem.amount,
  402. dscamt = bal
  403. )))
  404.  
  405. case VoucherNumed(_, tnum) =>
  406. val vi = items.copy(txnitems = items.txnitems.map { it => it.copy(num = tnum) })
  407. (state.copy(seq = state.seq + 1, num = tnum), vi)
  408.  
  409. case SuspVoucher => (state.copy(seq = state.seq + 1, susp = true), items)
  410.  
  411. case VoidVoucher => (state.copy(seq = state.seq + 1, canc = true), items)
  412.  
  413. case EndVoucher(vnum) => (state.nextVoucher.copy(jseq = lastSeqNr + 1), VchItems())
  414.  
  415. case _ => (state, items)
  416. }
  417.  
  418.  
  419. }

Messages.scala

  1. package datatech.cloud.pos
  2.  
  3. import java.time.LocalDate
  4. import java.time.LocalDateTime
  5. import java.time.format.DateTimeFormatter
  6. import java.util.Locale
  7. import akka.cluster.sharding._
  8.  
  9. object Messages {
  10.  
  11. val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA)
  12.  
  13. sealed trait Command {}
  14.  
  15. case class LogOn(opr: String, passwd: String) extends Command
  16. case object LogOff extends Command
  17. case class SuperOn(su: String, passwd: String) extends Command
  18. case object SuperOff extends Command
  19. case class MemberOn(cardnum: String, passwd: String) extends Command
  20. case object MemberOff extends Command //remove member status for the voucher
  21. case object RefundOn extends Command
  22. case object RefundOff extends Command
  23. case object VoidOn extends Command
  24. case object VoidOff extends Command
  25. case object VoidAll extends Command
  26. case object Suspend extends Command
  27.  
  28. case class VoucherNum(vnum: Int) extends Command
  29.  
  30.  
  31.  
  32. case class LogSales(acct: String, dpt: String, code: String, qty: Int, price: Int) extends Command
  33. case class Subtotal(level: Int) extends Command
  34. case class Discount(disctype: Int, grouped: Boolean, code: String, percent: Int) extends Command
  35.  
  36. case class Payment(acct: String, num: String, amount: Int) extends Command //settlement 结算支付
  37.  
  38. // read only command, no update event
  39. case class Plu(itemCode: String) extends Command //read only
  40. case object GetTxnItems extends Command
  41.  
  42.  
  43. /* discount type */
  44. object DISCTYPE {
  45. val duplicated: Int = 0
  46. val best: Int = 1
  47. val least: Int = 2
  48. val keep: Int = 3
  49. }
  50.  
  51. /* result message returned to client on the wire */
  52. object TXNTYPE {
  53. val sales: Int = 0
  54. val refund: Int = 1
  55. val void: Int = 2
  56. val voided: Int = 3
  57. val voidall: Int = 4
  58. val subtotal: Int = 5
  59. val logon: Int = 6
  60. val supon: Int = 7 // super user on/off
  61. val suspend: Int = 8
  62.  
  63. }
  64.  
  65. object SALESTYPE {
  66. val itm: Int = 2
  67. val sub: Int = 10
  68. val ttl: Int = 11
  69. val dsc: Int = 12
  70. val crd: Int = 13
  71. }
  72.  
  73. case class TxnItem(
  74. txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
  75. ,txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11)
  76. ,opr: String = ""//工号
  77. ,num: Int = 0 //销售单号
  78. ,seq: Int = 1 //交易序号
  79. ,txntype: Int = TXNTYPE.sales//交易类型
  80. ,salestype: Int = SALESTYPE.itm //销售类型
  81. ,qty: Int = 1 //交易数量
  82. ,price: Int = 0 //单价(分)
  83. ,amount: Int = 0 //码洋(分)
  84. ,disc: Int = 0 //折扣率 (%)
  85. ,dscamt: Int = 0 //折扣额:负值 net实洋 = amount + dscamt
  86. ,member: String = "" //会员卡号
  87. ,code: String = "" //编号(商品、卡号...)
  88. ,acct: String = "" //账号
  89. ,dpt: String = "" //部类
  90. )
  91. object TxnItem {
  92. def apply(vs: VchStates): TxnItem = TxnItem().copy(
  93. opr = vs.opr,
  94. num = vs.num,
  95. seq = vs.seq,
  96. member = vs.mbr
  97. )
  98. }
  99.  
  100. case class VchStatus( //操作状态锁留给前端维护
  101. qty: Int = 1,
  102. refund: Boolean = false,
  103. void: Boolean = false)
  104.  
  105. case class VchStates(
  106. opr: String = "", //收款员
  107. jseq: BigInt = 0, //begin journal sequence for read-side replay
  108. num: Int = 0, //当前单号
  109. seq: Int = 0, //当前序号
  110. void: Boolean = false, //取消模式
  111. refd: Boolean = false, //退款模式
  112. susp: Boolean = false, //挂单
  113. canc: Boolean = false, //废单
  114. due: Boolean = true, //当前余额
  115. su: String = "",
  116. mbr: String = ""
  117. ) {
  118.  
  119. def nextVoucher : VchStates = VchStates().copy(
  120. opr = this.opr,
  121. jseq = this.jseq + 1,
  122. num = this.num + 1
  123. )
  124. }
  125.  
  126.  
  127. object STATUS {
  128. val OK: Int = 0
  129. val FAIL: Int = -1
  130. }
  131.  
  132. case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
  133.  
  134. /* message on the wire (exchanged message) */
  135. val shardName = "POSShard"
  136.  
  137. case class POSMessage(id: Long, cmd: Command) {
  138. def shopId = id.toString.head.toString
  139. def posId = id.toString
  140. }
  141.  
  142. val getPOSId: ShardRegion.ExtractEntityId = {
  143. case posCommand: POSMessage => (posCommand.posId,posCommand.cmd)
  144. }
  145. val getShopId: ShardRegion.ExtractShardId = {
  146. case posCommand: POSMessage => posCommand.shopId
  147. }
  148.  
  149.  
  150. case object PassivatePOS //passivate message
  151. case object FilteredOut
  152. case class DebugMode(debug: Boolean)
  153. case class NodeAddress(address: String)
  154. case class PID(id: String)
  155. case class PerformRead(pid: String, vchnum: Int, bseq: Long, eseq: Long)
  156. }

cql/CassandraEngine.scala

  1. package sdp.cql.engine
  2.  
  3. import akka.NotUsed
  4. import akka.stream.alpakka.cassandra.scaladsl._
  5. import akka.stream.scaladsl._
  6. import com.datastax.driver.core._
  7. import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
  8. import protobuf.bytes.Converter._
  9. import sdp.logging.LogSupport
  10. import sdp.result.DBOResult._
  11.  
  12. import scala.collection.JavaConverters._
  13. import scala.collection.generic.CanBuildFrom
  14. import scala.concurrent._
  15.  
  16. object CQLContext {
  17. // Consistency Levels
  18. type CONSISTENCY_LEVEL = Int
  19. val ANY: CONSISTENCY_LEVEL = 0x0000
  20. val ONE: CONSISTENCY_LEVEL = 0x0001
  21. val TWO: CONSISTENCY_LEVEL = 0x0002
  22. val THREE: CONSISTENCY_LEVEL = 0x0003
  23. val QUORUM : CONSISTENCY_LEVEL = 0x0004
  24. val ALL: CONSISTENCY_LEVEL = 0x0005
  25. val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006
  26. val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007
  27. val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A
  28. val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B
  29. val SERIAL: CONSISTENCY_LEVEL = 0x000C
  30.  
  31. def apply(): CQLUpdateContext = CQLUpdateContext(statements = Nil)
  32.  
  33. def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
  34. consistency match {
  35. case ALL => ConsistencyLevel.ALL
  36. case ONE => ConsistencyLevel.ONE
  37. case TWO => ConsistencyLevel.TWO
  38. case THREE => ConsistencyLevel.THREE
  39. case ANY => ConsistencyLevel.ANY
  40. case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
  41. case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
  42. case QUORUM => ConsistencyLevel.QUORUM
  43. case SERIAL => ConsistencyLevel.SERIAL
  44. case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
  45.  
  46. }
  47. }
  48.  
  49. }
  50. case class CQLQueryContext(
  51. statement: String,
  52. parameters: Seq[Object] = Nil,
  53. consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
  54. fetchSize: Int = 100
  55. ) { ctx =>
  56. def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext =
  57. ctx.copy(consistency = Some(_consistency))
  58. def setFetchSize(pageSize: Int): CQLQueryContext =
  59. ctx.copy(fetchSize = pageSize)
  60. def setParameters(param: Seq[Object]): CQLQueryContext =
  61. ctx.copy(parameters = param)
  62.  
  63. def toProto = new sdp.grpc.services.ProtoCQLQuery(
  64. statement = this.statement,
  65. parameters = { if (this.parameters == Nil) None
  66. else Some(sdp.grpc.services.ProtoAny(marshal(this.parameters))) },
  67. consistency = this.consistency,
  68. fetchSize = this.fetchSize
  69. )
  70. }
  71. object CQLQueryContext {
  72. def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameters = param)
  73. def fromProto(proto: sdp.grpc.services.ProtoCQLQuery) =
  74. new CQLQueryContext(
  75. statement = proto.statement,
  76. parameters =
  77. proto.parameters match {
  78. case None => Nil
  79. case Some(so) =>
  80. if (so.value == _root_.com.google.protobuf.ByteString.EMPTY)
  81. Nil
  82. else
  83. unmarshal[Seq[Object]](so.value)
  84. },
  85. consistency = proto.consistency,
  86. fetchSize = proto.fetchSize
  87. )
  88. }
  89.  
  90. case class CQLUpdateContext(
  91. statements: Seq[String],
  92. parameters: Seq[Seq[Object]] = Nil,
  93. psize: Int = 0,
  94. consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
  95. batch: Boolean = false
  96. ) extends LogSupport { ctx =>
  97. def setBatch(bat: Boolean) = ctx.copy(batch = bat)
  98. def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLUpdateContext =
  99. ctx.copy(consistency = Some(_consistency))
  100. def setCommand(_statement: String, _psize: Int, _parameters: Object*): CQLUpdateContext = {
  101. log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
  102. var _params = Seq[Seq[Object]]()
  103. if ( _psize > 0) {
  104. if (_psize == 1)
  105. _params = Seq(_parameters.asInstanceOf[Seq[Object]])
  106. else
  107. _params = _parameters.asInstanceOf[Seq[Seq[Object]]]
  108. }
  109. val nc = ctx.copy(statements = Seq(_statement), psize = _psize, parameters = _params)
  110. log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}")
  111. nc
  112. }
  113. def appendCommand(_statement: String, _parameters: Object*): CQLUpdateContext = {
  114. log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
  115. val nc = ctx.copy(statements = ctx.statements :+ _statement,
  116. parameters = ctx.parameters ++ Seq(_parameters))
  117. log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}")
  118. nc
  119. }
  120.  
  121. def toProto = new sdp.grpc.services.ProtoCQLUpdate(
  122. statements = this.statements,
  123. parameters = { if (this.parameters == Nil) None
  124. else Some(sdp.grpc.services.ProtoAny(marshal(this.parameters))) },
  125. consistency = this.consistency,
  126. batch = Some(this.batch)
  127. )
  128. }
  129.  
  130. object CQLUpdateContext {
  131. def fromProto(proto: sdp.grpc.services.ProtoCQLUpdate) =
  132. new CQLUpdateContext(
  133. statements = proto.statements,
  134. parameters =
  135. proto.parameters match {
  136. case None => Nil
  137. case Some(so) =>
  138. if (so.value == _root_.com.google.protobuf.ByteString.EMPTY)
  139. Nil
  140. else
  141. unmarshal[Seq[Seq[Object]]](so.value)
  142. },
  143. consistency = proto.consistency,
  144. batch = if(proto.batch == None) false else proto.batch.get
  145. )
  146. }
  147.  
  148. object CQLEngine extends LogSupport {
  149. import CQLContext._
  150. import CQLHelpers._
  151.  
  152. import scala.concurrent.Await
  153. import scala.concurrent.duration._
  154.  
  155. def fetchResult[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100
  156. ,extractor: Row => A)(
  157. implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): DBOResult[C[A]]= {
  158.  
  159. val prepStmt = session.prepare(ctx.statement)
  160.  
  161. var boundStmt = prepStmt.bind()
  162. var params: Seq[Object] = Nil
  163. if (ctx.parameters != Nil) {
  164. try {
  165. params = processParameters(ctx.parameters)
  166. boundStmt = prepStmt.bind(params: _*)
  167. }
  168. catch {
  169. case e: Exception =>
  170. log.error(s"fetchResult> prepStmt.bind error: ${e.getMessage}")
  171. Left(new RuntimeException(s"fetchResult> prepStmt.bind Error: ${e.getMessage}"))
  172. }
  173. }
  174. log.info(s"fetchResult> statement: ${prepStmt.getQueryString}, parameters: ${params}")
  175.  
  176. try {
  177. ctx.consistency.foreach { consistency =>
  178. boundStmt.setConsistencyLevel(consistencyLevel(consistency))
  179. }
  180.  
  181. val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
  182. val rows = resultSet.asScala.view.map(extractor).to[C]
  183. valueToDBOResult(rows)
  184. /*
  185. val ores = if(rows.isEmpty) None else Some(rows)
  186. optionToDBOResult(ores: Option[C[A]]) */
  187. }
  188. catch {
  189. case e: Exception =>
  190. log.error(s"fetchResult> runtime error: ${e.getMessage}")
  191. Left(new RuntimeException(s"fetchResult> Error: ${e.getMessage}"))
  192. }
  193. }
  194.  
  195. def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100
  196. ,extractor: Row => A)(
  197. implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
  198.  
  199. val prepStmt = session.prepare(ctx.statement)
  200.  
  201. var boundStmt = prepStmt.bind()
  202. var params: Seq[Object] = Nil
  203. if (ctx.parameters != Nil) {
  204. params = processParameters(ctx.parameters)
  205. boundStmt = prepStmt.bind(params:_*)
  206. }
  207. log.info(s"fetchResultPage> statement: ${prepStmt.getQueryString}, parameters: ${params}")
  208.  
  209. ctx.consistency.foreach {consistency =>
  210. boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
  211.  
  212. val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
  213. (resultSet,(resultSet.asScala.view.map(extractor)).to[C])
  214. }
  215.  
  216. def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
  217. extractor: Row => A)(implicit ec: ExecutionContext, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
  218. if (resultSet.isFullyFetched) {
  219. (resultSet, None)
  220. } else {
  221. try {
  222. val result = Await.result((resultSet.fetchMoreResults()).asScala, timeOut)
  223. (result, Some((result.asScala.view.map(extractor)).to[C]))
  224. } catch { case e: Throwable => (resultSet, None) }
  225. }
  226.  
  227. def cqlExecute(ctx: CQLUpdateContext)(
  228. implicit session: Session, ec: ExecutionContext): DBOResult[Boolean] = {
  229. var ctxparameters = Seq[Seq[Object]]()
  230. if (ctx.parameters != Nil)
  231. if (ctx.parameters.head != Nil) {
  232. ctxparameters = ctx.parameters.asInstanceOf[Seq[Seq[Seq[Object]]]].head
  233. }
  234.  
  235. var invalidBat = false
  236. if ( ctx.batch ) {
  237. if (ctxparameters == Nil)
  238. invalidBat = true
  239. else if (ctxparameters.size < 2)
  240. invalidBat = true;
  241. }
  242. if (!ctx.batch || invalidBat) {
  243. if(invalidBat)
  244. log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
  245.  
  246. if (ctx.statements.size == 1 && ctx.psize <= 1) {
  247. var param: Seq[Seq[Object]] = Nil
  248. if (ctxparameters != Nil)
  249. param = ctxparameters
  250. log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
  251. cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
  252. }
  253. else {
  254. var params: Seq[Seq[Object]] = ctxparameters
  255. var ctxstatements = ctx.statements
  256. if (ctxparameters.size < ctx.statements.size) {
  257. log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
  258. val pnils = Seq.fill(ctx.statements.length - ctxparameters.size)(Nil)
  259. params = ctxparameters ++ pnils
  260. }
  261. else {
  262. if (ctx.statements.size < ctxparameters.size) {
  263. log.warn(s"cqlExecute> fewer statements than parameters! pad with 'head'.")
  264. val heads = Seq.fill(ctxparameters.size - ctx.statements.size)(ctx.statements.head)
  265. ctxstatements = ctx.statements ++ heads
  266. }
  267. }
  268.  
  269. val commands: Seq[(String, Seq[Object])] = ctxstatements zip params
  270. log.info(s"cqlExecute> multi-commands: ${commands}")
  271. /*
  272. //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
  273. //therefore, make sure no command replies on prev command effect
  274. val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
  275. cqlSingleUpdate(ctx.consistency, stmt, param)
  276. }.toList
  277.  
  278. val futList = Future.sequence(lstCmds).map(_ => true) //must map to execute
  279. */
  280. /*
  281. //using traverse to have some degree of parallelism = max(runtimes)
  282. //therefore, make sure no command replies on prev command effect
  283. val futList = Future.traverse(commands) { case (stmt,param) =>
  284. cqlSingleUpdate(ctx.consistency, stmt, param)
  285. }.map(_ => true)
  286.  
  287. Await.result(futList, 3 seconds)
  288. Future.successful(true)
  289. */
  290. // run sync directly
  291. try {
  292. commands.foreach { case (stm, pars) =>
  293. cqlExecuteSync(ctx.consistency, stm, pars)
  294. }
  295. Right(true)
  296. }
  297. catch {
  298. case e: Exception =>
  299. log.error(s"cqlExecute> runtime error: ${e.getMessage}")
  300. Left(new RuntimeException(s"cqlExecute> Error: ${e.getMessage}"))
  301. }
  302. }
  303. }
  304. else
  305. cqlBatchUpdate(ctx)
  306. }
  307. def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Seq[Object]])(
  308. implicit session: Session, ec: ExecutionContext): DBOResult[Boolean] = {
  309.  
  310. val prepStmt = session.prepare(stmt)
  311.  
  312. var boundStmt = prepStmt.bind()
  313. var pars: Seq[Seq[Object]] = Nil
  314. if (params != Nil) {
  315. try {
  316. pars = params.map(processParameters(_))
  317. boundStmt = prepStmt.bind(pars.head: _*)
  318. }
  319. catch {
  320. case e: Exception =>
  321. log.error(s"cqlSingleUpdate> prepStmt.bind error: ${e.getMessage}")
  322. Left(new RuntimeException(s"cqlSingleUpdate> prepStmt.bind Error: ${e.getMessage}"))
  323. }
  324. }
  325. log.info(s"cqlSingleUpdate> statement: ${prepStmt.getQueryString}, parameters: ${pars}")
  326.  
  327. try {
  328. cons.foreach { consistency =>
  329. boundStmt.setConsistencyLevel(consistencyLevel(consistency))
  330. }
  331. val res = session.execute(boundStmt) //executeAsync(boundStmt).map(_.wasApplied())
  332. Right(res.wasApplied())
  333. }
  334. catch {
  335. case e: Exception =>
  336. log.error(s"cqlExecute> runtime error: ${e.getMessage}")
  337. Left(new RuntimeException(s"cqlExecute> Error: ${e.getMessage}"))
  338. }
  339. }
  340.  
  341. def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
  342. implicit session: Session, ec: ExecutionContext): Boolean = {
  343.  
  344. val prepStmt = session.prepare(stmt)
  345.  
  346. var boundStmt = prepStmt.bind()
  347. var pars: Seq[Object] = Nil
  348. if (params != Nil) {
  349. pars = processParameters(params)
  350. boundStmt = prepStmt.bind(pars: _*)
  351. }
  352. log.info(s"cqlExecuteSync> statement: ${prepStmt.getQueryString}, parameters: ${pars}")
  353.  
  354. cons.foreach { consistency =>
  355. boundStmt.setConsistencyLevel(consistencyLevel(consistency))
  356. }
  357. session.execute(boundStmt).wasApplied()
  358.  
  359. }
  360.  
  361. def cqlBatchUpdate(ctx: CQLUpdateContext)(
  362. implicit session: Session, ec: ExecutionContext): DBOResult[Boolean] = {
  363. var ctxparameters = Seq[Seq[Object]]()
  364. if(ctx.parameters != Nil)
  365. if (ctx.parameters.head != Nil)
  366. ctxparameters = ctx.parameters.asInstanceOf[Seq[Seq[Seq[Object]]]].head
  367. var params: Seq[Seq[Object]] = ctxparameters
  368. var ctxstatements = ctx.statements
  369. if (ctxparameters.size < ctx.statements.size) {
  370. log.warn(s"cqlBatchUpdate> fewer parameters than statements! pad with 'Nil'.")
  371. val pnils = Seq.fill(ctx.statements.length - ctxparameters.size)(Nil)
  372. params = ctxparameters ++ pnils
  373. }
  374. else {
  375. if (ctx.statements.size < ctxparameters.size) {
  376. log.warn(s"cqlBatchUpdate> fewer statements than parameters! pad with 'head'.")
  377. val heads = Seq.fill(ctxparameters.size - ctx.statements.size)(ctx.statements.head)
  378. ctxstatements = ctx.statements ++ heads
  379. }
  380. }
  381.  
  382. val commands: Seq[(String, Seq[Object])] = ctxstatements zip params
  383. log.info(s"cqlBatchUpdate> batch-commands: ${commands}")
  384.  
  385.  
  386.  
  387. var batch = new BatchStatement()
  388. try {
  389. commands.foreach { cmd =>
  390. val prepStmt = session.prepare(cmd._1)
  391. log.info(s"cqlBatchUpdate> batch with statement: ${cmd._1}, raw parameter: ${cmd._2}")
  392. if (cmd._2 == Nil) {
  393. val pars = processParameters(cmd._2)
  394. log.info(s"cqlBatchUpdate> batch with cooked parameters: ${pars}")
  395. batch.add(prepStmt.bind(pars: _*))
  396. } else {
  397. log.info(s"cqlBatchUpdate> batch with no parameter")
  398. batch.add(prepStmt.bind())
  399. }
  400. }
  401. ctx.consistency.foreach { consistency =>
  402. batch.setConsistencyLevel(consistencyLevel(consistency))
  403. }
  404. val res = session.execute(batch) //session.executeAsync(batch).map(_.wasApplied())
  405. Right(res.wasApplied())
  406. }
  407. catch {
  408. case e: Exception =>
  409. log.error(s"cqlBatchUpdate> runtime error: ${e.getMessage}")
  410. Left(new RuntimeException(s"cqlBatchUpdate> Error: ${e.getMessage}"))
  411. }
  412.  
  413. }
  414.  
  415. def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A)
  416. (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {
  417.  
  418. val prepStmt = session.prepare(ctx.statement)
  419. var boundStmt = prepStmt.bind()
  420. val params = processParameters(ctx.parameters)
  421. boundStmt = prepStmt.bind(params:_*)
  422. ctx.consistency.foreach {consistency =>
  423. boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
  424.  
  425. log.info(s"cassandraStream> statement: ${prepStmt.getQueryString}, parameters: ${params}")
  426. CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor)
  427. }
  428.  
  429. case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
  430. statement: String, prepareParams: R => Seq[Object],
  431. consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
  432. def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
  433. def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
  434. def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
  435. cas.copy(consistency = Some(_consistency))
  436.  
  437. def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
  438. var prepStmt = session.prepare(statement)
  439. var boundStmt = prepStmt.bind()
  440. val params = processParameters(prepareParams(r))
  441. boundStmt = prepStmt.bind(params: _*)
  442. consistency.foreach { cons =>
  443. boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
  444. }
  445. log.info(s"CassandraActionStream.perform> statement: ${prepStmt.getQueryString}, parameters: ${params}")
  446. import monix.eval.Task
  447. import monix.execution.Scheduler.Implicits.global
  448. session.execute(boundStmt)
  449. Task.now {r}.runToFuture
  450. //session.executeAsync(boundStmt).map(_ => r)
  451. }
  452.  
  453. def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
  454. if (processInOrder)
  455. Flow[R].mapAsync(parallelism)(perform)
  456. else
  457. Flow[R].mapAsyncUnordered(parallelism)(perform)
  458.  
  459. def unloggedBatch[K](statementBinder: (
  460. R, PreparedStatement) => BoundStatement,partitionKey: R => K)(
  461. implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = {
  462. val preparedStatement = session.prepare(statement)
  463. log.info(s"CassandraActionStream.unloggedBatch> statement: ${preparedStatement.getQueryString}")
  464. CassandraFlow.createUnloggedBatchWithPassThrough[R, K](
  465. parallelism,
  466. preparedStatement,
  467. statementBinder,
  468. partitionKey)
  469. }
  470.  
  471. }
  472. object CassandraActionStream {
  473. def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
  474. new CassandraActionStream[R]( statement=_statement, prepareParams = params)
  475. }
  476.  
  477.  
  478. }
  479.  
  480. object CQLHelpers extends LogSupport {
  481. import java.io._
  482. import java.nio.ByteBuffer
  483. import java.nio.file._
  484. import java.time.Instant
  485. import scala.util.Try
  486.  
  487.  
  488. import akka.stream._
  489. import akka.stream.scaladsl._
  490. import com.datastax.driver.core.LocalDate
  491. import com.datastax.driver.extras.codecs.jdk8.InstantCodec
  492. import java.util.concurrent.Executor
  493. /*
  494. implicit def listenableFutureToFuture[T](
  495. listenableFuture: ListenableFuture[T]): Future[T] = {
  496. val promise = Promise[T]()
  497. Futures.addCallback(listenableFuture, new FutureCallback[T] {
  498. def onFailure(error: Throwable): Unit = {
  499. promise.failure(error)
  500. ()
  501. }
  502. def onSuccess(result: T): Unit = {
  503. promise.success(result)
  504. ()
  505. }
  506. })
  507. promise.future
  508. } */
  509.  
  510. implicit def listenableFutureToFuture[A](lf: ListenableFuture[A])(implicit executionContext: ExecutionContext): Future[A] = {
  511. val promise = Promise[A]
  512. lf.addListener(new Runnable {
  513. def run() = promise.complete(Try(lf.get()))
  514. }, executionContext.asInstanceOf[Executor])
  515. promise.future
  516. }
  517.  
  518. implicit class ListenableFutureConverter[A](val lf: ListenableFuture[A]) extends AnyVal {
  519. def asScala(implicit ec: ExecutionContext): Future[A] = {
  520. val promise = Promise[A]
  521. lf.addListener(new Runnable {
  522. def run() = promise.complete(Try(lf.get()))
  523. }, ec.asInstanceOf[Executor])
  524. promise.future
  525. }
  526. }
  527. /*
  528. implicit def toScalaFuture[A](a: ListenableFuture[A])(implicit ec: ExecutionContext): Future[A] = {
  529. val promise = Promise[A]()
  530. a.addListener(new Runnable {
  531. def run() = {
  532. try {
  533. promise.success(a.get)
  534. } catch {
  535. case ex: ExecutionException => promise.failure(ex.getCause)
  536. case ex => promise.failure(ex)
  537. }
  538. }
  539. }, ec.asInstanceOf[Executor])
  540. promise.future
  541. } */
  542.  
  543. case class CQLDate(year: Int, month: Int, day: Int)
  544. case object CQLTodayDate
  545. case class CQLDateTime(year: Int, Month: Int,
  546. day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
  547. case object CQLDateTimeNow
  548.  
  549. def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate =
  550. dateToConvert.toInstant()
  551. .atZone(java.time.ZoneId.systemDefault())
  552. .toLocalDate()
  553.  
  554. def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime =
  555. dateToConvert.toInstant()
  556. .atZone(java.time.ZoneId.systemDefault())
  557. .toLocalTime()
  558.  
  559. def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime=
  560. new java.sql.Timestamp(
  561. dateToConvert.getTime()
  562. ).toLocalDateTime()
  563.  
  564. def processParameters(params: Seq[Object]): Seq[Object] = {
  565. import java.time.{Clock, ZoneId}
  566. log.info(s"[processParameters] input: ${params}")
  567. val outParams = params.map { obj =>
  568. obj match {
  569. case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
  570. case CQLTodayDate =>
  571. val today = java.time.LocalDate.now()
  572. LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
  573. case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
  574. case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
  575. Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
  576. case p@_ => p
  577. }
  578. }
  579. log.info(s"[processParameters] output: ${params}")
  580. outParams
  581. }
  582. class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
  583. override def read: Int = {
  584. if (!buf.hasRemaining) return -1
  585. buf.get
  586. }
  587.  
  588. override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
  589. val length: Int = Math.min(len, buf.remaining)
  590. buf.get(bytes, off, length)
  591. length
  592. }
  593. }
  594. object ByteBufferInputStream {
  595. def apply(buf: ByteBuffer): ByteBufferInputStream = {
  596. new ByteBufferInputStream(buf)
  597. }
  598. }
  599. class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {
  600.  
  601. override def write(b: Int): Unit = {
  602. buf.put(b.toByte)
  603. }
  604.  
  605. override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
  606. buf.put(bytes, off, len)
  607. }
  608. }
  609. object FixsizedByteBufferOutputStream {
  610. def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
  611. }
  612. class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {
  613.  
  614. private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR
  615.  
  616. override def write(b: Array[Byte], off: Int, len: Int): Unit = {
  617. val position = buf.position
  618. val limit = buf.limit
  619. val newTotal: Long = position + len
  620. if(newTotal > limit){
  621. var capacity = (buf.capacity * increasing)
  622. while(capacity <= newTotal){
  623. capacity = (capacity*increasing)
  624. }
  625. increase(capacity.toInt)
  626. }
  627.  
  628. buf.put(b, 0, len)
  629. }
  630.  
  631. override def write(b: Int): Unit= {
  632. if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
  633. buf.put(b.toByte)
  634. }
  635. protected def increase(newCapacity: Int): Unit = {
  636. buf.limit(buf.position)
  637. buf.rewind
  638. val newBuffer =
  639. if (onHeap) ByteBuffer.allocate(newCapacity)
  640. else ByteBuffer.allocateDirect(newCapacity)
  641. newBuffer.put(buf)
  642. buf.clear
  643. buf = newBuffer
  644. }
  645. def size: Long = buf.position
  646. def capacity: Long = buf.capacity
  647. def byteBuffer: ByteBuffer = buf
  648. }
  649. object ExpandingByteBufferOutputStream {
  650. val DEFAULT_INCREASING_FACTOR = 1.5f
  651. def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
  652. if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
  653. val buffer: ByteBuffer =
  654. if (onHeap) ByteBuffer.allocate(size)
  655. else ByteBuffer.allocateDirect(size)
  656. new ExpandingByteBufferOutputStream(buffer,onHeap)
  657. }
  658. def apply(size: Int): ExpandingByteBufferOutputStream = {
  659. apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
  660. }
  661.  
  662. def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
  663. apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
  664. }
  665.  
  666. def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
  667. apply(size, increasingBy, false)
  668. }
  669.  
  670. }
  671. def cqlFileToBytes(fileName: String): ByteBuffer = {
  672. val fis = new FileInputStream(fileName)
  673. val b = new Array[Byte](fis.available + 1)
  674. val length = b.length
  675. fis.read(b)
  676. ByteBuffer.wrap(b)
  677. }
  678. def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
  679. implicit mat: Materializer): Future[IOResult] = {
  680. val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
  681. source.runWith(FileIO.toPath(Paths.get(fileName)))
  682. }
  683. def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
  684. val outputFormat = new java.text.SimpleDateFormat(fmt)
  685. outputFormat.format(date)
  686. }
  687. def useJava8DateTime(cluster: Cluster) = {
  688. //for jdk8 datetime format
  689. cluster.getConfiguration().getCodecRegistry()
  690. .register(InstantCodec.instance)
  691. }
  692. }

 

mgo/MGOProtoConversion.scala

  1. package sdp.mongo.engine
  2. import org.mongodb.scala.bson.collection.immutable.Document
  3. import org.bson.conversions.Bson
  4. import sdp.grpc.services._
  5. import protobuf.bytes.Converter._
  6. import MGOClasses._
  7. import MGOAdmins._
  8. import MGOCommands._
  9. import org.bson.BsonDocument
  10. import org.bson.codecs.configuration.CodecRegistry
  11. import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
  12. import org.mongodb.scala.FindObservable
  13.  
  14. object MGOProtoConversion {
  15.  
  16. type MGO_COMMAND_TYPE = Int
  17. val MGO_COMMAND_FIND = 0
  18. val MGO_COMMAND_COUNT = 20
  19. val MGO_COMMAND_DISTICT = 21
  20. val MGO_COMMAND_DOCUMENTSTREAM = 1
  21. val MGO_COMMAND_AGGREGATE = 2
  22. val MGO_COMMAND_INSERT = 3
  23. val MGO_COMMAND_DELETE = 4
  24. val MGO_COMMAND_REPLACE = 5
  25. val MGO_COMMAND_UPDATE = 6
  26.  
  27.  
  28. val MGO_ADMIN_DROPCOLLECTION = 8
  29. val MGO_ADMIN_CREATECOLLECTION = 9
  30. val MGO_ADMIN_LISTCOLLECTION = 10
  31. val MGO_ADMIN_CREATEVIEW = 11
  32. val MGO_ADMIN_CREATEINDEX = 12
  33. val MGO_ADMIN_DROPINDEXBYNAME = 13
  34. val MGO_ADMIN_DROPINDEXBYKEY = 14
  35. val MGO_ADMIN_DROPALLINDEXES = 15
  36.  
  37.  
  38. case class AdminContext(
  39. tarName: String = "",
  40. bsonParam: Seq[Bson] = Nil,
  41. options: Option[Any] = None,
  42. objName: String = ""
  43. ){
  44. def toProto = sdp.grpc.services.ProtoMGOAdmin(
  45. tarName = this.tarName,
  46. bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
  47. objName = this.objName,
  48. options = this.options.map(b => ProtoAny(marshal(b)))
  49.  
  50. )
  51. }
  52.  
  53. object AdminContext {
  54. def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
  55. tarName = msg.tarName,
  56. bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
  57. objName = msg.objName,
  58. options = msg.options.map(b => unmarshal[Any](b.value))
  59. )
  60. }
  61.  
  62. case class Context(
  63. dbName: String = "",
  64. collName: String = "",
  65. commandType: MGO_COMMAND_TYPE,
  66. bsonParam: Seq[Bson] = Nil,
  67. resultOptions: Seq[ResultOptions] = Nil,
  68. options: Option[Any] = None,
  69. documents: Seq[Document] = Nil,
  70. targets: Seq[String] = Nil,
  71. only: Boolean = false,
  72. adminOptions: Option[AdminContext] = None
  73. ){
  74.  
  75. def toProto = new sdp.grpc.services.ProtoMGOContext(
  76. dbName = this.dbName,
  77. collName = this.collName,
  78. commandType = this.commandType,
  79. bsonParam = this.bsonParam.map(bsonToProto),
  80. resultOptions = this.resultOptions.map(_.toProto),
  81. options = { if(this.options == None)
  82. None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  83. else
  84. Some(ProtoAny(marshal(this.options.get))) },
  85. documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
  86. targets = this.targets,
  87. only = Some(this.only),
  88. adminOptions = this.adminOptions.map(_.toProto)
  89. )
  90.  
  91. }
  92.  
  93. object MGODocument {
  94. def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
  95. unmarshal[Document](msg.document)
  96. def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
  97. new ProtoMGODocument(marshal(doc))
  98. }
  99.  
  100. object MGOProtoMsg {
  101. def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
  102. dbName = msg.dbName,
  103. collName = msg.collName,
  104. commandType = msg.commandType,
  105. bsonParam = msg.bsonParam.map(protoToBson),
  106. resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
  107. options = msg.options.map(a => unmarshal[Any](a.value)),
  108. documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
  109. targets = msg.targets,
  110. adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
  111. )
  112. }
  113.  
  114. def bsonToProto(bson: Bson) =
  115. ProtoMGOBson(marshal(bson.toBsonDocument(
  116. classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
  117.  
  118. def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
  119. val bsdoc = unmarshal[BsonDocument](proto.bson)
  120. override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
  121. }
  122.  
  123. def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
  124. case MGO_COMMAND_FIND => {
  125. var ctx = new MGOContext(
  126. dbName = proto.dbName,
  127. collName = proto.collName,
  128. actionType = MGO_QUERY,
  129. action = Some(Find())
  130. )
  131. def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
  132. rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
  133.  
  134. (proto.bsonParam, proto.resultOptions, proto.only) match {
  135. case (Nil, Nil, None) => ctx
  136. case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
  137. case (bp,Nil,None) => ctx.setCommand(
  138. Find(filter = Some(protoToBson(bp.head))))
  139. case (bp,Nil,Some(b)) => ctx.setCommand(
  140. Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
  141. case (bp,fo,None) => {
  142. ctx.setCommand(
  143. Find(filter = Some(protoToBson(bp.head)),
  144. andThen = fo.map(ResultOptions.fromProto)
  145. ))
  146. }
  147. case (bp,fo,Some(b)) => {
  148. ctx.setCommand(
  149. Find(filter = Some(protoToBson(bp.head)),
  150. andThen = fo.map(ResultOptions.fromProto),
  151. firstOnly = b))
  152. }
  153. case _ => ctx
  154. }
  155. }
  156. case MGO_COMMAND_COUNT => {
  157. var ctx = new MGOContext(
  158. dbName = proto.dbName,
  159. collName = proto.collName,
  160. actionType = MGO_QUERY,
  161. action = Some(Count())
  162. )
  163. (proto.bsonParam, proto.options) match {
  164. case (Nil, None) => ctx
  165. case (bp, None) => ctx.setCommand(
  166. Count(filter = Some(protoToBson(bp.head)))
  167. )
  168. case (Nil,Some(o)) => ctx.setCommand(
  169. Count(options = Some(unmarshal[Any](o.value)))
  170. )
  171. case _ => ctx
  172. }
  173. }
  174. case MGO_COMMAND_DISTICT => {
  175. var ctx = new MGOContext(
  176. dbName = proto.dbName,
  177. collName = proto.collName,
  178. actionType = MGO_QUERY,
  179. action = Some(Distict(fieldName = proto.targets.head))
  180. )
  181. (proto.bsonParam) match {
  182. case Nil => ctx
  183. case bp: Seq[ProtoMGOBson] => ctx.setCommand(
  184. Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
  185. )
  186. case _ => ctx
  187. }
  188. }
  189. case MGO_COMMAND_AGGREGATE => {
  190. new MGOContext(
  191. dbName = proto.dbName,
  192. collName = proto.collName,
  193. actionType = MGO_QUERY,
  194. action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
  195. )
  196. }
  197. case MGO_ADMIN_LISTCOLLECTION => {
  198. new MGOContext(
  199. dbName = proto.dbName,
  200. collName = proto.collName,
  201. actionType = MGO_QUERY,
  202. action = Some(ListCollection(proto.dbName)))
  203. }
  204. case MGO_COMMAND_INSERT => {
  205. var ctx = new MGOContext(
  206. dbName = proto.dbName,
  207. collName = proto.collName,
  208. actionType = MGO_UPDATE,
  209. action = Some(Insert(
  210. newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
  211. )
  212. proto.options match {
  213. case None => ctx
  214. case Some(o) => ctx.setCommand(Insert(
  215. newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
  216. options = Some(unmarshal[Any](o.value)))
  217. )
  218. }
  219. }
  220. case MGO_COMMAND_DELETE => {
  221. var ctx = new MGOContext(
  222. dbName = proto.dbName,
  223. collName = proto.collName,
  224. actionType = MGO_UPDATE,
  225. action = Some(Delete(
  226. filter = protoToBson(proto.bsonParam.head)))
  227. )
  228. (proto.options, proto.only) match {
  229. case (None,None) => ctx
  230. case (None,Some(b)) => ctx.setCommand(Delete(
  231. filter = protoToBson(proto.bsonParam.head),
  232. onlyOne = b))
  233. case (Some(o),None) => ctx.setCommand(Delete(
  234. filter = protoToBson(proto.bsonParam.head),
  235. options = Some(unmarshal[Any](o.value)))
  236. )
  237. case (Some(o),Some(b)) => ctx.setCommand(Delete(
  238. filter = protoToBson(proto.bsonParam.head),
  239. options = Some(unmarshal[Any](o.value)),
  240. onlyOne = b)
  241. )
  242. }
  243. }
  244. case MGO_COMMAND_REPLACE => {
  245. var ctx = new MGOContext(
  246. dbName = proto.dbName,
  247. collName = proto.collName,
  248. actionType = MGO_UPDATE,
  249. action = Some(Replace(
  250. filter = protoToBson(proto.bsonParam.head),
  251. replacement = unmarshal[Document](proto.documents.head.document)))
  252. )
  253. proto.options match {
  254. case None => ctx
  255. case Some(o) => ctx.setCommand(Replace(
  256. filter = protoToBson(proto.bsonParam.head),
  257. replacement = unmarshal[Document](proto.documents.head.document),
  258. options = Some(unmarshal[Any](o.value)))
  259. )
  260. }
  261. }
  262. case MGO_COMMAND_UPDATE => {
  263. var ctx = new MGOContext(
  264. dbName = proto.dbName,
  265. collName = proto.collName,
  266. actionType = MGO_UPDATE,
  267. action = Some(Update(
  268. filter = protoToBson(proto.bsonParam.head),
  269. update = protoToBson(proto.bsonParam.tail.head)))
  270. )
  271. (proto.options, proto.only) match {
  272. case (None,None) => ctx
  273. case (None,Some(b)) => ctx.setCommand(Update(
  274. filter = protoToBson(proto.bsonParam.head),
  275. update = protoToBson(proto.bsonParam.tail.head),
  276. onlyOne = b))
  277. case (Some(o),None) => ctx.setCommand(Update(
  278. filter = protoToBson(proto.bsonParam.head),
  279. update = protoToBson(proto.bsonParam.tail.head),
  280. options = Some(unmarshal[Any](o.value)))
  281. )
  282. case (Some(o),Some(b)) => ctx.setCommand(Update(
  283. filter = protoToBson(proto.bsonParam.head),
  284. update = protoToBson(proto.bsonParam.tail.head),
  285. options = Some(unmarshal[Any](o.value)),
  286. onlyOne = b)
  287. )
  288. }
  289. }
  290. case MGO_ADMIN_DROPCOLLECTION =>
  291. new MGOContext(
  292. dbName = proto.dbName,
  293. collName = proto.collName,
  294. actionType = MGO_ADMIN,
  295. action = Some(DropCollection(proto.collName))
  296. )
  297. case MGO_ADMIN_CREATECOLLECTION => {
  298. var ctx = new MGOContext(
  299. dbName = proto.dbName,
  300. collName = proto.collName,
  301. actionType = MGO_ADMIN,
  302. action = Some(CreateCollection(proto.collName))
  303. )
  304. proto.options match {
  305. case None => ctx
  306. case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
  307. options = Some(unmarshal[Any](o.value)))
  308. )
  309. }
  310. }
  311. case MGO_ADMIN_CREATEVIEW => {
  312. var ctx = new MGOContext(
  313. dbName = proto.dbName,
  314. collName = proto.collName,
  315. actionType = MGO_ADMIN,
  316. action = Some(CreateView(viewName = proto.targets.head,
  317. viewOn = proto.targets.tail.head,
  318. pipeline = proto.bsonParam.map(p => protoToBson(p))))
  319. )
  320. proto.options match {
  321. case None => ctx
  322. case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
  323. viewOn = proto.targets.tail.head,
  324. pipeline = proto.bsonParam.map(p => protoToBson(p)),
  325. options = Some(unmarshal[Any](o.value)))
  326. )
  327. }
  328. }
  329. case MGO_ADMIN_CREATEINDEX=> {
  330. var ctx = new MGOContext(
  331. dbName = proto.dbName,
  332. collName = proto.collName,
  333. actionType = MGO_ADMIN,
  334. action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
  335. )
  336. proto.options match {
  337. case None => ctx
  338. case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
  339. options = Some(unmarshal[Any](o.value)))
  340. )
  341. }
  342. }
  343. case MGO_ADMIN_DROPINDEXBYNAME=> {
  344. var ctx = new MGOContext(
  345. dbName = proto.dbName,
  346. collName = proto.collName,
  347. actionType = MGO_ADMIN,
  348. action = Some(DropIndexByName(indexName = proto.targets.head))
  349. )
  350. proto.options match {
  351. case None => ctx
  352. case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
  353. options = Some(unmarshal[Any](o.value)))
  354. )
  355. }
  356. }
  357. case MGO_ADMIN_DROPINDEXBYKEY=> {
  358. var ctx = new MGOContext(
  359. dbName = proto.dbName,
  360. collName = proto.collName,
  361. actionType = MGO_ADMIN,
  362. action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
  363. )
  364. proto.options match {
  365. case None => ctx
  366. case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
  367. options = Some(unmarshal[Any](o.value)))
  368. )
  369. }
  370. }
  371. case MGO_ADMIN_DROPALLINDEXES=> {
  372. var ctx = new MGOContext(
  373. dbName = proto.dbName,
  374. collName = proto.collName,
  375. actionType = MGO_ADMIN,
  376. action = Some(DropAllIndexes())
  377. )
  378. proto.options match {
  379. case None => ctx
  380. case Some(o) => ctx.setCommand(DropAllIndexes(
  381. options = Some(unmarshal[Any](o.value)))
  382. )
  383. }
  384. }
  385.  
  386. }
  387.  
  388. def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
  389. case None => None
  390. case Some(act) => act match {
  391. case Count(filter, options) =>
  392. Some(new sdp.grpc.services.ProtoMGOContext(
  393. dbName = ctx.dbName,
  394. collName = ctx.collName,
  395. commandType = MGO_COMMAND_COUNT,
  396. bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
  397. else Seq(bsonToProto(filter.get))},
  398. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  399. else Some(ProtoAny(marshal(options.get))) }
  400. ))
  401. case Distict(fieldName, filter) =>
  402. Some(new sdp.grpc.services.ProtoMGOContext(
  403. dbName = ctx.dbName,
  404. collName = ctx.collName,
  405. commandType = MGO_COMMAND_DISTICT,
  406. bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
  407. else Seq(bsonToProto(filter.get))},
  408. targets = Seq(fieldName)
  409.  
  410. ))
  411.  
  412. case Find(filter, andThen, firstOnly) =>
  413. Some(new sdp.grpc.services.ProtoMGOContext(
  414. dbName = ctx.dbName,
  415. collName = ctx.collName,
  416. commandType = MGO_COMMAND_FIND,
  417. bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
  418. else Seq(bsonToProto(filter.get))},
  419. resultOptions = andThen.map(_.toProto)
  420. ))
  421.  
  422. case Aggregate(pipeLine) =>
  423. Some(new sdp.grpc.services.ProtoMGOContext(
  424. dbName = ctx.dbName,
  425. collName = ctx.collName,
  426. commandType = MGO_COMMAND_AGGREGATE,
  427. bsonParam = pipeLine.map(bsonToProto)
  428. ))
  429.  
  430. case Insert(newdocs, options) =>
  431. Some(new sdp.grpc.services.ProtoMGOContext(
  432. dbName = ctx.dbName,
  433. collName = ctx.collName,
  434. commandType = MGO_COMMAND_INSERT,
  435. documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
  436. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  437. else Some(ProtoAny(marshal(options.get))) }
  438. ))
  439.  
  440. case Delete(filter, options, onlyOne) =>
  441. Some(new sdp.grpc.services.ProtoMGOContext(
  442. dbName = ctx.dbName,
  443. collName = ctx.collName,
  444. commandType = MGO_COMMAND_DELETE,
  445. bsonParam = Seq(bsonToProto(filter)),
  446. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  447. else Some(ProtoAny(marshal(options.get))) },
  448. only = Some(onlyOne)
  449. ))
  450.  
  451. case Replace(filter, replacement, options) =>
  452. Some(new sdp.grpc.services.ProtoMGOContext(
  453. dbName = ctx.dbName,
  454. collName = ctx.collName,
  455. commandType = MGO_COMMAND_REPLACE,
  456. bsonParam = Seq(bsonToProto(filter)),
  457. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  458. else Some(ProtoAny(marshal(options.get))) },
  459. documents = Seq(ProtoMGODocument(marshal(replacement)))
  460. ))
  461.  
  462. case Update(filter, update, options, onlyOne) =>
  463. Some(new sdp.grpc.services.ProtoMGOContext(
  464. dbName = ctx.dbName,
  465. collName = ctx.collName,
  466. commandType = MGO_COMMAND_UPDATE,
  467. bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
  468. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  469. else Some(ProtoAny(marshal(options.get))) },
  470. only = Some(onlyOne)
  471. ))
  472.  
  473.  
  474. case DropCollection(coll) =>
  475. Some(new sdp.grpc.services.ProtoMGOContext(
  476. dbName = ctx.dbName,
  477. collName = coll,
  478. commandType = MGO_ADMIN_DROPCOLLECTION
  479. ))
  480.  
  481. case CreateCollection(coll, options) =>
  482. Some(new sdp.grpc.services.ProtoMGOContext(
  483. dbName = ctx.dbName,
  484. collName = coll,
  485. commandType = MGO_ADMIN_CREATECOLLECTION,
  486. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  487. else Some(ProtoAny(marshal(options.get))) }
  488. ))
  489.  
  490. case ListCollection(dbName) =>
  491. Some(new sdp.grpc.services.ProtoMGOContext(
  492. dbName = ctx.dbName,
  493. commandType = MGO_ADMIN_LISTCOLLECTION
  494. ))
  495.  
  496. case CreateView(viewName, viewOn, pipeline, options) =>
  497. Some(new sdp.grpc.services.ProtoMGOContext(
  498. dbName = ctx.dbName,
  499. collName = ctx.collName,
  500. commandType = MGO_ADMIN_CREATEVIEW,
  501. bsonParam = pipeline.map(bsonToProto),
  502. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  503. else Some(ProtoAny(marshal(options.get))) },
  504. targets = Seq(viewName,viewOn)
  505. ))
  506.  
  507. case CreateIndex(key, options) =>
  508. Some(new sdp.grpc.services.ProtoMGOContext(
  509. dbName = ctx.dbName,
  510. collName = ctx.collName,
  511. commandType = MGO_ADMIN_CREATEINDEX,
  512. bsonParam = Seq(bsonToProto(key)),
  513. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  514. else Some(ProtoAny(marshal(options.get))) }
  515. ))
  516.  
  517.  
  518. case DropIndexByName(indexName, options) =>
  519. Some(new sdp.grpc.services.ProtoMGOContext(
  520. dbName = ctx.dbName,
  521. collName = ctx.collName,
  522. commandType = MGO_ADMIN_DROPINDEXBYNAME,
  523. targets = Seq(indexName),
  524. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  525. else Some(ProtoAny(marshal(options.get))) }
  526. ))
  527.  
  528. case DropIndexByKey(key, options) =>
  529. Some(new sdp.grpc.services.ProtoMGOContext(
  530. dbName = ctx.dbName,
  531. collName = ctx.collName,
  532. commandType = MGO_ADMIN_DROPINDEXBYKEY,
  533. bsonParam = Seq(bsonToProto(key)),
  534. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  535. else Some(ProtoAny(marshal(options.get))) }
  536. ))
  537.  
  538.  
  539. case DropAllIndexes(options) =>
  540. Some(new sdp.grpc.services.ProtoMGOContext(
  541. dbName = ctx.dbName,
  542. collName = ctx.collName,
  543. commandType = MGO_ADMIN_DROPALLINDEXES,
  544. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  545. else Some(ProtoAny(marshal(options.get))) }
  546. ))
  547.  
  548. }
  549. }
  550.  
  551. }

mgo/ObservableToPublisher.scala

  1. package sdp.mongo.engine
  2.  
  3. import java.util.concurrent.atomic.AtomicBoolean
  4.  
  5. import org.mongodb.{scala => mongoDB}
  6. import org.{reactivestreams => rxStreams}
  7.  
  8. final case class ObservableToPublisher[T](observable: mongoDB.Observable[T])
  9. extends rxStreams.Publisher[T] {
  10. def subscribe(subscriber: rxStreams.Subscriber[_ >: T]): Unit =
  11. observable.subscribe(new mongoDB.Observer[T]() {
  12. override def onSubscribe(subscription: mongoDB.Subscription): Unit =
  13. subscriber.onSubscribe(new rxStreams.Subscription() {
  14. private final val cancelled: AtomicBoolean = new AtomicBoolean
  15.  
  16. override def request(n: Long): Unit =
  17. if (!subscription.isUnsubscribed && !cancelled.get() && n < 1) {
  18. subscriber.onError(
  19. new IllegalArgumentException(
  20. s"Demand from publisher should be a positive amount. Current amount is:$n"
  21. )
  22. )
  23. } else {
  24. subscription.request(n)
  25. }
  26.  
  27. override def cancel(): Unit =
  28. if (!cancelled.getAndSet(true)) subscription.unsubscribe()
  29. })
  30.  
  31. def onNext(result: T): Unit = subscriber.onNext(result)
  32.  
  33. def onError(e: Throwable): Unit = subscriber.onError(e)
  34.  
  35. def onComplete(): Unit = subscriber.onComplete()
  36. })
  37. }

mgo/MongoEngine.scala

  1. package sdp.mongo.engine
  2.  
  3. import java.text.SimpleDateFormat
  4. import java.util.Calendar
  5.  
  6. import akka.NotUsed
  7. import akka.stream.Materializer
  8. import akka.stream.alpakka.mongodb.scaladsl._
  9. import akka.stream.scaladsl.{Flow, Source}
  10. import org.bson.conversions.Bson
  11. import org.mongodb.scala.bson.collection.immutable.Document
  12. import org.mongodb.scala.bson.{BsonArray, BsonBinary}
  13. import org.mongodb.scala.model._
  14. import org.mongodb.scala.{MongoClient, _}
  15. import protobuf.bytes.Converter._
  16. import sdp.file.Streaming._
  17. import sdp.logging.LogSupport
  18.  
  19. import scala.collection.JavaConverters._
  20. import scala.concurrent._
  21. import scala.concurrent.duration._
  22.  
  23. object MGOClasses {
  24. type MGO_ACTION_TYPE = Int
  25. val MGO_QUERY = 0
  26. val MGO_UPDATE = 1
  27. val MGO_ADMIN = 2
  28.  
  29. /* org.mongodb.scala.FindObservable
  30. import com.mongodb.async.client.FindIterable
  31. val resultDocType = FindIterable[Document]
  32. val resultOption = FindObservable(resultDocType)
  33. .maxScan(...)
  34. .limit(...)
  35. .sort(...)
  36. .project(...) */
  37.  
  38. type FOD_TYPE = Int
  39. val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
  40. val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
  41. val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
  42. val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
  43. val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
  44. //Sets a document describing the fields to return for all matching documents
  45. val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
  46. val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
  47. //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
  48. val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
  49. //Sets the cursor type
  50. val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
  51. //Sets the hint for which index to use. A null value means no hint is set
  52. val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
  53. //Sets the exclusive upper bound for a specific index. A null value means no max is set
  54. val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
  55. //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
  56. val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
  57. //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
  58. val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
  59. //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
  60.  
  61. case class ResultOptions(
  62. optType: FOD_TYPE,
  63. bson: Option[Bson] = None,
  64. value: Int = 0 ){
  65. def toProto = new sdp.grpc.services.ProtoMGOResultOption(
  66. optType = this.optType,
  67. bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
  68. valueParam = this.value
  69. )
  70. def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
  71. optType match {
  72. case FOD_FIRST => find
  73. case FOD_FILTER => find.filter(bson.get)
  74. case FOD_LIMIT => find.limit(value)
  75. case FOD_SKIP => find.skip(value)
  76. case FOD_PROJECTION => find.projection(bson.get)
  77. case FOD_SORT => find.sort(bson.get)
  78. case FOD_PARTIAL => find.partial(value != 0)
  79. case FOD_CURSORTYPE => find
  80. case FOD_HINT => find.hint(bson.get)
  81. case FOD_MAX => find.max(bson.get)
  82. case FOD_MIN => find.min(bson.get)
  83. case FOD_RETURNKEY => find.returnKey(value != 0)
  84. case FOD_SHOWRECORDID => find.showRecordId(value != 0)
  85.  
  86. }
  87. }
  88. }
  89. object ResultOptions {
  90. def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions(
  91. optType = msg.optType,
  92. bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
  93. value = msg.valueParam
  94. )
  95.  
  96. }
  97.  
  98. trait MGOCommands
  99.  
  100. object MGOCommands {
  101.  
  102. case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands
  103.  
  104. case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands
  105.  
  106. /* org.mongodb.scala.FindObservable
  107. import com.mongodb.async.client.FindIterable
  108. val resultDocType = FindIterable[Document]
  109. val resultOption = FindObservable(resultDocType)
  110. .maxScan(...)
  111. .limit(...)
  112. .sort(...)
  113. .project(...) */
  114. case class Find(filter: Option[Bson] = None,
  115. andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
  116. firstOnly: Boolean = false) extends MGOCommands
  117.  
  118. case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
  119.  
  120. case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
  121.  
  122. case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
  123.  
  124. case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
  125.  
  126. case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
  127.  
  128. case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
  129.  
  130.  
  131. case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
  132.  
  133. }
  134.  
  135. object MGOAdmins {
  136.  
  137. case class DropCollection(collName: String) extends MGOCommands
  138.  
  139. case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
  140.  
  141. case class ListCollection(dbName: String) extends MGOCommands
  142.  
  143. case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
  144.  
  145. case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
  146.  
  147. case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
  148.  
  149. case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
  150.  
  151. case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
  152.  
  153. }
  154.  
  155. case class MGOContext(
  156. dbName: String,
  157. collName: String,
  158. actionType: MGO_ACTION_TYPE = MGO_QUERY,
  159. action: Option[MGOCommands] = None,
  160. actionOptions: Option[Any] = None,
  161. actionTargets: Seq[String] = Nil
  162. ) {
  163. ctx =>
  164. def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
  165.  
  166. def setCollName(name: String): MGOContext = ctx.copy(collName = name)
  167.  
  168. def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
  169.  
  170. def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
  171.  
  172. def toSomeProto = MGOProtoConversion.ctxToProto(this)
  173.  
  174. }
  175.  
  176. object MGOContext {
  177. def apply(db: String, coll: String) = new MGOContext(db, coll)
  178. def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
  179. MGOProtoConversion.ctxFromProto(proto)
  180. }
  181.  
  182. case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
  183. ctxs =>
  184. def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
  185. def appendContext(ctx: MGOContext): MGOBatContext =
  186. ctxs.copy(contexts = contexts :+ ctx)
  187. }
  188.  
  189. object MGOBatContext {
  190. def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
  191. }
  192.  
  193. type MGODate = java.util.Date
  194. def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
  195. val ca = Calendar.getInstance()
  196. ca.set(yyyy,mm,dd)
  197. ca.getTime()
  198. }
  199. def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
  200. val ca = Calendar.getInstance()
  201. ca.set(yyyy,mm,dd,hr,min,sec)
  202. ca.getTime()
  203. }
  204. def mgoDateTimeNow: MGODate = {
  205. val ca = Calendar.getInstance()
  206. ca.getTime
  207. }
  208.  
  209.  
  210. def mgoDateToString(dt: MGODate, formatString: String): String = {
  211. val fmt= new SimpleDateFormat(formatString)
  212. fmt.format(dt)
  213. }
  214.  
  215. type MGOBlob = BsonBinary
  216. type MGOArray = BsonArray
  217.  
  218. def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  219. implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
  220.  
  221. def mgoBlobToFile(blob: MGOBlob, fileName: String)(
  222. implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName)
  223.  
  224. def mgoGetStringOrNone(doc: Document, fieldName: String) = {
  225. if (doc.keySet.contains(fieldName))
  226. Some(doc.getString(fieldName))
  227. else None
  228. }
  229. def mgoGetIntOrNone(doc: Document, fieldName: String) = {
  230. if (doc.keySet.contains(fieldName))
  231. Some(doc.getInteger(fieldName))
  232. else None
  233. }
  234. def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
  235. if (doc.keySet.contains(fieldName))
  236. Some(doc.getLong(fieldName))
  237. else None
  238. }
  239. def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
  240. if (doc.keySet.contains(fieldName))
  241. Some(doc.getDouble(fieldName))
  242. else None
  243. }
  244. def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
  245. if (doc.keySet.contains(fieldName))
  246. Some(doc.getBoolean(fieldName))
  247. else None
  248. }
  249. def mgoGetDateOrNone(doc: Document, fieldName: String) = {
  250. if (doc.keySet.contains(fieldName))
  251. Some(doc.getDate(fieldName))
  252. else None
  253. }
  254. def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
  255. if (doc.keySet.contains(fieldName))
  256. doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
  257. else None
  258. }
  259. def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
  260. if (doc.keySet.contains(fieldName))
  261. doc.get(fieldName).asInstanceOf[Option[MGOArray]]
  262. else None
  263. }
  264.  
  265. def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
  266. (arr.getValues.asScala.toList)
  267. .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
  268. }
  269.  
  270. type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
  271. }
  272.  
  273.  
  274. object MGOEngine extends LogSupport {
  275.  
  276. import MGOClasses._
  277. import MGOAdmins._
  278. import MGOCommands._
  279. import sdp.result.DBOResult._
  280. import com.mongodb.reactivestreams.client.MongoClients
  281.  
  282. object TxUpdateMode {
  283. private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
  284. implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
  285. log.info(s"mgoTxUpdate> calling ...")
  286. observable.map(clientSession => {
  287.  
  288. val transactionOptions =
  289. TransactionOptions.builder()
  290. .readConcern(ReadConcern.SNAPSHOT)
  291. .writeConcern(WriteConcern.MAJORITY).build()
  292.  
  293. clientSession.startTransaction(transactionOptions)
  294. /*
  295. val fut = Future.traverse(ctxs.contexts) { ctx =>
  296. mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
  297. }
  298. Await.ready(fut, 3 seconds) */
  299.  
  300. ctxs.contexts.foreach { ctx =>
  301. mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
  302. }
  303. clientSession
  304. })
  305. }
  306.  
  307. private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
  308. log.info(s"commitAndRetry> calling ...")
  309. observable.recoverWith({
  310. case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
  311. log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
  312. commitAndRetry(observable)
  313. }
  314. case e: Exception => {
  315. log.error(s"commitAndRetry> Exception during commit ...: $e")
  316. throw e
  317. }
  318. })
  319. }
  320.  
  321. private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
  322. log.info(s"runTransactionAndRetry> calling ...")
  323. observable.recoverWith({
  324. case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
  325. log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
  326. runTransactionAndRetry(observable)
  327. }
  328. })
  329. }
  330.  
  331. def mgoTxBatch(ctxs: MGOBatContext)(
  332. implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
  333.  
  334. log.info(s"mgoTxBatch> MGOBatContext: ${ctxs}")
  335.  
  336. val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
  337. val commitTransactionObservable: SingleObservable[Completed] =
  338. updateObservable.flatMap(clientSession => clientSession.commitTransaction())
  339. val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
  340.  
  341. runTransactionAndRetry(commitAndRetryObservable)
  342.  
  343. valueToDBOResult(Completed())
  344.  
  345. }
  346. }
  347.  
  348.  
  349. def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
  350. log.info(s"mgoUpdateBatch> MGOBatContext: ${ctxs}")
  351. if (ctxs.tx) {
  352. TxUpdateMode.mgoTxBatch(ctxs)
  353. } else {
  354. /*
  355. val fut = Future.traverse(ctxs.contexts) { ctx =>
  356. mgoUpdate[Completed](ctx).map(identity) }
  357.  
  358. Await.ready(fut, 3 seconds)
  359. Future.successful(new Completed) */
  360. ctxs.contexts.foreach { ctx =>
  361. mgoUpdate[Completed](ctx).map(identity) }
  362.  
  363. valueToDBOResult(Completed())
  364. }
  365.  
  366. }
  367.  
  368. def mongoStream(ctx: MGOContext)(
  369. implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
  370.  
  371. log.info(s"mongoStream> MGOContext: ${ctx}")
  372.  
  373. ObservableToPublisher
  374.  
  375. def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
  376. rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
  377.  
  378. val db = client.getDatabase(ctx.dbName)
  379. val coll = db.getCollection(ctx.collName)
  380. if ( ctx.action == None) {
  381. log.error(s"mongoStream> uery action cannot be null!")
  382. throw new IllegalArgumentException("query action cannot be null!")
  383. }
  384. try {
  385. ctx.action.get match {
  386. case Find(None, Nil, false) => //FindObservable
  387. MongoSource(ObservableToPublisher(coll.find()))
  388. case Find(None, Nil, true) => //FindObservable
  389. MongoSource(ObservableToPublisher(coll.find().first()))
  390. case Find(Some(filter), Nil, false) => //FindObservable
  391. MongoSource(ObservableToPublisher(coll.find(filter)))
  392. case Find(Some(filter), Nil, true) => //FindObservable
  393. MongoSource(ObservableToPublisher(coll.find(filter).first()))
  394. case Find(None, sro, _) => //FindObservable
  395. val next = toResultOption(sro)
  396. MongoSource(ObservableToPublisher(next(coll.find[Document]())))
  397. case Find(Some(filter), sro, _) => //FindObservable
  398. val next = toResultOption(sro)
  399. MongoSource(ObservableToPublisher(next(coll.find[Document](filter))))
  400. case _ =>
  401. log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
  402. throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
  403.  
  404. }
  405. }
  406. catch { case e: Exception =>
  407. log.error(s"mongoStream> runtime error: ${e.getMessage}")
  408. throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
  409. }
  410.  
  411. }
  412.  
  413.  
  414. // T => FindIterable e.g List[Document]
  415. def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {
  416. log.info(s"mgoQuery> MGOContext: ${ctx}")
  417.  
  418. val db = client.getDatabase(ctx.dbName)
  419. val coll = db.getCollection(ctx.collName)
  420.  
  421. def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
  422. rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
  423.  
  424.  
  425. if ( ctx.action == None) {
  426. log.error(s"mgoQuery> uery action cannot be null!")
  427. Left(new IllegalArgumentException("query action cannot be null!"))
  428. }
  429. try {
  430. ctx.action.get match {
  431. /* count */
  432. case Count(Some(filter), Some(opt)) => //SingleObservable
  433. coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
  434. .toFuture().asInstanceOf[Future[T]]
  435. case Count(Some(filter), None) => //SingleObservable
  436. coll.countDocuments(filter).toFuture()
  437. .asInstanceOf[Future[T]]
  438. case Count(None, None) => //SingleObservable
  439. coll.countDocuments().toFuture()
  440. .asInstanceOf[Future[T]]
  441. /* distinct */
  442. case Distict(field, Some(filter)) => //DistinctObservable
  443. coll.distinct(field, filter).toFuture()
  444. .asInstanceOf[Future[T]]
  445. case Distict(field, None) => //DistinctObservable
  446. coll.distinct((field)).toFuture()
  447. .asInstanceOf[Future[T]]
  448. /* find */
  449. case Find(None, Nil, false) => //FindObservable
  450. if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
  451. else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
  452. case Find(None, Nil, true) => //FindObservable
  453. if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
  454. else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
  455. case Find(Some(filter), Nil, false) => //FindObservable
  456. if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
  457. else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  458. case Find(Some(filter), Nil, true) => //FindObservable
  459. if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
  460. else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
  461. case Find(None, sro, _) => //FindObservable
  462. val next = toResultOption(sro)
  463. if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
  464. else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  465. case Find(Some(filter), sro, _) => //FindObservable
  466. val next = toResultOption(sro)
  467. if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
  468. else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  469. /* aggregate AggregateObservable*/
  470. case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
  471. /* mapReduce MapReduceObservable*/
  472. case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
  473. /* list collection */
  474. case ListCollection(dbName) => //ListConllectionObservable
  475. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  476.  
  477. }
  478. }
  479. catch { case e: Exception =>
  480. log.error(s"mgoQuery> runtime error: ${e.getMessage}")
  481. Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))
  482. }
  483. }
  484. //T => Completed, result.UpdateResult, result.DeleteResult
  485. def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =
  486. try {
  487. mgoUpdateObservable[T](ctx).toFuture()
  488. }
  489. catch { case e: Exception =>
  490. log.error(s"mgoUpdate> runtime error: ${e.getMessage}")
  491. Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))
  492. }
  493.  
  494. def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
  495. log.info(s"mgoUpdateObservable> MGOContext: ${ctx}")
  496.  
  497. val db = client.getDatabase(ctx.dbName)
  498. val coll = db.getCollection(ctx.collName)
  499. if ( ctx.action == None) {
  500. log.error(s"mgoUpdateObservable> uery action cannot be null!")
  501. throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")
  502. }
  503. try {
  504. ctx.action.get match {
  505. /* insert */
  506. case Insert(docs, Some(opt)) => //SingleObservable[Completed]
  507. if (docs.size > 1)
  508. coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
  509. else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
  510. case Insert(docs, None) => //SingleObservable
  511. if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
  512. else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
  513. /* delete */
  514. case Delete(filter, None, onlyOne) => //SingleObservable
  515. if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
  516. else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
  517. case Delete(filter, Some(opt), onlyOne) => //SingleObservable
  518. if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
  519. else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
  520. /* replace */
  521. case Replace(filter, replacement, None) => //SingleObservable
  522. coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
  523. case Replace(filter, replacement, Some(opt)) => //SingleObservable
  524. coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]
  525. /* update */
  526. case Update(filter, update, None, onlyOne) => //SingleObservable
  527. if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
  528. else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
  529. case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
  530. if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
  531. else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
  532. /* bulkWrite */
  533. case BulkWrite(commands, None) => //SingleObservable
  534. coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
  535. case BulkWrite(commands, Some(opt)) => //SingleObservable
  536. coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
  537. }
  538. }
  539. catch { case e: Exception =>
  540. log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")
  541. throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")
  542. }
  543. }
  544.  
  545. def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {
  546. log.info(s"mgoAdmin> MGOContext: ${ctx}")
  547.  
  548. val db = client.getDatabase(ctx.dbName)
  549. val coll = db.getCollection(ctx.collName)
  550. if ( ctx.action == None) {
  551. log.error(s"mgoAdmin> uery action cannot be null!")
  552. Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))
  553. }
  554. try {
  555. ctx.action.get match {
  556. /* drop collection */
  557. case DropCollection(collName) => //SingleObservable
  558. val coll = db.getCollection(collName)
  559. coll.drop().toFuture()
  560. /* create collection */
  561. case CreateCollection(collName, None) => //SingleObservable
  562. db.createCollection(collName).toFuture()
  563. case CreateCollection(collName, Some(opt)) => //SingleObservable
  564. db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()
  565. /* list collection
  566. case ListCollection(dbName) => //ListConllectionObservable
  567. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  568. */
  569. /* create view */
  570. case CreateView(viewName, viewOn, pline, None) => //SingleObservable
  571. db.createView(viewName, viewOn, pline).toFuture()
  572. case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
  573. db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()
  574. /* create index */
  575. case CreateIndex(key, None) => //SingleObservable
  576. coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
  577. case CreateIndex(key, Some(opt)) => //SingleObservable
  578. coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
  579. /* drop index */
  580. case DropIndexByName(indexName, None) => //SingleObservable
  581. coll.dropIndex(indexName).toFuture()
  582. case DropIndexByName(indexName, Some(opt)) => //SingleObservable
  583. coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()
  584. case DropIndexByKey(key, None) => //SingleObservable
  585. coll.dropIndex(key).toFuture()
  586. case DropIndexByKey(key, Some(opt)) => //SingleObservable
  587. coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()
  588. case DropAllIndexes(None) => //SingleObservable
  589. coll.dropIndexes().toFuture()
  590. case DropAllIndexes(Some(opt)) => //SingleObservable
  591. coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()
  592. }
  593. }
  594. catch { case e: Exception =>
  595. log.error(s"mgoAdmin> runtime error: ${e.getMessage}")
  596. throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")
  597. }
  598.  
  599. }
  600.  
  601. /*
  602. def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
  603. val db = client.getDatabase(ctx.dbName)
  604. val coll = db.getCollection(ctx.collName)
  605. ctx.action match {
  606. /* count */
  607. case Count(Some(filter), Some(opt)) => //SingleObservable
  608. coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
  609. .toFuture().asInstanceOf[Future[T]]
  610. case Count(Some(filter), None) => //SingleObservable
  611. coll.countDocuments(filter).toFuture()
  612. .asInstanceOf[Future[T]]
  613. case Count(None, None) => //SingleObservable
  614. coll.countDocuments().toFuture()
  615. .asInstanceOf[Future[T]]
  616. /* distinct */
  617. case Distict(field, Some(filter)) => //DistinctObservable
  618. coll.distinct(field, filter).toFuture()
  619. .asInstanceOf[Future[T]]
  620. case Distict(field, None) => //DistinctObservable
  621. coll.distinct((field)).toFuture()
  622. .asInstanceOf[Future[T]]
  623. /* find */
  624. case Find(None, None, optConv, false) => //FindObservable
  625. if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
  626. else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
  627. case Find(None, None, optConv, true) => //FindObservable
  628. if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
  629. else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
  630. case Find(Some(filter), None, optConv, false) => //FindObservable
  631. if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
  632. else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  633. case Find(Some(filter), None, optConv, true) => //FindObservable
  634. if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
  635. else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
  636. case Find(None, Some(next), optConv, _) => //FindObservable
  637. if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
  638. else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  639. case Find(Some(filter), Some(next), optConv, _) => //FindObservable
  640. if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
  641. else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  642. /* aggregate AggregateObservable*/
  643. case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
  644. /* mapReduce MapReduceObservable*/
  645. case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
  646. /* insert */
  647. case Insert(docs, Some(opt)) => //SingleObservable[Completed]
  648. if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
  649. .asInstanceOf[Future[T]]
  650. else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
  651. .asInstanceOf[Future[T]]
  652. case Insert(docs, None) => //SingleObservable
  653. if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
  654. else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
  655. /* delete */
  656. case Delete(filter, None, onlyOne) => //SingleObservable
  657. if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
  658. else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
  659. case Delete(filter, Some(opt), onlyOne) => //SingleObservable
  660. if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
  661. else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
  662. /* replace */
  663. case Replace(filter, replacement, None) => //SingleObservable
  664. coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
  665. case Replace(filter, replacement, Some(opt)) => //SingleObservable
  666. coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  667. /* update */
  668. case Update(filter, update, None, onlyOne) => //SingleObservable
  669. if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
  670. else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
  671. case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
  672. if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  673. else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  674. /* bulkWrite */
  675. case BulkWrite(commands, None) => //SingleObservable
  676. coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
  677. case BulkWrite(commands, Some(opt)) => //SingleObservable
  678. coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
  679.  
  680. /* drop collection */
  681. case DropCollection(collName) => //SingleObservable
  682. val coll = db.getCollection(collName)
  683. coll.drop().toFuture().asInstanceOf[Future[T]]
  684. /* create collection */
  685. case CreateCollection(collName, None) => //SingleObservable
  686. db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
  687. case CreateCollection(collName, Some(opt)) => //SingleObservable
  688. db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
  689. /* list collection */
  690. case ListCollection(dbName) => //ListConllectionObservable
  691. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  692. /* create view */
  693. case CreateView(viewName, viewOn, pline, None) => //SingleObservable
  694. db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
  695. case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
  696. db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
  697. /* create index */
  698. case CreateIndex(key, None) => //SingleObservable
  699. coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
  700. case CreateIndex(key, Some(opt)) => //SingleObservable
  701. coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
  702. /* drop index */
  703. case DropIndexByName(indexName, None) => //SingleObservable
  704. coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
  705. case DropIndexByName(indexName, Some(opt)) => //SingleObservable
  706. coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  707. case DropIndexByKey(key, None) => //SingleObservable
  708. coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
  709. case DropIndexByKey(key, Some(opt)) => //SingleObservable
  710. coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  711. case DropAllIndexes(None) => //SingleObservable
  712. coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
  713. case DropAllIndexes(Some(opt)) => //SingleObservable
  714. coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  715. }
  716. }
  717. */
  718.  
  719.  
  720. }
  721.  
  722.  
  723. object MongoActionStream {
  724.  
  725. import MGOClasses._
  726.  
  727. case class StreamingInsert[A](dbName: String,
  728. collName: String,
  729. converter: A => Document,
  730. parallelism: Int = 1
  731. ) extends MGOCommands
  732.  
  733. case class StreamingDelete[A](dbName: String,
  734. collName: String,
  735. toFilter: A => Bson,
  736. parallelism: Int = 1,
  737. justOne: Boolean = false
  738. ) extends MGOCommands
  739.  
  740. case class StreamingUpdate[A](dbName: String,
  741. collName: String,
  742. toFilter: A => Bson,
  743. toUpdate: A => Bson,
  744. parallelism: Int = 1,
  745. justOne: Boolean = false
  746. ) extends MGOCommands
  747.  
  748.  
  749. case class InsertAction[A](ctx: StreamingInsert[A])(
  750. implicit mongoClient: MongoClient) {
  751.  
  752. val database = mongoClient.getDatabase(ctx.dbName)
  753. val collection = database.getCollection(ctx.collName)
  754.  
  755. def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
  756. Flow[A].map(ctx.converter)
  757. .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
  758. }
  759.  
  760. case class UpdateAction[A](ctx: StreamingUpdate[A])(
  761. implicit mongoClient: MongoClient) {
  762.  
  763. val database = mongoClient.getDatabase(ctx.dbName)
  764. val collection = database.getCollection(ctx.collName)
  765.  
  766. def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
  767. if (ctx.justOne) {
  768. Flow[A]
  769. .mapAsync(ctx.parallelism)(a =>
  770. collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
  771. } else
  772. Flow[A]
  773. .mapAsync(ctx.parallelism)(a =>
  774. collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
  775. }
  776.  
  777.  
  778. case class DeleteAction[A](ctx: StreamingDelete[A])(
  779. implicit mongoClient: MongoClient) {
  780.  
  781. val database = mongoClient.getDatabase(ctx.dbName)
  782. val collection = database.getCollection(ctx.collName)
  783.  
  784. def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
  785. if (ctx.justOne) {
  786. Flow[A]
  787. .mapAsync(ctx.parallelism)(a =>
  788. collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
  789. } else
  790. Flow[A]
  791. .mapAsync(ctx.parallelism)(a =>
  792. collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
  793. }
  794.  
  795. }
  796.  
  797. object MGOHelpers {
  798.  
  799. implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
  800. override val converter: (Document) => String = (doc) => doc.toJson
  801. }
  802.  
  803. implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
  804. override val converter: (C) => String = (doc) => doc.toString
  805. }
  806.  
  807. trait ImplicitObservable[C] {
  808. val observable: Observable[C]
  809. val converter: (C) => String
  810.  
  811. def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
  812.  
  813. def headResult() = Await.result(observable.head(), 10 seconds)
  814.  
  815. def printResults(initial: String = ""): Unit = {
  816. if (initial.length > 0) print(initial)
  817. results().foreach(res => println(converter(res)))
  818. }
  819.  
  820. def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  821. }
  822.  
  823. def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
  824. Await.result(fut, timeOut)
  825. }
  826.  
  827. def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
  828. Await.result(fut, timeOut)
  829. }
  830.  
  831. import monix.eval.Task
  832. import monix.execution.Scheduler.Implicits.global
  833.  
  834. final class FutureToTask[A](x: => Future[A]) {
  835. def asTask: Task[A] = Task.deferFuture[A](x)
  836. }
  837.  
  838. final class TaskToFuture[A](x: => Task[A]) {
  839. def asFuture: Future[A] = x.runToFuture
  840. }
  841.  
  842. }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

原文链接:http://www.cnblogs.com/tiger-xc/p/10948321.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号