经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
Akka-Cluster(3)- ClusterClient, 集群客户端
来源:cnblogs  作者:雪川大虫  时间:2018/12/11 9:26:00  对本文有异议

  上篇我们介绍了distributed pub/sub消息传递机制。这是在同一个集群内的消息共享机制:发布者(publisher)和订阅者(subscriber)都在同一个集群的节点上,所有节点上的DistributedPubSubMediator通过集群内部的沟通机制在底层构建了消息流通渠道。在actor pub/sub层面可以实现对象位置透明化。在现实里很多前端都会作为某个集群的客户端但又与集群分离,又或者两个独立的集群之间可能会发生交互关系,这是也会出现客户端与服务端不在同一集群内的情况,ClusterClient就是为集群外部actor与集群内部actor进行沟通的解决方案。

实际上ClusterClient模式就代表一种依赖于消息发布订阅机制的服务方式:客户端通过消息来请求服务,服务端接收请求服务消息并提供相应运算服务。

我们可以把集群客户端模式分成集群客户端ClusterClient和集群服务端ClusterClientReceptionist,从字面理解这就是个接待员这么个角色,负责接待集群外客户端发起的服务请求。在集群所有节点上(或者选定角色role)都部署ClusterClientReceptionist,它们都与本节点的DistributedPubSubMediator对接组成更上一层的消息订阅方,ClusterClient与ClusterClientReceptionist的对接又组成了一种统一集群环境可以实现上集所讨论的distributed pub/sub机制。

ClusterClient就是消息发布方,它是在目标集群之外机器上的某个actor。这个机器上的actor如果需要向集群内部actor发送消息可以通过这个机器上的ClusterClient actor与集群内的ClusterClientReceptionist搭建的通道向集群内某个ClusterClientReceptionist连接的DistributedPubSubMediator所登记的actor进行消息发送。所以使用集群客户端的机器必须在本机启动ClusterClient服务(运行这个actor),这是通讯桥梁的一端。

ClusterClient在启动时用预先配置的地址(contact points)与ClusterClientReceptionist连接,然后通过ClusterClientReceptionist发布的联络点清单来维护内部的对接点清单,可以进行持久化,在发生系统重启时用这个名单来与集群连接。一旦连接,ClusterClient会监控对方运行情况,自动进行具体ClusterClientReceiptionist的替换。ClusterClient发布消息是包嵌在三种结构里的:

1、ClusterClient.Send

2、ClusterClient.SendAll

3、ClusterClient.Publish

这几种方法我们在上篇已经讨论过,这里就略去。

ClusterClientReceiptionist是集群内的消息接收接口。集群内需要接收消息的actor必须在本地的DistributedPubSubMediator上注册自己的地址,ClusterClientReceptionist由此获得集群内所有服务项目actor的地址清单。通过ClusterClient发布的消息内指定接收方类型信息来确定最终接收消息并提供服务的actor。服务注册示范如下:

  1. //注册服务A
  2. val serviceA = system.actorOf(Props[Service], "serviceA")
  3. ClusterClientReceptionist(system).registerService(serviceA)
  4. //注册服务B
  5. val serviceB = system.actorOf(Props[Service], "serviceB")
  6. ClusterClientReceptionist(system).registerService(serviceB)

ClusterClient调用服务示范:

  1. val client = system.actorOf(ClusterClient.props(
  2. ClusterClientSettings(system).withInitialContacts(initialContacts)), "client")
  3. client ! ClusterClient.Send("/user/serviceA", DoThis, localAffinity = true)
  4. client ! ClusterClient.SendToAll("/user/serviceB", DoThat)

注意:ClusterClientReceptionist需要接收DoThis,DoThat消息并实现相关的运算。

在具体应用中要注意sender()的具体意义:从提供服务的actor方面看,sender()代表ClusterClientReceptionist。从发布消息的actor角度看,sender()代表的是DeadLetter。如果服务actor需要知道请求者具体地址,发布方可以把自己的地址嵌在发布的消息结构里。

下面我们就通过一个简单的例子来进行示范。先设计两个服务actor:Cat,Dog 。假设它们会提供不同的叫声作为服务吧:

  1. class Cat extends Actor with ActorLogging {
  2. //使用pub/sub方式设置
  3. val mediator = DistributedPubSub(context.system).mediator
  4. override def preStart() = {
  5. mediator ! Subscribe("Shout", self)
  6. super.preStart()
  7. }
  8. override def receive: Receive = {
  9. case "Shout" =>
  10. log.info("*******I am a cat, MIAOM ...******")
  11. }
  12. }
  13. class Dog extends Actor with ActorLogging {
  14. //使用pub/sub方式设置
  15. val mediator = DistributedPubSub(context.system).mediator
  16. override def preStart() = {
  17. mediator ! Subscribe("Shout", self)
  18. super.preStart()
  19. }
  20. override def receive: Receive = {
  21. case "Shout" =>
  22. log.info("*****I am a dog, WANG WANG...*****")
  23. }
  24. }

我们看到,这就是两个很普通的actor。但我们还是可以和上一篇分布式pub/sub结合起来验证cluster-client是基于distributed-pub/sub的。然后我们分别把这两个actor(服务)放到不同的集群节点上:

  1. object Cat {
  2. def props = Props[Cat]
  3. def create(port: Int): ActorSystem = {
  4. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  5. .withFallback(ConfigFactory.load())
  6. val system = ActorSystem("ClusterSystem",config)
  7. val catSound = system.actorOf(props,"CatSound")
  8. ClusterClientReceptionist(system).registerService(catSound)
  9. system
  10. }
  11. }
  12. object Dog {
  13. def props = Props(new Dog)
  14. def create(port: Int): ActorSystem = {
  15. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  16. .withFallback(ConfigFactory.load())
  17. val system = ActorSystem("ClusterSystem",config)
  18. val dogSound = system.actorOf(props,"DogSound")
  19. ClusterClientReceptionist(system).registerService(dogSound)
  20. system
  21. }
  22. }

注意:集群名称是ClusterSystem。我们分别在actor所在节点用ClusterClientReceptionist.registerService登记了服务。这个集群所使用的conf如下:

  1. akka.actor.warn-about-java-serializer-usage = off
  2. akka.log-dead-letters-during-shutdown = off
  3. akka.log-dead-letters = off
  4. akka {
  5. loglevel = INFO
  6. extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  7. actor {
  8. provider = "cluster"
  9. serializers {
  10. java = "akka.serialization.JavaSerializer"
  11. proto = "akka.remote.serialization.ProtobufSerializer"
  12. }
  13. serialization-bindings {
  14. "java.lang.String" = java
  15. "scalapb.GeneratedMessage" = proto
  16. }
  17. }
  18. remote {
  19. log-remote-lifecycle-events = off
  20. netty.tcp {
  21. hostname = "127.0.0.1"
  22. port = 0
  23. }
  24. }
  25. cluster {
  26. seed-nodes = [
  27. "akka.tcp://ClusterSystem@127.0.0.1:2551"]
  28. log-info = off
  29. }
  30. }

这是一个比较完整的集群配置文档,只有port需要再配置。然后运行这两个节点:

  1. object PetHouse extends App {
  2. val sysCat = Cat.create(2551)
  3. val sysDog = Dog.create(2552)
  4. scala.io.StdIn.readLine()
  5. sysCat.terminate()
  6. sysDog.terminate()
  7. }

完成了在2551,2552节点上的Cat,Dog actor构建及ClusterClientReceptionist.registerService服务登记。现在看看客户端:

  1. object PetClient extends App {
  2. val conf = ConfigFactory.load("client")
  3. val clientSystem = ActorSystem("ClientSystem",conf)
  4. /* 从 conf 文件里读取 contact-points 地址
  5. val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {
  6. case AddressFromURIString(addr) ? RootActorPath(addr) / "system" / "receptionist"
  7. }.toSet
  8. */
  9. //先放一个contact-point, 系统会自动增加其它的点
  10. val initialContacts = Set(
  11. ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")
  12. )
  13. val clusterClient = clientSystem.actorOf(
  14. ClusterClient.props(
  15. ClusterClientSettings(clientSystem)
  16. .withInitialContacts(initialContacts)),
  17. "petClient")
  18. clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)
  19. clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)
  20. println(s"sent shout messages ...")
  21. scala.io.StdIn.readLine()
  22. clusterClient ! Publish("Shout","Shout")
  23. println(s"publish shout messages ...")
  24. scala.io.StdIn.readLine()
  25. clientSystem.terminate();
  26. }

客户端的ActorSystem名称为ClientSystem,是在ClusterSystem集群之外的。conf文件如下:

  1. akka {
  2. actor.provider = remote
  3. remote.netty.tcp.port= 2553
  4. remote.netty.tcp.hostname=127.0.0.1
  5. }
  6. contact-points = [
  7. "akka.tcp://ClusterSystem@127.0.0.1:2551",
  8. "akka.tcp://ClusterSystem@127.0.0.1:2552"]

把它设成actor.provider=remote可以免去提供seednodes。运算结果:

  1. [12/08/2018 09:32:51.432] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******
  2. [INFO] [12/08/2018 09:32:51.435] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****
  3. [INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/CatSound] *******I am a cat, MIAOM ...******
  4. [INFO] [12/08/2018 09:33:44.113] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/DogSound] *****I am a dog, WANG WANG...*****

无论ClusterClient或Receptionist都会针对自己的状态发送消息。我们可以截取这些消息来做些相应的工作。参考下面的截听器示范代码: 

  1. package petsound
  2. import akka.actor._
  3. import akka.cluster.client._
  4. class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {
  5. override def preStart(): Unit = {
  6. clusterClient ! SubscribeContactPoints
  7. super.preStart()
  8. }
  9. override def receive: Receive = {
  10. case ContactPoints(cps) =>
  11. cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}
  12. case ContactPointAdded(cp) =>
  13. log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")
  14. case ContactPointRemoved(cp) =>
  15. log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")
  16. }
  17. }
  18. class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {
  19. override def preStart(): Unit = {
  20. receptionist ! SubscribeClusterClients
  21. super.preStart()
  22. }
  23. override def receive: Receive = {
  24. case ClusterClients(cs) =>
  25. cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}
  26. case ClusterClientUp(cc) =>
  27. log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")
  28. case ClusterClientUnreachable(cc) =>
  29. log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")
  30. }
  31. }

这两个event-listener的安装方法如下:

  1. val receptionist = ClusterClientReceptionist(system).underlying
  2. system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")
  3. val receptionist = ClusterClientReceptionist(system).underlying
  4. system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")
  5. val clusterClient = clientSystem.actorOf(
  6. ClusterClient.props(
  7. ClusterClientSettings(clientSystem)
  8. .withInitialContacts(initialContacts)),
  9. "petClient")
  10. clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")

