经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
Akka-Cluster(6)- Cluster-Sharding:集群分片,分布式交互程序核心方式
来源:cnblogs  作者:雪川大虫  时间:2019/1/17 10:47:54  对本文有异议

在前面几篇讨论里我们介绍了在集群环境里的一些编程模式、分布式数据结构及具体实现方式。到目前为止,我们已经实现了把程序任务分配给处于很多服务器上的actor,能够最大程度的利用整体系统的硬件资源。这是因为通过akka-cluster能够把很多服务器组合成一个虚拟的整体系统,编程人员不需要知道负责运算的actor具体在那台服务器上运行。当然,我所指的整体系统是一种分布式的系统,实质底层还是各集群节点作为完整个体独立运行的,所以核心理念还是需要将程序分割成能独立运算的任务,然后分派给可能分布在很多服务器上的actor去运算。在上一篇的cluster-load-balance里我们采用了一种fire-and-forget模式把多项独立任务分配给集群节点上的actor,然后任由它们各自完成运算,中途不做任何交互、控制。这也是一种典型的无内部状态的运算模式。对外界来讲就是开始、完成,中间没有关于运算进展或当前状态的交流需要。但在现实里,很多任务是无法完全进行独立细分的,或者再细分会影响系统效率。比如网上购物网站每个客户的购物车:它记录了客户在网上的所有商品拣选过程,每一个拣选动作都代表更新的购物车状态,直到完成结算。那么在一个可能有几十万用户同时在线购物的网站,保留在内存的购物车状态应该是任何机器都无法容纳的,只有回到传统的数据库模式了,还是要面对无法解决的多并发系统效率问题。这么分析,集群分片技术可能是最好的解决方法了。

简单讲:集群分片技术就是把一堆带唯一标识identifier的actor,即entity分布到集群节点上去。控制程序可以通过唯一ID与entityr进行交互,控制整个运算过程。这样,我们可以把程序分成相对合理的包含多个过程状态的细分任务。这些细分任务是由分布在集群节点上的entity来运算的,产生的状态当然也使用的是各集群节点上的资源,如此解决上面所提到的内存容量问题。akka-cluster提供的actor位置透明化机制能在系统崩溃、增减集群节点时自动重新部署所有的actor以达到负责均衡。而用户通过固定的ID就能联络目标entity,无论它被转移到任何集群节点上。

集群分片由分片管理ShardRegion和分片定位ShardCoordinator共同协作实现,目标是把消息正确传递给指定ID的entity。分片定位负责确定分片所在集群节点,分片管理则对每个集群节点上分片内的entity进行定位。ShardCoordinator是个cluster-singleton,而ShardRegion则必须部署在每个集群节点上。每个分片内的entity必须是一个类型的actor。发给entity的消息内部必须包含分片编号和entity ID。通过从消息中解析位置信息后由ShardCoordinator确定负责传递消息的ShardRegion,相关的ShardRegion按ID把消息发送至目标entity。

每个节点上的ShardRegion是通过下面这个start函数构建的:

  1. /**
  2. * Scala API: Register a named entity type by defining the [[akka.actor.Props]] of the entity actor
  3. * and functions to extract entity and shard identifier from messages. The [[ShardRegion]] actor
  4. * for this type can later be retrieved with the [[#shardRegion]] method.
  5. *
  6. * Some settings can be configured as described in the `akka.cluster.sharding` section
  7. * of the `reference.conf`.
  8. *
  9. * @param typeName the name of the entity type
  10. * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion`
  11. * @param settings configuration settings, see [[ClusterShardingSettings]]
  12. * @param extractEntityId partial function to extract the entity id and the message to send to the
  13. * entity from the incoming message, if the partial function does not match the message will
  14. * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
  15. * @param extractShardId function to determine the shard id for an incoming message, only messages
  16. * that passed the `extractEntityId` will be used
  17. * @param allocationStrategy possibility to use a custom shard allocation and
  18. * rebalancing logic
  19. * @param handOffStopMessage the message that will be sent to entities when they are to be stopped
  20. * for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
  21. * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
  22. */
  23. def start(
  24. typeName: String,
  25. entityProps: Props,
  26. settings: ClusterShardingSettings,
  27. extractEntityId: ShardRegion.ExtractEntityId,
  28. extractShardId: ShardRegion.ExtractShardId,
  29. allocationStrategy: ShardAllocationStrategy,
  30. handOffStopMessage: Any): ActorRef = {...}

