前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。
假如我们有个业务系统也是在cassandra上的,那么reader就需要把从日志读出来的事件恢复成cassandra表里的数据行row。首先,我们需要在cassandra上创建相关的keyspace和table。下面是在scala中使用cassandra-java-driver的例子:
- import com.datastax.driver.core._
- import akka.actor.ActorSystem
- import akka.stream.ActorMaterializer
- import sdp.cql.engine._
- import CQLEngine._
- import CQLHelpers._
- import monix.execution.Scheduler.Implicits.global
- import scala.util._
- object CQLCreatTables extends App {
- //#init-mat
- implicit val cqlsys = ActorSystem("cqlSystem")
- implicit val mat = ActorMaterializer()
- // implicit val ec = cqlsys.dispatcher
- val cluster = new Cluster
- .Builder()
- .addContactPoints("192.168.11.189")
- .withPort(9042)
- .build()
- useJava8DateTime(cluster)
- implicit val session = cluster.connect()
- val createKeyspace = """
- CREATE KEYSPACE pos_on_cloud WITH replication = { //pos业务数据库
- 'class': 'SimpleStrategy',
- 'replication_factor': '3'
- }"""
- val createVchLog ="""
- CREATE TABLE pos_on_cloud.vch_log ( //销售单号日志 (可以从某日开始重新运算交易)
- terminal text,
- txndate text,
- vchnum int,
- begin_seq bigint,
- end_seq bigint,
- PRIMARY KEY (terminal,txndate,vchnum)
- )"""
- val createTxnItems ="""
- CREATE TABLE pos_on_cloud.txn_log ( //pos交易记录表
- terminal text,
- txndate text,
- txntime text,
- opr text,
- num int,
- seq int,
- txntype int,
- salestype int,
- qty int,
- price int,
- amount int,
- disc int,
- dscamt int,
- member text,
- code text,
- acct text,
- dpt text,
- PRIMARY KEY (terminal,txndate,num,seq)
- )"""
- val createTxnSuspend ="""
- CREATE TABLE pos_on_cloud.txn_hold ( //临时挂单表
- terminal text,
- txndate text,
- txntime text,
- opr text,
- num int,
- seq int,
- txntype int,
- salestype int,
- qty int,
- price int,
- amount int,
- disc int
- dscamt int,
- member text,
- code text,
- acct text,
- dpt text,
- PRIMARY KEY (terminal,txndate,num,seq)
- )"""
- val ctxKeyspace = CQLContext().setCommand(createKeyspace)
- val ctxVchlog = CQLContext().setCommand(createVchLog)
- val ctxTxnlog = CQLContext().setCommand(createTxnItems)
- val ctxTxnhold = CQLContext().setCommand(createTxnSuspend)
- val results = for {
- stsKeyspace <- cqlExecute(ctxKeyspace)
- stsVchlog <- cqlExecute(ctxVchlog)
- stsTxnlog <- cqlExecute(ctxTxnlog)
- stsTxnhold <- cqlExecute(ctxTxnhold)
- } yield (stsKeyspace,stsVchlog,stsTxnlog,stsTxnhold)
- val task = results.value.value
- val cancellableFut = task.runToFuture
- cancellableFut.onComplete {
- case Success(value) =>
- println(s"returned status: $value")
- case Failure(ex) =>
- System.err.println(s"ERROR: ${ex.getMessage}")
- }
- // cancellableFut.cancel()
- /*
- val cancelable = task.runAsync { result =>
- result match {
- case Right(value) =>
- println(value)
- case Left(ex) =>
- System.err.println(s"ERROR: ${ex.getMessage}")
- }
- } */
- scala.io.StdIn.readLine()
- session.close()
- cluster.close()
- cqlsys.terminate()
- }
这里面调用了之前PICE系列博文中设计的CassandraEngine里的工具源代码。下面是用CassandraEngine工具向cassandra表里插入数据的示范代码:
- object DBWriter {
- def writeTxnsToDB(vchnum: Int, susp: Boolean, txns: List[TxnItem])(pid: String, bseq: Long, eseq: Long) = {
- import monix.execution.Scheduler.Implicits.global
- val cluster = new Cluster
- .Builder()
- .addContactPoints("192.168.11.189")
- .withPort(9042)
- .build()
- useJava8DateTime(cluster)
- implicit val session = cluster.connect()
- val insertVchLog = """
- |insert into pos_on_cloud.vch_log(terminal,txndate,vchnum,begin_seq,end_seq)
- |values(?,?,?,?,?)
- |""".stripMargin
- val insertTxns = """
- |insert into pos_on_cloud.txn_log(terminal,txndate,txntime,opr,num,seq,txntype,salestype,
- |qty,price,amount,disc,dscamt,member,code,acct,dpt)
- |values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
- """.stripMargin
- val insertSusp = """
- |insert into pos_on_cloud.txn_hold(terminal,txndate,txntime,opr,num,seq,txntype,salestype,
- |qty,price,amount,disc,dscamt,member,code,acct,dpt)
- |values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
- """.stripMargin
- val vchParams: Seq[Object] = Seq(
- pid.asInstanceOf[Object],
- LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")).asInstanceOf[Object],
- vchnum.asInstanceOf[Object],
- bseq.asInstanceOf[Object],
- eseq.asInstanceOf[Object]
- )
- val txnParams: Seq[Seq[Object]] = txns.foldRight(Seq[Seq[Object]]()) { (txnitem,b) =>
- (Seq(pid.asInstanceOf[Object]) ++ ccToList(txnitem)) +: b
- }
- val ctxVchlog = CQLContext().setCommand(insertVchLog, 1,vchParams)
- val ctxTxnlog = CQLContext().setCommand((if(susp) insertSusp else insertTxns),txnParams.size,txnParams)
- val results = for {
- stsVchlog <- cqlExecute(ctxVchlog)
- stsTxnlog <- cqlExecute(ctxTxnlog)
- } yield (stsTxnlog)
- val task = results.value.value
- val cancellableFut = task.runToFuture
- cancellableFut.onComplete {
- case Success(value) =>
- println(s"returned status: $value")
- session.close()
- cluster.close()
- case Failure(ex) =>
- System.err.println(s"ERROR: ${ex.getMessage}")
- session.close()
- cluster.close()
- }
-
- // cqlsys.terminate()
- }
- def getMapFromCC(cc: Product) = cc.getClass.getDeclaredFields.map( _.getName ) // all field names
- .zip( cc.productIterator.to ).toMap // zipped with all values
- def ccFieldsToMap(cc: Product) = {
- val values = cc.productIterator
- cc.getClass.getDeclaredFields.map( _.getName -> (values.next).asInstanceOf[Object] ).toMap
- }
- def ccToList(cc: Product) = {
- val values = cc.productIterator
- cc.getClass.getDeclaredFields.map(_ => (values.next).asInstanceOf[Object] ).toList
- }
- def ccToMap(cc: Product): Map[String, Object] = {
- val values = cc.productIterator
- cc.getClass.getDeclaredFields.map {
- _.getName -> (values.next() match {
- case p: Product if p.productArity > 0 => ccToMap(p)
- case x => x.asInstanceOf[Object]
- })
- }.toMap
- }
- }
用cqlsh: select * from txn_hold 检查了一下,插入数据正确。对于这种批量数据同类处理,可能用akka-stream会更方便高效:
- val actionStreamVs = CassandraActionStream(insertVchLog,vsToParams)
- .setParallelism(2)
- .setProcessOrder(false)
- val actionFlowVs: Flow[Seq[Object],Seq[Object],NotUsed] = actionStreamVs.performOnRow
- val sinkVs = Sink.foreach[Seq[Object]]{ r =>
- log.step(s"insert: $insertVchLog, values: ${r}")
- }
- // insert to vch_log
- val stsVs = Source.fromIterator(() => Seq(vchParams).iterator).via(actionFlowVs).to(sinkVs).run()
-
- val insertTxn = if (susp) insertSusp else insertTxns
- val txnitemToParams: TxnItem => Seq[Object] = txn =>
- (Seq(pid.asInstanceOf[Object]) ++ ccToList(txn))
- val actionStreamTxn = CassandraActionStream(insertTxn,txnitemToParams)
- .setParallelism(2)
- .setProcessOrder(false)
- val actionFlowTxn: Flow[TxnItem,TxnItem,NotUsed] = actionStreamTxn.performOnRow
- val sinkTxn = Sink.foreach[TxnItem]{ r =>
- log.step(s"insert: $insertTxn, values: ${r}")
- }
- // insert to txn_???
- val stsTxn = Source.fromIterator(() => txns.iterator).via(actionFlowTxn).to(sinkTxn).run()
检查cassandra数据库表,结果正确。用stream方式来做重复类型的处理会比较方便,在当前这个例子的场合下建议使用。
好了,完成了事件日志读取和转换成数据行格式并写入数据库表后,下一步就是建一个reader-actor负责完成这一轮工作。这个reader-actor只根据下面这个消息进行相关的工作:
- case class PerformRead(pid: String, vchnum: Int, bseq: Long, eseq: Long)
这个消息描述了读端需要读取的日志记录范围和persistenceId。然后再加一个远程路由remote-router,负责按照某种算法来向各个集群节点上的reader-actor分发读端任务。下面是reader-actor:
- package datatech.cloud.pos
- import akka.actor._
- import akka.cluster._
- import akka.pattern._
- import scala.concurrent.duration._
- import com.typesafe.config.ConfigFactory
- import sdp.logging.LogSupport
- import Messages._
- import Reader._
- object ActionReader {
- def actionReaderProps(trace: Boolean): Props = Props(new ActionReader(trace))
- //backoff suppervisor must use onStop mode
- //respond only to failure of child
- def readerProps(trace:Boolean): Props = {
- val options = BackoffOpts.onFailure(
- childProps = actionReaderProps(trace),
- childName = "cqrs-reader",
- minBackoff = 1 second,
- maxBackoff = 10 seconds,
- randomFactor = 0.20
- ).withMaxNrOfRetries(3)
- BackoffSupervisor.props(options)
- }
- def create(port: Int): Unit = {
- val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
- .withFallback(ConfigFactory.load())
- val system = ActorSystem("cloud-pos-server", config)
- system.actorOf(readerProps(true),"reader")
- }
- }
- class ActionReader(trace: Boolean) extends Actor with LogSupport {
- val cluster = Cluster(context.system)
- val host = Cluster(context.system).selfAddress.host.get
- implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString)
- val readerId = "ActionReader"
- log.stepOn = trace
- log.step(s"${nodeAddress.address}-${readerId}")
- override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
- super.preRestart(reason, message)
- log.step(s"${nodeAddress.address}-${readerId} Restarting for $message ...")
- }
- override def postRestart(reason: Throwable): Unit = {
- super.postRestart(reason)
- log.step(s"${nodeAddress.address}-${readerId} restarted for ${reason.getMessage}.")
- }
- override def postStop(): Unit = {
- log.step(s"${nodeAddress.address}-${readerId} stooped.")
- }
- override def preStart(): Unit = {
- log.step(s"${nodeAddress.address}-${readerId} Starting ...")
- }
- var debugConfig: com.typesafe.config.Config = _
- var debug: Boolean = _
- try {
- debugConfig = ConfigFactory.load("pos.conf").getConfig("pos.server")
- debug = debugConfig.getBoolean("debug")
- }
- catch {
- case _ : Throwable => debug = false
- }
- log.step(s"${nodeAddress.address}-${readerId} debug mode = $debug")
- implicit val debugMode = DebugMode(debug)
- override def receive: Receive = {
- case PerformRead(pid, vchnum, bseq, eseq) =>
- log.step(s"${nodeAddress.address}-${readerId} PerformRead($pid, $vchnum, $bseq, $eseq)")
- readActions(host,bseq,eseq,pid,vchnum)(context.system,context.dispatcher,nodeAddress)
- case msg @ _ =>
- log.step(s"${nodeAddress.address}-${readerId} receive unsupported command:[$msg]")
- }
- }
这是一个在backoffSupervisor后面的actor。remote-router是从配置文件定义创建的:
- akka.actor.deployment {
- /readerRouter/readerRouter = {
- # Router type provided by metrics extension.
- router = cluster-metrics-adaptive-group
- # Router parameter specific for metrics extension.
- # metrics-selector = heap
- # metrics-selector = load
- # metrics-selector = cpu
- metrics-selector = mix
- #
- routees.paths = ["/user/reader"]
- cluster {
- max-nr-of-instances-per-node = 10
- max-total-nr-of-instances = 1000
- enabled = on
- allow-local-routees = on
- }
- }
- }
ReaderRouter的代码如下:
- package datatech.cloud.pos
- import akka.actor._
- import akka.routing._
- import akka.cluster._
- import com.typesafe.config.ConfigFactory
- class ReaderRouter extends Actor {
- val router = context.actorOf(FromConfig.props(), name = "readerRouter")
- def receive: Receive = {
- case msg => router ! msg
- }
- }
- object ReaderRouter {
- var router: ActorRef = _
- def props = Props(new ReaderRouter)
- def create(port: Int) = {
- val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
- .withFallback(ConfigFactory.load())
- val system = ActorSystem("cloud-pos-server",config)
- Cluster(system).registerOnMemberUp{
- router = system.actorOf(props,"readerRouter")
- }
- }
- def getRouter = router
- }
好了,可以写个例子来测试运行这个router/routee。reader-actor所做的工作在前面的讨论里已经测试过了。
- ackage datatech.cloud.pos
- import akka.actor._
- import datatech.cloud.pos.Messages.PerformRead
- object ReaderDemo extends App {
- ActionReader.create(2551)
- ActionReader.create(2552)
- ActionReader.create(2553)
- ReaderRouter.create(2558)
- scala.io.StdIn.readLine()
- val router = ReaderRouter.getRouter
- router ! PerformRead("1022",111,0,Long.MaxValue)
- scala.io.StdIn.readLine()
- router ! PerformRead("1022",222,0,Long.MaxValue)
- scala.io.StdIn.readLine()
- }
在这个例子里我们先在本机的2551,2552,2553端口上部署了routees, 即reader-actor。然后在2558端口部署router,再向router发送任务PerformRead。这里有些东西值得留意:akka-cluster使用了netty,而netty也需要占用一个端口。在配置文件里:
- remote {
- log-remote-lifecycle-events = on
- netty.tcp {
- hostname = "192.168.11.189"
- # port set to 0 for netty to randomly choose from
- port = 0
- }
- }
下面是本次示范的源代码:
project/plugin.sbt
- addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "0.6.1")
- addSbtPlugin("com.lightbend.sbt" % "sbt-javaagent" % "0.1.4") // ALPN agent
build.sbt
- name := "akka-cluster-reader"
-
- version := "0.1"
-
- scalaVersion := "2.12.8"
-
- scalacOptions += "-Ypartial-unification"
-
- // in build.sbt:
- //enablePlugins(AkkaGrpcPlugin)
- // ALPN agent
- //enablePlugins(JavaAgent)
- //javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"
-
- libraryDependencies := Seq(
- "com.typesafe.akka" %% "akka-cluster-metrics" % "2.5.19",
- "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
- "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
- "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.0.1",
- "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",
- "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.0.1",
- "com.typesafe.akka" %% "akka-persistence-query" % "2.5.19",
- "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93",
- "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test,
- "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
- "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",
- "ch.qos.logback" % "logback-classic" % "1.2.3",
- "io.monix" %% "monix" % "3.0.0-RC2",
- "org.typelevel" %% "cats-core" % "2.0.0-M1",
- "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
- "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
-
- )
-
- // (optional) If you need scalapb/scalapb.proto or anything from
- // google/protobuf/*.proto
- //libraryDependencies += "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
-
-
- PB.targets in Compile := Seq(
- scalapb.gen() -> (sourceManaged in Compile).value
- )
resources/application.conf
- akka.actor.warn-about-java-serializer-usage = off
- akka.log-dead-letters-during-shutdown = off
- akka.log-dead-letters = off
- akka.remote.use-passive-connections=off
-
- akka {
- loglevel = INFO
- actor {
- provider = "cluster"
- }
-
- remote {
- log-remote-lifecycle-events = on
- netty.tcp {
- hostname = "192.168.11.189"
- # port set to 0 for netty to randomly choose from
- port = 0
- }
- }
-
- cluster {
- seed-nodes = [
- "akka.tcp://cloud-pos-server@192.168.11.189:2551",
- "akka.tcp://cloud-pos-server@192.168.11.189:2552"
- ]
-
- log-info = off
- sharding {
- role = "shard"
- passivate-idle-entity-after = 10 m
- }
- }
-
- persistence {
- journal.plugin = "cassandra-journal"
- snapshot-store.plugin = "cassandra-snapshot-store"
- }
-
- }
-
- cassandra-journal {
- contact-points = ["192.168.11.189"]
- }
-
- cassandra-snapshot-store {
- contact-points = ["192.168.11.189"]
- }
-
- # Enable metrics extension in akka-cluster-metrics.
- akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
-
- akka.actor.deployment {
- /readerRouter/readerRouter = {
- # Router type provided by metrics extension.
- router = cluster-metrics-adaptive-group
- # Router parameter specific for metrics extension.
- # metrics-selector = heap
- # metrics-selector = load
- # metrics-selector = cpu
- metrics-selector = mix
- #
- routees.paths = ["/user/reader"]
- cluster {
- max-nr-of-instances-per-node = 10
- max-total-nr-of-instances = 1000
- enabled = on
- allow-local-routees = on
- }
- }
- }
-
- dbwork-dispatcher {
- # Dispatcher is the name of the event-based dispatcher
- type = Dispatcher
- # What kind of ExecutionService to use
- executor = "fork-join-executor"
- # Configuration for the fork join pool
- fork-join-executor {
- # Min number of threads to cap factor-based parallelism number to
- parallelism-min = 2
- # Parallelism (threads) ... ceil(available processors * factor)
- parallelism-factor = 2.0
- # Max number of threads to cap factor-based parallelism number to
- parallelism-max = 10
- }
- # Throughput defines the maximum number of messages to be
- # processed per actor before the thread jumps to the next actor.
- # Set to 1 for as fair as possible.
- throughput = 100
- }
ActionReader.scala
- package datatech.cloud.pos
- import akka.actor._
- import akka.cluster._
- import akka.pattern._
- import scala.concurrent.duration._
- import com.typesafe.config.ConfigFactory
- import sdp.logging.LogSupport
- import Messages._
- import Reader._
-
-
-
- object ActionReader {
- def actionReaderProps(trace: Boolean): Props = Props(new ActionReader(trace))
-
- //backoff suppervisor must use onStop mode
- //respond only to failure of child
- def readerProps(trace:Boolean): Props = {
- val options = BackoffOpts.onFailure(
- childProps = actionReaderProps(trace),
- childName = "cqrs-reader",
- minBackoff = 1 second,
- maxBackoff = 10 seconds,
- randomFactor = 0.20
- ).withMaxNrOfRetries(3)
- BackoffSupervisor.props(options)
- }
-
- def create(port: Int): Unit = {
- val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
- .withFallback(ConfigFactory.load())
-
- val system = ActorSystem("cloud-pos-server", config)
- system.actorOf(readerProps(true),"reader")
- }
-
- }
-
- class ActionReader(trace: Boolean) extends Actor with LogSupport {
- val cluster = Cluster(context.system)
- val host = Cluster(context.system).selfAddress.host.get
- implicit val nodeAddress: NodeAddress = NodeAddress(cluster.selfAddress.toString)
- val readerId = "ActionReader"
- log.stepOn = trace
-
- log.step(s"${nodeAddress.address}-${readerId}")
-
- override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
- super.preRestart(reason, message)
- log.step(s"${nodeAddress.address}-${readerId} Restarting for $message ...")
- }
-
- override def postRestart(reason: Throwable): Unit = {
- super.postRestart(reason)
- log.step(s"${nodeAddress.address}-${readerId} restarted for ${reason.getMessage}.")
- }
-
- override def postStop(): Unit = {
- log.step(s"${nodeAddress.address}-${readerId} stooped.")
- }
-
- override def preStart(): Unit = {
- log.step(s"${nodeAddress.address}-${readerId} Starting ...")
- }
-
- var debugConfig: com.typesafe.config.Config = _
- var debug: Boolean = _
- try {
-
- debugConfig = ConfigFactory.load("pos.conf").getConfig("pos.server")
- debug = debugConfig.getBoolean("debug")
- }
- catch {
- case _ : Throwable => debug = false
- }
-
- log.step(s"${nodeAddress.address}-${readerId} debug mode = $debug")
-
- implicit val debugMode = DebugMode(debug)
-
- override def receive: Receive = {
- case PerformRead(pid, vchnum, bseq, eseq) =>
- log.step(s"${nodeAddress.address}-${readerId} PerformRead($pid, $vchnum, $bseq, $eseq)")
- readActions(host,bseq,eseq,pid,vchnum)(context.system,context.dispatcher,nodeAddress)
- case msg @ _ =>
- log.step(s"${nodeAddress.address}-${readerId} receive unsupported command:[$msg]")
- }
-
- }
ReaderRouter.scala
- package datatech.cloud.pos
- import akka.actor._
- import akka.routing._
- import akka.cluster._
- import com.typesafe.config.ConfigFactory
-
-
- class ReaderRouter extends Actor {
-
- val router = context.actorOf(FromConfig.props(), name = "readerRouter")
-
- def receive: Receive = {
- case msg => router ! msg
- }
-
- }
-
- object ReaderRouter {
- var router: ActorRef = _
-
- def props = Props(new ReaderRouter)
-
- def create(port: Int) = {
- val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
- .withFallback(ConfigFactory.load())
-
- val system = ActorSystem("cloud-pos-server",config)
-
- Cluster(system).registerOnMemberUp{
- router = system.actorOf(props,"readerRouter")
- }
-
- }
- def getRouter = router
- }
Reader.scala
- package datatech.cloud.pos
- import akka.actor._
- import akka.stream.scaladsl._
-
- import scala.util._
- import akka._
- import akka.persistence.query._
- import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
-
- import scala.concurrent._
- import akka.stream._
- import sdp.logging._
- import Actions._
- import States._
- import Messages._
- import akka.cluster._
- import DBWriter._
-
- object Reader extends LogSupport {
-
- def readActions(cqlhost: String, startSeq: Long, endSeq: Long, persistenceId: String, vchnum: Int)(implicit sys: ActorSystem, ec: ExecutionContextExecutor, nodeAddress: NodeAddress) = {
- implicit var vchState = VchStates().copy(num = vchnum)
- implicit var vchItems = VchItems()
- implicit var curTxnItem = TxnItem()
- implicit val pid = PID(persistenceId)
- implicit val mat = ActorMaterializer()
-
-
- val readerId = "ActionReader"
-
- // obtain read journal by plugin id
- val readJournal =
- PersistenceQuery(sys).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
-
- // issue query to journal
- val source: Source[EventEnvelope, NotUsed] =
- readJournal.currentEventsByPersistenceId(persistenceId, startSeq, endSeq)
-
- // materialize stream, consuming events
- val futureActions: Future[List[Any]] = source.runFold(List[Any]()) { (lstAny, evl) => evl.event :: lstAny }
-
- futureActions.onComplete {
- case Success(txns) =>
- log.step(s"${nodeAddress.address}-${readerId} recovered actions: $txns")
- buildVoucher(txns)
- case Failure(excpt) =>
- log.error(s"${nodeAddress.address}-${readerId} read actions error: ${excpt.getMessage}")
-
- }
-
- def buildVoucher(actions: List[Any])= {
- actions.reverse.foreach { txn =>
- txn match {
- case EndVoucher(_) =>
- writeTxnsToDB(cqlhost,vchState.num,vchState.susp,vchItems.txnitems)(persistenceId,startSeq,endSeq)
- mat.shutdown()
- case ti@_ =>
- curTxnItem = buildTxnItem(ti.asInstanceOf[Action])
- val sts = updateState(ti.asInstanceOf[Action],0)
- vchState = sts._1
- vchItems = sts._2
- }
- }
- }
- }
-
- }
DBWriter.scala
- package datatech.cloud.pos
- import java.time.LocalDate
- import java.time.format.DateTimeFormatter
- import sdp.logging._
- import Messages._
- import com.datastax.driver.core._
- import akka.actor.ActorSystem
- import akka.stream.ActorMaterializer
- import sdp.cql.engine._
- import CQLEngine._
- import CQLHelpers._
- import com.typesafe.config._
- import akka.stream.scaladsl._
- import akka._
- import scala.concurrent._
-
-
- object DBWriter extends LogSupport {
- var posConfig: com.typesafe.config.Config = _
- def writeTxnsToDB(cqlhost: String, vchnum: Int, susp: Boolean, txns: List[TxnItem])(pid: String, bseq: Long, eseq: Long)(
- implicit sys: ActorSystem, ec: ExecutionContextExecutor, mat: ActorMaterializer, nodeAddress: NodeAddress) = {
-
- val readerId = "DBWriter"
- var cqlport: Int = 9042
- try {
- posConfig = ConfigFactory.load("pos.conf").getConfig("pos.cqlport")
- cqlport = posConfig.getInt("cqlport")
- }
- catch {
- case _ : Throwable => cqlport = 9042
- }
-
- val cluster = new Cluster
- .Builder()
- .addContactPoints(cqlhost)
- .withPort(cqlport)
- .build()
-
- useJava8DateTime(cluster)
- implicit val session = cluster.connect()
- val insertVchLog = """
- |insert into pos_on_cloud.vch_log(
- |terminal,
- |txndate,
- |vchnum,
- |begin_seq,
- |end_seq)
- |values(?,?,?,?,?)
- |""".stripMargin
-
- val insertTxns = """
- |insert into pos_on_cloud.txn_log(
- |terminal,
- |txndate,
- |txntime,
- |opr,
- |num,
- |seq,
- |txntype,
- |salestype,
- |qty,
- |price,
- |amount,
- |disc,
- |dscamt,
- |member,
- |code,
- |acct,
- |dpt)
- |values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
- """.stripMargin
-
- val insertSusp = """
- |insert into pos_on_cloud.txn_hold(
- |terminal,
- |txndate,
- |txntime,
- |opr,
- |num,
- |seq,
- |txntype,
- |salestype,
- |qty,
- |price,
- |amount,
- |disc,
- |dscamt,
- |member,
- |code,
- |acct,
- |dpt)
- |values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
- """.stripMargin
-
- val vchParams: Seq[Object] = Seq(
- pid.asInstanceOf[Object],
- LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd")).asInstanceOf[Object],
- vchnum.asInstanceOf[Object],
- bseq.asInstanceOf[Object],
- eseq.asInstanceOf[Object]
- )
-
- val vsToParams: Seq[Object] => Seq[Object] = vchParams => vchParams
-
- val actionStreamVs = CassandraActionStream(insertVchLog,vsToParams)
- .setParallelism(2)
- .setProcessOrder(false)
- val actionFlowVs: Flow[Seq[Object],Seq[Object],NotUsed] = actionStreamVs.performOnRow
-
- val sinkVs = Sink.foreach[Seq[Object]]{ r =>
- log.step(s"${nodeAddress.address}-${readerId} insert: $insertVchLog, values: ${r}")
- }
- // insert to vch_log
- val stsVs = Source.fromIterator(() => Seq(vchParams).iterator).via(actionFlowVs).to(sinkVs).run()
-
- val insertTxn = if (susp) insertSusp else insertTxns
-
- val txnitemToParams: TxnItem => Seq[Object] = txn =>
- (Seq(pid.asInstanceOf[Object]) ++ ccToList(txn))
-
- val actionStreamTxn = CassandraActionStream(insertTxn,txnitemToParams)
- .setParallelism(2)
- .setProcessOrder(false)
- val actionFlowTxn: Flow[TxnItem,TxnItem,NotUsed] = actionStreamTxn.performOnRow
-
- val sinkTxn = Sink.foreach[TxnItem]{ r =>
- log.step(s"${nodeAddress.address}-${readerId} insert: $insertTxn, values: ${r}")
- }
- // insert to txn_???
- val stsTxn = Source.fromIterator(() => txns.iterator).via(actionFlowTxn).to(sinkTxn).run()
- }
-
- def ccToList(cc: Product) = {
- val values = cc.productIterator
- cc.getClass.getDeclaredFields.map(_ => (values.next).asInstanceOf[Object] ).toList
- }
-
- }
ReaderDemo.scala
- package datatech.cloud.pos
- import akka.actor._
-
- import datatech.cloud.pos.Messages.PerformRead
- object ReaderDemo extends App {
- ActionReader.create(2551)
-
- ActionReader.create(2552)
-
- ActionReader.create(2553)
-
-
- ReaderRouter.create(2558)
-
- scala.io.StdIn.readLine()
-
- val router = ReaderRouter.getRouter
-
- router ! PerformRead("1022",111,0,Long.MaxValue)
-
- scala.io.StdIn.readLine()
-
- router ! PerformRead("1022",222,0,Long.MaxValue)
-
- scala.io.StdIn.readLine()
-
-
- }
States.scala
- package datatech.cloud.pos
- import java.time.LocalDate
- import java.time.LocalDateTime
- import java.time.format.DateTimeFormatter
-
-
- import Messages._
- import sdp.logging._
-
- object Actions {
-
-
- implicit class FoldLeftWhile[A](trav: Seq[A]) {
- def foldLeftWhile[B](z: B)(op: ((B,Boolean), A) => (B, Boolean)): B = {
- def go(acc: (B, Boolean), l: Seq[A]): (B, Boolean) = l match {
- case h +: t =>
- val nacc = op(acc, h)
- if (!nacc._2)
- go(nacc, t)
- else
- nacc
- case _ => acc
- }
- go((z, false), trav)._1
- }
- }
-
-
- case class ReadActions(startSeq: Int, endSeq: Int, persistenceId: String)
-
- sealed trait Action {}
- case class LogOned(opr: String) extends Action
- case object LogOffed extends Action
- case class SuperOned(su: String) extends Action
- case object SuperOffed extends Action
- case class MemberOned(cardnum: String) extends Action
- case object MemberOffed extends Action //remove member status for the voucher
- case object RefundOned extends Action
- case object RefundOffed extends Action
- case object VoidOned extends Action
- case object VoidOffed extends Action
-
-
- case class SalesLogged(acct: String, dpt: String, code: String, qty: Int, price: Int) extends Action
- case class Subtotaled(level: Int) extends Action
- case class Discounted(disctype: Int, grouped: Boolean, code: String, percent: Int) extends Action
-
- case class NewVoucher(vnum: Int) extends Action //新单, reminder for read-side to set new vnum
- case class EndVoucher(vnum: Int) extends Action //单据终结标示
- case object VoidVoucher extends Action
-
-
- case object SuspVoucher extends Action
-
- case class VoucherNumed(fnum: Int, tnum: Int) extends Action
-
- case class PaymentMade(acct: String, num: String, amount: Int) extends Action //settlement 结算支付
-
- }
-
-
- object States extends LogSupport {
- import Actions._
-
- def setShowSteps(b: Boolean) = log.stepOn = b
-
- def buildTxnItem(evt: Action)(implicit vs: VchStates, vi: VchItems): TxnItem = evt match {
- case LogOned(op) => TxnItem(vs).copy(
- txntype = TXNTYPE.logon,
- salestype = SALESTYPE.crd,
- opr = op,
- code = op
- )
- case LogOffed => TxnItem(vs).copy(
- txntype = TXNTYPE.logon,
- salestype = SALESTYPE.crd
- )
- case SuperOned(su) => TxnItem(vs).copy(
- txntype = TXNTYPE.supon,
- salestype = SALESTYPE.crd,
- code = su
- )
- case SuperOffed => TxnItem(vs).copy(
- txntype = TXNTYPE.supon,
- salestype = SALESTYPE.crd
- )
- case MemberOned(cardnum) => TxnItem(vs).copy(
- txntype = TXNTYPE.sales,
- salestype = SALESTYPE.crd,
- member = cardnum
- )
- case MemberOffed => TxnItem(vs).copy(
- txntype = TXNTYPE.sales,
- salestype = SALESTYPE.crd
- )
- case RefundOned => TxnItem(vs).copy(
- txntype = TXNTYPE.refund
- )
- case RefundOffed => TxnItem(vs).copy(
- txntype = TXNTYPE.refund
- )
- case VoidOned => TxnItem(vs).copy(
- txntype = TXNTYPE.void
- )
- case VoidOffed => TxnItem(vs).copy(
- txntype = TXNTYPE.void
- )
- case VoidVoucher => TxnItem(vs).copy(
- txntype = TXNTYPE.voidall,
- code = vs.num.toString,
- acct = vs.num.toString
- )
- case SuspVoucher => TxnItem(vs).copy(
- txntype = TXNTYPE.suspend,
- code = vs.num.toString,
- acct = vs.num.toString
- )
- case Subtotaled(level) =>
- TxnItem(vs).copy(
- txntype = TXNTYPE.sales,
- salestype = SALESTYPE.sub
- )
- case Discounted(dt,gp,code,pct) => TxnItem(vs).copy(
- txntype = TXNTYPE.sales,
- salestype = SALESTYPE.dsc,
- acct = code,
- disc = pct
- )
- case PaymentMade(act,num,amt) => TxnItem(vs).copy(
- txntype = TXNTYPE.sales,
- salestype = SALESTYPE.ttl,
- acct = act,
- code = num,
- amount = amt
- )
-
- case SalesLogged(sacct,sdpt,scode,sqty,sprice) => TxnItem(vs).copy(
- txntype = TXNTYPE.sales,
- salestype = SALESTYPE.itm,
- acct = sacct,
- dpt = sdpt,
- code = scode,
- qty = sqty,
- price = sprice,
- amount = sprice * sqty,
- dscamt = 0
- )
- case _ => TxnItem(vs)
- }
-
- case class VchItems(txnitems: List[TxnItem] = Nil) {
-
- def noSales: Boolean = (txnitems.find(txn => txn.salestype == SALESTYPE.itm)).isEmpty
-
- def subTotal: (Int, Int, Int, Int) = txnitems.foldRight((0, 0, 0, 0)) { case (txn, b) =>
- if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
- b.copy(_1 = b._1 + 1, _2 = b._2 + txn.qty, _3 = b._3 + txn.amount, _4 = b._4 + txn.dscamt)
- else b
- }
-
- def groupTotal(level:Int): (Int, Int, Int, Int) = {
- val gts = txnitems.foldLeftWhile((0, 0, 0, 0, 0)) { case (b,txn) =>
- if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales)
- ((b._1._1 +1,b._1._2 + txn.qty, b._1._3 + txn.amount, b._1._4 + txn.dscamt, b._1._5),false)
- else {
- if (txn.salestype == SALESTYPE.sub) {
- if (((b._1._5) + 1) >= level)
- ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), true)
- else
- ((b._1._1, b._1._2, b._1._3, b._1._4, b._1._5 + 1), false)
- } else b
- }
- }
- (gts._1,gts._2,gts._3,gts._4)
- }
-
- def updateDisc(dt: Int, grouped: Boolean, disc: Int): (List[TxnItem],(Int,Int,Int,Int)) = {
- //(salestype,(cnt,qty,amt,dsc),hassub,list)
- val accu = txnitems.foldLeft((-1, (0,0,0,0), false, List[TxnItem]())) { case (b, txn) =>
- var discAmt = 0
- if ((b._1) < 0) {
- if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) {
- if (txn.dscamt == 0)
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) - (txn.amount * disc / 100)
- ), false, txn.copy(
- dscamt = - (txn.amount * disc / 100)) :: (b._4)))
- else {
- dt match {
- case DISCTYPE.duplicated =>
- if (txn.dscamt != 0) {
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) - (txn.amount + txn.dscamt) * disc / 100
- ), false, txn.copy(
- dscamt = -(txn.amount + txn.dscamt) * disc / 100) :: (b._4)
- ))
- } else {
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) - txn.amount * disc / 100
- ), false, txn.copy(
- dscamt = -txn.amount * disc / 100) :: (b._4)
- ))
- }
- case DISCTYPE.keep => ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) + txn.dscamt), false, txn :: (b._4)))
- case DISCTYPE.best =>
- discAmt = -(txn.amount * disc / 100)
- if (discAmt < txn.dscamt)
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) + discAmt), false, txn.copy(
- dscamt = discAmt
- ) :: (b._4)))
- else
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) + txn.dscamt), false, txn :: (b._4)))
- }
- }
-
- } else ((b._1,b._2,b._3,txn :: (b._4)))
- } else {
- if ((b._3))
- (((b._1), (b._2), true, txn :: (b._4)))
- else {
- if (txn.salestype == SALESTYPE.sub) {
- if (grouped)
- (((b._1), (b._2), true, txn :: (b._4)))
- else
- (((b._1), (b._2), false, txn :: (b._4)))
- } else {
- if (txn.salestype == SALESTYPE.itm && txn.txntype == TXNTYPE.sales) {
- dt match {
- case DISCTYPE.duplicated =>
- if (txn.dscamt != 0) {
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) - (txn.amount + txn.dscamt) * disc / 100), false, txn.copy(
- dscamt = -(txn.amount + txn.dscamt) * disc / 100) :: (b._4)
- ))
- } else {
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) - txn.amount * disc / 100), false, txn.copy(
- dscamt = -(txn.amount * disc / 100)) :: (b._4)
- ))
- }
- case DISCTYPE.keep => ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) + txn.dscamt), false, txn :: (b._4)))
- case DISCTYPE.best =>
- discAmt = -(txn.amount * disc / 100)
- if (discAmt < txn.dscamt)
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) + discAmt), false, txn.copy(
- dscamt = discAmt
- ) :: (b._4)))
- else
- ((txn.salestype, (
- (b._2._1) + 1,
- (b._2._2) + txn.qty,
- (b._2._3) + txn.amount,
- (b._2._4) + txn.dscamt), false, txn :: (b._4)))
- }
- }
- else ((b._1, b._2, b._3, txn :: (b._4)))
- }
- }
- }
-
- }
- (accu._4.reverse,accu._2)
- }
-
- def totalSales: Int = txnitems.foldRight(0) { case (txn, b) =>
- if (txn.salestype == SALESTYPE.itm)
- (txn.amount + txn.dscamt) + b
- else b
-
- /*
- val amt: Int = txn.salestype match {
- case (SALESTYPE.plu | SALESTYPE.cat | SALESTYPE.brd | SALESTYPE.ra) => txn.amount + txn.dscamt
- case _ => 0
- }
- amt + b */
- }
-
- def totalPaid: Int = txnitems.foldRight(0) { case (txn, b) =>
- if (txn.txntype == TXNTYPE.sales && txn.salestype == SALESTYPE.ttl)
- txn.amount + b
- else b
- }
-
- def addItem(item: TxnItem): VchItems = VchItems((item :: txnitems)) //.reverse)
-
- }
-
- def LastSecOfDate(ldate: LocalDate): LocalDateTime = {
- val dtStr = ldate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + " 23:59:59"
- LocalDateTime.parse(dtStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
- }
-
- def dateStr(dt: LocalDate): String = dt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
-
- def updateState(evt: Action, lastSeqNr: BigInt = 0)(implicit nodeAddress: NodeAddress, persistenceId: PID, state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = {
- val (vs, vi) = updateStateImpl(evt, lastSeqNr)
- log.step(s"${nodeAddress.address}-${persistenceId.id} run updateState($evt, $lastSeqNr) with results state[$vs], txns[$vi].")
- (vs, vi)
- }
-
- def updateStateImpl(evt: Action, lastSeqNr: BigInt = 0)(implicit state: VchStates, items: VchItems, curItem: TxnItem): (VchStates, VchItems) = evt match {
- case LogOned(csr) => (state.copy(seq = state.seq + 1, opr = csr, jseq = lastSeqNr), items)
- case LogOffed => (state.copy(seq = state.seq + 1, opr = ""), items)
- case RefundOned => (state.copy(seq = state.seq + 1, refd = true), items)
- case RefundOffed => (state.copy(seq = state.seq + 1, refd = false), items)
- case VoidOned => (state.copy(seq = state.seq + 1, void = true), items)
- case VoidOffed => (state.copy(seq = state.seq + 1, void = false), items)
- case SuperOned(suser) => (state.copy(seq = state.seq + 1, su = suser), items)
- case SuperOffed => (state.copy(seq = state.seq + 1, su = ""), items)
- case MemberOned(num) => (state.copy(seq = state.seq + 1, mbr = num), items)
- case MemberOffed => (state.copy(seq = state.seq + 1, mbr = ""), items)
-
-
- case SalesLogged(_,_,_,_,_) => (state.copy(
- seq = state.seq + 1)
- , items.addItem(curItem))
-
- case Subtotaled(level) =>
- var subs = (0,0,0,0)
- if (level == 0)
- subs = items.subTotal
- else
- subs = items.groupTotal(level)
- val (cnt, tqty, tamt, tdsc) = subs
-
- val subttlItem =
- TxnItem(state).copy(
- txntype = TXNTYPE.sales,
- salestype = SALESTYPE.sub,
- qty = tqty,
- amount = tamt,
- dscamt = tdsc,
- price = cnt
- )
- (state.copy(
- seq = state.seq + 1)
- , items.addItem(subttlItem))
-
- case Discounted(dt,gp,code,pct) =>
- val (lstItems, accum) = items.updateDisc(dt,gp,pct)
- val discItem = TxnItem(state).copy(
- txntype = TXNTYPE.sales,
- salestype = SALESTYPE.dsc,
- acct = code,
- disc = pct,
- price = accum._1,
- qty = accum._2,
- amount = accum._3,
- dscamt = accum._4
- )
- (state.copy(
- seq = state.seq + 1)
- , items.copy(txnitems = lstItems).addItem(discItem))
-
-
- case PaymentMade(_,_,_) =>
- val due = if (items.totalSales > 0) items.totalSales - items.totalPaid else items.totalSales + items.totalPaid
- val bal = if (items.totalSales > 0) due - curItem.amount else due + curItem.amount
- (state.copy(
- seq = state.seq + 1,
- due = (if ((curItem.amount.abs + items.totalPaid.abs) >= items.totalSales.abs) false else true)
- )
- ,items.addItem(curItem.copy(
- salestype = SALESTYPE.ttl,
- price = due,
- amount = curItem.amount,
- dscamt = bal
- )))
-
- case VoucherNumed(_, tnum) =>
- val vi = items.copy(txnitems = items.txnitems.map { it => it.copy(num = tnum) })
- (state.copy(seq = state.seq + 1, num = tnum), vi)
-
- case SuspVoucher => (state.copy(seq = state.seq + 1, susp = true), items)
-
- case VoidVoucher => (state.copy(seq = state.seq + 1, canc = true), items)
-
- case EndVoucher(vnum) => (state.nextVoucher.copy(jseq = lastSeqNr + 1), VchItems())
-
- case _ => (state, items)
- }
-
-
- }
Messages.scala
- package datatech.cloud.pos
-
- import java.time.LocalDate
- import java.time.LocalDateTime
- import java.time.format.DateTimeFormatter
- import java.util.Locale
- import akka.cluster.sharding._
-
- object Messages {
-
- val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA)
-
- sealed trait Command {}
-
- case class LogOn(opr: String, passwd: String) extends Command
- case object LogOff extends Command
- case class SuperOn(su: String, passwd: String) extends Command
- case object SuperOff extends Command
- case class MemberOn(cardnum: String, passwd: String) extends Command
- case object MemberOff extends Command //remove member status for the voucher
- case object RefundOn extends Command
- case object RefundOff extends Command
- case object VoidOn extends Command
- case object VoidOff extends Command
- case object VoidAll extends Command
- case object Suspend extends Command
-
- case class VoucherNum(vnum: Int) extends Command
-
-
-
- case class LogSales(acct: String, dpt: String, code: String, qty: Int, price: Int) extends Command
- case class Subtotal(level: Int) extends Command
- case class Discount(disctype: Int, grouped: Boolean, code: String, percent: Int) extends Command
-
- case class Payment(acct: String, num: String, amount: Int) extends Command //settlement 结算支付
-
- // read only command, no update event
- case class Plu(itemCode: String) extends Command //read only
- case object GetTxnItems extends Command
-
-
- /* discount type */
- object DISCTYPE {
- val duplicated: Int = 0
- val best: Int = 1
- val least: Int = 2
- val keep: Int = 3
- }
-
- /* result message returned to client on the wire */
- object TXNTYPE {
- val sales: Int = 0
- val refund: Int = 1
- val void: Int = 2
- val voided: Int = 3
- val voidall: Int = 4
- val subtotal: Int = 5
- val logon: Int = 6
- val supon: Int = 7 // super user on/off
- val suspend: Int = 8
-
- }
-
- object SALESTYPE {
- val itm: Int = 2
- val sub: Int = 10
- val ttl: Int = 11
- val dsc: Int = 12
- val crd: Int = 13
- }
-
- case class TxnItem(
- txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
- ,txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11)
- ,opr: String = ""//工号
- ,num: Int = 0 //销售单号
- ,seq: Int = 1 //交易序号
- ,txntype: Int = TXNTYPE.sales//交易类型
- ,salestype: Int = SALESTYPE.itm //销售类型
- ,qty: Int = 1 //交易数量
- ,price: Int = 0 //单价(分)
- ,amount: Int = 0 //码洋(分)
- ,disc: Int = 0 //折扣率 (%)
- ,dscamt: Int = 0 //折扣额:负值 net实洋 = amount + dscamt
- ,member: String = "" //会员卡号
- ,code: String = "" //编号(商品、卡号...)
- ,acct: String = "" //账号
- ,dpt: String = "" //部类
- )
- object TxnItem {
- def apply(vs: VchStates): TxnItem = TxnItem().copy(
- opr = vs.opr,
- num = vs.num,
- seq = vs.seq,
- member = vs.mbr
- )
- }
-
- case class VchStatus( //操作状态锁留给前端维护
- qty: Int = 1,
- refund: Boolean = false,
- void: Boolean = false)
-
- case class VchStates(
- opr: String = "", //收款员
- jseq: BigInt = 0, //begin journal sequence for read-side replay
- num: Int = 0, //当前单号
- seq: Int = 0, //当前序号
- void: Boolean = false, //取消模式
- refd: Boolean = false, //退款模式
- susp: Boolean = false, //挂单
- canc: Boolean = false, //废单
- due: Boolean = true, //当前余额
- su: String = "",
- mbr: String = ""
- ) {
-
- def nextVoucher : VchStates = VchStates().copy(
- opr = this.opr,
- jseq = this.jseq + 1,
- num = this.num + 1
- )
- }
-
-
- object STATUS {
- val OK: Int = 0
- val FAIL: Int = -1
- }
-
- case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
-
- /* message on the wire (exchanged message) */
- val shardName = "POSShard"
-
- case class POSMessage(id: Long, cmd: Command) {
- def shopId = id.toString.head.toString
- def posId = id.toString
- }
-
- val getPOSId: ShardRegion.ExtractEntityId = {
- case posCommand: POSMessage => (posCommand.posId,posCommand.cmd)
- }
- val getShopId: ShardRegion.ExtractShardId = {
- case posCommand: POSMessage => posCommand.shopId
- }
-
-
- case object PassivatePOS //passivate message
- case object FilteredOut
- case class DebugMode(debug: Boolean)
- case class NodeAddress(address: String)
- case class PID(id: String)
- case class PerformRead(pid: String, vchnum: Int, bseq: Long, eseq: Long)
- }
cql/CassandraEngine.scala
- package sdp.cql.engine
-
- import akka.NotUsed
- import akka.stream.alpakka.cassandra.scaladsl._
- import akka.stream.scaladsl._
- import com.datastax.driver.core._
- import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
- import protobuf.bytes.Converter._
- import sdp.logging.LogSupport
- import sdp.result.DBOResult._
-
- import scala.collection.JavaConverters._
- import scala.collection.generic.CanBuildFrom
- import scala.concurrent._
-
- object CQLContext {
- // Consistency Levels
- type CONSISTENCY_LEVEL = Int
- val ANY: CONSISTENCY_LEVEL = 0x0000
- val ONE: CONSISTENCY_LEVEL = 0x0001
- val TWO: CONSISTENCY_LEVEL = 0x0002
- val THREE: CONSISTENCY_LEVEL = 0x0003
- val QUORUM : CONSISTENCY_LEVEL = 0x0004
- val ALL: CONSISTENCY_LEVEL = 0x0005
- val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006
- val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007
- val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A
- val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B
- val SERIAL: CONSISTENCY_LEVEL = 0x000C
-
- def apply(): CQLUpdateContext = CQLUpdateContext(statements = Nil)
-
- def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
- consistency match {
- case ALL => ConsistencyLevel.ALL
- case ONE => ConsistencyLevel.ONE
- case TWO => ConsistencyLevel.TWO
- case THREE => ConsistencyLevel.THREE
- case ANY => ConsistencyLevel.ANY
- case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
- case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
- case QUORUM => ConsistencyLevel.QUORUM
- case SERIAL => ConsistencyLevel.SERIAL
- case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
-
- }
- }
-
- }
- case class CQLQueryContext(
- statement: String,
- parameters: Seq[Object] = Nil,
- consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
- fetchSize: Int = 100
- ) { ctx =>
- def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext =
- ctx.copy(consistency = Some(_consistency))
- def setFetchSize(pageSize: Int): CQLQueryContext =
- ctx.copy(fetchSize = pageSize)
- def setParameters(param: Seq[Object]): CQLQueryContext =
- ctx.copy(parameters = param)
-
- def toProto = new sdp.grpc.services.ProtoCQLQuery(
- statement = this.statement,
- parameters = { if (this.parameters == Nil) None
- else Some(sdp.grpc.services.ProtoAny(marshal(this.parameters))) },
- consistency = this.consistency,
- fetchSize = this.fetchSize
- )
- }
- object CQLQueryContext {
- def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameters = param)
- def fromProto(proto: sdp.grpc.services.ProtoCQLQuery) =
- new CQLQueryContext(
- statement = proto.statement,
- parameters =
- proto.parameters match {
- case None => Nil
- case Some(so) =>
- if (so.value == _root_.com.google.protobuf.ByteString.EMPTY)
- Nil
- else
- unmarshal[Seq[Object]](so.value)
- },
- consistency = proto.consistency,
- fetchSize = proto.fetchSize
- )
- }
-
- case class CQLUpdateContext(
- statements: Seq[String],
- parameters: Seq[Seq[Object]] = Nil,
- psize: Int = 0,
- consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
- batch: Boolean = false
- ) extends LogSupport { ctx =>
- def setBatch(bat: Boolean) = ctx.copy(batch = bat)
- def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLUpdateContext =
- ctx.copy(consistency = Some(_consistency))
- def setCommand(_statement: String, _psize: Int, _parameters: Object*): CQLUpdateContext = {
- log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
- var _params = Seq[Seq[Object]]()
- if ( _psize > 0) {
- if (_psize == 1)
- _params = Seq(_parameters.asInstanceOf[Seq[Object]])
- else
- _params = _parameters.asInstanceOf[Seq[Seq[Object]]]
- }
- val nc = ctx.copy(statements = Seq(_statement), psize = _psize, parameters = _params)
- log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- }
- def appendCommand(_statement: String, _parameters: Object*): CQLUpdateContext = {
- log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
- val nc = ctx.copy(statements = ctx.statements :+ _statement,
- parameters = ctx.parameters ++ Seq(_parameters))
- log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}")
- nc
- }
-
- def toProto = new sdp.grpc.services.ProtoCQLUpdate(
- statements = this.statements,
- parameters = { if (this.parameters == Nil) None
- else Some(sdp.grpc.services.ProtoAny(marshal(this.parameters))) },
- consistency = this.consistency,
- batch = Some(this.batch)
- )
- }
-
- object CQLUpdateContext {
- def fromProto(proto: sdp.grpc.services.ProtoCQLUpdate) =
- new CQLUpdateContext(
- statements = proto.statements,
- parameters =
- proto.parameters match {
- case None => Nil
- case Some(so) =>
- if (so.value == _root_.com.google.protobuf.ByteString.EMPTY)
- Nil
- else
- unmarshal[Seq[Seq[Object]]](so.value)
- },
- consistency = proto.consistency,
- batch = if(proto.batch == None) false else proto.batch.get
- )
- }
-
- object CQLEngine extends LogSupport {
- import CQLContext._
- import CQLHelpers._
-
- import scala.concurrent.Await
- import scala.concurrent.duration._
-
- def fetchResult[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100
- ,extractor: Row => A)(
- implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): DBOResult[C[A]]= {
-
- val prepStmt = session.prepare(ctx.statement)
-
- var boundStmt = prepStmt.bind()
- var params: Seq[Object] = Nil
- if (ctx.parameters != Nil) {
- try {
- params = processParameters(ctx.parameters)
- boundStmt = prepStmt.bind(params: _*)
- }
- catch {
- case e: Exception =>
- log.error(s"fetchResult> prepStmt.bind error: ${e.getMessage}")
- Left(new RuntimeException(s"fetchResult> prepStmt.bind Error: ${e.getMessage}"))
- }
- }
- log.info(s"fetchResult> statement: ${prepStmt.getQueryString}, parameters: ${params}")
-
- try {
- ctx.consistency.foreach { consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))
- }
-
- val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
- val rows = resultSet.asScala.view.map(extractor).to[C]
- valueToDBOResult(rows)
- /*
- val ores = if(rows.isEmpty) None else Some(rows)
- optionToDBOResult(ores: Option[C[A]]) */
- }
- catch {
- case e: Exception =>
- log.error(s"fetchResult> runtime error: ${e.getMessage}")
- Left(new RuntimeException(s"fetchResult> Error: ${e.getMessage}"))
- }
- }
-
- def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100
- ,extractor: Row => A)(
- implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
-
- val prepStmt = session.prepare(ctx.statement)
-
- var boundStmt = prepStmt.bind()
- var params: Seq[Object] = Nil
- if (ctx.parameters != Nil) {
- params = processParameters(ctx.parameters)
- boundStmt = prepStmt.bind(params:_*)
- }
- log.info(s"fetchResultPage> statement: ${prepStmt.getQueryString}, parameters: ${params}")
-
- ctx.consistency.foreach {consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
-
- val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
- (resultSet,(resultSet.asScala.view.map(extractor)).to[C])
- }
-
- def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
- extractor: Row => A)(implicit ec: ExecutionContext, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
- if (resultSet.isFullyFetched) {
- (resultSet, None)
- } else {
- try {
- val result = Await.result((resultSet.fetchMoreResults()).asScala, timeOut)
- (result, Some((result.asScala.view.map(extractor)).to[C]))
- } catch { case e: Throwable => (resultSet, None) }
- }
-
- def cqlExecute(ctx: CQLUpdateContext)(
- implicit session: Session, ec: ExecutionContext): DBOResult[Boolean] = {
- var ctxparameters = Seq[Seq[Object]]()
- if (ctx.parameters != Nil)
- if (ctx.parameters.head != Nil) {
- ctxparameters = ctx.parameters.asInstanceOf[Seq[Seq[Seq[Object]]]].head
- }
-
- var invalidBat = false
- if ( ctx.batch ) {
- if (ctxparameters == Nil)
- invalidBat = true
- else if (ctxparameters.size < 2)
- invalidBat = true;
- }
- if (!ctx.batch || invalidBat) {
- if(invalidBat)
- log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
-
- if (ctx.statements.size == 1 && ctx.psize <= 1) {
- var param: Seq[Seq[Object]] = Nil
- if (ctxparameters != Nil)
- param = ctxparameters
- log.info(s"cqlExecute> single-command: statement: ${ctx.statements.head} parameters: ${param}")
- cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
- }
- else {
- var params: Seq[Seq[Object]] = ctxparameters
- var ctxstatements = ctx.statements
- if (ctxparameters.size < ctx.statements.size) {
- log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
- val pnils = Seq.fill(ctx.statements.length - ctxparameters.size)(Nil)
- params = ctxparameters ++ pnils
- }
- else {
- if (ctx.statements.size < ctxparameters.size) {
- log.warn(s"cqlExecute> fewer statements than parameters! pad with 'head'.")
- val heads = Seq.fill(ctxparameters.size - ctx.statements.size)(ctx.statements.head)
- ctxstatements = ctx.statements ++ heads
- }
- }
-
- val commands: Seq[(String, Seq[Object])] = ctxstatements zip params
- log.info(s"cqlExecute> multi-commands: ${commands}")
- /*
- //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
- //therefore, make sure no command replies on prev command effect
- val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
- cqlSingleUpdate(ctx.consistency, stmt, param)
- }.toList
-
- val futList = Future.sequence(lstCmds).map(_ => true) //must map to execute
- */
- /*
- //using traverse to have some degree of parallelism = max(runtimes)
- //therefore, make sure no command replies on prev command effect
- val futList = Future.traverse(commands) { case (stmt,param) =>
- cqlSingleUpdate(ctx.consistency, stmt, param)
- }.map(_ => true)
-
- Await.result(futList, 3 seconds)
- Future.successful(true)
- */
- // run sync directly
- try {
- commands.foreach { case (stm, pars) =>
- cqlExecuteSync(ctx.consistency, stm, pars)
- }
- Right(true)
- }
- catch {
- case e: Exception =>
- log.error(s"cqlExecute> runtime error: ${e.getMessage}")
- Left(new RuntimeException(s"cqlExecute> Error: ${e.getMessage}"))
- }
- }
- }
- else
- cqlBatchUpdate(ctx)
- }
- def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Seq[Object]])(
- implicit session: Session, ec: ExecutionContext): DBOResult[Boolean] = {
-
- val prepStmt = session.prepare(stmt)
-
- var boundStmt = prepStmt.bind()
- var pars: Seq[Seq[Object]] = Nil
- if (params != Nil) {
- try {
- pars = params.map(processParameters(_))
- boundStmt = prepStmt.bind(pars.head: _*)
- }
- catch {
- case e: Exception =>
- log.error(s"cqlSingleUpdate> prepStmt.bind error: ${e.getMessage}")
- Left(new RuntimeException(s"cqlSingleUpdate> prepStmt.bind Error: ${e.getMessage}"))
- }
- }
- log.info(s"cqlSingleUpdate> statement: ${prepStmt.getQueryString}, parameters: ${pars}")
-
- try {
- cons.foreach { consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))
- }
- val res = session.execute(boundStmt) //executeAsync(boundStmt).map(_.wasApplied())
- Right(res.wasApplied())
- }
- catch {
- case e: Exception =>
- log.error(s"cqlExecute> runtime error: ${e.getMessage}")
- Left(new RuntimeException(s"cqlExecute> Error: ${e.getMessage}"))
- }
- }
-
- def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
- implicit session: Session, ec: ExecutionContext): Boolean = {
-
- val prepStmt = session.prepare(stmt)
-
- var boundStmt = prepStmt.bind()
- var pars: Seq[Object] = Nil
- if (params != Nil) {
- pars = processParameters(params)
- boundStmt = prepStmt.bind(pars: _*)
- }
- log.info(s"cqlExecuteSync> statement: ${prepStmt.getQueryString}, parameters: ${pars}")
-
- cons.foreach { consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))
- }
- session.execute(boundStmt).wasApplied()
-
- }
-
- def cqlBatchUpdate(ctx: CQLUpdateContext)(
- implicit session: Session, ec: ExecutionContext): DBOResult[Boolean] = {
- var ctxparameters = Seq[Seq[Object]]()
- if(ctx.parameters != Nil)
- if (ctx.parameters.head != Nil)
- ctxparameters = ctx.parameters.asInstanceOf[Seq[Seq[Seq[Object]]]].head
- var params: Seq[Seq[Object]] = ctxparameters
- var ctxstatements = ctx.statements
- if (ctxparameters.size < ctx.statements.size) {
- log.warn(s"cqlBatchUpdate> fewer parameters than statements! pad with 'Nil'.")
- val pnils = Seq.fill(ctx.statements.length - ctxparameters.size)(Nil)
- params = ctxparameters ++ pnils
- }
- else {
- if (ctx.statements.size < ctxparameters.size) {
- log.warn(s"cqlBatchUpdate> fewer statements than parameters! pad with 'head'.")
- val heads = Seq.fill(ctxparameters.size - ctx.statements.size)(ctx.statements.head)
- ctxstatements = ctx.statements ++ heads
- }
- }
-
- val commands: Seq[(String, Seq[Object])] = ctxstatements zip params
- log.info(s"cqlBatchUpdate> batch-commands: ${commands}")
-
-
-
- var batch = new BatchStatement()
- try {
- commands.foreach { cmd =>
- val prepStmt = session.prepare(cmd._1)
- log.info(s"cqlBatchUpdate> batch with statement: ${cmd._1}, raw parameter: ${cmd._2}")
- if (cmd._2 == Nil) {
- val pars = processParameters(cmd._2)
- log.info(s"cqlBatchUpdate> batch with cooked parameters: ${pars}")
- batch.add(prepStmt.bind(pars: _*))
- } else {
- log.info(s"cqlBatchUpdate> batch with no parameter")
- batch.add(prepStmt.bind())
- }
- }
- ctx.consistency.foreach { consistency =>
- batch.setConsistencyLevel(consistencyLevel(consistency))
- }
- val res = session.execute(batch) //session.executeAsync(batch).map(_.wasApplied())
- Right(res.wasApplied())
- }
- catch {
- case e: Exception =>
- log.error(s"cqlBatchUpdate> runtime error: ${e.getMessage}")
- Left(new RuntimeException(s"cqlBatchUpdate> Error: ${e.getMessage}"))
- }
-
- }
-
- def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A)
- (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {
-
- val prepStmt = session.prepare(ctx.statement)
- var boundStmt = prepStmt.bind()
- val params = processParameters(ctx.parameters)
- boundStmt = prepStmt.bind(params:_*)
- ctx.consistency.foreach {consistency =>
- boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
-
- log.info(s"cassandraStream> statement: ${prepStmt.getQueryString}, parameters: ${params}")
- CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor)
- }
-
- case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
- statement: String, prepareParams: R => Seq[Object],
- consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
- def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
- def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
- def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
- cas.copy(consistency = Some(_consistency))
-
- def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
- var prepStmt = session.prepare(statement)
- var boundStmt = prepStmt.bind()
- val params = processParameters(prepareParams(r))
- boundStmt = prepStmt.bind(params: _*)
- consistency.foreach { cons =>
- boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
- }
- log.info(s"CassandraActionStream.perform> statement: ${prepStmt.getQueryString}, parameters: ${params}")
- import monix.eval.Task
- import monix.execution.Scheduler.Implicits.global
- session.execute(boundStmt)
- Task.now {r}.runToFuture
- //session.executeAsync(boundStmt).map(_ => r)
- }
-
- def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
- if (processInOrder)
- Flow[R].mapAsync(parallelism)(perform)
- else
- Flow[R].mapAsyncUnordered(parallelism)(perform)
-
- def unloggedBatch[K](statementBinder: (
- R, PreparedStatement) => BoundStatement,partitionKey: R => K)(
- implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = {
- val preparedStatement = session.prepare(statement)
- log.info(s"CassandraActionStream.unloggedBatch> statement: ${preparedStatement.getQueryString}")
- CassandraFlow.createUnloggedBatchWithPassThrough[R, K](
- parallelism,
- preparedStatement,
- statementBinder,
- partitionKey)
- }
-
- }
- object CassandraActionStream {
- def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
- new CassandraActionStream[R]( statement=_statement, prepareParams = params)
- }
-
-
- }
-
- object CQLHelpers extends LogSupport {
- import java.io._
- import java.nio.ByteBuffer
- import java.nio.file._
- import java.time.Instant
- import scala.util.Try
-
-
- import akka.stream._
- import akka.stream.scaladsl._
- import com.datastax.driver.core.LocalDate
- import com.datastax.driver.extras.codecs.jdk8.InstantCodec
- import java.util.concurrent.Executor
- /*
- implicit def listenableFutureToFuture[T](
- listenableFuture: ListenableFuture[T]): Future[T] = {
- val promise = Promise[T]()
- Futures.addCallback(listenableFuture, new FutureCallback[T] {
- def onFailure(error: Throwable): Unit = {
- promise.failure(error)
- ()
- }
- def onSuccess(result: T): Unit = {
- promise.success(result)
- ()
- }
- })
- promise.future
- } */
-
- implicit def listenableFutureToFuture[A](lf: ListenableFuture[A])(implicit executionContext: ExecutionContext): Future[A] = {
- val promise = Promise[A]
- lf.addListener(new Runnable {
- def run() = promise.complete(Try(lf.get()))
- }, executionContext.asInstanceOf[Executor])
- promise.future
- }
-
- implicit class ListenableFutureConverter[A](val lf: ListenableFuture[A]) extends AnyVal {
- def asScala(implicit ec: ExecutionContext): Future[A] = {
- val promise = Promise[A]
- lf.addListener(new Runnable {
- def run() = promise.complete(Try(lf.get()))
- }, ec.asInstanceOf[Executor])
- promise.future
- }
- }
- /*
- implicit def toScalaFuture[A](a: ListenableFuture[A])(implicit ec: ExecutionContext): Future[A] = {
- val promise = Promise[A]()
- a.addListener(new Runnable {
- def run() = {
- try {
- promise.success(a.get)
- } catch {
- case ex: ExecutionException => promise.failure(ex.getCause)
- case ex => promise.failure(ex)
- }
- }
- }, ec.asInstanceOf[Executor])
- promise.future
- } */
-
- case class CQLDate(year: Int, month: Int, day: Int)
- case object CQLTodayDate
- case class CQLDateTime(year: Int, Month: Int,
- day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
- case object CQLDateTimeNow
-
- def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate =
- dateToConvert.toInstant()
- .atZone(java.time.ZoneId.systemDefault())
- .toLocalDate()
-
- def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime =
- dateToConvert.toInstant()
- .atZone(java.time.ZoneId.systemDefault())
- .toLocalTime()
-
- def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime=
- new java.sql.Timestamp(
- dateToConvert.getTime()
- ).toLocalDateTime()
-
- def processParameters(params: Seq[Object]): Seq[Object] = {
- import java.time.{Clock, ZoneId}
- log.info(s"[processParameters] input: ${params}")
- val outParams = params.map { obj =>
- obj match {
- case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
- case CQLTodayDate =>
- val today = java.time.LocalDate.now()
- LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
- case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
- case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
- Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
- case p@_ => p
- }
- }
- log.info(s"[processParameters] output: ${params}")
- outParams
- }
- class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
- override def read: Int = {
- if (!buf.hasRemaining) return -1
- buf.get
- }
-
- override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
- val length: Int = Math.min(len, buf.remaining)
- buf.get(bytes, off, length)
- length
- }
- }
- object ByteBufferInputStream {
- def apply(buf: ByteBuffer): ByteBufferInputStream = {
- new ByteBufferInputStream(buf)
- }
- }
- class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {
-
- override def write(b: Int): Unit = {
- buf.put(b.toByte)
- }
-
- override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
- buf.put(bytes, off, len)
- }
- }
- object FixsizedByteBufferOutputStream {
- def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
- }
- class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {
-
- private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR
-
- override def write(b: Array[Byte], off: Int, len: Int): Unit = {
- val position = buf.position
- val limit = buf.limit
- val newTotal: Long = position + len
- if(newTotal > limit){
- var capacity = (buf.capacity * increasing)
- while(capacity <= newTotal){
- capacity = (capacity*increasing)
- }
- increase(capacity.toInt)
- }
-
- buf.put(b, 0, len)
- }
-
- override def write(b: Int): Unit= {
- if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
- buf.put(b.toByte)
- }
- protected def increase(newCapacity: Int): Unit = {
- buf.limit(buf.position)
- buf.rewind
- val newBuffer =
- if (onHeap) ByteBuffer.allocate(newCapacity)
- else ByteBuffer.allocateDirect(newCapacity)
- newBuffer.put(buf)
- buf.clear
- buf = newBuffer
- }
- def size: Long = buf.position
- def capacity: Long = buf.capacity
- def byteBuffer: ByteBuffer = buf
- }
- object ExpandingByteBufferOutputStream {
- val DEFAULT_INCREASING_FACTOR = 1.5f
- def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
- if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
- val buffer: ByteBuffer =
- if (onHeap) ByteBuffer.allocate(size)
- else ByteBuffer.allocateDirect(size)
- new ExpandingByteBufferOutputStream(buffer,onHeap)
- }
- def apply(size: Int): ExpandingByteBufferOutputStream = {
- apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
- }
-
- def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
- apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
- }
-
- def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
- apply(size, increasingBy, false)
- }
-
- }
- def cqlFileToBytes(fileName: String): ByteBuffer = {
- val fis = new FileInputStream(fileName)
- val b = new Array[Byte](fis.available + 1)
- val length = b.length
- fis.read(b)
- ByteBuffer.wrap(b)
- }
- def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
- implicit mat: Materializer): Future[IOResult] = {
- val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
- source.runWith(FileIO.toPath(Paths.get(fileName)))
- }
- def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
- val outputFormat = new java.text.SimpleDateFormat(fmt)
- outputFormat.format(date)
- }
- def useJava8DateTime(cluster: Cluster) = {
- //for jdk8 datetime format
- cluster.getConfiguration().getCodecRegistry()
- .register(InstantCodec.instance)
- }
- }
mgo/MGOProtoConversion.scala
- package sdp.mongo.engine
- import org.mongodb.scala.bson.collection.immutable.Document
- import org.bson.conversions.Bson
- import sdp.grpc.services._
- import protobuf.bytes.Converter._
- import MGOClasses._
- import MGOAdmins._
- import MGOCommands._
- import org.bson.BsonDocument
- import org.bson.codecs.configuration.CodecRegistry
- import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
- import org.mongodb.scala.FindObservable
-
- object MGOProtoConversion {
-
- type MGO_COMMAND_TYPE = Int
- val MGO_COMMAND_FIND = 0
- val MGO_COMMAND_COUNT = 20
- val MGO_COMMAND_DISTICT = 21
- val MGO_COMMAND_DOCUMENTSTREAM = 1
- val MGO_COMMAND_AGGREGATE = 2
- val MGO_COMMAND_INSERT = 3
- val MGO_COMMAND_DELETE = 4
- val MGO_COMMAND_REPLACE = 5
- val MGO_COMMAND_UPDATE = 6
-
-
- val MGO_ADMIN_DROPCOLLECTION = 8
- val MGO_ADMIN_CREATECOLLECTION = 9
- val MGO_ADMIN_LISTCOLLECTION = 10
- val MGO_ADMIN_CREATEVIEW = 11
- val MGO_ADMIN_CREATEINDEX = 12
- val MGO_ADMIN_DROPINDEXBYNAME = 13
- val MGO_ADMIN_DROPINDEXBYKEY = 14
- val MGO_ADMIN_DROPALLINDEXES = 15
-
-
- case class AdminContext(
- tarName: String = "",
- bsonParam: Seq[Bson] = Nil,
- options: Option[Any] = None,
- objName: String = ""
- ){
- def toProto = sdp.grpc.services.ProtoMGOAdmin(
- tarName = this.tarName,
- bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
- objName = this.objName,
- options = this.options.map(b => ProtoAny(marshal(b)))
-
- )
- }
-
- object AdminContext {
- def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
- tarName = msg.tarName,
- bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
- objName = msg.objName,
- options = msg.options.map(b => unmarshal[Any](b.value))
- )
- }
-
- case class Context(
- dbName: String = "",
- collName: String = "",
- commandType: MGO_COMMAND_TYPE,
- bsonParam: Seq[Bson] = Nil,
- resultOptions: Seq[ResultOptions] = Nil,
- options: Option[Any] = None,
- documents: Seq[Document] = Nil,
- targets: Seq[String] = Nil,
- only: Boolean = false,
- adminOptions: Option[AdminContext] = None
- ){
-
- def toProto = new sdp.grpc.services.ProtoMGOContext(
- dbName = this.dbName,
- collName = this.collName,
- commandType = this.commandType,
- bsonParam = this.bsonParam.map(bsonToProto),
- resultOptions = this.resultOptions.map(_.toProto),
- options = { if(this.options == None)
- None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else
- Some(ProtoAny(marshal(this.options.get))) },
- documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
- targets = this.targets,
- only = Some(this.only),
- adminOptions = this.adminOptions.map(_.toProto)
- )
-
- }
-
- object MGODocument {
- def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
- unmarshal[Document](msg.document)
- def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
- new ProtoMGODocument(marshal(doc))
- }
-
- object MGOProtoMsg {
- def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
- dbName = msg.dbName,
- collName = msg.collName,
- commandType = msg.commandType,
- bsonParam = msg.bsonParam.map(protoToBson),
- resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
- options = msg.options.map(a => unmarshal[Any](a.value)),
- documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
- targets = msg.targets,
- adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
- )
- }
-
- def bsonToProto(bson: Bson) =
- ProtoMGOBson(marshal(bson.toBsonDocument(
- classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
-
- def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
- val bsdoc = unmarshal[BsonDocument](proto.bson)
- override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
- }
-
- def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
- case MGO_COMMAND_FIND => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Find())
- )
- def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
- rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
-
- (proto.bsonParam, proto.resultOptions, proto.only) match {
- case (Nil, Nil, None) => ctx
- case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
- case (bp,Nil,None) => ctx.setCommand(
- Find(filter = Some(protoToBson(bp.head))))
- case (bp,Nil,Some(b)) => ctx.setCommand(
- Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
- case (bp,fo,None) => {
- ctx.setCommand(
- Find(filter = Some(protoToBson(bp.head)),
- andThen = fo.map(ResultOptions.fromProto)
- ))
- }
- case (bp,fo,Some(b)) => {
- ctx.setCommand(
- Find(filter = Some(protoToBson(bp.head)),
- andThen = fo.map(ResultOptions.fromProto),
- firstOnly = b))
- }
- case _ => ctx
- }
- }
- case MGO_COMMAND_COUNT => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Count())
- )
- (proto.bsonParam, proto.options) match {
- case (Nil, None) => ctx
- case (bp, None) => ctx.setCommand(
- Count(filter = Some(protoToBson(bp.head)))
- )
- case (Nil,Some(o)) => ctx.setCommand(
- Count(options = Some(unmarshal[Any](o.value)))
- )
- case _ => ctx
- }
- }
- case MGO_COMMAND_DISTICT => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Distict(fieldName = proto.targets.head))
- )
- (proto.bsonParam) match {
- case Nil => ctx
- case bp: Seq[ProtoMGOBson] => ctx.setCommand(
- Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
- )
- case _ => ctx
- }
- }
- case MGO_COMMAND_AGGREGATE => {
- new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
- )
- }
- case MGO_ADMIN_LISTCOLLECTION => {
- new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_QUERY,
- action = Some(ListCollection(proto.dbName)))
- }
- case MGO_COMMAND_INSERT => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_UPDATE,
- action = Some(Insert(
- newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(Insert(
- newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_COMMAND_DELETE => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_UPDATE,
- action = Some(Delete(
- filter = protoToBson(proto.bsonParam.head)))
- )
- (proto.options, proto.only) match {
- case (None,None) => ctx
- case (None,Some(b)) => ctx.setCommand(Delete(
- filter = protoToBson(proto.bsonParam.head),
- onlyOne = b))
- case (Some(o),None) => ctx.setCommand(Delete(
- filter = protoToBson(proto.bsonParam.head),
- options = Some(unmarshal[Any](o.value)))
- )
- case (Some(o),Some(b)) => ctx.setCommand(Delete(
- filter = protoToBson(proto.bsonParam.head),
- options = Some(unmarshal[Any](o.value)),
- onlyOne = b)
- )
- }
- }
- case MGO_COMMAND_REPLACE => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_UPDATE,
- action = Some(Replace(
- filter = protoToBson(proto.bsonParam.head),
- replacement = unmarshal[Document](proto.documents.head.document)))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(Replace(
- filter = protoToBson(proto.bsonParam.head),
- replacement = unmarshal[Document](proto.documents.head.document),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_COMMAND_UPDATE => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_UPDATE,
- action = Some(Update(
- filter = protoToBson(proto.bsonParam.head),
- update = protoToBson(proto.bsonParam.tail.head)))
- )
- (proto.options, proto.only) match {
- case (None,None) => ctx
- case (None,Some(b)) => ctx.setCommand(Update(
- filter = protoToBson(proto.bsonParam.head),
- update = protoToBson(proto.bsonParam.tail.head),
- onlyOne = b))
- case (Some(o),None) => ctx.setCommand(Update(
- filter = protoToBson(proto.bsonParam.head),
- update = protoToBson(proto.bsonParam.tail.head),
- options = Some(unmarshal[Any](o.value)))
- )
- case (Some(o),Some(b)) => ctx.setCommand(Update(
- filter = protoToBson(proto.bsonParam.head),
- update = protoToBson(proto.bsonParam.tail.head),
- options = Some(unmarshal[Any](o.value)),
- onlyOne = b)
- )
- }
- }
- case MGO_ADMIN_DROPCOLLECTION =>
- new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(DropCollection(proto.collName))
- )
- case MGO_ADMIN_CREATECOLLECTION => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(CreateCollection(proto.collName))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_CREATEVIEW => {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(CreateView(viewName = proto.targets.head,
- viewOn = proto.targets.tail.head,
- pipeline = proto.bsonParam.map(p => protoToBson(p))))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
- viewOn = proto.targets.tail.head,
- pipeline = proto.bsonParam.map(p => protoToBson(p)),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_CREATEINDEX=> {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_DROPINDEXBYNAME=> {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(DropIndexByName(indexName = proto.targets.head))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_DROPINDEXBYKEY=> {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
- case MGO_ADMIN_DROPALLINDEXES=> {
- var ctx = new MGOContext(
- dbName = proto.dbName,
- collName = proto.collName,
- actionType = MGO_ADMIN,
- action = Some(DropAllIndexes())
- )
- proto.options match {
- case None => ctx
- case Some(o) => ctx.setCommand(DropAllIndexes(
- options = Some(unmarshal[Any](o.value)))
- )
- }
- }
-
- }
-
- def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
- case None => None
- case Some(act) => act match {
- case Count(filter, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_COUNT,
- bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
- else Seq(bsonToProto(filter.get))},
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
- case Distict(fieldName, filter) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_DISTICT,
- bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
- else Seq(bsonToProto(filter.get))},
- targets = Seq(fieldName)
-
- ))
-
- case Find(filter, andThen, firstOnly) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_FIND,
- bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
- else Seq(bsonToProto(filter.get))},
- resultOptions = andThen.map(_.toProto)
- ))
-
- case Aggregate(pipeLine) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_AGGREGATE,
- bsonParam = pipeLine.map(bsonToProto)
- ))
-
- case Insert(newdocs, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_INSERT,
- documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
-
- case Delete(filter, options, onlyOne) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_DELETE,
- bsonParam = Seq(bsonToProto(filter)),
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) },
- only = Some(onlyOne)
- ))
-
- case Replace(filter, replacement, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_REPLACE,
- bsonParam = Seq(bsonToProto(filter)),
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) },
- documents = Seq(ProtoMGODocument(marshal(replacement)))
- ))
-
- case Update(filter, update, options, onlyOne) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_COMMAND_UPDATE,
- bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) },
- only = Some(onlyOne)
- ))
-
-
- case DropCollection(coll) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = coll,
- commandType = MGO_ADMIN_DROPCOLLECTION
- ))
-
- case CreateCollection(coll, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = coll,
- commandType = MGO_ADMIN_CREATECOLLECTION,
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
-
- case ListCollection(dbName) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- commandType = MGO_ADMIN_LISTCOLLECTION
- ))
-
- case CreateView(viewName, viewOn, pipeline, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_CREATEVIEW,
- bsonParam = pipeline.map(bsonToProto),
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) },
- targets = Seq(viewName,viewOn)
- ))
-
- case CreateIndex(key, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_CREATEINDEX,
- bsonParam = Seq(bsonToProto(key)),
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
-
-
- case DropIndexByName(indexName, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_DROPINDEXBYNAME,
- targets = Seq(indexName),
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
-
- case DropIndexByKey(key, options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_DROPINDEXBYKEY,
- bsonParam = Seq(bsonToProto(key)),
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
-
-
- case DropAllIndexes(options) =>
- Some(new sdp.grpc.services.ProtoMGOContext(
- dbName = ctx.dbName,
- collName = ctx.collName,
- commandType = MGO_ADMIN_DROPALLINDEXES,
- options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
- else Some(ProtoAny(marshal(options.get))) }
- ))
-
- }
- }
-
- }
mgo/ObservableToPublisher.scala
- package sdp.mongo.engine
-
- import java.util.concurrent.atomic.AtomicBoolean
-
- import org.mongodb.{scala => mongoDB}
- import org.{reactivestreams => rxStreams}
-
- final case class ObservableToPublisher[T](observable: mongoDB.Observable[T])
- extends rxStreams.Publisher[T] {
- def subscribe(subscriber: rxStreams.Subscriber[_ >: T]): Unit =
- observable.subscribe(new mongoDB.Observer[T]() {
- override def onSubscribe(subscription: mongoDB.Subscription): Unit =
- subscriber.onSubscribe(new rxStreams.Subscription() {
- private final val cancelled: AtomicBoolean = new AtomicBoolean
-
- override def request(n: Long): Unit =
- if (!subscription.isUnsubscribed && !cancelled.get() && n < 1) {
- subscriber.onError(
- new IllegalArgumentException(
- s"Demand from publisher should be a positive amount. Current amount is:$n"
- )
- )
- } else {
- subscription.request(n)
- }
-
- override def cancel(): Unit =
- if (!cancelled.getAndSet(true)) subscription.unsubscribe()
- })
-
- def onNext(result: T): Unit = subscriber.onNext(result)
-
- def onError(e: Throwable): Unit = subscriber.onError(e)
-
- def onComplete(): Unit = subscriber.onComplete()
- })
- }
mgo/MongoEngine.scala
- package sdp.mongo.engine
-
- import java.text.SimpleDateFormat
- import java.util.Calendar
-
- import akka.NotUsed
- import akka.stream.Materializer
- import akka.stream.alpakka.mongodb.scaladsl._
- import akka.stream.scaladsl.{Flow, Source}
- import org.bson.conversions.Bson
- import org.mongodb.scala.bson.collection.immutable.Document
- import org.mongodb.scala.bson.{BsonArray, BsonBinary}
- import org.mongodb.scala.model._
- import org.mongodb.scala.{MongoClient, _}
- import protobuf.bytes.Converter._
- import sdp.file.Streaming._
- import sdp.logging.LogSupport
-
- import scala.collection.JavaConverters._
- import scala.concurrent._
- import scala.concurrent.duration._
-
- object MGOClasses {
- type MGO_ACTION_TYPE = Int
- val MGO_QUERY = 0
- val MGO_UPDATE = 1
- val MGO_ADMIN = 2
-
- /* org.mongodb.scala.FindObservable
- import com.mongodb.async.client.FindIterable
- val resultDocType = FindIterable[Document]
- val resultOption = FindObservable(resultDocType)
- .maxScan(...)
- .limit(...)
- .sort(...)
- .project(...) */
-
- type FOD_TYPE = Int
- val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
- val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
- val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
- val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
- val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
- //Sets a document describing the fields to return for all matching documents
- val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
- val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
- //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
- val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
- //Sets the cursor type
- val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
- //Sets the hint for which index to use. A null value means no hint is set
- val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
- //Sets the exclusive upper bound for a specific index. A null value means no max is set
- val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
- //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
- val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
- //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
- val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
- //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
-
- case class ResultOptions(
- optType: FOD_TYPE,
- bson: Option[Bson] = None,
- value: Int = 0 ){
- def toProto = new sdp.grpc.services.ProtoMGOResultOption(
- optType = this.optType,
- bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
- valueParam = this.value
- )
- def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
- optType match {
- case FOD_FIRST => find
- case FOD_FILTER => find.filter(bson.get)
- case FOD_LIMIT => find.limit(value)
- case FOD_SKIP => find.skip(value)
- case FOD_PROJECTION => find.projection(bson.get)
- case FOD_SORT => find.sort(bson.get)
- case FOD_PARTIAL => find.partial(value != 0)
- case FOD_CURSORTYPE => find
- case FOD_HINT => find.hint(bson.get)
- case FOD_MAX => find.max(bson.get)
- case FOD_MIN => find.min(bson.get)
- case FOD_RETURNKEY => find.returnKey(value != 0)
- case FOD_SHOWRECORDID => find.showRecordId(value != 0)
-
- }
- }
- }
- object ResultOptions {
- def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions(
- optType = msg.optType,
- bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
- value = msg.valueParam
- )
-
- }
-
- trait MGOCommands
-
- object MGOCommands {
-
- case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands
-
- case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands
-
- /* org.mongodb.scala.FindObservable
- import com.mongodb.async.client.FindIterable
- val resultDocType = FindIterable[Document]
- val resultOption = FindObservable(resultDocType)
- .maxScan(...)
- .limit(...)
- .sort(...)
- .project(...) */
- case class Find(filter: Option[Bson] = None,
- andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
- firstOnly: Boolean = false) extends MGOCommands
-
- case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
-
- case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
-
- case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
-
- case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
-
- case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
-
- case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
-
-
- case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
-
- }
-
- object MGOAdmins {
-
- case class DropCollection(collName: String) extends MGOCommands
-
- case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
-
- case class ListCollection(dbName: String) extends MGOCommands
-
- case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
-
- case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
-
- case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
-
- case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
-
- case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
-
- }
-
- case class MGOContext(
- dbName: String,
- collName: String,
- actionType: MGO_ACTION_TYPE = MGO_QUERY,
- action: Option[MGOCommands] = None,
- actionOptions: Option[Any] = None,
- actionTargets: Seq[String] = Nil
- ) {
- ctx =>
- def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
-
- def setCollName(name: String): MGOContext = ctx.copy(collName = name)
-
- def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
-
- def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
-
- def toSomeProto = MGOProtoConversion.ctxToProto(this)
-
- }
-
- object MGOContext {
- def apply(db: String, coll: String) = new MGOContext(db, coll)
- def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
- MGOProtoConversion.ctxFromProto(proto)
- }
-
- case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
- ctxs =>
- def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
- def appendContext(ctx: MGOContext): MGOBatContext =
- ctxs.copy(contexts = contexts :+ ctx)
- }
-
- object MGOBatContext {
- def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
- }
-
- type MGODate = java.util.Date
- def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
- val ca = Calendar.getInstance()
- ca.set(yyyy,mm,dd)
- ca.getTime()
- }
- def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
- val ca = Calendar.getInstance()
- ca.set(yyyy,mm,dd,hr,min,sec)
- ca.getTime()
- }
- def mgoDateTimeNow: MGODate = {
- val ca = Calendar.getInstance()
- ca.getTime
- }
-
-
- def mgoDateToString(dt: MGODate, formatString: String): String = {
- val fmt= new SimpleDateFormat(formatString)
- fmt.format(dt)
- }
-
- type MGOBlob = BsonBinary
- type MGOArray = BsonArray
-
- def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
- implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
-
- def mgoBlobToFile(blob: MGOBlob, fileName: String)(
- implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName)
-
- def mgoGetStringOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getString(fieldName))
- else None
- }
- def mgoGetIntOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getInteger(fieldName))
- else None
- }
- def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getLong(fieldName))
- else None
- }
- def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getDouble(fieldName))
- else None
- }
- def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getBoolean(fieldName))
- else None
- }
- def mgoGetDateOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- Some(doc.getDate(fieldName))
- else None
- }
- def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
- else None
- }
- def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
- if (doc.keySet.contains(fieldName))
- doc.get(fieldName).asInstanceOf[Option[MGOArray]]
- else None
- }
-
- def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
- (arr.getValues.asScala.toList)
- .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
- }
-
- type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
- }
-
-
- object MGOEngine extends LogSupport {
-
- import MGOClasses._
- import MGOAdmins._
- import MGOCommands._
- import sdp.result.DBOResult._
- import com.mongodb.reactivestreams.client.MongoClients
-
- object TxUpdateMode {
- private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
- implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
- log.info(s"mgoTxUpdate> calling ...")
- observable.map(clientSession => {
-
- val transactionOptions =
- TransactionOptions.builder()
- .readConcern(ReadConcern.SNAPSHOT)
- .writeConcern(WriteConcern.MAJORITY).build()
-
- clientSession.startTransaction(transactionOptions)
- /*
- val fut = Future.traverse(ctxs.contexts) { ctx =>
- mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
- }
- Await.ready(fut, 3 seconds) */
-
- ctxs.contexts.foreach { ctx =>
- mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
- }
- clientSession
- })
- }
-
- private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
- log.info(s"commitAndRetry> calling ...")
- observable.recoverWith({
- case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
- log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
- commitAndRetry(observable)
- }
- case e: Exception => {
- log.error(s"commitAndRetry> Exception during commit ...: $e")
- throw e
- }
- })
- }
-
- private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
- log.info(s"runTransactionAndRetry> calling ...")
- observable.recoverWith({
- case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
- log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
- runTransactionAndRetry(observable)
- }
- })
- }
-
- def mgoTxBatch(ctxs: MGOBatContext)(
- implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
-
- log.info(s"mgoTxBatch> MGOBatContext: ${ctxs}")
-
- val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
- val commitTransactionObservable: SingleObservable[Completed] =
- updateObservable.flatMap(clientSession => clientSession.commitTransaction())
- val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
-
- runTransactionAndRetry(commitAndRetryObservable)
-
- valueToDBOResult(Completed())
-
- }
- }
-
-
- def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
- log.info(s"mgoUpdateBatch> MGOBatContext: ${ctxs}")
- if (ctxs.tx) {
- TxUpdateMode.mgoTxBatch(ctxs)
- } else {
- /*
- val fut = Future.traverse(ctxs.contexts) { ctx =>
- mgoUpdate[Completed](ctx).map(identity) }
-
- Await.ready(fut, 3 seconds)
- Future.successful(new Completed) */
- ctxs.contexts.foreach { ctx =>
- mgoUpdate[Completed](ctx).map(identity) }
-
- valueToDBOResult(Completed())
- }
-
- }
-
- def mongoStream(ctx: MGOContext)(
- implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
-
- log.info(s"mongoStream> MGOContext: ${ctx}")
-
- ObservableToPublisher
-
- def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
- rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
-
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- if ( ctx.action == None) {
- log.error(s"mongoStream> uery action cannot be null!")
- throw new IllegalArgumentException("query action cannot be null!")
- }
- try {
- ctx.action.get match {
- case Find(None, Nil, false) => //FindObservable
- MongoSource(ObservableToPublisher(coll.find()))
- case Find(None, Nil, true) => //FindObservable
- MongoSource(ObservableToPublisher(coll.find().first()))
- case Find(Some(filter), Nil, false) => //FindObservable
- MongoSource(ObservableToPublisher(coll.find(filter)))
- case Find(Some(filter), Nil, true) => //FindObservable
- MongoSource(ObservableToPublisher(coll.find(filter).first()))
- case Find(None, sro, _) => //FindObservable
- val next = toResultOption(sro)
- MongoSource(ObservableToPublisher(next(coll.find[Document]())))
- case Find(Some(filter), sro, _) => //FindObservable
- val next = toResultOption(sro)
- MongoSource(ObservableToPublisher(next(coll.find[Document](filter))))
- case _ =>
- log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
- throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
-
- }
- }
- catch { case e: Exception =>
- log.error(s"mongoStream> runtime error: ${e.getMessage}")
- throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
- }
-
- }
-
-
- // T => FindIterable e.g List[Document]
- def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {
- log.info(s"mgoQuery> MGOContext: ${ctx}")
-
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
-
- def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
- rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
-
-
- if ( ctx.action == None) {
- log.error(s"mgoQuery> uery action cannot be null!")
- Left(new IllegalArgumentException("query action cannot be null!"))
- }
- try {
- ctx.action.get match {
- /* count */
- case Count(Some(filter), Some(opt)) => //SingleObservable
- coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
- .toFuture().asInstanceOf[Future[T]]
- case Count(Some(filter), None) => //SingleObservable
- coll.countDocuments(filter).toFuture()
- .asInstanceOf[Future[T]]
- case Count(None, None) => //SingleObservable
- coll.countDocuments().toFuture()
- .asInstanceOf[Future[T]]
- /* distinct */
- case Distict(field, Some(filter)) => //DistinctObservable
- coll.distinct(field, filter).toFuture()
- .asInstanceOf[Future[T]]
- case Distict(field, None) => //DistinctObservable
- coll.distinct((field)).toFuture()
- .asInstanceOf[Future[T]]
- /* find */
- case Find(None, Nil, false) => //FindObservable
- if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
- else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
- case Find(None, Nil, true) => //FindObservable
- if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
- else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
- case Find(Some(filter), Nil, false) => //FindObservable
- if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
- else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
- case Find(Some(filter), Nil, true) => //FindObservable
- if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
- else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
- case Find(None, sro, _) => //FindObservable
- val next = toResultOption(sro)
- if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
- else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
- case Find(Some(filter), sro, _) => //FindObservable
- val next = toResultOption(sro)
- if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
- else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
- /* aggregate AggregateObservable*/
- case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
- /* mapReduce MapReduceObservable*/
- case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
- /* list collection */
- case ListCollection(dbName) => //ListConllectionObservable
- client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
-
- }
- }
- catch { case e: Exception =>
- log.error(s"mgoQuery> runtime error: ${e.getMessage}")
- Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))
- }
- }
- //T => Completed, result.UpdateResult, result.DeleteResult
- def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =
- try {
- mgoUpdateObservable[T](ctx).toFuture()
- }
- catch { case e: Exception =>
- log.error(s"mgoUpdate> runtime error: ${e.getMessage}")
- Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))
- }
-
- def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
- log.info(s"mgoUpdateObservable> MGOContext: ${ctx}")
-
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- if ( ctx.action == None) {
- log.error(s"mgoUpdateObservable> uery action cannot be null!")
- throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")
- }
- try {
- ctx.action.get match {
- /* insert */
- case Insert(docs, Some(opt)) => //SingleObservable[Completed]
- if (docs.size > 1)
- coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
- else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
- case Insert(docs, None) => //SingleObservable
- if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
- else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
- /* delete */
- case Delete(filter, None, onlyOne) => //SingleObservable
- if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
- else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
- case Delete(filter, Some(opt), onlyOne) => //SingleObservable
- if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
- else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
- /* replace */
- case Replace(filter, replacement, None) => //SingleObservable
- coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
- case Replace(filter, replacement, Some(opt)) => //SingleObservable
- coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]
- /* update */
- case Update(filter, update, None, onlyOne) => //SingleObservable
- if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
- else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
- case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
- if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
- else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
- /* bulkWrite */
- case BulkWrite(commands, None) => //SingleObservable
- coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
- case BulkWrite(commands, Some(opt)) => //SingleObservable
- coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
- }
- }
- catch { case e: Exception =>
- log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")
- throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")
- }
- }
-
- def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {
- log.info(s"mgoAdmin> MGOContext: ${ctx}")
-
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- if ( ctx.action == None) {
- log.error(s"mgoAdmin> uery action cannot be null!")
- Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))
- }
- try {
- ctx.action.get match {
- /* drop collection */
- case DropCollection(collName) => //SingleObservable
- val coll = db.getCollection(collName)
- coll.drop().toFuture()
- /* create collection */
- case CreateCollection(collName, None) => //SingleObservable
- db.createCollection(collName).toFuture()
- case CreateCollection(collName, Some(opt)) => //SingleObservable
- db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()
- /* list collection
- case ListCollection(dbName) => //ListConllectionObservable
- client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
- */
- /* create view */
- case CreateView(viewName, viewOn, pline, None) => //SingleObservable
- db.createView(viewName, viewOn, pline).toFuture()
- case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
- db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()
- /* create index */
- case CreateIndex(key, None) => //SingleObservable
- coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
- case CreateIndex(key, Some(opt)) => //SingleObservable
- coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
- /* drop index */
- case DropIndexByName(indexName, None) => //SingleObservable
- coll.dropIndex(indexName).toFuture()
- case DropIndexByName(indexName, Some(opt)) => //SingleObservable
- coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()
- case DropIndexByKey(key, None) => //SingleObservable
- coll.dropIndex(key).toFuture()
- case DropIndexByKey(key, Some(opt)) => //SingleObservable
- coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()
- case DropAllIndexes(None) => //SingleObservable
- coll.dropIndexes().toFuture()
- case DropAllIndexes(Some(opt)) => //SingleObservable
- coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()
- }
- }
- catch { case e: Exception =>
- log.error(s"mgoAdmin> runtime error: ${e.getMessage}")
- throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")
- }
-
- }
-
- /*
- def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
- val db = client.getDatabase(ctx.dbName)
- val coll = db.getCollection(ctx.collName)
- ctx.action match {
- /* count */
- case Count(Some(filter), Some(opt)) => //SingleObservable
- coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
- .toFuture().asInstanceOf[Future[T]]
- case Count(Some(filter), None) => //SingleObservable
- coll.countDocuments(filter).toFuture()
- .asInstanceOf[Future[T]]
- case Count(None, None) => //SingleObservable
- coll.countDocuments().toFuture()
- .asInstanceOf[Future[T]]
- /* distinct */
- case Distict(field, Some(filter)) => //DistinctObservable
- coll.distinct(field, filter).toFuture()
- .asInstanceOf[Future[T]]
- case Distict(field, None) => //DistinctObservable
- coll.distinct((field)).toFuture()
- .asInstanceOf[Future[T]]
- /* find */
- case Find(None, None, optConv, false) => //FindObservable
- if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
- else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
- case Find(None, None, optConv, true) => //FindObservable
- if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
- else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
- case Find(Some(filter), None, optConv, false) => //FindObservable
- if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
- else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
- case Find(Some(filter), None, optConv, true) => //FindObservable
- if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
- else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
- case Find(None, Some(next), optConv, _) => //FindObservable
- if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
- else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
- case Find(Some(filter), Some(next), optConv, _) => //FindObservable
- if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
- else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
- /* aggregate AggregateObservable*/
- case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
- /* mapReduce MapReduceObservable*/
- case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
- /* insert */
- case Insert(docs, Some(opt)) => //SingleObservable[Completed]
- if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
- .asInstanceOf[Future[T]]
- else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
- .asInstanceOf[Future[T]]
- case Insert(docs, None) => //SingleObservable
- if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
- else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
- /* delete */
- case Delete(filter, None, onlyOne) => //SingleObservable
- if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
- else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
- case Delete(filter, Some(opt), onlyOne) => //SingleObservable
- if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
- else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
- /* replace */
- case Replace(filter, replacement, None) => //SingleObservable
- coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
- case Replace(filter, replacement, Some(opt)) => //SingleObservable
- coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
- /* update */
- case Update(filter, update, None, onlyOne) => //SingleObservable
- if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
- else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
- case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
- if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
- else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
- /* bulkWrite */
- case BulkWrite(commands, None) => //SingleObservable
- coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
- case BulkWrite(commands, Some(opt)) => //SingleObservable
- coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
-
- /* drop collection */
- case DropCollection(collName) => //SingleObservable
- val coll = db.getCollection(collName)
- coll.drop().toFuture().asInstanceOf[Future[T]]
- /* create collection */
- case CreateCollection(collName, None) => //SingleObservable
- db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
- case CreateCollection(collName, Some(opt)) => //SingleObservable
- db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
- /* list collection */
- case ListCollection(dbName) => //ListConllectionObservable
- client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
- /* create view */
- case CreateView(viewName, viewOn, pline, None) => //SingleObservable
- db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
- case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
- db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
- /* create index */
- case CreateIndex(key, None) => //SingleObservable
- coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
- case CreateIndex(key, Some(opt)) => //SingleObservable
- coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
- /* drop index */
- case DropIndexByName(indexName, None) => //SingleObservable
- coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
- case DropIndexByName(indexName, Some(opt)) => //SingleObservable
- coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
- case DropIndexByKey(key, None) => //SingleObservable
- coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
- case DropIndexByKey(key, Some(opt)) => //SingleObservable
- coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
- case DropAllIndexes(None) => //SingleObservable
- coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
- case DropAllIndexes(Some(opt)) => //SingleObservable
- coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
- }
- }
- */
-
-
- }
-
-
- object MongoActionStream {
-
- import MGOClasses._
-
- case class StreamingInsert[A](dbName: String,
- collName: String,
- converter: A => Document,
- parallelism: Int = 1
- ) extends MGOCommands
-
- case class StreamingDelete[A](dbName: String,
- collName: String,
- toFilter: A => Bson,
- parallelism: Int = 1,
- justOne: Boolean = false
- ) extends MGOCommands
-
- case class StreamingUpdate[A](dbName: String,
- collName: String,
- toFilter: A => Bson,
- toUpdate: A => Bson,
- parallelism: Int = 1,
- justOne: Boolean = false
- ) extends MGOCommands
-
-
- case class InsertAction[A](ctx: StreamingInsert[A])(
- implicit mongoClient: MongoClient) {
-
- val database = mongoClient.getDatabase(ctx.dbName)
- val collection = database.getCollection(ctx.collName)
-
- def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
- Flow[A].map(ctx.converter)
- .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
- }
-
- case class UpdateAction[A](ctx: StreamingUpdate[A])(
- implicit mongoClient: MongoClient) {
-
- val database = mongoClient.getDatabase(ctx.dbName)
- val collection = database.getCollection(ctx.collName)
-
- def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
- if (ctx.justOne) {
- Flow[A]
- .mapAsync(ctx.parallelism)(a =>
- collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
- } else
- Flow[A]
- .mapAsync(ctx.parallelism)(a =>
- collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
- }
-
-
- case class DeleteAction[A](ctx: StreamingDelete[A])(
- implicit mongoClient: MongoClient) {
-
- val database = mongoClient.getDatabase(ctx.dbName)
- val collection = database.getCollection(ctx.collName)
-
- def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
- if (ctx.justOne) {
- Flow[A]
- .mapAsync(ctx.parallelism)(a =>
- collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
- } else
- Flow[A]
- .mapAsync(ctx.parallelism)(a =>
- collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
- }
-
- }
-
- object MGOHelpers {
-
- implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
- override val converter: (Document) => String = (doc) => doc.toJson
- }
-
- implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
- override val converter: (C) => String = (doc) => doc.toString
- }
-
- trait ImplicitObservable[C] {
- val observable: Observable[C]
- val converter: (C) => String
-
- def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
-
- def headResult() = Await.result(observable.head(), 10 seconds)
-
- def printResults(initial: String = ""): Unit = {
- if (initial.length > 0) print(initial)
- results().foreach(res => println(converter(res)))
- }
-
- def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
- }
-
- def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
- Await.result(fut, timeOut)
- }
-
- def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
- Await.result(fut, timeOut)
- }
-
- import monix.eval.Task
- import monix.execution.Scheduler.Implicits.global
-
- final class FutureToTask[A](x: => Future[A]) {
- def asTask: Task[A] = Task.deferFuture[A](x)
- }
-
- final class TaskToFuture[A](x: => Task[A]) {
- def asFuture: Future[A] = x.runToFuture
- }
-
- }