看看运算结果:

  1. [INFO] [12/09/2018 09:42:40.838] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPoints:akka.tcp://ClusterSystem@127.0.0.1:2551******
  2. [INFO] [12/09/2018 09:42:40.947] [ClientSystem-akka.actor.default-dispatcher-3] [akka.tcp://ClientSystem@127.0.0.1:2553/user/client-event-listner] *******ContactPointAdded: akka.tcp://ClusterSystem@127.0.0.1:2552*******
  3. [INFO] [12/09/2018 09:42:40.967] [ClientSystem-akka.actor.default-dispatcher-15] [akka.tcp://ClientSystem@127.0.0.1:2553/user/petClient] Connected to [akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist]
  4. [INFO] [12/09/2018 09:42:40.979] [ClusterSystem-akka.actor.default-dispatcher-4] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUp: akka.tcp://ClientSystem@127.0.0.1:2553*******
  5. [INFO] [12/09/2018 09:54:34.363] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/cat-event-listner] *******ClusterClientUnreachable: akka.tcp://ClientSystem@127.0.0.1:2553*******

下面我们再做个示范,还是与上篇讨论一样:由集群客户端发送MongoDB指令至某个在集群里用ClusterClientReceptionist注册的MongoDB操作服务actor。服务方接收指令后在MongoDB上进行运算。下面是MongoDB的服务actor: 

  1. package petsound
  2. import akka.actor._
  3. import com.typesafe.config._
  4. import akka.actor.ActorSystem
  5. import org.mongodb.scala._
  6. import sdp.grpc.services.ProtoMGOContext
  7. import sdp.mongo.engine.MGOClasses._
  8. import sdp.mongo.engine.MGOEngine._
  9. import sdp.result.DBOResult._
  10. import akka.cluster.client._
  11. import scala.collection.JavaConverters._
  12. import scala.util._
  13. class MongoAdder extends Actor with ActorLogging {
  14. import monix.execution.Scheduler.Implicits.global
  15. implicit val mgosys = context.system
  16. implicit val ec = mgosys.dispatcher
  17. val clientSettings: MongoClientSettings = MongoClientSettings.builder()
  18. .applyToClusterSettings {b =>
  19. b.hosts(List(new ServerAddress("localhost:27017")).asJava)
  20. }.build()
  21. implicit val client: MongoClient = MongoClient(clientSettings)
  22. val ctx = MGOContext("testdb","friends")
  23. override def receive: Receive = {
  24. case someProto @ Some(proto:ProtoMGOContext) =>
  25. val ctx = MGOContext.fromProto(proto)
  26. log.info(s"****** received MGOContext: $someProto *********")
  27. val task = mgoUpdate[Completed](ctx).toTask
  28. task.runOnComplete {
  29. case Success(s) => println("operations completed successfully.")
  30. case Failure(exception) => println(s"error: ${exception.getMessage}")
  31. }
  32. }
  33. }
  34. object MongoAdder {
  35. def create(port: Int): ActorSystem = {
  36. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  37. .withFallback(ConfigFactory.load())
  38. val system = ActorSystem("ClusterSystem", config)
  39. val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")
  40. ClusterClientReceptionist(system).registerService(mongoAdder)
  41. val receptionist = ClusterClientReceptionist(system).underlying
  42. system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")
  43. system
  44. }
  45. }

MongoAdder处于同一个集群ClusterSystem中。代码里已经包括了服务注册部分。客户端发送MongoDB指令的示范如下:

  1. //MongoDB 操作示范
  2. import org.mongodb.scala._
  3. import sdp.mongo.engine.MGOClasses._
  4. val ctx = MGOContext("testdb","friends")
  5. val chen = Document("" -> "", "" -> "大文","age" -> 28)
  6. val zhang = Document("" -> "", "" -> "小海","age" -> 7)
  7. val lee = Document("" -> "", "" -> "","age" -> 45)
  8. val ouyang = Document("" -> "欧阳", "" -> "","age" -> 120)
  9. val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))
  10. clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)

由于MongoDB指令是通过protobuffer方式进行序列化的,所以需要修改client.conf通知akka使用protobuf格式的消息:

  1. akka {
  2. actor {
  3. provider = remote
  4. serializers {
  5. java = "akka.serialization.JavaSerializer"
  6. proto = "akka.remote.serialization.ProtobufSerializer"
  7. }
  8. serialization-bindings {
  9. "java.lang.String" = java
  10. "scalapb.GeneratedMessage" = proto
  11. }
  12. }
  13. remote.netty.tcp.port= 2553
  14. remote.netty.tcp.hostname=127.0.0.1
  15. }
  16. contact-points = [
  17. "akka.tcp://ClusterSystem@127.0.0.1:2551",
  18. "akka.tcp://ClusterSystem@127.0.0.1:2552"]

下面是本次讨论完整示范源代码:

build.sbt

  1. import scalapb.compiler.Version.scalapbVersion
  2. import scalapb.compiler.Version.grpcJavaVersion
  3. name := "akka-cluster-client"
  4. version := "0.1"
  5. scalaVersion := "2.12.7"
  6. scalacOptions += "-Ypartial-unification"
  7. libraryDependencies := Seq(
  8. "com.typesafe.akka" %% "akka-actor" % "2.5.17",
  9. "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.17",
  10. "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
  11. // "io.grpc" % "grpc-netty" % grpcJavaVersion,
  12. "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
  13. "io.monix" %% "monix" % "2.3.0",
  14. //for mongodb 4.0
  15. "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0",
  16. "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.20",
  17. //other dependencies
  18. "co.fs2" %% "fs2-core" % "0.9.7",
  19. "ch.qos.logback" % "logback-classic" % "1.2.3",
  20. "org.typelevel" %% "cats-core" % "0.9.0",
  21. "io.monix" %% "monix-execution" % "3.0.0-RC1",
  22. "io.monix" %% "monix-eval" % "3.0.0-RC1"
  23. )
  24. PB.targets in Compile := Seq(
  25. scalapb.gen() -> (sourceManaged in Compile).value
  26. )

project/scalapb.sbt

  1. addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
  2. libraryDependencies ++= Seq(
  3. "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4"
  4. )

resouces/application.conf

  1. akka.actor.warn-about-java-serializer-usage = off
  2. akka.log-dead-letters-during-shutdown = off
  3. akka.log-dead-letters = off
  4. akka {
  5. loglevel = INFO
  6. extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  7. actor {
  8. provider = "cluster"
  9. serializers {
  10. java = "akka.serialization.JavaSerializer"
  11. proto = "akka.remote.serialization.ProtobufSerializer"
  12. }
  13. serialization-bindings {
  14. "java.lang.String" = java
  15. "scalapb.GeneratedMessage" = proto
  16. }
  17. }
  18. remote {
  19. log-remote-lifecycle-events = off
  20. netty.tcp {
  21. hostname = "127.0.0.1"
  22. port = 0
  23. }
  24. }
  25. cluster {
  26. seed-nodes = [
  27. "akka.tcp://ClusterSystem@127.0.0.1:2551"]
  28. log-info = off
  29. }
  30. }

resources/client.conf

  1. akka {
  2. actor {
  3. provider = remote
  4. serializers {
  5. java = "akka.serialization.JavaSerializer"
  6. proto = "akka.remote.serialization.ProtobufSerializer"
  7. }
  8. serialization-bindings {
  9. "java.lang.String" = java
  10. "scalapb.GeneratedMessage" = proto
  11. }
  12. }
  13. remote.netty.tcp.port= 2553
  14. remote.netty.tcp.hostname=127.0.0.1
  15. }
  16. contact-points = [
  17. "akka.tcp://ClusterSystem@127.0.0.1:2551",
  18. "akka.tcp://ClusterSystem@127.0.0.1:2552"]

protobuf/spd.proto

  1. syntax = "proto3";
  2. import "google/protobuf/wrappers.proto";
  3. import "google/protobuf/any.proto";
  4. import "scalapb/scalapb.proto";
  5. option (scalapb.options) = {
  6. // use a custom Scala package name
  7. // package_name: "io.ontherocks.introgrpc.demo"
  8. // don't append file name to package
  9. flat_package: true
  10.  
  11. // generate one Scala file for all messages (services still get their own file)
  12. single_file: true
  13.  
  14. // add imports to generated file
  15. // useful when extending traits or using custom types
  16. // import: "io.ontherocks.hellogrpc.RockingMessage"
  17. // code to put at the top of generated file
  18. // works only with `single_file: true`
  19. //preamble: "sealed trait SomeSealedTrait"
  20. };
  21. package sdp.grpc.services;
  22. message ProtoDate {
  23. int32 yyyy = 1;
  24. int32 mm = 2;
  25. int32 dd = 3;
  26. }
  27. message ProtoTime {
  28. int32 hh = 1;
  29. int32 mm = 2;
  30. int32 ss = 3;
  31. int32 nnn = 4;
  32. }
  33. message ProtoDateTime {
  34. ProtoDate date = 1;
  35. ProtoTime time = 2;
  36. }
  37. message ProtoAny {
  38. bytes value = 1;
  39. }

protobuf/mgo.proto

  1. syntax = "proto3";
  2. import "google/protobuf/wrappers.proto";
  3. import "google/protobuf/any.proto";
  4. import "scalapb/scalapb.proto";
  5. option (scalapb.options) = {
  6. // use a custom Scala package name
  7. // package_name: "io.ontherocks.introgrpc.demo"
  8. // don't append file name to package
  9. flat_package: true
  10.  
  11. // generate one Scala file for all messages (services still get their own file)
  12. single_file: true
  13.  
  14. // add imports to generated file
  15. // useful when extending traits or using custom types
  16. // import: "io.ontherocks.hellogrpc.RockingMessage"
  17. // code to put at the top of generated file
  18. // works only with `single_file: true`
  19. //preamble: "sealed trait SomeSealedTrait"
  20. };
  21. /*
  22. * Demoes various customization options provided by ScalaPBs.
  23. */
  24. package sdp.grpc.services;
  25. import "sdp.proto";
  26. message ProtoMGOBson {
  27. bytes bson = 1;
  28. }
  29. message ProtoMGODocument {
  30. bytes document = 1;
  31. }
  32. message ProtoMGOResultOption { //FindObservable
  33. int32 optType = 1;
  34. ProtoMGOBson bsonParam = 2;
  35. int32 valueParam = 3;
  36. }
  37. message ProtoMGOAdmin{
  38. string tarName = 1;
  39. repeated ProtoMGOBson bsonParam = 2;
  40. ProtoAny options = 3;
  41. string objName = 4;
  42. }
  43. message ProtoMGOContext { //MGOContext
  44. string dbName = 1;
  45. string collName = 2;
  46. int32 commandType = 3;
  47. repeated ProtoMGOBson bsonParam = 4;
  48. repeated ProtoMGOResultOption resultOptions = 5;
  49. repeated string targets = 6;
  50. ProtoAny options = 7;
  51. repeated ProtoMGODocument documents = 8;
  52. google.protobuf.BoolValue only = 9;
  53. ProtoMGOAdmin adminOptions = 10;
  54. }

