上篇我介绍了CQRS模式存写部分的具体实现和akka-persistence一些函数和消息的用法。在这篇本来是准备直接用一个具体的例子来示范CQRS模式编程,主要是写端,或者是数据采集端。想着模拟收银机的后端操作,可以使用集群分片(cluster-sharding),每个分片shard代表一部POS机控制系统。在写这段程序之前首先把示例功能实现、cluster-sharding, persistence-actor,actor-passivation, backoff-supervisor, ClusterSharding.start和ClusterSharding.startProxy等技术细节搞清楚:
1、构建几个测试销售的产品信息
2、设计一套简单但功能完整的操作指令command
3、设计运算状态,即一单未结算销售单据的状态。相关的指令-事件command-event转换和状态更新机制
4、单据状态初始化
5、业务逻辑部分,从接到各项指令、指令-事件转换、处理副作用、存写事件、更新单据状态
6、结束单据处理
以一单支付金额大于等于应付金额作为整单结束状态。此时应进行下面的处理:
1)增加单号 2)清除所有交易项目 3)saveSnapshot (重启就不用恢复前面的事件persistent-events)
7、资源释放策略及处理 passivation
如果一个shard-entity暂不使用时最好先停掉stop以释放它占用的资源。但用常规的方式停止entity会造成mailbox里未处理的消息丢失,所以cluster-sharding有一套特别的机制ClusterSharding.Passivate(actorRef)来实现shard-entity的安全停用,即:目标entity向ShardRegion发送Passivate(stopMessage)消息、ShardRegion向目标entity发送包嵌在消息里的stopMessage。目标entity在收到消息后可以自行停止。ShardRegion会保留收到Passivate消息到目标entity停止之间收到的消息,还给再启动的entity。在本例子里passivation的应用场景如下:每单支付后如果一段时间没有收到新的开单指令,这个shard-entity可以通过向ShardRegion发送Passivate消息或按空转时间段设定自动passivate自己,这时ShardRegion在entity空转超出时间后自动发送ClusterSharding.start(...)里定义的handOffStopMessage(PoisonPill),如下:
- def passivate(entity: ActorRef, stopMessage: Any): Unit = {
- idByRef.get(entity) match {
- case Some(id) ⇒ if (!messageBuffers.contains(id)) {
- passivating = passivating + entity
- messageBuffers.add(id)
- entity ! stopMessage
- } else {
- log.debug("Passivation already in progress for {}. Not sending stopMessage back to entity.", entity)
- }
- case None ⇒ log.debug("Unknown entity {}. Not sending stopMessage back to entity.", entity)
- }
- }
- def passivateIdleEntities(): Unit = {
- val deadline = System.nanoTime() - settings.passivateIdleEntityAfter.toNanos
- val refsToPassivate = lastMessageTimestamp.collect {
- case (entityId, lastMessageTimestamp) if lastMessageTimestamp < deadline ⇒ refById(entityId)
- }
- if (refsToPassivate.nonEmpty) {
- log.debug("Passivating [{}] idle entities", refsToPassivate.size)
- refsToPassivate.foreach(passivate(_, handOffStopMessage))
- }
- }
启动passivation的时间长度可以通过配置文件或者直接在代码里设置:在配置文件中设置 akka.cluster.sharding.passivate-idle-entity-after = 2m,代表两分钟内没有接收从ShardRegion发来的POS指令即启动passivation(经entity自身actor或actorRef收发的消息不算)。可以设置off关闭自动passivation。其它设置值参考如下:
- ns, nano, nanos, nanosecond, nanoseconds
- us, micro, micros, microsecond, microseconds
- ms, milli, millis, millisecond, milliseconds
- s, second, seconds
- m, minute, minutes
- h, hour, hours
- d, day, days
也可以直接在代码里设定ClusterShardingSettings.passivateIdleEntityAfter=2 minutes。不过我们还是选择配置文件方式,比较灵活。下面是一个包括了passivation, backoffSupervisor的示范代码:
- import akka.cluster.sharding.ShardRegion.Passivate
- import scala.concurrent.duration._
- object SupervisionSpec {
- val config =
- ConfigFactory.parseString(
- """
- akka.actor.provider = "cluster"
- akka.loglevel = INFO
- """)
- case class Msg(id: Long, msg: Any)
- case class Response(self: ActorRef)
- case object StopMessage
- val idExtractor: ShardRegion.ExtractEntityId = {
- case Msg(id, msg) ⇒ (id.toString, msg)
- }
- val shardResolver: ShardRegion.ExtractShardId = {
- case Msg(id, msg) ⇒ (id % 2).toString
- }
- class PassivatingActor extends Actor with ActorLogging {
- override def preStart(): Unit = {
- log.info("Starting")
- }
- override def postStop(): Unit = {
- log.info("Stopping")
- }
- override def receive: Receive = {
- case "passivate" ⇒
- log.info("Passivating")
- context.parent ! Passivate(StopMessage)
- // simulate another message causing a stop before the region sends the stop message
- // e.g. a persistent actor having a persist failure while processing the next message
- context.stop(self)
- case "hello" ⇒
- sender() ! Response(self)
- case StopMessage ⇒
- log.info("Received stop from region")
- context.parent ! PoisonPill
- }
- }
- }
- class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSender {
- import SupervisionSpec._
- "Supervision for a sharded actor" must {
- "allow passivation" in {
- val supervisedProps = BackoffSupervisor.props(Backoff.onStop(
- Props(new PassivatingActor()),
- childName = "child",
- minBackoff = 1.seconds,
- maxBackoff = 30.seconds,
- randomFactor = 0.2,
- maxNrOfRetries = -1
- ).withFinalStopMessage(_ == StopMessage))
- Cluster(system).join(Cluster(system).selfAddress)
- val region = ClusterSharding(system).start(
- "passy",
- supervisedProps,
- ClusterShardingSettings(system),
- idExtractor,
- shardResolver
- )
- region ! Msg(10, "hello")
- val response = expectMsgType[Response](5.seconds)
- watch(response.self)
- region ! Msg(10, "passivate")
- expectTerminated(response.self)
- // This would fail before as sharded actor would be stuck passivating
- region ! Msg(10, "hello")
- expectMsgType[Response](20.seconds)
- }
- }
- }
8、异常处理、重试策略 backoffsupervisor 实现,如下:
- val supervisedProps = BackoffSupervisor.props(Backoff.onStop(
- Props(new EventWriter()),
- childName = "child",
- minBackoff = 1.seconds,
- maxBackoff = 30.seconds,
- randomFactor = 0.2,
- maxNrOfRetries = -1
- ))
- //自动passivate时设定 .withFinalStopMessage(_ == StopMessage))
9、分片sharding部署
一般来说可以通过ClusterSharding(system).start(...)在每个节点上部署分片,如:
- ClusterSharding(system).start(
- typeName = shardName,
- entityProps = POSProps,
- settings = mySettings,
- extractEntityId = getPOSId,
- extractShardId = getShopId,
- allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(mySettings),
- handOffStopMessage = PassivatePOS
- )
但如果分片的调用客户端所在节点因某种原因不能部署分片时可以用ClusterSharding(system).startProxy(...)部署一个分片代理:
- ClusterSharding(system).startProxy(
- typeName = shardName,
- role = Some(role),
- extractEntityId = getPOSId,
- extractShardId = getShopId
- )
实际上当所在节点的role不等于startProxy参数role时才能启动这个分片代理。下面是一个成功部署分片代理的例子:
- def create(port: Int): ActorSystem = {
- var config: Config = ConfigFactory.load()
- if (port != 2554)
- config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
- .withFallback(ConfigFactory.parseString("akka.cluster.roles = [shard]"))
- .withFallback(ConfigFactory.load())
- else
- config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
- .withFallback(ConfigFactory.load())
- val system = ActorSystem("posSystem",config)
- val role = "shard"
- val mySettings = ClusterShardingSettings(system) //.withPassivateIdleAfter(10 seconds)
- .withRole(role)
- /*
- val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
- val region = ClusterSharding(system).start(
- "myType",
- InactiveEntityPassivationSpec.Entity.props(probe.ref),
- settings,
- extractEntityId,
- extractShardId,
- ClusterSharding(system).defaultShardAllocationStrategy(settings),
- Passivate
- ) */
- if (port != 2554) {
- ClusterSharding(system).start(
- typeName = shardName,
- entityProps = POSProps,
- settings = mySettings,
- extractEntityId = getPOSId,
- extractShardId = getShopId,
- allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(mySettings),
- handOffStopMessage = PassivatePOS
- )
- println(s"************** cluster-shard created at port $port **************")
- }
- else {
- ClusterSharding(system).startProxy(
- typeName = shardName,
- role = Some(role),
- extractEntityId = getPOSId,
- extractShardId = getShopId
- )
- println(s"************** cluster-shard-proxy created at port $port **************")
- }
- val eventListener = system.actorOf(Props[EventLisener],"eventListener")
- system
- }
配置文件指定分片部署role例子:
- cluster {
- seed-nodes = [
- "akka.tcp://posSystem@127.0.0.1:2551"]
- log-info = off
- sharding {
- role = "shard"
- passivate-idle-entity-after = 10 s
- }
- }
10、设计后端执行命令后返回的结果类型
11、设计一套POS前端的命名规则:因为有关POS过程的事件持久化是以persistenceId辨别的,所以一个POS编号应该有一个对应的persistenceId,所有这个POS编号的事件都以对应的persistenceId来存储。我们先跳到ClusterSharding是如何动态地构建和部署ShardRegion和entity的:ClusterSharding是通过两个函数extractShardId,extractEntityId来对应ShardRegion和Entity实例的。用一个shardId去调用ShardRegion,如果不存在就用这个Id构建一个。ShardRegion是个actor,那么这个Id应该就是它的ActorPath.name。同样ShardRegion也会用一个entityId去构建Entity。这个entityId也就是Entity的ActorPath.name了。而从ActorPath结构来看:ShardRegion是Entity的父辈。最终,我们可以用父子关系的ActorPath.name来代表persistenceId,如:
- // self.path.parent.name is the type name (utf-8 URL-encoded)
- // self.path.name is the entry identifier (utf-8 URL-encoded) but entity has a supervisor
- override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name
如果考虑的全面些,我们可以把区域zone,门店shop,部门dpt,POS机合并成一个唯一的Id:
1位zoneId+3位shopId+2位dptId+4位POSId => 10位POSUID 如1001019365
12、用actor来模拟POS前端。店号与shardId, 机号与entityId对应。暂时用以显示后端执行指令结果。
以上这12个关注点算是我编程前的一些思路和备注。然后就开始写示范代码了。经历了好几遍周折,这段CQRS的C部分是越写越细、越复杂。主要是想把这个例子做成一个将来可以落地的项目(刚好有朋友公司他们提供零售IT解决方案,需要一个平台化android前端POS解决方案),自不然又不断考虑前端移动客户和后端CQRS的Q部分如何实现的问题,这时在一个局部功能的实现里需要照顾到全局的功能需求,往往把应该在其它部分实现的功能都放到这个C部分代码中来了。所以还是应该先从整体系统考虑的更具体、全面些才行。
一开始,我的主要注意力是放在persistenceActor的状态变化,也就是收款机开单操作过程的维护方面。我犯的第一个错误就是老是担心在后面Q端(读端)能不能实现客单项目内容管理,所以复杂化了event数据结构,总是希望为Q端提供完整的信息来支持对客单项目内容的管理。实际上C端和Q端各自的功能应该是:C端主要负责把所有的操作动作都记录下来,Q端把这些动作恢复成交易项目,形成客单内容,然后管理整个客单状态。C端只维护客单的开始、结束状态。至于这张单项目内容的修改、调整则应该是放在Q端的。这样一来,正如本篇标题所述:还是需要多想想,有全局思路。下面是我重新整理的一些想法:
1、整体考虑前端POS机客户端、C端、Q端:前端接收收款员操作动作及应对动作所产生的结果如显示、打印等。C端负责动作的数据采集。Q端负责客单交易内容的构建和管理
2、从C端角度考虑:需要向前端返回每个动作产生的结果,使前端有足够的信息进行显示、打印小票等。如实向Q端反应具体操作动作,提供客单状态如新单、结束、单号等Q端管理客单状态必要的信息。
3、C端POSHandler是个cluster-sharding-entity persistenceActor with backoffSupervisor。对应的前端POSRouter是客户端请求入口,是个cluster-singleton,能实现热插拔、热转换。POSRouter可以通过cluster-load-balancing在routees上运行Q端。
4、C端有以下几种状态:登陆前、开单中、单结束。C端程序主要是处理这几种状态里的操作
5、整体POS系统是一个云平台应用。客户端通过POSRouter向POS系统请求POS服务。POSRouter是部署在集群所有节点上的cluster-singleton, 系统通过一个公网IP连接任何一个在线节点的POSRouter,任何一个节点出现异常不会影响系统运行,这是一种高可用的设计。
6、POSHandler是集群分片,每个分片代表一部物理POS机。POS机号编码规则为:客户号+店号+序号,客户代表云POS用户
7、每客单结束时POSHandler向POSRouter发送消息请求启动执行一次Q端读取动作,这样可以避免持久数据流占用资源
8、系统应该作为一种云服务提供给各种的客户端设备。客户端用gRPC连接云服务端。调用那项服务,用户有否使用权限由客户端决定。