这个函数登记了名称为typeName类型entity的分片。函数返回ActorRef,说明ShardRegion是在本节点上的一个actor。下面是调用示范:

  1. ClusterSharding(system).start(
  2. typeName = Counter.shardName,
  3. entityProps = Counter.props(),
  4. settings = ClusterShardingSettings(system),
  5. extractEntityId = Counter.idExtractor,
  6. extractShardId = Counter.shardResolver)
  7. ...
  8. object Counter {
  9. trait Command
  10. case object Increment extends Command
  11. case object Decrement extends Command
  12. case object Get extends Command
  13. case object Stop extends Command
  14. trait Event
  15. case class CounterChanged(delta: Int) extends Event
  16. // Sharding Name
  17. val shardName: String = "Counter"
  18. // outside world if he want to send message to sharding should use this message
  19. case class CounterMessage(id: Long, cmd: Command)
  20. // id extrator
  21. val idExtractor: ShardRegion.ExtractEntityId = {
  22. case CounterMessage(id, msg) => (id.toString, msg)
  23. }
  24. // shard resolver
  25. val shardResolver: ShardRegion.ExtractShardId = {
  26. case CounterMessage(id, msg) => (id % 12).toString
  27. }
  28. def props() = Props[Counter]
  29. }

entityProps是ShardRegion用来重构entity的。typeName是用来查找ShardRegion的,如下:

  1. val counterRegion: ActorRef = ClusterSharding(system).shardRegion("Counter")
  2. counterRegion ! Get(123)

用"Counter"获得ShardRegion的ActorRef后所有本节点的消息都是通过这个ShardRegion actor来定位,转达。所以每个ShardRegion都必须具备消息目的地entity的分片编号及entityID的解析方法:extractShardId和extractEntityId。在有些情况下由于节点角色的关系在某个节点不部署任何entity,但本节点需要向其它节点的entity发送消息,这时需要构建一个中介ProxyOnlyShardRegion:

  1. /**
  2. * Java/Scala API: Register a named entity type `ShardRegion` on this node that will run in proxy only mode,
  3. * i.e. it will delegate messages to other `ShardRegion` actors on other nodes, but not host any
  4. * entity actors itself. The [[ShardRegion]] actor for this type can later be retrieved with the
  5. * [[#shardRegion]] method.
  6. *
  7. * Some settings can be configured as described in the `akka.cluster.sharding` section
  8. * of the `reference.conf`.
  9. *
  10. * @param typeName the name of the entity type
  11. * @param role specifies that this entity type is located on cluster nodes with a specific role.
  12. * If the role is not specified all nodes in the cluster are used.
  13. * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
  14. * entity from the incoming message
  15. * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
  16. */
  17. def startProxy(
  18. typeName: String,
  19. role: Optional[String],
  20. messageExtractor: ShardRegion.MessageExtractor): ActorRef = {...}

还有一个重要问题是如何弃用passivate entity,以释放占用资源。akka-cluster提供的方法是通过定义一个空转时间值idle-timeout,如果空转超出此时间段则可以进行passivate。下面是一段应用示范:两分钟空转就passivate entity

  1. class ABC extends Actor {
  2. ...
  3. // passivate the entity when no activity
  4. context.setReceiveTimeout(2.minutes)
  5. ...
  6. override def receive .....
  7. override def receiveCommand: Receive = {
  8. case Increment ? persist(CounterChanged(+1))(updateState)
  9. case Decrement ? persist(CounterChanged(-1))(updateState)
  10. case Get(_) ? sender() ! count
  11. case ReceiveTimeout ? context.parent ! Passivate(stopMessage = Stop)
  12. case Stop ? context.stop(self)
  13. }
  14. /* 或者
  15. override def unhandled(msg: Any): Unit = msg match {
  16. case ReceiveTimeout => context.parent ! Passivate(stopMessage = PoisonPill)
  17. case _ => super.unhandled(msg)
  18. }
  19. */
  20. }

又或者通过设定配置来实现自动的passivation:

在配置文件中设定:akka.cluster.sharding.passivate-idle-entity-after = 120 s   // off to disable