converters/ByteConverter.scala

  1. package protobuf.bytes
  2. import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
  3. import com.google.protobuf.ByteString
  4. object Converter {
  5. def marshal(value: Any): ByteString = {
  6. val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  7. val oos = new ObjectOutputStream(stream)
  8. oos.writeObject(value)
  9. oos.close()
  10. ByteString.copyFrom(stream.toByteArray())
  11. }
  12. def unmarshal[A](bytes: ByteString): A = {
  13. val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
  14. val value = ois.readObject()
  15. ois.close()
  16. value.asInstanceOf[A]
  17. }
  18. }

converters/DBOResultType.scala

  1. package sdp.result
  2. import cats._
  3. import cats.data.EitherT
  4. import cats.data.OptionT
  5. import monix.eval.Task
  6. import cats.implicits._
  7. import scala.concurrent._
  8. import scala.collection.TraversableOnce
  9. object DBOResult {
  10. type DBOError[A] = EitherT[Task,Throwable,A]
  11. type DBOResult[A] = OptionT[DBOError,A]
  12. implicit def valueToDBOResult[A](a: A): DBOResult[A] =
  13. Applicative[DBOResult].pure(a)
  14. implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =
  15. OptionT((o: Option[A]).pure[DBOError])
  16. implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = {
  17. // val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))
  18. OptionT.liftF(EitherT.fromEither[Task](e))
  19. }
  20. implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {
  21. val task = Task.fromFuture[A](fut)
  22. val et = EitherT.liftF[Task,Throwable,A](task)
  23. OptionT.liftF(et)
  24. }
  25. implicit class DBOResultToTask[A](r: DBOResult[A]) {
  26. def toTask = r.value.value
  27. }
  28. implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {
  29. def someValue: Option[A] = r match {
  30. case Left(err) => (None: Option[A])
  31. case Right(oa) => oa
  32. }
  33. }
  34. def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =
  35. if (coll.isEmpty)
  36. optionToDBOResult(None: Option[C[A]])
  37. else
  38. optionToDBOResult(Some(coll): Option[C[A]])
  39. }

filestream/FileStreaming.scala

  1. package sdp.file
  2. import java.io.{ByteArrayInputStream, InputStream}
  3. import java.nio.ByteBuffer
  4. import java.nio.file.Paths
  5. import akka.stream.Materializer
  6. import akka.stream.scaladsl.{FileIO, StreamConverters}
  7. import akka.util._
  8. import scala.concurrent.Await
  9. import scala.concurrent.duration._
  10. object Streaming {
  11. def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  12. implicit mat: Materializer):ByteBuffer = {
  13. val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
  14. hd ++ bs
  15. }
  16. (Await.result(fut, timeOut)).toByteBuffer
  17. }
  18. def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  19. implicit mat: Materializer): Array[Byte] = {
  20. val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
  21. hd ++ bs
  22. }
  23. (Await.result(fut, timeOut)).toArray
  24. }
  25. def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  26. implicit mat: Materializer): InputStream = {
  27. val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
  28. hd ++ bs
  29. }
  30. val buf = (Await.result(fut, timeOut)).toArray
  31. new ByteArrayInputStream(buf)
  32. }
  33. def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
  34. implicit mat: Materializer) = {
  35. val ba = new Array[Byte](byteBuf.remaining())
  36. byteBuf.get(ba,0,ba.length)
  37. val baInput = new ByteArrayInputStream(ba)
  38. val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
  39. source.runWith(FileIO.toPath(Paths.get(fileName)))
  40. }
  41. def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
  42. implicit mat: Materializer) = {
  43. val bb = ByteBuffer.wrap(bytes)
  44. val baInput = new ByteArrayInputStream(bytes)
  45. val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
  46. source.runWith(FileIO.toPath(Paths.get(fileName)))
  47. }
  48. def InputStreamToFile(is: InputStream, fileName: String)(
  49. implicit mat: Materializer) = {
  50. val source = StreamConverters.fromInputStream(() => is)
  51. source.runWith(FileIO.toPath(Paths.get(fileName)))
  52. }
  53. }

logging/Log.scala

  1. package sdp.logging
  2. import org.slf4j.Logger
  3. /**
  4. * Logger which just wraps org.slf4j.Logger internally.
  5. *
  6. * @param logger logger
  7. */
  8. class Log(logger: Logger) {
  9. // use var consciously to enable squeezing later
  10. var isDebugEnabled: Boolean = logger.isDebugEnabled
  11. var isInfoEnabled: Boolean = logger.isInfoEnabled
  12. var isWarnEnabled: Boolean = logger.isWarnEnabled
  13. var isErrorEnabled: Boolean = logger.isErrorEnabled
  14. def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
  15. level match {
  16. case 'debug | 'DEBUG => debug(msg)
  17. case 'info | 'INFO => info(msg)
  18. case 'warn | 'WARN => warn(msg)
  19. case 'error | 'ERROR => error(msg)
  20. case _ => // nothing to do
  21. }
  22. }
  23. def debug(msg: => String): Unit = {
  24. if (isDebugEnabled && logger.isDebugEnabled) {
  25. logger.debug(msg)
  26. }
  27. }
  28. def debug(msg: => String, e: Throwable): Unit = {
  29. if (isDebugEnabled && logger.isDebugEnabled) {
  30. logger.debug(msg, e)
  31. }
  32. }
  33. def info(msg: => String): Unit = {
  34. if (isInfoEnabled && logger.isInfoEnabled) {
  35. logger.info(msg)
  36. }
  37. }
  38. def info(msg: => String, e: Throwable): Unit = {
  39. if (isInfoEnabled && logger.isInfoEnabled) {
  40. logger.info(msg, e)
  41. }
  42. }
  43. def warn(msg: => String): Unit = {
  44. if (isWarnEnabled && logger.isWarnEnabled) {
  45. logger.warn(msg)
  46. }
  47. }
  48. def warn(msg: => String, e: Throwable): Unit = {
  49. if (isWarnEnabled && logger.isWarnEnabled) {
  50. logger.warn(msg, e)
  51. }
  52. }
  53. def error(msg: => String): Unit = {
  54. if (isErrorEnabled && logger.isErrorEnabled) {
  55. logger.error(msg)
  56. }
  57. }
  58. def error(msg: => String, e: Throwable): Unit = {
  59. if (isErrorEnabled && logger.isErrorEnabled) {
  60. logger.error(msg, e)
  61. }
  62. }
  63. }

logging/LogSupport.scala

  1. package sdp.logging
  2. import org.slf4j.LoggerFactory
  3. trait LogSupport {
  4. /**
  5. * Logger
  6. */
  7. protected val log = new Log(LoggerFactory.getLogger(this.getClass))
  8. }

mgo/engine/MGOProtoConversion.scala

  1. package sdp.mongo.engine
  2. import org.mongodb.scala.bson.collection.immutable.Document
  3. import org.bson.conversions.Bson
  4. import sdp.grpc.services._
  5. import protobuf.bytes.Converter._
  6. import MGOClasses._
  7. import MGOAdmins._
  8. import MGOCommands._
  9. import org.bson.BsonDocument
  10. import org.bson.codecs.configuration.CodecRegistry
  11. import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
  12. import org.mongodb.scala.FindObservable
  13. object MGOProtoConversion {
  14. type MGO_COMMAND_TYPE = Int
  15. val MGO_COMMAND_FIND = 0
  16. val MGO_COMMAND_COUNT = 20
  17. val MGO_COMMAND_DISTICT = 21
  18. val MGO_COMMAND_DOCUMENTSTREAM = 1
  19. val MGO_COMMAND_AGGREGATE = 2
  20. val MGO_COMMAND_INSERT = 3
  21. val MGO_COMMAND_DELETE = 4
  22. val MGO_COMMAND_REPLACE = 5
  23. val MGO_COMMAND_UPDATE = 6
  24. val MGO_ADMIN_DROPCOLLECTION = 8
  25. val MGO_ADMIN_CREATECOLLECTION = 9
  26. val MGO_ADMIN_LISTCOLLECTION = 10
  27. val MGO_ADMIN_CREATEVIEW = 11
  28. val MGO_ADMIN_CREATEINDEX = 12
  29. val MGO_ADMIN_DROPINDEXBYNAME = 13
  30. val MGO_ADMIN_DROPINDEXBYKEY = 14
  31. val MGO_ADMIN_DROPALLINDEXES = 15
  32.  
  33.  
  34. case class AdminContext(
  35. tarName: String = "",
  36. bsonParam: Seq[Bson] = Nil,
  37. options: Option[Any] = None,
  38. objName: String = ""
  39. ){
  40. def toProto = sdp.grpc.services.ProtoMGOAdmin(
  41. tarName = this.tarName,
  42. bsonParam = this.bsonParam.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
  43. objName = this.objName,
  44. options = this.options.map(b => ProtoAny(marshal(b)))
  45. )
  46. }
  47. object AdminContext {
  48. def fromProto(msg: sdp.grpc.services.ProtoMGOAdmin) = new AdminContext(
  49. tarName = msg.tarName,
  50. bsonParam = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
  51. objName = msg.objName,
  52. options = msg.options.map(b => unmarshal[Any](b.value))
  53. )
  54. }
  55. case class Context(
  56. dbName: String = "",
  57. collName: String = "",
  58. commandType: MGO_COMMAND_TYPE,
  59. bsonParam: Seq[Bson] = Nil,
  60. resultOptions: Seq[ResultOptions] = Nil,
  61. options: Option[Any] = None,
  62. documents: Seq[Document] = Nil,
  63. targets: Seq[String] = Nil,
  64. only: Boolean = false,
  65. adminOptions: Option[AdminContext] = None
  66. ){
  67. def toProto = new sdp.grpc.services.ProtoMGOContext(
  68. dbName = this.dbName,
  69. collName = this.collName,
  70. commandType = this.commandType,
  71. bsonParam = this.bsonParam.map(bsonToProto),
  72. resultOptions = this.resultOptions.map(_.toProto),
  73. options = { if(this.options == None)
  74. None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  75. else
  76. Some(ProtoAny(marshal(this.options.get))) },
  77. documents = this.documents.map(d => sdp.grpc.services.ProtoMGODocument(marshal(d))),
  78. targets = this.targets,
  79. only = Some(this.only),
  80. adminOptions = this.adminOptions.map(_.toProto)
  81. )
  82. }
  83. object MGODocument {
  84. def fromProto(msg: sdp.grpc.services.ProtoMGODocument): Document =
  85. unmarshal[Document](msg.document)
  86. def toProto(doc: Document): sdp.grpc.services.ProtoMGODocument =
  87. new ProtoMGODocument(marshal(doc))
  88. }
  89. object MGOProtoMsg {
  90. def fromProto(msg: sdp.grpc.services.ProtoMGOContext) = new Context(
  91. dbName = msg.dbName,
  92. collName = msg.collName,
  93. commandType = msg.commandType,
  94. bsonParam = msg.bsonParam.map(protoToBson),
  95. resultOptions = msg.resultOptions.map(r => ResultOptions.fromProto(r)),
  96. options = msg.options.map(a => unmarshal[Any](a.value)),
  97. documents = msg.documents.map(doc => unmarshal[Document](doc.document)),
  98. targets = msg.targets,
  99. adminOptions = msg.adminOptions.map(ado => AdminContext.fromProto(ado))
  100. )
  101. }
  102. def bsonToProto(bson: Bson) =
  103. ProtoMGOBson(marshal(bson.toBsonDocument(
  104. classOf[org.mongodb.scala.bson.collection.immutable.Document],DEFAULT_CODEC_REGISTRY)))
  105. def protoToBson(proto: ProtoMGOBson): Bson = new Bson {
  106. val bsdoc = unmarshal[BsonDocument](proto.bson)
  107. override def toBsonDocument[TDocument](documentClass: Class[TDocument], codecRegistry: CodecRegistry): BsonDocument = bsdoc
  108. }
  109. def ctxFromProto(proto: ProtoMGOContext): MGOContext = proto.commandType match {
  110. case MGO_COMMAND_FIND => {
  111. var ctx = new MGOContext(
  112. dbName = proto.dbName,
  113. collName = proto.collName,
  114. actionType = MGO_QUERY,
  115. action = Some(Find())
  116. )
  117. def toResultOption(rts: Seq[ProtoMGOResultOption]): FindObservable[Document] => FindObservable[Document] = findObj =>
  118. rts.foldRight(findObj)((a,b) => ResultOptions.fromProto(a).toFindObservable(b))
  119. (proto.bsonParam, proto.resultOptions, proto.only) match {
  120. case (Nil, Nil, None) => ctx
  121. case (Nil, Nil, Some(b)) => ctx.setCommand(Find(firstOnly = b))
  122. case (bp,Nil,None) => ctx.setCommand(
  123. Find(filter = Some(protoToBson(bp.head))))
  124. case (bp,Nil,Some(b)) => ctx.setCommand(
  125. Find(filter = Some(protoToBson(bp.head)), firstOnly = b))
  126. case (bp,fo,None) => {
  127. ctx.setCommand(
  128. Find(filter = Some(protoToBson(bp.head)),
  129. andThen = fo.map(ResultOptions.fromProto)
  130. ))
  131. }
  132. case (bp,fo,Some(b)) => {
  133. ctx.setCommand(
  134. Find(filter = Some(protoToBson(bp.head)),
  135. andThen = fo.map(ResultOptions.fromProto),
  136. firstOnly = b))
  137. }
  138. case _ => ctx
  139. }
  140. }
  141. case MGO_COMMAND_COUNT => {
  142. var ctx = new MGOContext(
  143. dbName = proto.dbName,
  144. collName = proto.collName,
  145. actionType = MGO_QUERY,
  146. action = Some(Count())
  147. )
  148. (proto.bsonParam, proto.options) match {
  149. case (Nil, None) => ctx
  150. case (bp, None) => ctx.setCommand(
  151. Count(filter = Some(protoToBson(bp.head)))
  152. )
  153. case (Nil,Some(o)) => ctx.setCommand(
  154. Count(options = Some(unmarshal[Any](o.value)))
  155. )
  156. case _ => ctx
  157. }
  158. }
  159. case MGO_COMMAND_DISTICT => {
  160. var ctx = new MGOContext(
  161. dbName = proto.dbName,
  162. collName = proto.collName,
  163. actionType = MGO_QUERY,
  164. action = Some(Distict(fieldName = proto.targets.head))
  165. )
  166. (proto.bsonParam) match {
  167. case Nil => ctx
  168. case bp: Seq[ProtoMGOBson] => ctx.setCommand(
  169. Distict(fieldName = proto.targets.head,filter = Some(protoToBson(bp.head)))
  170. )
  171. case _ => ctx
  172. }
  173. }
  174. case MGO_COMMAND_AGGREGATE => {
  175. new MGOContext(
  176. dbName = proto.dbName,
  177. collName = proto.collName,
  178. actionType = MGO_QUERY,
  179. action = Some(Aggregate(proto.bsonParam.map(p => protoToBson(p))))
  180. )
  181. }
  182. case MGO_ADMIN_LISTCOLLECTION => {
  183. new MGOContext(
  184. dbName = proto.dbName,
  185. collName = proto.collName,
  186. actionType = MGO_QUERY,
  187. action = Some(ListCollection(proto.dbName)))
  188. }
  189. case MGO_COMMAND_INSERT => {
  190. var ctx = new MGOContext(
  191. dbName = proto.dbName,
  192. collName = proto.collName,
  193. actionType = MGO_UPDATE,
  194. action = Some(Insert(
  195. newdocs = proto.documents.map(doc => unmarshal[Document](doc.document))))
  196. )
  197. proto.options match {
  198. case None => ctx
  199. case Some(o) => ctx.setCommand(Insert(
  200. newdocs = proto.documents.map(doc => unmarshal[Document](doc.document)),
  201. options = Some(unmarshal[Any](o.value)))
  202. )
  203. }
  204. }
  205. case MGO_COMMAND_DELETE => {
  206. var ctx = new MGOContext(
  207. dbName = proto.dbName,
  208. collName = proto.collName,
  209. actionType = MGO_UPDATE,
  210. action = Some(Delete(
  211. filter = protoToBson(proto.bsonParam.head)))
  212. )
  213. (proto.options, proto.only) match {
  214. case (None,None) => ctx
  215. case (None,Some(b)) => ctx.setCommand(Delete(
  216. filter = protoToBson(proto.bsonParam.head),
  217. onlyOne = b))
  218. case (Some(o),None) => ctx.setCommand(Delete(
  219. filter = protoToBson(proto.bsonParam.head),
  220. options = Some(unmarshal[Any](o.value)))
  221. )
  222. case (Some(o),Some(b)) => ctx.setCommand(Delete(
  223. filter = protoToBson(proto.bsonParam.head),
  224. options = Some(unmarshal[Any](o.value)),
  225. onlyOne = b)
  226. )
  227. }
  228. }
  229. case MGO_COMMAND_REPLACE => {
  230. var ctx = new MGOContext(
  231. dbName = proto.dbName,
  232. collName = proto.collName,
  233. actionType = MGO_UPDATE,
  234. action = Some(Replace(
  235. filter = protoToBson(proto.bsonParam.head),
  236. replacement = unmarshal[Document](proto.documents.head.document)))
  237. )
  238. proto.options match {
  239. case None => ctx
  240. case Some(o) => ctx.setCommand(Replace(
  241. filter = protoToBson(proto.bsonParam.head),
  242. replacement = unmarshal[Document](proto.documents.head.document),
  243. options = Some(unmarshal[Any](o.value)))
  244. )
  245. }
  246. }
  247. case MGO_COMMAND_UPDATE => {
  248. var ctx = new MGOContext(
  249. dbName = proto.dbName,
  250. collName = proto.collName,
  251. actionType = MGO_UPDATE,
  252. action = Some(Update(
  253. filter = protoToBson(proto.bsonParam.head),
  254. update = protoToBson(proto.bsonParam.tail.head)))
  255. )
  256. (proto.options, proto.only) match {
  257. case (None,None) => ctx
  258. case (None,Some(b)) => ctx.setCommand(Update(
  259. filter = protoToBson(proto.bsonParam.head),
  260. update = protoToBson(proto.bsonParam.tail.head),
  261. onlyOne = b))
  262. case (Some(o),None) => ctx.setCommand(Update(
  263. filter = protoToBson(proto.bsonParam.head),
  264. update = protoToBson(proto.bsonParam.tail.head),
  265. options = Some(unmarshal[Any](o.value)))
  266. )
  267. case (Some(o),Some(b)) => ctx.setCommand(Update(
  268. filter = protoToBson(proto.bsonParam.head),
  269. update = protoToBson(proto.bsonParam.tail.head),
  270. options = Some(unmarshal[Any](o.value)),
  271. onlyOne = b)
  272. )
  273. }
  274. }
  275. case MGO_ADMIN_DROPCOLLECTION =>
  276. new MGOContext(
  277. dbName = proto.dbName,
  278. collName = proto.collName,
  279. actionType = MGO_ADMIN,
  280. action = Some(DropCollection(proto.collName))
  281. )
  282. case MGO_ADMIN_CREATECOLLECTION => {
  283. var ctx = new MGOContext(
  284. dbName = proto.dbName,
  285. collName = proto.collName,
  286. actionType = MGO_ADMIN,
  287. action = Some(CreateCollection(proto.collName))
  288. )
  289. proto.options match {
  290. case None => ctx
  291. case Some(o) => ctx.setCommand(CreateCollection(proto.collName,
  292. options = Some(unmarshal[Any](o.value)))
  293. )
  294. }
  295. }
  296. case MGO_ADMIN_CREATEVIEW => {
  297. var ctx = new MGOContext(
  298. dbName = proto.dbName,
  299. collName = proto.collName,
  300. actionType = MGO_ADMIN,
  301. action = Some(CreateView(viewName = proto.targets.head,
  302. viewOn = proto.targets.tail.head,
  303. pipeline = proto.bsonParam.map(p => protoToBson(p))))
  304. )
  305. proto.options match {
  306. case None => ctx
  307. case Some(o) => ctx.setCommand(CreateView(viewName = proto.targets.head,
  308. viewOn = proto.targets.tail.head,
  309. pipeline = proto.bsonParam.map(p => protoToBson(p)),
  310. options = Some(unmarshal[Any](o.value)))
  311. )
  312. }
  313. }
  314. case MGO_ADMIN_CREATEINDEX=> {
  315. var ctx = new MGOContext(
  316. dbName = proto.dbName,
  317. collName = proto.collName,
  318. actionType = MGO_ADMIN,
  319. action = Some(CreateIndex(key = protoToBson(proto.bsonParam.head)))
  320. )
  321. proto.options match {
  322. case None => ctx
  323. case Some(o) => ctx.setCommand(CreateIndex(key = protoToBson(proto.bsonParam.head),
  324. options = Some(unmarshal[Any](o.value)))
  325. )
  326. }
  327. }
  328. case MGO_ADMIN_DROPINDEXBYNAME=> {
  329. var ctx = new MGOContext(
  330. dbName = proto.dbName,
  331. collName = proto.collName,
  332. actionType = MGO_ADMIN,
  333. action = Some(DropIndexByName(indexName = proto.targets.head))
  334. )
  335. proto.options match {
  336. case None => ctx
  337. case Some(o) => ctx.setCommand(DropIndexByName(indexName = proto.targets.head,
  338. options = Some(unmarshal[Any](o.value)))
  339. )
  340. }
  341. }
  342. case MGO_ADMIN_DROPINDEXBYKEY=> {
  343. var ctx = new MGOContext(
  344. dbName = proto.dbName,
  345. collName = proto.collName,
  346. actionType = MGO_ADMIN,
  347. action = Some(DropIndexByKey(key = protoToBson(proto.bsonParam.head)))
  348. )
  349. proto.options match {
  350. case None => ctx
  351. case Some(o) => ctx.setCommand(DropIndexByKey(key = protoToBson(proto.bsonParam.head),
  352. options = Some(unmarshal[Any](o.value)))
  353. )
  354. }
  355. }
  356. case MGO_ADMIN_DROPALLINDEXES=> {
  357. var ctx = new MGOContext(
  358. dbName = proto.dbName,
  359. collName = proto.collName,
  360. actionType = MGO_ADMIN,
  361. action = Some(DropAllIndexes())
  362. )
  363. proto.options match {
  364. case None => ctx
  365. case Some(o) => ctx.setCommand(DropAllIndexes(
  366. options = Some(unmarshal[Any](o.value)))
  367. )
  368. }
  369. }
  370. }
  371. def ctxToProto(ctx: MGOContext): Option[sdp.grpc.services.ProtoMGOContext] = ctx.action match {
  372. case None => None
  373. case Some(act) => act match {
  374. case Count(filter, options) =>
  375. Some(new sdp.grpc.services.ProtoMGOContext(
  376. dbName = ctx.dbName,
  377. collName = ctx.collName,
  378. commandType = MGO_COMMAND_COUNT,
  379. bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
  380. else Seq(bsonToProto(filter.get))},
  381. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  382. else Some(ProtoAny(marshal(options.get))) }
  383. ))
  384. case Distict(fieldName, filter) =>
  385. Some(new sdp.grpc.services.ProtoMGOContext(
  386. dbName = ctx.dbName,
  387. collName = ctx.collName,
  388. commandType = MGO_COMMAND_DISTICT,
  389. bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
  390. else Seq(bsonToProto(filter.get))},
  391. targets = Seq(fieldName)
  392. ))
  393. case Find(filter, andThen, firstOnly) =>
  394. Some(new sdp.grpc.services.ProtoMGOContext(
  395. dbName = ctx.dbName,
  396. collName = ctx.collName,
  397. commandType = MGO_COMMAND_FIND,
  398. bsonParam = { if (filter == None) Seq.empty[ProtoMGOBson]
  399. else Seq(bsonToProto(filter.get))},
  400. resultOptions = andThen.map(_.toProto)
  401. ))
  402. case Aggregate(pipeLine) =>
  403. Some(new sdp.grpc.services.ProtoMGOContext(
  404. dbName = ctx.dbName,
  405. collName = ctx.collName,
  406. commandType = MGO_COMMAND_AGGREGATE,
  407. bsonParam = pipeLine.map(bsonToProto)
  408. ))
  409. case Insert(newdocs, options) =>
  410. Some(new sdp.grpc.services.ProtoMGOContext(
  411. dbName = ctx.dbName,
  412. collName = ctx.collName,
  413. commandType = MGO_COMMAND_INSERT,
  414. documents = newdocs.map(d => ProtoMGODocument(marshal(d))),
  415. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  416. else Some(ProtoAny(marshal(options.get))) }
  417. ))
  418. case Delete(filter, options, onlyOne) =>
  419. Some(new sdp.grpc.services.ProtoMGOContext(
  420. dbName = ctx.dbName,
  421. collName = ctx.collName,
  422. commandType = MGO_COMMAND_DELETE,
  423. bsonParam = Seq(bsonToProto(filter)),
  424. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  425. else Some(ProtoAny(marshal(options.get))) },
  426. only = Some(onlyOne)
  427. ))
  428. case Replace(filter, replacement, options) =>
  429. Some(new sdp.grpc.services.ProtoMGOContext(
  430. dbName = ctx.dbName,
  431. collName = ctx.collName,
  432. commandType = MGO_COMMAND_REPLACE,
  433. bsonParam = Seq(bsonToProto(filter)),
  434. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  435. else Some(ProtoAny(marshal(options.get))) },
  436. documents = Seq(ProtoMGODocument(marshal(replacement)))
  437. ))
  438. case Update(filter, update, options, onlyOne) =>
  439. Some(new sdp.grpc.services.ProtoMGOContext(
  440. dbName = ctx.dbName,
  441. collName = ctx.collName,
  442. commandType = MGO_COMMAND_UPDATE,
  443. bsonParam = Seq(bsonToProto(filter),bsonToProto(update)),
  444. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  445. else Some(ProtoAny(marshal(options.get))) },
  446. only = Some(onlyOne)
  447. ))
  448. case DropCollection(coll) =>
  449. Some(new sdp.grpc.services.ProtoMGOContext(
  450. dbName = ctx.dbName,
  451. collName = coll,
  452. commandType = MGO_ADMIN_DROPCOLLECTION
  453. ))
  454. case CreateCollection(coll, options) =>
  455. Some(new sdp.grpc.services.ProtoMGOContext(
  456. dbName = ctx.dbName,
  457. collName = coll,
  458. commandType = MGO_ADMIN_CREATECOLLECTION,
  459. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  460. else Some(ProtoAny(marshal(options.get))) }
  461. ))
  462. case ListCollection(dbName) =>
  463. Some(new sdp.grpc.services.ProtoMGOContext(
  464. dbName = ctx.dbName,
  465. commandType = MGO_ADMIN_LISTCOLLECTION
  466. ))
  467. case CreateView(viewName, viewOn, pipeline, options) =>
  468. Some(new sdp.grpc.services.ProtoMGOContext(
  469. dbName = ctx.dbName,
  470. collName = ctx.collName,
  471. commandType = MGO_ADMIN_CREATEVIEW,
  472. bsonParam = pipeline.map(bsonToProto),
  473. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  474. else Some(ProtoAny(marshal(options.get))) },
  475. targets = Seq(viewName,viewOn)
  476. ))
  477. case CreateIndex(key, options) =>
  478. Some(new sdp.grpc.services.ProtoMGOContext(
  479. dbName = ctx.dbName,
  480. collName = ctx.collName,
  481. commandType = MGO_ADMIN_CREATEINDEX,
  482. bsonParam = Seq(bsonToProto(key)),
  483. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  484. else Some(ProtoAny(marshal(options.get))) }
  485. ))
  486. case DropIndexByName(indexName, options) =>
  487. Some(new sdp.grpc.services.ProtoMGOContext(
  488. dbName = ctx.dbName,
  489. collName = ctx.collName,
  490. commandType = MGO_ADMIN_DROPINDEXBYNAME,
  491. targets = Seq(indexName),
  492. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  493. else Some(ProtoAny(marshal(options.get))) }
  494. ))
  495. case DropIndexByKey(key, options) =>
  496. Some(new sdp.grpc.services.ProtoMGOContext(
  497. dbName = ctx.dbName,
  498. collName = ctx.collName,
  499. commandType = MGO_ADMIN_DROPINDEXBYKEY,
  500. bsonParam = Seq(bsonToProto(key)),
  501. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  502. else Some(ProtoAny(marshal(options.get))) }
  503. ))
  504. case DropAllIndexes(options) =>
  505. Some(new sdp.grpc.services.ProtoMGOContext(
  506. dbName = ctx.dbName,
  507. collName = ctx.collName,
  508. commandType = MGO_ADMIN_DROPALLINDEXES,
  509. options = { if(options == None) None //Some(ProtoAny(com.google.protobuf.ByteString.EMPTY))
  510. else Some(ProtoAny(marshal(options.get))) }
  511. ))
  512. }
  513. }
  514. }