下面是官网提供的一个说明passivation-stop-message的示范代码:

  1. trait CounterCommand
  2. case object Increment extends CounterCommand
  3. final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
  4. case object Idle extends CounterCommand
  5. case object GoodByeCounter extends CounterCommand
  6. def counter2(shard: ActorRef[ClusterSharding.ShardCommand], entityId: String): Behavior[CounterCommand] = {
  7. Behaviors.setup { ctx ?
  8. def become(value: Int): Behavior[CounterCommand] =
  9. Behaviors.receiveMessage[CounterCommand] {
  10. case Increment ?
  11. become(value + 1)
  12. case GetValue(replyTo) ?
  13. replyTo ! value
  14. Behaviors.same
  15. case Idle ?
  16. // after receive timeout
  17. shard ! ClusterSharding.Passivate(ctx.self)
  18. Behaviors.same
  19. case GoodByeCounter ?
  20. // the stopMessage, used for rebalance and passivate
  21. Behaviors.stopped
  22. }
  23. ctx.setReceiveTimeout(30.seconds, Idle)
  24. become(0)
  25. }
  26. }
  27. sharding.init(Entity(
  28. typeKey = TypeKey,
  29. createBehavior = ctx ? counter2(ctx.shard, ctx.entityId))
  30. .withStopMessage(GoodByeCounter))

实际上是向主管ShardRegion发送Passivation消息,并指定停止方式。

还有必须注意的是如果使用BackoffSupervisor监控entity:必须使用Backoff.OnStop,因为persist异常会直接停掉entity。Backoff.OnStop策略会重构entity(BackoffSupervisedEntity),再启动。那么如果实施passivation时真的需要停止entity呢?我们可以如下操作:

  1. case "stop" =>
  2. context.stop(self)
  3. context.parent ! PoisonPill

context.parent是BackoffSupervisor,需要同时停掉。

下面我们就设计一个例子来示范集群分片应用。为了更贴近现实,在例子使用了event-sourcing,persistentActor等尚未完整介绍的技术和工具。我会在接着的讨论里介绍它们的原理和使用方式。这个例子模仿一个水果店收银业务:有三台pos机,顾客到任何pos机前录入商品、数量,然后结账。这个示范的主要目的是任何时间如果后端服务器出现故障,正在录入过程中的销售单状态都能得到完整恢复。

我们先看看这个pos前端的源代码:

  1. import akka.actor._
  2. import akka.cluster._
  3. import akka.persistence._
  4. import akka.pattern._
  5. import scala.concurrent.duration._
  6. object POSTerminal {
  7. case class Fruit(code: String, name: String, price: Double)
  8. case class Item(fruit: Fruit, qty: Int)
  9. sealed trait Command {
  10. }
  11. case class Checkout(fruit: Fruit, qty: Int) extends Command
  12. case object ShowTotol extends Command
  13. case class PayCash(amount: Double) extends Command
  14. case object Shutdown extends Command
  15. sealed trait Event {}
  16. case class ItemScanned(fruit: Fruit, qty: Int) extends Event
  17. case object Paid extends Event
  18. case class Items(items: List[Item] = Nil) {
  19. def itemAdded(evt: Event): Items = evt match {
  20. case ItemScanned(fruit,qty) =>
  21. copy( Item(fruit,qty) :: items ) //append item
  22. case _ => this //nothing happens
  23. }
  24. def billPaid = copy(Nil) //clear all items
  25. override def toString = items.reverse.toString()
  26. }
  27. def termProps = Props(new POSTerminal())
  28. //backoff suppervisor must use onStop mode
  29. def POSProps: Props = {
  30. val options = Backoff.onStop(
  31. childProps = termProps,
  32. childName = "posterm",
  33. minBackoff = 1 second,
  34. maxBackoff = 5 seconds,
  35. randomFactor = 0.20
  36. )
  37. BackoffSupervisor.props(options)
  38. }
  39. }
  40. class POSTerminal extends PersistentActor with ActorLogging {
  41. import POSTerminal._
  42. val cluster = Cluster(context.system)
  43. // self.path.parent.name is the type name (utf-8 URL-encoded)
  44. // self.path.name is the entry identifier (utf-8 URL-encoded) but entity has a supervisor
  45. override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name
  46. var currentItems = Items()
  47. override def receiveRecover: Receive = {
  48. case evt: Event => currentItems = currentItems.itemAdded(evt)
  49. log.info(s"***** ${persistenceId} recovering events ... ********")
  50. case SnapshotOffer(_,loggedItems: Items) =>
  51. log.info(s"***** ${persistenceId} recovering snapshot ... ********")
  52. currentItems = loggedItems
  53. }
  54. override def receiveCommand: Receive = {
  55. case Checkout(fruit,qty) =>
  56. log.info(s"*********${persistenceId} is scanning item: $fruit, qty: $qty *********")
  57. persist(ItemScanned(fruit,qty))(evt => currentItems = currentItems.itemAdded(evt))
  58. case ShowTotol =>
  59. log.info(s"*********${persistenceId} on ${cluster.selfAddress} has current scanned items: *********")
  60. if (currentItems.items == Nil)
  61. log.info(s"**********${persistenceId} None transaction found! *********")
  62. else
  63. currentItems.items.reverse.foreach (item =>
  64. log.info(s"*********${persistenceId}: ${item.fruit.name} ${item.fruit.price} X ${item.qty} = ${item.fruit.price * item.qty} *********"))
  65. case PayCash(amt) =>
  66. log.info(s"**********${persistenceId} paying $amt to settle ***********")
  67. persist(Paid) { _ =>
  68. currentItems = currentItems.billPaid
  69. saveSnapshot(currentItems) //no recovery
  70. }
  71. //shutdown this node to validate entity relocation and proper state recovery
  72. case Shutdown =>
  73. log.info(s"******** node ${cluster.selfAddress} is leaving cluster ... *******")
  74. cluster.leave(cluster.selfAddress)
  75. }
  76. }