mgo/engine/MongoDBEngine.scala

  1. package sdp.mongo.engine
  2. import java.text.SimpleDateFormat
  3. import java.util.Calendar
  4. import akka.NotUsed
  5. import akka.stream.Materializer
  6. import akka.stream.alpakka.mongodb.scaladsl._
  7. import akka.stream.scaladsl.{Flow, Source}
  8. import org.bson.conversions.Bson
  9. import org.mongodb.scala.bson.collection.immutable.Document
  10. import org.mongodb.scala.bson.{BsonArray, BsonBinary}
  11. import org.mongodb.scala.model._
  12. import org.mongodb.scala.{MongoClient, _}
  13. import protobuf.bytes.Converter._
  14. import sdp.file.Streaming._
  15. import sdp.logging.LogSupport
  16. import scala.collection.JavaConverters._
  17. import scala.concurrent._
  18. import scala.concurrent.duration._
  19. object MGOClasses {
  20. type MGO_ACTION_TYPE = Int
  21. val MGO_QUERY = 0
  22. val MGO_UPDATE = 1
  23. val MGO_ADMIN = 2
  24.  
  25. /* org.mongodb.scala.FindObservable
  26. import com.mongodb.async.client.FindIterable
  27. val resultDocType = FindIterable[Document]
  28. val resultOption = FindObservable(resultDocType)
  29. .maxScan(...)
  30. .limit(...)
  31. .sort(...)
  32. .project(...) */
  33. type FOD_TYPE = Int
  34. val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item
  35. val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]
  36. val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]
  37. val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]
  38. val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]
  39. //Sets a document describing the fields to return for all matching documents
  40. val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]
  41. val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]
  42. //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)
  43. val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]
  44. //Sets the cursor type
  45. val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]
  46. //Sets the hint for which index to use. A null value means no hint is set
  47. val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]
  48. //Sets the exclusive upper bound for a specific index. A null value means no max is set
  49. val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]
  50. //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set
  51. val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]
  52. //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents
  53. val FOD_SHOWRECORDID=12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]
  54. //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents
  55.  
  56. case class ResultOptions(
  57. optType: FOD_TYPE,
  58. bson: Option[Bson] = None,
  59. value: Int = 0 ){
  60. def toProto = new sdp.grpc.services.ProtoMGOResultOption(
  61. optType = this.optType,
  62. bsonParam = this.bson.map {b => sdp.grpc.services.ProtoMGOBson(marshal(b))},
  63. valueParam = this.value
  64. )
  65. def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {
  66. optType match {
  67. case FOD_FIRST => find
  68. case FOD_FILTER => find.filter(bson.get)
  69. case FOD_LIMIT => find.limit(value)
  70. case FOD_SKIP => find.skip(value)
  71. case FOD_PROJECTION => find.projection(bson.get)
  72. case FOD_SORT => find.sort(bson.get)
  73. case FOD_PARTIAL => find.partial(value != 0)
  74. case FOD_CURSORTYPE => find
  75. case FOD_HINT => find.hint(bson.get)
  76. case FOD_MAX => find.max(bson.get)
  77. case FOD_MIN => find.min(bson.get)
  78. case FOD_RETURNKEY => find.returnKey(value != 0)
  79. case FOD_SHOWRECORDID => find.showRecordId(value != 0)
  80. }
  81. }
  82. }
  83. object ResultOptions {
  84. def fromProto(msg: sdp.grpc.services.ProtoMGOResultOption) = new ResultOptions(
  85. optType = msg.optType,
  86. bson = msg.bsonParam.map(b => unmarshal[Bson](b.bson)),
  87. value = msg.valueParam
  88. )
  89. }
  90. trait MGOCommands
  91. object MGOCommands {
  92. case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands
  93. case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands
  94. /* org.mongodb.scala.FindObservable
  95. import com.mongodb.async.client.FindIterable
  96. val resultDocType = FindIterable[Document]
  97. val resultOption = FindObservable(resultDocType)
  98. .maxScan(...)
  99. .limit(...)
  100. .sort(...)
  101. .project(...) */
  102. case class Find(filter: Option[Bson] = None,
  103. andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],
  104. firstOnly: Boolean = false) extends MGOCommands
  105. case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands
  106. case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands
  107. case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands
  108. case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
  109. case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands
  110. case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands
  111. case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands
  112. }
  113. object MGOAdmins {
  114. case class DropCollection(collName: String) extends MGOCommands
  115. case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands
  116. case class ListCollection(dbName: String) extends MGOCommands
  117. case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands
  118. case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands
  119. case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands
  120. case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands
  121. case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands
  122. }
  123. case class MGOContext(
  124. dbName: String,
  125. collName: String,
  126. actionType: MGO_ACTION_TYPE = MGO_QUERY,
  127. action: Option[MGOCommands] = None,
  128. actionOptions: Option[Any] = None,
  129. actionTargets: Seq[String] = Nil
  130. ) {
  131. ctx =>
  132. def setDbName(name: String): MGOContext = ctx.copy(dbName = name)
  133. def setCollName(name: String): MGOContext = ctx.copy(collName = name)
  134. def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)
  135. def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = Some(cmd))
  136. def toSomeProto = MGOProtoConversion.ctxToProto(this)
  137. }
  138. object MGOContext {
  139. def apply(db: String, coll: String) = new MGOContext(db, coll)
  140. def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =
  141. MGOProtoConversion.ctxFromProto(proto)
  142. }
  143. case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {
  144. ctxs =>
  145. def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)
  146. def appendContext(ctx: MGOContext): MGOBatContext =
  147. ctxs.copy(contexts = contexts :+ ctx)
  148. }
  149. object MGOBatContext {
  150. def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)
  151. }
  152. type MGODate = java.util.Date
  153. def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
  154. val ca = Calendar.getInstance()
  155. ca.set(yyyy,mm,dd)
  156. ca.getTime()
  157. }
  158. def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
  159. val ca = Calendar.getInstance()
  160. ca.set(yyyy,mm,dd,hr,min,sec)
  161. ca.getTime()
  162. }
  163. def mgoDateTimeNow: MGODate = {
  164. val ca = Calendar.getInstance()
  165. ca.getTime
  166. }
  167. def mgoDateToString(dt: MGODate, formatString: String): String = {
  168. val fmt= new SimpleDateFormat(formatString)
  169. fmt.format(dt)
  170. }
  171. type MGOBlob = BsonBinary
  172. type MGOArray = BsonArray
  173. def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
  174. implicit mat: Materializer) = FileToByteArray(fileName,timeOut)
  175. def mgoBlobToFile(blob: MGOBlob, fileName: String)(
  176. implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName)
  177. def mgoGetStringOrNone(doc: Document, fieldName: String) = {
  178. if (doc.keySet.contains(fieldName))
  179. Some(doc.getString(fieldName))
  180. else None
  181. }
  182. def mgoGetIntOrNone(doc: Document, fieldName: String) = {
  183. if (doc.keySet.contains(fieldName))
  184. Some(doc.getInteger(fieldName))
  185. else None
  186. }
  187. def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
  188. if (doc.keySet.contains(fieldName))
  189. Some(doc.getLong(fieldName))
  190. else None
  191. }
  192. def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
  193. if (doc.keySet.contains(fieldName))
  194. Some(doc.getDouble(fieldName))
  195. else None
  196. }
  197. def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
  198. if (doc.keySet.contains(fieldName))
  199. Some(doc.getBoolean(fieldName))
  200. else None
  201. }
  202. def mgoGetDateOrNone(doc: Document, fieldName: String) = {
  203. if (doc.keySet.contains(fieldName))
  204. Some(doc.getDate(fieldName))
  205. else None
  206. }
  207. def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
  208. if (doc.keySet.contains(fieldName))
  209. doc.get(fieldName).asInstanceOf[Option[MGOBlob]]
  210. else None
  211. }
  212. def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
  213. if (doc.keySet.contains(fieldName))
  214. doc.get(fieldName).asInstanceOf[Option[MGOArray]]
  215. else None
  216. }
  217. def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {
  218. (arr.getValues.asScala.toList)
  219. .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]
  220. }
  221. type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
  222. }
  223. object MGOEngine extends LogSupport {
  224. import MGOClasses._
  225. import MGOAdmins._
  226. import MGOCommands._
  227. import sdp.result.DBOResult._
  228. object TxUpdateMode {
  229. private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(
  230. implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {
  231. log.info(s"mgoTxUpdate> calling ...")
  232. observable.map(clientSession => {
  233. val transactionOptions =
  234. TransactionOptions.builder()
  235. .readConcern(ReadConcern.SNAPSHOT)
  236. .writeConcern(WriteConcern.MAJORITY).build()
  237. clientSession.startTransaction(transactionOptions)
  238. /*
  239. val fut = Future.traverse(ctxs.contexts) { ctx =>
  240. mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
  241. }
  242. Await.ready(fut, 3 seconds) */
  243. ctxs.contexts.foreach { ctx =>
  244. mgoUpdateObservable[Completed](ctx).map(identity).toFuture()
  245. }
  246. clientSession
  247. })
  248. }
  249. private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
  250. log.info(s"commitAndRetry> calling ...")
  251. observable.recoverWith({
  252. case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
  253. log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")
  254. commitAndRetry(observable)
  255. }
  256. case e: Exception => {
  257. log.error(s"commitAndRetry> Exception during commit ...: $e")
  258. throw e
  259. }
  260. })
  261. }
  262. private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
  263. log.info(s"runTransactionAndRetry> calling ...")
  264. observable.recoverWith({
  265. case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
  266. log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")
  267. runTransactionAndRetry(observable)
  268. }
  269. })
  270. }
  271. def mgoTxBatch(ctxs: MGOBatContext)(
  272. implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
  273. log.info(s"mgoTxBatch> MGOBatContext: ${ctxs}")
  274. val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())
  275. val commitTransactionObservable: SingleObservable[Completed] =
  276. updateObservable.flatMap(clientSession => clientSession.commitTransaction())
  277. val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
  278. runTransactionAndRetry(commitAndRetryObservable)
  279. valueToDBOResult(Completed())
  280. }
  281. }
  282. def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {
  283. log.info(s"mgoUpdateBatch> MGOBatContext: ${ctxs}")
  284. if (ctxs.tx) {
  285. TxUpdateMode.mgoTxBatch(ctxs)
  286. } else {
  287. /*
  288. val fut = Future.traverse(ctxs.contexts) { ctx =>
  289. mgoUpdate[Completed](ctx).map(identity) }
  290. Await.ready(fut, 3 seconds)
  291. Future.successful(new Completed) */
  292. ctxs.contexts.foreach { ctx =>
  293. mgoUpdate[Completed](ctx).map(identity) }
  294. valueToDBOResult(Completed())
  295. }
  296. }
  297. def mongoStream(ctx: MGOContext)(
  298. implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
  299. log.info(s"mongoStream> MGOContext: ${ctx}")
  300. def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
  301. rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
  302. val db = client.getDatabase(ctx.dbName)
  303. val coll = db.getCollection(ctx.collName)
  304. if ( ctx.action == None) {
  305. log.error(s"mongoStream> uery action cannot be null!")
  306. throw new IllegalArgumentException("query action cannot be null!")
  307. }
  308. try {
  309. ctx.action.get match {
  310. case Find(None, Nil, false) => //FindObservable
  311. MongoSource(coll.find())
  312. case Find(None, Nil, true) => //FindObservable
  313. MongoSource(coll.find().first())
  314. case Find(Some(filter), Nil, false) => //FindObservable
  315. MongoSource(coll.find(filter))
  316. case Find(Some(filter), Nil, true) => //FindObservable
  317. MongoSource(coll.find(filter).first())
  318. case Find(None, sro, _) => //FindObservable
  319. val next = toResultOption(sro)
  320. MongoSource(next(coll.find[Document]()))
  321. case Find(Some(filter), sro, _) => //FindObservable
  322. val next = toResultOption(sro)
  323. MongoSource(next(coll.find[Document](filter)))
  324. case _ =>
  325. log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
  326. throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")
  327. }
  328. }
  329. catch { case e: Exception =>
  330. log.error(s"mongoStream> runtime error: ${e.getMessage}")
  331. throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")
  332. }
  333. }
  334. // T => FindIterable e.g List[Document]
  335. def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {
  336. log.info(s"mgoQuery> MGOContext: ${ctx}")
  337. val db = client.getDatabase(ctx.dbName)
  338. val coll = db.getCollection(ctx.collName)
  339. def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>
  340. rts.foldRight(findObj)((a,b) => a.toFindObservable(b))
  341. if ( ctx.action == None) {
  342. log.error(s"mgoQuery> uery action cannot be null!")
  343. Left(new IllegalArgumentException("query action cannot be null!"))
  344. }
  345. try {
  346. ctx.action.get match {
  347. /* count */
  348. case Count(Some(filter), Some(opt)) => //SingleObservable
  349. coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
  350. .toFuture().asInstanceOf[Future[T]]
  351. case Count(Some(filter), None) => //SingleObservable
  352. coll.countDocuments(filter).toFuture()
  353. .asInstanceOf[Future[T]]
  354. case Count(None, None) => //SingleObservable
  355. coll.countDocuments().toFuture()
  356. .asInstanceOf[Future[T]]
  357. /* distinct */
  358. case Distict(field, Some(filter)) => //DistinctObservable
  359. coll.distinct(field, filter).toFuture()
  360. .asInstanceOf[Future[T]]
  361. case Distict(field, None) => //DistinctObservable
  362. coll.distinct((field)).toFuture()
  363. .asInstanceOf[Future[T]]
  364. /* find */
  365. case Find(None, Nil, false) => //FindObservable
  366. if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]
  367. else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]
  368. case Find(None, Nil, true) => //FindObservable
  369. if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]
  370. else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]
  371. case Find(Some(filter), Nil, false) => //FindObservable
  372. if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
  373. else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  374. case Find(Some(filter), Nil, true) => //FindObservable
  375. if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
  376. else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]
  377. case Find(None, sro, _) => //FindObservable
  378. val next = toResultOption(sro)
  379. if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
  380. else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  381. case Find(Some(filter), sro, _) => //FindObservable
  382. val next = toResultOption(sro)
  383. if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
  384. else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]
  385. /* aggregate AggregateObservable*/
  386. case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
  387. /* mapReduce MapReduceObservable*/
  388. case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
  389. /* list collection */
  390. case ListCollection(dbName) => //ListConllectionObservable
  391. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  392. }
  393. }
  394. catch { case e: Exception =>
  395. log.error(s"mgoQuery> runtime error: ${e.getMessage}")
  396. Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))
  397. }
  398. }
  399. //T => Completed, result.UpdateResult, result.DeleteResult
  400. def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =
  401. try {
  402. mgoUpdateObservable[T](ctx).toFuture()
  403. }
  404. catch { case e: Exception =>
  405. log.error(s"mgoUpdate> runtime error: ${e.getMessage}")
  406. Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))
  407. }
  408. def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {
  409. log.info(s"mgoUpdateObservable> MGOContext: ${ctx}")
  410. val db = client.getDatabase(ctx.dbName)
  411. val coll = db.getCollection(ctx.collName)
  412. if ( ctx.action == None) {
  413. log.error(s"mgoUpdateObservable> uery action cannot be null!")
  414. throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")
  415. }
  416. try {
  417. ctx.action.get match {
  418. /* insert */
  419. case Insert(docs, Some(opt)) => //SingleObservable[Completed]
  420. if (docs.size > 1)
  421. coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]
  422. else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]
  423. case Insert(docs, None) => //SingleObservable
  424. if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]
  425. else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]
  426. /* delete */
  427. case Delete(filter, None, onlyOne) => //SingleObservable
  428. if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]
  429. else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]
  430. case Delete(filter, Some(opt), onlyOne) => //SingleObservable
  431. if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
  432. else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]
  433. /* replace */
  434. case Replace(filter, replacement, None) => //SingleObservable
  435. coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]
  436. case Replace(filter, replacement, Some(opt)) => //SingleObservable
  437. coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]
  438. /* update */
  439. case Update(filter, update, None, onlyOne) => //SingleObservable
  440. if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]
  441. else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]
  442. case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
  443. if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
  444. else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]
  445. /* bulkWrite */
  446. case BulkWrite(commands, None) => //SingleObservable
  447. coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]
  448. case BulkWrite(commands, Some(opt)) => //SingleObservable
  449. coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]
  450. }
  451. }
  452. catch { case e: Exception =>
  453. log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")
  454. throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")
  455. }
  456. }
  457. def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {
  458. log.info(s"mgoAdmin> MGOContext: ${ctx}")
  459. val db = client.getDatabase(ctx.dbName)
  460. val coll = db.getCollection(ctx.collName)
  461. if ( ctx.action == None) {
  462. log.error(s"mgoAdmin> uery action cannot be null!")
  463. Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))
  464. }
  465. try {
  466. ctx.action.get match {
  467. /* drop collection */
  468. case DropCollection(collName) => //SingleObservable
  469. val coll = db.getCollection(collName)
  470. coll.drop().toFuture()
  471. /* create collection */
  472. case CreateCollection(collName, None) => //SingleObservable
  473. db.createCollection(collName).toFuture()
  474. case CreateCollection(collName, Some(opt)) => //SingleObservable
  475. db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()
  476. /* list collection
  477. case ListCollection(dbName) => //ListConllectionObservable
  478. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  479. */
  480. /* create view */
  481. case CreateView(viewName, viewOn, pline, None) => //SingleObservable
  482. db.createView(viewName, viewOn, pline).toFuture()
  483. case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
  484. db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()
  485. /* create index */
  486. case CreateIndex(key, None) => //SingleObservable
  487. coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
  488. case CreateIndex(key, Some(opt)) => //SingleObservable
  489. coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]
  490. /* drop index */
  491. case DropIndexByName(indexName, None) => //SingleObservable
  492. coll.dropIndex(indexName).toFuture()
  493. case DropIndexByName(indexName, Some(opt)) => //SingleObservable
  494. coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()
  495. case DropIndexByKey(key, None) => //SingleObservable
  496. coll.dropIndex(key).toFuture()
  497. case DropIndexByKey(key, Some(opt)) => //SingleObservable
  498. coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()
  499. case DropAllIndexes(None) => //SingleObservable
  500. coll.dropIndexes().toFuture()
  501. case DropAllIndexes(Some(opt)) => //SingleObservable
  502. coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()
  503. }
  504. }
  505. catch { case e: Exception =>
  506. log.error(s"mgoAdmin> runtime error: ${e.getMessage}")
  507. throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")
  508. }
  509. }
  510. /*
  511. def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
  512. val db = client.getDatabase(ctx.dbName)
  513. val coll = db.getCollection(ctx.collName)
  514. ctx.action match {
  515. /* count */
  516. case Count(Some(filter), Some(opt)) => //SingleObservable
  517. coll.countDocuments(filter, opt.asInstanceOf[CountOptions])
  518. .toFuture().asInstanceOf[Future[T]]
  519. case Count(Some(filter), None) => //SingleObservable
  520. coll.countDocuments(filter).toFuture()
  521. .asInstanceOf[Future[T]]
  522. case Count(None, None) => //SingleObservable
  523. coll.countDocuments().toFuture()
  524. .asInstanceOf[Future[T]]
  525. /* distinct */
  526. case Distict(field, Some(filter)) => //DistinctObservable
  527. coll.distinct(field, filter).toFuture()
  528. .asInstanceOf[Future[T]]
  529. case Distict(field, None) => //DistinctObservable
  530. coll.distinct((field)).toFuture()
  531. .asInstanceOf[Future[T]]
  532. /* find */
  533. case Find(None, None, optConv, false) => //FindObservable
  534. if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
  535. else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
  536. case Find(None, None, optConv, true) => //FindObservable
  537. if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
  538. else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
  539. case Find(Some(filter), None, optConv, false) => //FindObservable
  540. if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
  541. else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  542. case Find(Some(filter), None, optConv, true) => //FindObservable
  543. if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
  544. else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
  545. case Find(None, Some(next), optConv, _) => //FindObservable
  546. if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
  547. else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  548. case Find(Some(filter), Some(next), optConv, _) => //FindObservable
  549. if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
  550. else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
  551. /* aggregate AggregateObservable*/
  552. case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
  553. /* mapReduce MapReduceObservable*/
  554. case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
  555. /* insert */
  556. case Insert(docs, Some(opt)) => //SingleObservable[Completed]
  557. if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
  558. .asInstanceOf[Future[T]]
  559. else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
  560. .asInstanceOf[Future[T]]
  561. case Insert(docs, None) => //SingleObservable
  562. if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
  563. else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
  564. /* delete */
  565. case Delete(filter, None, onlyOne) => //SingleObservable
  566. if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
  567. else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
  568. case Delete(filter, Some(opt), onlyOne) => //SingleObservable
  569. if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
  570. else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
  571. /* replace */
  572. case Replace(filter, replacement, None) => //SingleObservable
  573. coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
  574. case Replace(filter, replacement, Some(opt)) => //SingleObservable
  575. coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  576. /* update */
  577. case Update(filter, update, None, onlyOne) => //SingleObservable
  578. if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
  579. else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
  580. case Update(filter, update, Some(opt), onlyOne) => //SingleObservable
  581. if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  582. else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
  583. /* bulkWrite */
  584. case BulkWrite(commands, None) => //SingleObservable
  585. coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]]
  586. case BulkWrite(commands, Some(opt)) => //SingleObservable
  587. coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]
  588. /* drop collection */
  589. case DropCollection(collName) => //SingleObservable
  590. val coll = db.getCollection(collName)
  591. coll.drop().toFuture().asInstanceOf[Future[T]]
  592. /* create collection */
  593. case CreateCollection(collName, None) => //SingleObservable
  594. db.createCollection(collName).toFuture().asInstanceOf[Future[T]]
  595. case CreateCollection(collName, Some(opt)) => //SingleObservable
  596. db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
  597. /* list collection */
  598. case ListCollection(dbName) => //ListConllectionObservable
  599. client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]
  600. /* create view */
  601. case CreateView(viewName, viewOn, pline, None) => //SingleObservable
  602. db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
  603. case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable
  604. db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
  605. /* create index */
  606. case CreateIndex(key, None) => //SingleObservable
  607. coll.createIndex(key).toFuture().asInstanceOf[Future[T]]
  608. case CreateIndex(key, Some(opt)) => //SingleObservable
  609. coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
  610. /* drop index */
  611. case DropIndexByName(indexName, None) => //SingleObservable
  612. coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]]
  613. case DropIndexByName(indexName, Some(opt)) => //SingleObservable
  614. coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  615. case DropIndexByKey(key, None) => //SingleObservable
  616. coll.dropIndex(key).toFuture().asInstanceOf[Future[T]]
  617. case DropIndexByKey(key, Some(opt)) => //SingleObservable
  618. coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  619. case DropAllIndexes(None) => //SingleObservable
  620. coll.dropIndexes().toFuture().asInstanceOf[Future[T]]
  621. case DropAllIndexes(Some(opt)) => //SingleObservable
  622. coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
  623. }
  624. }
  625. */
  626. }
  627. object MongoActionStream {
  628. import MGOClasses._
  629. case class StreamingInsert[A](dbName: String,
  630. collName: String,
  631. converter: A => Document,
  632. parallelism: Int = 1
  633. ) extends MGOCommands
  634. case class StreamingDelete[A](dbName: String,
  635. collName: String,
  636. toFilter: A => Bson,
  637. parallelism: Int = 1,
  638. justOne: Boolean = false
  639. ) extends MGOCommands
  640. case class StreamingUpdate[A](dbName: String,
  641. collName: String,
  642. toFilter: A => Bson,
  643. toUpdate: A => Bson,
  644. parallelism: Int = 1,
  645. justOne: Boolean = false
  646. ) extends MGOCommands
  647. case class InsertAction[A](ctx: StreamingInsert[A])(
  648. implicit mongoClient: MongoClient) {
  649. val database = mongoClient.getDatabase(ctx.dbName)
  650. val collection = database.getCollection(ctx.collName)
  651. def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
  652. Flow[A].map(ctx.converter)
  653. .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))
  654. }
  655. case class UpdateAction[A](ctx: StreamingUpdate[A])(
  656. implicit mongoClient: MongoClient) {
  657. val database = mongoClient.getDatabase(ctx.dbName)
  658. val collection = database.getCollection(ctx.collName)
  659. def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
  660. if (ctx.justOne) {
  661. Flow[A]
  662. .mapAsync(ctx.parallelism)(a =>
  663. collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
  664. } else
  665. Flow[A]
  666. .mapAsync(ctx.parallelism)(a =>
  667. collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
  668. }
  669. case class DeleteAction[A](ctx: StreamingDelete[A])(
  670. implicit mongoClient: MongoClient) {
  671. val database = mongoClient.getDatabase(ctx.dbName)
  672. val collection = database.getCollection(ctx.collName)
  673. def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
  674. if (ctx.justOne) {
  675. Flow[A]
  676. .mapAsync(ctx.parallelism)(a =>
  677. collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
  678. } else
  679. Flow[A]
  680. .mapAsync(ctx.parallelism)(a =>
  681. collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))
  682. }
  683. }
  684. object MGOHelpers {
  685. implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
  686. override val converter: (Document) => String = (doc) => doc.toJson
  687. }
  688. implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
  689. override val converter: (C) => String = (doc) => doc.toString
  690. }
  691. trait ImplicitObservable[C] {
  692. val observable: Observable[C]
  693. val converter: (C) => String
  694. def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
  695. def headResult() = Await.result(observable.head(), 10 seconds)
  696. def printResults(initial: String = ""): Unit = {
  697. if (initial.length > 0) print(initial)
  698. results().foreach(res => println(converter(res)))
  699. }
  700. def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  701. }
  702. def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
  703. Await.result(fut, timeOut)
  704. }
  705. def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {
  706. Await.result(fut, timeOut)
  707. }
  708. import monix.eval.Task
  709. import monix.execution.Scheduler.Implicits.global
  710. final class FutureToTask[A](x: => Future[A]) {
  711. def asTask: Task[A] = Task.deferFuture[A](x)
  712. }
  713. final class TaskToFuture[A](x: => Task[A]) {
  714. def asFuture: Future[A] = x.runAsync
  715. }
  716. }

PetSound.scala

  1. package petsound
  2. import akka.actor._
  3. import akka.cluster.client._
  4. import com.typesafe.config.ConfigFactory
  5. import akka.cluster.pubsub.DistributedPubSubMediator._
  6. import akka.cluster.pubsub._
  7. object Cat {
  8. def props = Props[Cat]
  9. def create(port: Int): ActorSystem = {
  10. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  11. .withFallback(ConfigFactory.load())
  12. val system = ActorSystem("ClusterSystem",config)
  13. val catSound = system.actorOf(props,"CatSound")
  14. ClusterClientReceptionist(system).registerService(catSound)
  15. val receptionist = ClusterClientReceptionist(system).underlying
  16. system.actorOf(Props(classOf[ReceptionistListener],receptionist),"cat-event-listner")
  17. system
  18. }
  19. }
  20. object Dog {
  21. def props = Props(new Dog)
  22. def create(port: Int): ActorSystem = {
  23. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  24. .withFallback(ConfigFactory.load())
  25. val system = ActorSystem("ClusterSystem",config)
  26. val dogSound = system.actorOf(props,"DogSound")
  27. ClusterClientReceptionist(system).registerService(dogSound)
  28. val receptionist = ClusterClientReceptionist(system).underlying
  29. system.actorOf(Props(classOf[ReceptionistListener],receptionist),"dog-event-listner")
  30. system
  31. }
  32. }
  33. class Cat extends Actor with ActorLogging {
  34. //使用pub/sub方式设置
  35. val mediator = DistributedPubSub(context.system).mediator
  36. override def preStart() = {
  37. mediator ! Subscribe("Shout", self)
  38. super.preStart()
  39. }
  40. override def receive: Receive = {
  41. case "Shout" =>
  42. log.info("*******I am a cat, MIAOM ...******")
  43. }
  44. }
  45. class Dog extends Actor with ActorLogging {
  46. //使用pub/sub方式设置
  47. val mediator = DistributedPubSub(context.system).mediator
  48. override def preStart() = {
  49. mediator ! Subscribe("Shout", self)
  50. super.preStart()
  51. }
  52. override def receive: Receive = {
  53. case "Shout" =>
  54. log.info("*****I am a dog, WANG WANG...*****")
  55. }
  56. }