我用下面几项来总结一下:

1、POSTerminal是具体的业务运算前端,包裹在BackoffSupervisor里。能保证这个entity在因异常如持久化失败造成停顿时能进行重试。所以,使用了Backoff.onStop方式。

2、persistenceId=self.path.parent.parent.name+"-"+self.path.parent.name 代表: 店号-机号 如: 1-1021。actor.path.name的产生是由ShardRegion具体操作的,其实就是ExtactShardId-ExtractEntityId。

3、注意这个状态类型Item,它的方法itemAdded(evt): Item 即返回新状态。所以必须谨记用currentItems=itemAdded(evt)这样的语法。

下面是构建和启动ClusterSharding的源代码:

  1. object POSShard {
  2. import POSTerminal._
  3. val shardName = "POSManager"
  4. case class POSCommand(id: Long, cmd: Command) {
  5. def shopId = id.toString.head.toString
  6. def posId = id.toString
  7. }
  8. val getPOSId: ShardRegion.ExtractEntityId = {
  9. case posCommand: POSCommand => (posCommand.posId,posCommand.cmd)
  10. }
  11. val getShopId: ShardRegion.ExtractShardId = {
  12. case posCommand: POSCommand => posCommand.shopId
  13. }
  14. def create(port: Int) = {
  15. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  16. .withFallback(ConfigFactory.load())
  17. val system = ActorSystem("posSystem",config)
  18. ClusterSharding(system).start(
  19. typeName = shardName,
  20. entityProps = POSProps,
  21. settings = ClusterShardingSettings(system),
  22. extractEntityId = getPOSId,
  23. extractShardId = getShopId
  24. )
  25. }
  26. }

用下面的代码来测试:

  1. object POSDemo extends App {
  2. POSShard.create(2551)
  3. Thread.sleep(1000)
  4. POSShard.create(2552)
  5. POSShard.create(2553)
  6. val posref = POSShard.create(2554)
  7. scala.io.StdIn.readLine()
  8. val apple = Fruit("0001","high grade apple",10.5)
  9. val orange = Fruit("0002","sunkist orage",12.0)
  10. val grape = Fruit("0003","xinjiang red grape",15.8)
  11. posref ! POSCommand(1021, Checkout(apple,2))
  12. posref ! POSCommand(1021,Checkout(grape,1))
  13. posref ! POSCommand(1021,ShowTotol)
  14. scala.io.StdIn.readLine()
  15. posref ! POSCommand(1021,Shutdown)
  16. scala.io.StdIn.readLine()
  17. posref ! POSCommand(1021,Checkout(orange,10))
  18. posref ! POSCommand(1021,ShowTotol)
  19. scala.io.StdIn.readLine()
  20. posref ! POSCommand(1028,Checkout(orange,10))
  21. posref ! POSCommand(1028,ShowTotol)
  22. scala.io.StdIn.readLine()
  23. }