EventListener.scala

  1. package petsound
  2. import akka.actor._
  3. import akka.cluster.client._
  4. class ClientListener(clusterClient: ActorRef) extends Actor with ActorLogging {
  5. override def preStart(): Unit = {
  6. clusterClient ! SubscribeContactPoints
  7. super.preStart()
  8. }
  9. override def receive: Receive = {
  10. case ContactPoints(cps) =>
  11. cps.map {ap => log.info(s"*******ContactPoints:${ap.address.toString}******")}
  12. case ContactPointAdded(cp) =>
  13. log.info(s"*******ContactPointAdded: ${cp.address.toString}*******")
  14. case ContactPointRemoved(cp) =>
  15. log.info(s"*******ContactPointRemoved: ${cp.address.toString}*******")
  16. }
  17. }
  18. class ReceptionistListener(receptionist: ActorRef) extends Actor with ActorLogging {
  19. override def preStart(): Unit = {
  20. receptionist ! SubscribeClusterClients
  21. super.preStart()
  22. }
  23. override def receive: Receive = {
  24. case ClusterClients(cs) =>
  25. cs.map{aref => println(s"*******ClusterClients: ${aref.path.address.toString}*******")}
  26. case ClusterClientUp(cc) =>
  27. log.info(s"*******ClusterClientUp: ${cc.path.address.toString}*******")
  28. case ClusterClientUnreachable(cc) =>
  29. log.info(s"*******ClusterClientUnreachable: ${cc.path.address.toString}*******")
  30. }
  31. }

MongoAdder.scala

  1. package petsound
  2. import akka.actor._
  3. import com.typesafe.config._
  4. import akka.actor.ActorSystem
  5. import org.mongodb.scala._
  6. import sdp.grpc.services.ProtoMGOContext
  7. import sdp.mongo.engine.MGOClasses._
  8. import sdp.mongo.engine.MGOEngine._
  9. import sdp.result.DBOResult._
  10. import akka.cluster.client._
  11. import scala.collection.JavaConverters._
  12. import scala.util._
  13. class MongoAdder extends Actor with ActorLogging {
  14. import monix.execution.Scheduler.Implicits.global
  15. implicit val mgosys = context.system
  16. implicit val ec = mgosys.dispatcher
  17. val clientSettings: MongoClientSettings = MongoClientSettings.builder()
  18. .applyToClusterSettings {b =>
  19. b.hosts(List(new ServerAddress("localhost:27017")).asJava)
  20. }.build()
  21. implicit val client: MongoClient = MongoClient(clientSettings)
  22. val ctx = MGOContext("testdb","friends")
  23. override def receive: Receive = {
  24. case someProto @ Some(proto:ProtoMGOContext) =>
  25. val ctx = MGOContext.fromProto(proto)
  26. log.info(s"****** received MGOContext: $someProto *********")
  27. val task = mgoUpdate[Completed](ctx).toTask
  28. task.runOnComplete {
  29. case Success(s) => println("operations completed successfully.")
  30. case Failure(exception) => println(s"error: ${exception.getMessage}")
  31. }
  32. }
  33. }
  34. object MongoAdder {
  35. def create(port: Int): ActorSystem = {
  36. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  37. .withFallback(ConfigFactory.load())
  38. val system = ActorSystem("ClusterSystem", config)
  39. val mongoAdder = system.actorOf(Props[MongoAdder],"MongoAdder")
  40. ClusterClientReceptionist(system).registerService(mongoAdder)
  41. val receptionist = ClusterClientReceptionist(system).underlying
  42. system.actorOf(Props(classOf[ReceptionistListener],receptionist),"mongo-event-listner")
  43. system
  44. }
  45. }

PetHouse.scala

  1. package petsound
  2. import akka.actor._
  3. import akka.japi.Util.immutableSeq
  4. import akka.actor.AddressFromURIString
  5. import com.typesafe.config.ConfigFactory
  6. import akka.cluster.client._
  7. import akka.cluster.client.ClusterClient._
  8. object PetHouse extends App {
  9. val sysCat = Cat.create(2551)
  10. val sysDog = Dog.create(2552)
  11. val mongo = MongoAdder.create(2555)
  12. scala.io.StdIn.readLine()
  13. sysCat.terminate()
  14. sysDog.terminate()
  15. mongo.terminate()
  16. }
  17. object PetClient extends App {
  18. val conf = ConfigFactory.load("client")
  19. val clientSystem = ActorSystem("ClientSystem",conf)
  20. /* 从 conf 文件里读取 contact-points 地址
  21. val initialContacts = immutableSeq(conf.getStringList("contact-points")).map {
  22. case AddressFromURIString(addr) ? RootActorPath(addr) / "system" / "receptionist"
  23. }.toSet
  24. */
  25.  
  26. //先放一个contact-point, 系统会自动增加其它的点
  27. val initialContacts = Set(
  28. ActorPaths.fromString("akka.tcp://ClusterSystem@127.0.0.1:2551/system/receptionist")
  29. )
  30. val clusterClient = clientSystem.actorOf(
  31. ClusterClient.props(
  32. ClusterClientSettings(clientSystem)
  33. .withInitialContacts(initialContacts)), "petClient")
  34. clientSystem.actorOf(Props(classOf[ClientListener],clusterClient),"client-event-listner")
  35. clusterClient ! Send("/user/CatSound","Shout",localAffinity = true)
  36. clusterClient ! Send("/user/DogSound","Shout",localAffinity = true)
  37. println(s"sent shout messages ...")
  38. scala.io.StdIn.readLine()
  39. clusterClient ! Publish("Shout","Shout")
  40. println(s"publish shout messages ...")
  41. //MongoDB 操作示范
  42. import org.mongodb.scala._
  43. import sdp.mongo.engine.MGOClasses._
  44. val ctx = MGOContext("testdb","friends")
  45. val chen = Document("" -> "", "" -> "大文","age" -> 28)
  46. val zhang = Document("" -> "", "" -> "小海","age" -> 7)
  47. val lee = Document("" -> "", "" -> "","age" -> 45)
  48. val ouyang = Document("" -> "欧阳", "" -> "","age" -> 120)
  49. val c = ctx.setCommand(MGOCommands.Insert(Seq(chen,zhang,lee,ouyang)))
  50. clusterClient ! Send("/user/MongoAdder",c.toSomeProto,localAffinity = true)
  51. scala.io.StdIn.readLine()
  52. clientSystem.terminate()
  53. }

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号