运算结果如下:

  1. [akka.tcp://posSystem@127.0.0.1:2551*********1-1021 is scanning item: Fruit(0001,high grade apple,10.5), qty: 2 *********
  2. [akka.tcp://posSystem@127.0.0.1:2551*********1-1021 is scanning item: Fruit(0003,xinjiang red grape,15.8), qty: 1 *********
  3. [akka.tcp://posSystem@127.0.0.1:2551*********1-1021 on akka.tcp://posSystem@127.0.0.1:2551 has current scanned items: *********
  4. [akka.tcp://posSystem@127.0.0.1:2551*********1-1021: high grade apple 10.5 X 2 = 21.0 *********
  5. [akka.tcp://posSystem@127.0.0.1:2551*********1-1021: xinjiang red grape 15.8 X 1 = 15.8 *********
  6. [akka.tcp://posSystem@127.0.0.1:2551******** node akka.tcp://posSystem@127.0.0.1:2551 is leaving cluster ... *******
  7. [akka.tcp://posSystem@127.0.0.1:2551/system/remoting-terminator] Remoting shut down.
  8. [akka.tcp://posSystem@127.0.0.1:2552***** 1-1021 recovering events ... ********
  9. [akka.tcp://posSystem@127.0.0.1:2552***** 1-1021 recovering events ... ********
  10. [akka.tcp://posSystem@127.0.0.1:2552********1-1021 is scanning item: Fruit(0002,sunkist orage,12.0), qty: 10 *********
  11. [akka.tcp://posSystem@127.0.0.1:2552*********1-1021 on akka.tcp://posSystem@127.0.0.1:2552 has current scanned items: *********
  12. [akka.tcp://posSystem@127.0.0.1:2552*********1-1021: high grade apple 10.5 X 2 = 21.0 *********
  13. [akka.tcp://posSystem@127.0.0.1:2552*********1-1021: xinjiang red grape 15.8 X 1 = 15.8 *********
  14. [akka.tcp://posSystem@127.0.0.1:2552*********1-1021: sunkist orage 12.0 X 10 = 120.0 *********

从结果显示看到:一开始1-1021是在2551节点上运行的。我们用Shutdown关停2551后ClusterSharding立即在2552上重构了1-1021并且恢复了之前的状态。能够在系统出现故障无法使用的情况下自动对运行中的actor进行迁移、状态恢复,正是我们这次讨论的核心内容。

下面是本次示范的源代码:

build.sbt

  1. name := "akka-cluster-sharding"
  2. version := "0.2"
  3. scalaVersion := "2.12.8"
  4. libraryDependencies := Seq(
  5. "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
  6. "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
  7. "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.92",
  8. "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.92" % Test
  9. )

resources/application.conf

  1. akka.actor.warn-about-java-serializer-usage = off
  2. akka.log-dead-letters-during-shutdown = off
  3. akka.log-dead-letters = off
  4. akka {
  5. loglevel = INFO
  6. actor {
  7. provider = "cluster"
  8. }
  9. remote {
  10. log-remote-lifecycle-events = off
  11. netty.tcp {
  12. hostname = "127.0.0.1"
  13. port = 0
  14. }
  15. }
  16. cluster {
  17. seed-nodes = [
  18. "akka.tcp://posSystem@127.0.0.1:2551"]
  19. log-info = off
  20. }
  21. persistence {
  22. journal.plugin = "cassandra-journal"
  23. snapshot-store.plugin = "cassandra-snapshot-store"
  24. }
  25. }

Entities.scala

  1. import akka.actor._
  2. import akka.cluster._
  3. import akka.persistence._
  4. import akka.pattern._
  5. import scala.concurrent.duration._
  6. object POSTerminal {
  7. case class Fruit(code: String, name: String, price: Double)
  8. case class Item(fruit: Fruit, qty: Int)
  9. sealed trait Command {
  10. }
  11. case class Checkout(fruit: Fruit, qty: Int) extends Command
  12. case object ShowTotol extends Command
  13. case class PayCash(amount: Double) extends Command
  14. case object Shutdown extends Command
  15. sealed trait Event {}
  16. case class ItemScanned(fruit: Fruit, qty: Int) extends Event
  17. case object Paid extends Event
  18. case class Items(items: List[Item] = Nil) {
  19. def itemAdded(evt: Event): Items = evt match {
  20. case ItemScanned(fruit,qty) =>
  21. copy( Item(fruit,qty) :: items ) //append item
  22.  
  23. case _ => this //nothing happens
  24. }
  25. def billPaid = copy(Nil) //clear all items
  26. override def toString = items.reverse.toString()
  27. }
  28. def termProps = Props(new POSTerminal())
  29. //backoff suppervisor must use onStop mode
  30. def POSProps: Props = {
  31. val options = Backoff.onStop(
  32. childProps = termProps,
  33. childName = "posterm",
  34. minBackoff = 1 second,
  35. maxBackoff = 5 seconds,
  36. randomFactor = 0.20
  37. )
  38. BackoffSupervisor.props(options)
  39. }
  40. }
  41. class POSTerminal extends PersistentActor with ActorLogging {
  42. import POSTerminal._
  43. val cluster = Cluster(context.system)
  44. // self.path.parent.name is the type name (utf-8 URL-encoded)
  45. // self.path.name is the entry identifier (utf-8 URL-encoded) but entity has a supervisor
  46. override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name
  47. var currentItems = Items()
  48. override def receiveRecover: Receive = {
  49. case evt: Event => currentItems = currentItems.itemAdded(evt)
  50. log.info(s"***** ${persistenceId} recovering events ... ********")
  51. case SnapshotOffer(_,loggedItems: Items) =>
  52. log.info(s"***** ${persistenceId} recovering snapshot ... ********")
  53. currentItems = loggedItems
  54. }
  55. override def receiveCommand: Receive = {
  56. case Checkout(fruit,qty) =>
  57. log.info(s"*********${persistenceId} is scanning item: $fruit, qty: $qty *********")
  58. persist(ItemScanned(fruit,qty))(evt => currentItems = currentItems.itemAdded(evt))
  59. case ShowTotol =>
  60. log.info(s"*********${persistenceId} on ${cluster.selfAddress} has current scanned items: *********")
  61. if (currentItems.items == Nil)
  62. log.info(s"**********${persistenceId} None transaction found! *********")
  63. else
  64. currentItems.items.reverse.foreach (item =>
  65. log.info(s"*********${persistenceId}: ${item.fruit.name} ${item.fruit.price} X ${item.qty} = ${item.fruit.price * item.qty} *********"))
  66. case PayCash(amt) =>
  67. log.info(s"**********${persistenceId} paying $amt to settle ***********")
  68. persist(Paid) { _ =>
  69. currentItems = currentItems.billPaid
  70. saveSnapshot(currentItems) //no recovery
  71. }
  72. //shutdown this node to validate entity relocation and proper state recovery
  73. case Shutdown =>
  74. log.info(s"******** node ${cluster.selfAddress} is leaving cluster ... *******")
  75. cluster.leave(cluster.selfAddress)
  76. }
  77. }

Shards.scala

  1. import akka.actor._
  2. import akka.cluster.sharding._
  3. import com.typesafe.config.ConfigFactory
  4. object POSShard {
  5. import POSTerminal._
  6. val shardName = "POSManager"
  7. case class POSCommand(id: Long, cmd: Command) {
  8. def shopId = id.toString.head.toString
  9. def posId = id.toString
  10. }
  11. val getPOSId: ShardRegion.ExtractEntityId = {
  12. case posCommand: POSCommand => (posCommand.posId,posCommand.cmd)
  13. }
  14. val getShopId: ShardRegion.ExtractShardId = {
  15. case posCommand: POSCommand => posCommand.shopId
  16. }
  17. def create(port: Int) = {
  18. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
  19. .withFallback(ConfigFactory.load())
  20. val system = ActorSystem("posSystem",config)
  21. ClusterSharding(system).start(
  22. typeName = shardName,
  23. entityProps = POSProps,
  24. settings = ClusterShardingSettings(system),
  25. extractEntityId = getPOSId,
  26. extractShardId = getShopId
  27. )
  28. }
  29. }

POSDemo.scala

  1. import POSTerminal._
  2. import POSShard._
  3. object POSDemo extends App {
  4. POSShard.create(2551)
  5. Thread.sleep(1000)
  6. POSShard.create(2552)
  7. POSShard.create(2553)
  8. val posref = POSShard.create(2554)
  9. scala.io.StdIn.readLine()
  10. val apple = Fruit("0001","high grade apple",10.5)
  11. val orange = Fruit("0002","sunkist orage",12.0)
  12. val grape = Fruit("0003","xinjiang red grape",15.8)
  13. posref ! POSCommand(1021, Checkout(apple,2))
  14. posref ! POSCommand(1021,Checkout(grape,1))
  15. posref ! POSCommand(1021,ShowTotol)
  16. scala.io.StdIn.readLine()
  17. posref ! POSCommand(1021,Shutdown)
  18. scala.io.StdIn.readLine()
  19. posref ! POSCommand(1021,Checkout(orange,10))
  20. posref ! POSCommand(1021,ShowTotol)
  21. scala.io.StdIn.readLine()
  22. posref ! POSCommand(1028,Checkout(orange,10))
  23. posref ! POSCommand(1028,ShowTotol)
  24. scala.io.StdIn.readLine()
  25. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/10280399.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号