经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
Akka-CQRS(4)- CQRS Writer Actor 示范
来源:cnblogs  作者:雪川大虫  时间:2019/4/1 9:09:20  对本文有异议

   我觉着,CQRS的写部分最核心、最复杂的部分应该是Writer-Actor了。其它的监管(supervising)、分片部署等都比较规范,没太多的变动。几乎Writer-Actor所有的业务逻辑都在Receive函数里,这个地方应该算是最复杂的地方。我的想法是搭建一个标准框架,保证可以运行Writer-Actor。然后因应业务需求变化在这个Receive函数里用一种标准写法加功能。这就要求Receive函数能够做到代码简洁、逻辑清晰。Receive函数是一个执行Command的地方。在执行之前需要对指令的可执行性进行判断,这样就可能在函数内部造成臃肿的重复代码。比如,大部分指令需要用户登陆后才容许执行、又或者很多指令都需要超级用户权限、又或者开始支付后不容许除支付之外的任何操作,等等。。。

首先,我们可以有一个前置操作,里面包括了大部分对指令可执行性判断逻辑,这样我们可以把许多重复代码从Receive函数中移走,如下:

  1. //helper functions
  2. object RunPOSCommand {
  3. def unapply(arg: POSCommand) = if (cmdFilter(persistenceId,arg,vchState,vchItems,sender())) Some(arg) else None
  4. }
  5. def cmdFilter(terminalid: String, cmd: POSCommand, state: VchStates, txns: VchItems, router: ActorRef): Boolean = cmd match {
  6. case LogOn(opr, passwd) => //only allowed in logOffState
  7. if (!txns.txnitems.isEmpty) { //in the middle of process
  8. router ! POSResponse(STATUS.FAIL, s"禁止用户登陆!终端 ${terminalid} 有未完成单据。", state, List())
  9. false
  10. } else{
  11. if (validateUser(opr, passwd).isDefined) true
  12. else {
  13. router ! POSResponse(STATUS.FAIL, s"终端-$terminalid: 用户 ${opr} 登陆失败!", state, List())
  14. false
  15. }
  16. }
  17. case LogOff => //only allowed in logOnState
  18. if (!txns.txnitems.isEmpty) { //in the middle of process
  19. router ! POSResponse(STATUS.FAIL, s"禁止用户退出!终端 ${terminalid} 有未完成单据。", state, List())
  20. false
  21. } else true
  22. case VoidAll => //allowed in logOnState and paymentState
  23. if (txns.txnitems.isEmpty) { //no valid sales
  24. router ! POSResponse(STATUS.FAIL, s"全单取消失败!终端 ${terminalid} 本单无任何有效销售记录。", state, txns.txnitems)
  25. false
  26. } else true
  27. case OfflinePay(acct,num,amt) =>
  28. if (txns.totalSales.abs == 0) { // no valid sales. void,refund neg values could produce zero
  29. router ! POSResponse(STATUS.FAIL, s"支付失败!终端 ${terminalid} 应付金额为零。", state, List())
  30. false
  31. } else {
  32. if(validateAcct(acct).isDefined) true
  33. else {
  34. router ! POSResponse(STATUS.FAIL, s"支付失败!终端 ${terminalid} 账号{$acct}不存在。", state, List())
  35. false
  36. }
  37. }
  38. case Subtotal =>
  39. if (txns.txnitems.isEmpty) { //in the middle of process
  40. router ! POSResponse(STATUS.FAIL, s"小计操作失败!终端 ${terminalid} 无任何销售记录。", state, List())
  41. false
  42. } else true
  43. case VCBalance(_,_,_) => true
  44. case MemberOn(_,_) => true
  45. case MemberOff => true
  46. case VoucherNum(_) => true
  47. ...
  48. }

如上,前期的判断逻辑都移到cmdFilter(...)里了,然后可以用RunPOSCommand(command)方式来实现对command的执行性判断。现在我们可以在Receive函数里通过调用RunPOSCommand来节省一大截重复代码了,如下:

  1. private def logOffState: Receive = {
  2. case RunPOSCommand(LogOn(opr, _)) =>
  3. persistEvent(LogOned(opr,vchState)) { evt =>
  4. val sts = updateState(evt, vchState, vchItems)
  5. //starting seqenceNr for any voucher. no action logged before login
  6. vchState = sts._1.copy(jseq = lastSequenceNr + 1);
  7. vchItems = sts._2
  8. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户{$opr}成功登陆。", vchState, List(vchItems.txnitems.head))
  9. context.become(logOnState)
  10. }
  11. case PassivatePOS =>
  12. log.info(s"**********${persistenceId} got passivate message and stopping ... ***********")
  13. context.parent ! PoisonPill
  14. case _ =>
  15. sender() ! POSResponse(STATUS.FAIL, s"操作失败!终端 ${persistenceId} 用户未登陆。", vchState, List())
  16. }

另外,我把所有指令分成三个可执行类别,分别是:登陆前、已登陆、支付中。这三种状态的行为分别用logOffState,logOnState,paymentState三个Receive函数代表: 

  1. private def logOnState: Receive = {
  2. case RunPOSCommand(LogOff) =>
  3. persistEvent(LogOffed(vchState)) { evt =>
  4. val user = vchState.opr
  5. val sts = updateState(evt,vchState,vchItems)
  6. vchState = sts._1; vchItems = sts._2
  7. saveSnapshot(vchState) //state of last voucher
  8. //手工passivate shard ! ShardRegion.Passivate(PassivatePOS)
  9. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户 $user 成功退出。", vchState, List(vchItems.txnitems.head))
  10. context.unbecome() //switch to logOffState
  11. }
  12. case RunPOSCommand(SuperOn(su,_)) =>
  13. persistEvent(SuperOned(su,vchState)) { evt =>
  14. val sts = updateState(evt, vchState, vchItems)
  15. vchState = sts._1
  16. vchItems = sts._2
  17. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 转到管理模式{$su}。", vchState, List(vchItems.txnitems.head))
  18. }
  19. private def logOnState: Receive = {
  20. case RunPOSCommand(LogOff) =>
  21. persistEvent(LogOffed(vchState)) { evt =>
  22. val user = vchState.opr
  23. val sts = updateState(evt,vchState,vchItems)
  24. vchState = sts._1; vchItems = sts._2
  25. saveSnapshot(vchState) //state of last voucher
  26. //手工passivate shard ! ShardRegion.Passivate(PassivatePOS)
  27. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户 $user 成功退出。", vchState, List(vchItems.txnitems.head))
  28. context.unbecome() //switch to logOffState
  29. }
  30. case RunPOSCommand(SuperOn(su,_)) =>
  31. persistEvent(SuperOned(su,vchState)) { evt =>
  32. val sts = updateState(evt, vchState, vchItems)
  33. vchState = sts._1
  34. vchItems = sts._2
  35. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 转到管理模式{$su}。", vchState, List(vchItems.txnitems.head))
  36. }
  37. private def logOnState: Receive = {
  38. case RunPOSCommand(LogOff) =>
  39. persistEvent(LogOffed(vchState)) { evt =>
  40. val user = vchState.opr
  41. val sts = updateState(evt,vchState,vchItems)
  42. vchState = sts._1; vchItems = sts._2
  43. saveSnapshot(vchState) //state of last voucher
  44. //手工passivate shard ! ShardRegion.Passivate(PassivatePOS)
  45. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户 $user 成功退出。", vchState, List(vchItems.txnitems.head))
  46. context.unbecome() //switch to logOffState
  47. }
  48. case RunPOSCommand(SuperOn(su,_)) =>
  49. persistEvent(SuperOned(su,vchState)) { evt =>
  50. val sts = updateState(evt, vchState, vchItems)
  51. vchState = sts._1
  52. vchItems = sts._2
  53. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 转到管理模式{$su}。", vchState, List(vchItems.txnitems.head))
  54. }
  55. //first payment in a voucher
  56. case RunPOSCommand(OfflinePay(acct,num, amount)) =>
  57. persistEvent(Payment(acct,num,vchState)) { evt =>
  58. val sts = updateState(evt, vchState, vchItems)
  59. vchState = sts._1
  60. vchItems = sts._2
  61. if (vchItems.totalSales > 0)
  62. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  63. else
  64. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  65. if (!vchState.due) { //completed voucher. mark end of voucher and move next. stay in logOnState
  66. persistEvent(EndVoucher(vchState.num)) { evt =>
  67. val sts = updateState(evt, vchState, vchItems)
  68. vchState = sts._1
  69. vchItems = sts._2
  70. saveSnapshot(vchState) //recovery to next voucher
  71. }
  72. }
  73. else context.become(paymentState) //switch into paymentState
  74. }
  75. ...
  76. }
  77. private def paymentState: Receive = {
  78. case RunPOSCommand(OfflinePay(acct,num, amount)) =>
  79. persistEvent(Payment(acct,num,vchState)) { evt =>
  80. val sts = updateState(evt, vchState, vchItems)
  81. vchState = sts._1
  82. vchItems = sts._2
  83. if (vchItems.totalSales > 0)
  84. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  85. else
  86. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  87. if (!vchState.due) { //completed voucher. mark end of voucher and move next. return to logOnState
  88. persistEvent(EndVoucher(vchState.num)) { evt =>
  89. val sts = updateState(evt, vchState, vchItems)
  90. vchState = sts._1
  91. vchItems = sts._2
  92. saveSnapshot(vchState) //recovery to next voucher
  93. context.unbecome() //logOnState
  94. }
  95. }
  96. // else wait for other payments and stay in logOnState
  97. }
  98. ...
  99. }

注意上面代码里的context.become,saveSnapshot, 它们分别代表状态转换及一单的终结。

在实现写端时不得不考虑读端Reader-Actor的一些实现方式:这是个收银机软件,以一张销售单为单元,单中有多条交易记录(在例子里是多条收银动作记录),后面的操作记录可能会影响前面记录的状态,如:后面的冲销前面的、后面输入一条折扣指令等,不过单与单之间没有任何瓜葛。前面提到过:我们不想在写端来处理任何业务逻辑,所以对每单中项目状态处理就移到了读端。具体做法是这样的:写端完成一单操作后通知Reader-Actor,并把本单的开始sequenceNr和结束sequenceNr传给Reader-Actor, Reader用静态流方式读取事件,维护本单状态并对单内所有项目状态进行更新并恢复到规定的交易记录格式,最终把所有交易项目写到目标数据库表。假设有一个POSRouter负责派送指令给Writer-Actor,我们同样可以把完成写单据的信息传送给这个POSRouter, 然后由它分配调度Reader-Actor。POSRouter+Reader-Actor可以是cluster-loadbalance模式的。写端存入日志的数据包括每一个动作的类型和详细的数据,如SalesLogged(txnitem), SalesLogged是事件类型,txnItem是事件数据:

  1. case class TxnItem(
  2. txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
  3. ,txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11)
  4. ,opr: String = ""//工号
  5. ,num: Int = 0 //销售单号
  6. ,seq: Int = 1 //交易序号
  7. ,txntype: Int = TXNTYPE.sales//交易类型
  8. ,salestype: Int = SALESTYPE.plu //销售类型
  9. ,qty: Int = 1 //交易数量
  10. ,price: Int = 0 //单价(分)
  11. ,amount: Int = 0 //码洋(分)
  12. ,dscamt: Int = 0 //折扣:负值 net实洋 = amount + dscamt
  13. ,member: String = "" //会员卡号
  14. ,code: String = "" //编号(商品、账号...)
  15. ,desc: String = "" //项目名称
  16. ,dpt: String = ""
  17. ,department: String = ""
  18. ,cat: String = ""
  19. ,category: String = ""
  20. ,brd: String = ""
  21. ,brand: String = ""
  22. )

首先,我们需要这个TxnItem使Read-Actor能把事件恢复成规定格式的交易记录。也可以用这个结构返回给前端用来显示或者打印交易明细。下面是这些Command和Event的定义代码: 

  1. object Commands {
  2. case object PassivatePOS //passivate message
  3. case class DebugMode(debug: Boolean)
  4. sealed trait POSCommand {}
  5. case class LogOn(opr: String, passwd: String) extends POSCommand
  6. case object LogOff extends POSCommand
  7. case class SuperOn(su: String, passwd: String) extends POSCommand
  8. case object SuperOff extends POSCommand
  9. case class MemberOn(cardnum: String, passwd: String) extends POSCommand
  10. case object MemberOff extends POSCommand //remove member status for the voucher
  11. case object RefundOn extends POSCommand
  12. case object RefundOff extends POSCommand
  13. case object VoidOn extends POSCommand
  14. case object VoidOff extends POSCommand
  15. case object VoidAll extends POSCommand
  16. case object Suspend extends POSCommand
  17. case class VoucherNum(vnum: Int) extends POSCommand
  18. case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand
  19. case object Subtotal extends POSCommand
  20. case class Discount(code: String, percent: Int) extends POSCommand
  21. case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand //settlement 结算支付
  22. //read only command, no event process
  23. case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand
  24. case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand
  25. case class AliPay(acct: String, num: String, amount: Int) extends POSCommand
  26. case class WxPay(acct: String, num: String, amount: Int) extends POSCommand
  27. // read only command, no update event
  28. case class Plu(itemCode: String) extends POSCommand //read only
  29. }
  30. object Events {
  31. sealed trait POSEvent {}
  32. case class LogOned(txnItem: TxnItem) extends POSEvent
  33. object LogOned {
  34. def apply(op: String, vs: VchStates): LogOned = LogOned(TxnItem(vs).copy(
  35. txntype = TXNTYPE.logon,
  36. salestype = SALESTYPE.crd,
  37. opr = op,
  38. code = op
  39. ))
  40. }
  41. case class LogOffed(txnItem: TxnItem) extends POSEvent
  42. object LogOffed {
  43. def apply(vs: VchStates): LogOffed = LogOffed(TxnItem(vs).copy(
  44. txntype = TXNTYPE.logon,
  45. salestype = SALESTYPE.crd,
  46. ))
  47. }
  48. case class SuperOned(txnItem: TxnItem) extends POSEvent
  49. object SuperOned {
  50. def apply(su: String, vs: VchStates): SuperOned = SuperOned(TxnItem(vs).copy(
  51. txntype = TXNTYPE.supon,
  52. salestype = SALESTYPE.crd,
  53. code = su
  54. ))
  55. }
  56. case class SuperOffed(txnItem: TxnItem) extends POSEvent
  57. object SuperOffed {
  58. def apply(vs: VchStates): SuperOffed = SuperOffed(TxnItem(vs).copy(
  59. txntype = TXNTYPE.supon,
  60. salestype = SALESTYPE.crd
  61. ))
  62. }
  63. case class MemberOned(txnItem: TxnItem) extends POSEvent
  64. object MemberOned {
  65. def apply(cardnum: String,vs: VchStates): MemberOned = MemberOned(TxnItem(vs).copy(
  66. txntype = TXNTYPE.sales,
  67. salestype = SALESTYPE.crd,
  68. member = cardnum
  69. ))
  70. }
  71. case class MemberOffed(txnItem: TxnItem) extends POSEvent //remove member status for the voucher
  72. object MemberOffed {
  73. def apply(vs: VchStates): MemberOffed = MemberOffed(TxnItem(vs).copy(
  74. txntype = TXNTYPE.sales,
  75. salestype = SALESTYPE.crd,
  76. member = vs.mbr
  77. ))
  78. }
  79. case class RefundOned(txnItem: TxnItem) extends POSEvent
  80. object RefundOned {
  81. def apply(vs: VchStates): RefundOned = RefundOned(TxnItem(vs).copy(
  82. txntype = TXNTYPE.refund
  83. ))
  84. }
  85. case class RefundOffed(txnItem: TxnItem) extends POSEvent
  86. object RefundOffed {
  87. def apply(vs: VchStates): RefundOffed = RefundOffed(TxnItem(vs).copy(
  88. txntype = TXNTYPE.refund
  89. ))
  90. }
  91. case class VoidOned(txnItem: TxnItem) extends POSEvent
  92. object VoidOned {
  93. def apply(vs: VchStates): VoidOned = VoidOned(TxnItem(vs).copy(
  94. txntype = TXNTYPE.void
  95. ))
  96. }
  97. case class VoidOffed(txnItem: TxnItem) extends POSEvent
  98. object VoidOffed {
  99. def apply(vs: VchStates): VoidOffed = VoidOffed(TxnItem(vs).copy(
  100. txntype = TXNTYPE.void
  101. ))
  102. }
  103. case class NewVoucher(vnum: Int) extends POSEvent //新单, reminder for read-side to set new vnum
  104. case class EndVoucher(vnum: Int) extends POSEvent //单据终结标示
  105. case class VoidVoucher(txnItem: TxnItem) extends POSEvent
  106. object VoidVoucher {
  107. def apply(vs: VchStates): VoidVoucher = VoidVoucher(TxnItem(vs).copy(
  108. txntype = TXNTYPE.voidall
  109. ))
  110. }
  111. case class SuspVoucher(txnItem: TxnItem) extends POSEvent
  112. object SuspVoucher {
  113. def apply(vs: VchStates): SuspVoucher = SuspVoucher(TxnItem(vs).copy(
  114. txntype = TXNTYPE.suspend
  115. ))
  116. }
  117. case class VoucherNumed(fnum: Int, tnum: Int) extends POSEvent
  118. case class SalesLogged(txnItem: TxnItem) extends POSEvent
  119. case class Subtotaled(txnItem: TxnItem) extends POSEvent
  120. object Subtotaled {
  121. def apply(vs: VchStates, vi: VchItems): Subtotaled = {
  122. val (cnt,tqty,tamt,tdsc) = vi.subTotal
  123. Subtotaled(TxnItem(vs).copy(
  124. txntype = TXNTYPE.sales,
  125. salestype = SALESTYPE.sub,
  126. qty = tqty,
  127. amount = tamt,
  128. dscamt = tdsc,
  129. price = cnt
  130. ))
  131. }
  132. }
  133. case class Discounted(txnItem: TxnItem) extends POSEvent
  134. case class Payment(txnItem: TxnItem) extends POSEvent //settlement 结算支付
  135. object Payment {
  136. def apply(acct: String, num: String, vs: VchStates): Payment = Payment(TxnItem(vs).copy(
  137. txntype = TXNTYPE.sales,
  138. salestype = SALESTYPE.ttl,
  139. dpt = acct,
  140. code = num
  141. ))
  142. }
  143. }

可以看到,在构建Event的同时也构建了TxnItem结构。

返回前端数据格式如下:

  1. object Responses {
  2. object STATUS {
  3. val OK: Int = 0
  4. val FAIL: Int = -1
  5. }
  6. case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
  7. }

为了便于debug,我们构建了一个虚拟的数据库环节: 

  1. package pos.dao
  2. import java.time.LocalDate
  3. import java.time.format.DateTimeFormatter
  4. case class Item(
  5. brd: String
  6. ,dpt: String
  7. ,cat: String
  8. ,code: String
  9. ,name: String
  10. ,price: Int
  11. )
  12. object Items {
  13. val apple = Item("01","02","01","001", "green apple", 820)
  14. val grape = Item("01","02","01","002", "red grape", 1050)
  15. val orage = Item("01","02","01","003", "sunkist orage", 350)
  16. val banana = Item("01","02","01","004", "demon banana", 300)
  17. val pineapple = Item("01","02","01","005", "hainan pineapple", 1300)
  18. val peach = Item("01","02","01","006", "xinjiang peach", 2390)
  19. val tblItems = List(apple, grape, orage, banana, pineapple, peach)
  20. sealed trait QueryItemsResult {}
  21. case class QueryItemsOK(items: List[Item]) extends QueryItemsResult
  22. case class QueryItemsFail(msg: String) extends QueryItemsResult
  23. }
  24. object Codes {
  25. case class User(code: String, name: String, passwd: String)
  26. case class Department(code: String, name: String)
  27. case class Category(code: String, name: String)
  28. case class Brand(code: String, name: String)
  29. case class Ra(code: String, name: String)
  30. case class Account(code: String, name: String)
  31. case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean)
  32. val ras = List(Ra("01","Delivery"),Ra("02","Cooking"))
  33. val dpts = List(Department("01","Fruit"),Department("02","Grocery"))
  34. val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery"))
  35. val brds = List(Brand("01","Sunkist"),Brand("02","Demon"))
  36. val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa")
  37. ,Account("004","Alipay"),Account("005","WXPay"))
  38. val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123"))
  39. def getDpt(code: String) = dpts.find(d => d.code == code)
  40. def getCat(code: String) = cats.find(d => d.code == code)
  41. def getBrd(code: String) = brds.find(b => b.code == code)
  42. def getAcct(code: String) = accts.find(a => a.code == code)
  43. def getRa(code: String) = ras.find(a => a.code == code)
  44. }
  45. object DAO {
  46. import Items._
  47. import Codes._
  48. def getItem(code: String): QueryItemsResult = {
  49. val optItem = tblItems.find(it => it.code == code)
  50. optItem match {
  51. case Some(item) => QueryItemsOK(List(item))
  52. case None => QueryItemsFail("Invalid item code!")
  53. }
  54. }
  55. def validateDpt(code: String) = dpts.find(d => d.code == code)
  56. def validateCat(code: String) = cats.find(d => d.code == code)
  57. def validateBrd(code: String) = brds.find(b => b.code == code)
  58. def validateRa(code: String) = ras.find(ac => ac.code == code)
  59. def validateAcct(code: String) = accts.find(ac => ac.code == code)
  60. def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd))
  61. def lastSecOfDateStr(ldate: LocalDate): String = {
  62. ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd"))+" 23:59:59"
  63. }
  64. }

这个库主要是用来验证和提供TxnItem里的字段值。

好了,回到WriterActor:同样为了方便集群分片监控跟踪和debug,增加了debugMode和重写了persist:

  1. class WriterActor extends PersistentActor with LogSupport {
  2. val cluster = Cluster(context.system)
  3. // shopdptId-posId
  4. // self.path.parent.name is the type name (utf-8 URL-encoded)
  5. // self.path.name is the entry identifier (utf-8 URL-encoded) but entity has a supervisor
  6. // override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name
  7. override def persistenceId: String = self.path.parent.name
  8. override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
  9. super.preRestart(reason, message)
  10. log.info(s"Restarting terminal $persistenceId on ${cluster.selfAddress} for $message")
  11. }
  12. override def postRestart(reason: Throwable): Unit = {
  13. super.postRestart(reason)
  14. log.info(s"terminal $persistenceId on ${cluster.selfAddress} restarted for ${reason.getMessage}")
  15. }
  16. override def postStop(): Unit = {
  17. log.info(s"terminal $persistenceId on ${cluster.selfAddress} stopped!")
  18. }
  19. override def preStart(): Unit = {
  20. log.info(s"terminal $persistenceId on ${cluster.selfAddress} starting...")
  21. }
  22. //helper functions
  23. object RunPOSCommand {
  24. def unapply(arg: POSCommand) = if (cmdFilter(persistenceId,arg,vchState,vchItems,sender())) Some(arg) else None
  25. }
  26. def persistEvent[E](evt: E)(f: E => Unit)(implicit dm: DebugMode) = {
  27. if (dm.debug)
  28. log.info(s"********** $persistenceId: persisted event: {$evt} **********")
  29. else {
  30. try {
  31. persist(evt)(f)
  32. log.debug(s"终端-$persistenceId:event: [$evt] state: [$vchState] : [${vchItems.txnitems.reverse}]")
  33. }
  34. catch {
  35. case err: Throwable =>
  36. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 操作失败![${err.getMessage}]。", vchState, vchItems.txnitems.reverse)
  37. log.error(s"终端-$persistenceId: 操作失败![${err.getMessage}] current state: [$vchState],[${vchItems.txnitems.reverse}]")
  38. }
  39. }
  40. }
  41. var debugConfig: com.typesafe.config.Config = _
  42. var debug: Boolean = _
  43. try {
  44. debugConfig = ConfigFactory.load("pos.conf").getConfig("pos.server")
  45. debug = debugConfig.getBoolean("debug")
  46. }
  47. catch {
  48. case _ : Throwable => debug = false
  49. }
  50. log.info(s"********** $persistenceId: debug mode = $debug **********")
  51. implicit val debugMode = DebugMode(debug)
  52. //actor state
  53. var vchState = VchStates()
  54. var vchItems = VchItems()
  55. override def receiveRecover: Receive = {
  56. case evt: POSEvent => //incompleted voucher play back events
  57. val (vs,vi) = updateState(evt,vchState,vchItems)
  58. vchState = vs; vchItems = vi
  59. case SnapshotOffer(_,vs: VchStates) => vchState = vs //restore num,seq ...
  60. }
  61. override def receiveCommand: Receive = logOffState
  62. ...
  63. }

Receive函数在WriteActor启动时默认为logOffState  

  1. private def logOffState: Receive = {
  2. case RunPOSCommand(LogOn(opr, _)) =>
  3. persistEvent(LogOned(opr,vchState)) { evt =>
  4. val sts = updateState(evt, vchState, vchItems)
  5. //starting seqenceNr for any voucher. no action logged before login
  6. vchState = sts._1.copy(jseq = lastSequenceNr + 1);
  7. vchItems = sts._2
  8. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户{$opr}成功登陆。", vchState, List(vchItems.txnitems.head))
  9. context.become(logOnState)
  10. }
  11. case PassivatePOS =>
  12. log.info(s"**********${persistenceId} got passivate message and stopping ... ***********")
  13. context.parent ! PoisonPill
  14. case _ =>
  15. sender() ! POSResponse(STATUS.FAIL, s"操作失败!终端 ${persistenceId} 用户未登陆。", vchState, List())
  16. }

除LogOn指令外不接受任何其它指令,一旦接到指令立即通过context.become(logOnState)转状态。passivation只会在这个状态下发挥作用。这也可以理解:即先是没有处理中的单据,又有一段时间的空转,可以passivate了。

在logOnState里可以进行支付操作,如果支付金额小于应付金额则代表部分付款操作,需要转入paymentState:

  1. private def logOnState: Receive = {
  2. ...
  3. //first payment in a voucher
  4. case RunPOSCommand(OfflinePay(acct,num, amount)) =>
  5. persistEvent(Payment(acct,num,vchState)) { evt =>
  6. val sts = updateState(evt, vchState, vchItems)
  7. vchState = sts._1
  8. vchItems = sts._2
  9. if (vchItems.totalSales > 0)
  10. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  11. else
  12. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  13. if (!vchState.due) { //completed voucher. mark end of voucher and move next. stay in logOnState
  14. persistEvent(EndVoucher(vchState.num)) { evt =>
  15. val sts = updateState(evt, vchState, vchItems)
  16. vchState = sts._1
  17. vchItems = sts._2
  18. saveSnapshot(vchState) //recovery to next voucher
  19. }
  20. }
  21. else context.become(paymentState) //switch into paymentState
  22. }
  23. ...
  24. }

支付操作可能出现两种情况:整单完成,存入EndVoucher标示,saveSnapshot,或者转入paymentState。同样,在paymentState状态,只接受支付指令,直到应付金额耗尽,转回logOnState: 

  1. private def paymentState: Receive = {
  2. case RunPOSCommand(OfflinePay(acct,num, amount)) =>
  3. persistEvent(Payment(acct,num,vchState)) { evt =>
  4. val sts = updateState(evt, vchState, vchItems)
  5. vchState = sts._1
  6. vchItems = sts._2
  7. if (vchItems.totalSales > 0)
  8. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  9. else
  10. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  11. if (!vchState.due) { //completed voucher. mark end of voucher and move next. return to logOnState
  12. persistEvent(EndVoucher(vchState.num)) { evt =>
  13. val sts = updateState(evt, vchState, vchItems, lastSeqenceNr)
  14. vchState = sts._1
  15. vchItems = sts._2
  16. saveSnapshot(vchState) //recovery to next voucher
  17. context.unbecome() //logOnState
  18. }
  19. }
  20. // else wait for other payments and stay in logOnState
  21. }
  22. /* strictly disallow any action other than payment till completion
  23. case RunPOSCommand(VoidAll) =>
  24. persistEvent(VoidVoucher(vchState)) { _ =>
  25. persistEvent(EndVoucher(vchState.num)) { evt =>
  26. updateState(evt,vchState,vchItems)
  27. context.unbecome() //in paymentState, switch to logOnState
  28. }
  29. }
  30. case RunPOSCommand(Suspend) =>
  31. persistEvent(SuspVoucher(vchState)) { _ =>
  32. persistEvent(EndVoucher(vchState.num)) { evt =>
  33. updateState(evt,vchState,vchItems)
  34. context.unbecome() //in paymentState, switch to logOnState
  35. }
  36. }
  37. */
  38. case RunPOSCommand(SuperOn(su,_)) =>
  39. persistEvent(SuperOned(su,vchState)) { evt =>
  40. val sts = updateState(evt, vchState, vchItems)
  41. vchState = sts._1
  42. vchItems = sts._2
  43. }
  44. case RunPOSCommand(SuperOff) =>
  45. persistEvent(SuperOffed(vchState)) { evt =>
  46. val sts = updateState(evt, vchState, vchItems)
  47. vchState = sts._1
  48. vchItems = sts._2
  49. }
  50. case RunPOSCommand(VCBalance(acct,num,passwd)) =>
  51. if (POSInterfaces.validateVC(acct,num,passwd)) {
  52. val res = POSInterfaces.getVCBalance(acct,num)
  53. if (res.sts == POSInterfaces.VCRESULT.OK)
  54. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 储值卡余额{${res.amt}}。", vchState, List(TxnItem(vchState).copy(
  55. amount = (res.amt * 100).toInt,
  56. dpt = acct,
  57. code = num
  58. )))
  59. else
  60. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额读取错误![${res.msg}]", vchState, List())
  61. } else {
  62. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]验证信息错误!", vchState, List())
  63. }
  64. case RunPOSCommand(VCPay(acct,num, passwd,amount)) =>
  65. if (POSInterfaces.validateVC(acct,num,passwd)) {
  66. val res = POSInterfaces.getVCBalance(acct,num)
  67. if (res.sts == POSInterfaces.VCRESULT.OK) {
  68. if ((res.amt * 100).toInt < amount)
  69. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额不足!", vchState,List(TxnItem(vchState).copy(
  70. amount = (res.amt * 100).toInt,
  71. dpt = acct,
  72. code = num
  73. )))
  74. else {
  75. val res = POSInterfaces.payByVC(acct,num,amount/100.00)
  76. if (res.sts == POSInterfaces.VCRESULT.OK) {
  77. persistEvent(Payment(acct,num,vchState)) { evt =>
  78. val sts = updateState(evt, vchState, vchItems)
  79. vchState = sts._1
  80. vchItems = sts._2
  81. if (vchItems.totalSales > 0)
  82. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  83. else
  84. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  85. if (!vchState.due) { //completed voucher. mark end of voucher and move next. stay in logOnState
  86. persistEvent(EndVoucher(vchState.num)) { evt =>
  87. val sts = updateState(evt, vchState, vchItems, lastSeqenceNr)
  88. vchState = sts._1
  89. vchItems = sts._2
  90. saveSnapshot(vchState) //recovery to next voucher
  91. context.unbecome() //switch to logOnState
  92. }
  93. }
  94. }
  95. }
  96. }
  97. }
  98. else
  99. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额读取错误![${res.msg}]", vchState, List())
  100. } else {
  101. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]验证信息错误!", vchState, List())
  102. }
  103. case _ =>
  104. sender() ! POSResponse(STATUS.FAIL, s"操作失败!终端 ${persistenceId} 结算中不容许其它非支付操作!", vchState, List(vchItems.txnitems.head))
  105. }

目前支付部分只处理了线下支付和储值卡支付两个流程。

每单结束时会进行单据处理状态快照存储 saveSnapShot(vchState),真正意义在新一单从零的重新开始。 VchState是在updateState函数里维护的:

  1. case class VchStates(
  2. opr: String = "", //收款员
  3. jseq: BigInt = 0, //begin journal sequence for read-side replay
  4. num: Int = 0, //当前单号
  5. seq: Int = 1, //当前序号
  6. void: Boolean = false, //取消模式
  7. refd: Boolean = false, //退款模式
  8. due: Boolean = true, //当前余额
  9. su: String = "",
  10. mbr: String = ""
  11. ) {
  12. def nextVoucher : VchStates = VchStates().copy(
  13. opr = this.opr,
  14. jseq = this.jseq + 1,
  15. num = this.num + 1
  16. )
  17. }

也就是说每次saveSnapShot会把当前单号、操作员存入snapshot方便恢复当前单状态。另外jseq是本单事件存入时的写序号lastSeqenceNr,我们用来通知Reader-Actor从那里开始读journal。这个jseq是如下维护的: 

  1. def updateState(evt: POSEvent, state: VchStates, items: VchItems, lastSeqNr: BigInt = 0): (VchStates, VchItems) = evt match {
  2. case LogOned(txn) => (state.copy(seq = txn.seq + 1,opr = txn.opr,jseq = lastSeqNr), items)
  3. ...
  4. case EndVoucher(vnum) => (state.nextVoucher.copy(jseq = lastSeqNr), VchItems())
  5. ...
  6. }

如上所述,只在登陆时或者结单时需要标示jseq。注意:登陆指令只能在完成单据的情况下处理,而lastSeqenceNr是一单的结束序号,+1就是新单开始序号了。

下面把这次示范的完整源代码提供在下面。这是一个只通过了编译的版本。完整可运行的cluster-sharding版本放在下篇中提供:

build.sbt

  1. name := "akka-cluster-pos"
  2.  
  3. version := "0.2"
  4.  
  5. scalaVersion := "2.12.8"
  6.  
  7. libraryDependencies := Seq(
  8. "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.21",
  9. "com.typesafe.akka" %% "akka-persistence" % "2.5.21",
  10. "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.92",
  11. "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.92" % Test,
  12. "ch.qos.logback" % "logback-classic" % "1.2.3"
  13. )

resources/application.conf

  1. akka.actor.warn-about-java-serializer-usage = off
  2. akka.log-dead-letters-during-shutdown = off
  3. akka.log-dead-letters = off
  4.  
  5. akka {
  6. loglevel = INFO
  7. actor {
  8. provider = "cluster"
  9. }
  10.  
  11. remote {
  12. log-remote-lifecycle-events = off
  13. netty.tcp {
  14. hostname = "127.0.0.1"
  15. port = 0
  16. }
  17. }
  18.  
  19. cluster {
  20. seed-nodes = [
  21. "akka.tcp://posSystem@127.0.0.1:2551"]
  22. log-info = off
  23. sharding {
  24. role = "shard"
  25. passivate-idle-entity-after = 10 s
  26. }
  27. }
  28.  
  29. persistence {
  30. journal.plugin = "cassandra-journal"
  31. snapshot-store.plugin = "cassandra-snapshot-store"
  32. }
  33.  
  34. }
  35.  
  36. cassandra-journal {
  37. contact-points = ["192.168.1.18"]
  38. }
  39.  
  40. cassandra-snapshot-store {
  41. contact-points = ["192.168.1.18"]
  42. }

pos.conf

  1. pos {
  2. server {
  3. debug = false
  4. }
  5. }

 logback.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration>
  3.  
  4. <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  5. <layout class="ch.qos.logback.classic.PatternLayout">
  6. <Pattern>
  7. %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
  8. </Pattern>
  9. </layout>
  10. </appender>
  11.  
  12. <logger name="sdp.cql" level="info"
  13. additivity="false">
  14. <appender-ref ref="STDOUT" />
  15. </logger>
  16.  
  17. <logger name="demo.sdp.grpc.cql" level="info"
  18. additivity="false">
  19. <appender-ref ref="STDOUT" />
  20. </logger>
  21.  
  22. <root level="error">
  23. <appender-ref ref="STDOUT" />
  24. </root>

logging/Log.scala

  1. package sdp.logging
  2.  
  3. import org.slf4j.Logger
  4.  
  5. /**
  6. * Logger which just wraps org.slf4j.Logger internally.
  7. *
  8. * @param logger logger
  9. */
  10. class Log(logger: Logger) {
  11.  
  12. // use var consciously to enable squeezing later
  13. var isDebugEnabled: Boolean = logger.isDebugEnabled
  14. var isInfoEnabled: Boolean = logger.isInfoEnabled
  15. var isWarnEnabled: Boolean = logger.isWarnEnabled
  16. var isErrorEnabled: Boolean = logger.isErrorEnabled
  17.  
  18. def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
  19. level match {
  20. case 'debug | 'DEBUG => debug(msg)
  21. case 'info | 'INFO => info(msg)
  22. case 'warn | 'WARN => warn(msg)
  23. case 'error | 'ERROR => error(msg)
  24. case _ => // nothing to do
  25. }
  26. }
  27.  
  28. def debug(msg: => String): Unit = {
  29. if (isDebugEnabled && logger.isDebugEnabled) {
  30. logger.debug(msg)
  31. }
  32. }
  33.  
  34. def debug(msg: => String, e: Throwable): Unit = {
  35. if (isDebugEnabled && logger.isDebugEnabled) {
  36. logger.debug(msg, e)
  37. }
  38. }
  39.  
  40. def info(msg: => String): Unit = {
  41. if (isInfoEnabled && logger.isInfoEnabled) {
  42. logger.info(msg)
  43. }
  44. }
  45.  
  46. def info(msg: => String, e: Throwable): Unit = {
  47. if (isInfoEnabled && logger.isInfoEnabled) {
  48. logger.info(msg, e)
  49. }
  50. }
  51.  
  52. def warn(msg: => String): Unit = {
  53. if (isWarnEnabled && logger.isWarnEnabled) {
  54. logger.warn(msg)
  55. }
  56. }
  57.  
  58. def warn(msg: => String, e: Throwable): Unit = {
  59. if (isWarnEnabled && logger.isWarnEnabled) {
  60. logger.warn(msg, e)
  61. }
  62. }
  63.  
  64. def error(msg: => String): Unit = {
  65. if (isErrorEnabled && logger.isErrorEnabled) {
  66. logger.error(msg)
  67. }
  68. }
  69.  
  70. def error(msg: => String, e: Throwable): Unit = {
  71. if (isErrorEnabled && logger.isErrorEnabled) {
  72. logger.error(msg, e)
  73. }
  74. }
  75.  
  76. }

logging/LogSupport.scala

  1. package sdp.logging
  2.  
  3. import org.slf4j.LoggerFactory
  4.  
  5. trait LogSupport {
  6.  
  7. /**
  8. * Logger
  9. */
  10. protected val log = new Log(LoggerFactory.getLogger(this.getClass))
  11.  
  12. }

Commands.scala

  1. package pos.commands
  2. import pos.states.States._
  3.  
  4. object Commands {
  5.  
  6. case object PassivatePOS //passivate message
  7. case class DebugMode(debug: Boolean)
  8.  
  9. sealed trait POSCommand {}
  10.  
  11. case class LogOn(opr: String, passwd: String) extends POSCommand
  12. case object LogOff extends POSCommand
  13. case class SuperOn(su: String, passwd: String) extends POSCommand
  14. case object SuperOff extends POSCommand
  15. case class MemberOn(cardnum: String, passwd: String) extends POSCommand
  16. case object MemberOff extends POSCommand //remove member status for the voucher
  17. case object RefundOn extends POSCommand
  18. case object RefundOff extends POSCommand
  19. case object VoidOn extends POSCommand
  20. case object VoidOff extends POSCommand
  21. case object VoidAll extends POSCommand
  22. case object Suspend extends POSCommand
  23.  
  24. case class VoucherNum(vnum: Int) extends POSCommand
  25.  
  26.  
  27. case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand
  28. case object Subtotal extends POSCommand
  29. case class Discount(code: String, percent: Int) extends POSCommand
  30.  
  31. case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand //settlement 结算支付
  32. //read only command, no event process
  33. case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand
  34. case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand
  35. case class AliPay(acct: String, num: String, amount: Int) extends POSCommand
  36. case class WxPay(acct: String, num: String, amount: Int) extends POSCommand
  37.  
  38.  
  39. // read only command, no update event
  40. case class Plu(itemCode: String) extends POSCommand //read only
  41.  
  42.  
  43. }
  44.  
  45.  
  46. object Events {
  47.  
  48. sealed trait POSEvent {}
  49.  
  50. case class LogOned(txnItem: TxnItem) extends POSEvent
  51. object LogOned {
  52. def apply(op: String, vs: VchStates): LogOned = LogOned(TxnItem(vs).copy(
  53. txntype = TXNTYPE.logon,
  54. salestype = SALESTYPE.crd,
  55. opr = op,
  56. code = op
  57. ))
  58. }
  59. case class LogOffed(txnItem: TxnItem) extends POSEvent
  60. object LogOffed {
  61. def apply(vs: VchStates): LogOffed = LogOffed(TxnItem(vs).copy(
  62. txntype = TXNTYPE.logon,
  63. salestype = SALESTYPE.crd,
  64. ))
  65. }
  66. case class SuperOned(txnItem: TxnItem) extends POSEvent
  67. object SuperOned {
  68. def apply(su: String, vs: VchStates): SuperOned = SuperOned(TxnItem(vs).copy(
  69. txntype = TXNTYPE.supon,
  70. salestype = SALESTYPE.crd,
  71. code = su
  72. ))
  73. }
  74. case class SuperOffed(txnItem: TxnItem) extends POSEvent
  75. object SuperOffed {
  76. def apply(vs: VchStates): SuperOffed = SuperOffed(TxnItem(vs).copy(
  77. txntype = TXNTYPE.supon,
  78. salestype = SALESTYPE.crd
  79. ))
  80. }
  81. case class MemberOned(txnItem: TxnItem) extends POSEvent
  82. object MemberOned {
  83. def apply(cardnum: String,vs: VchStates): MemberOned = MemberOned(TxnItem(vs).copy(
  84. txntype = TXNTYPE.sales,
  85. salestype = SALESTYPE.crd,
  86. member = cardnum
  87. ))
  88. }
  89. case class MemberOffed(txnItem: TxnItem) extends POSEvent //remove member status for the voucher
  90. object MemberOffed {
  91. def apply(vs: VchStates): MemberOffed = MemberOffed(TxnItem(vs).copy(
  92. txntype = TXNTYPE.sales,
  93. salestype = SALESTYPE.crd,
  94. member = vs.mbr
  95. ))
  96. }
  97. case class RefundOned(txnItem: TxnItem) extends POSEvent
  98. object RefundOned {
  99. def apply(vs: VchStates): RefundOned = RefundOned(TxnItem(vs).copy(
  100. txntype = TXNTYPE.refund
  101. ))
  102. }
  103. case class RefundOffed(txnItem: TxnItem) extends POSEvent
  104. object RefundOffed {
  105. def apply(vs: VchStates): RefundOffed = RefundOffed(TxnItem(vs).copy(
  106. txntype = TXNTYPE.refund
  107. ))
  108. }
  109. case class VoidOned(txnItem: TxnItem) extends POSEvent
  110. object VoidOned {
  111. def apply(vs: VchStates): VoidOned = VoidOned(TxnItem(vs).copy(
  112. txntype = TXNTYPE.void
  113. ))
  114. }
  115. case class VoidOffed(txnItem: TxnItem) extends POSEvent
  116. object VoidOffed {
  117. def apply(vs: VchStates): VoidOffed = VoidOffed(TxnItem(vs).copy(
  118. txntype = TXNTYPE.void
  119. ))
  120. }
  121.  
  122. case class NewVoucher(vnum: Int) extends POSEvent //新单, reminder for read-side to set new vnum
  123. case class EndVoucher(vnum: Int) extends POSEvent //单据终结标示
  124. case class VoidVoucher(txnItem: TxnItem) extends POSEvent
  125. object VoidVoucher {
  126. def apply(vs: VchStates): VoidVoucher = VoidVoucher(TxnItem(vs).copy(
  127. txntype = TXNTYPE.voidall
  128. ))
  129. }
  130. case class SuspVoucher(txnItem: TxnItem) extends POSEvent
  131. object SuspVoucher {
  132. def apply(vs: VchStates): SuspVoucher = SuspVoucher(TxnItem(vs).copy(
  133. txntype = TXNTYPE.suspend
  134. ))
  135. }
  136.  
  137. case class VoucherNumed(fnum: Int, tnum: Int) extends POSEvent
  138.  
  139. case class SalesLogged(txnItem: TxnItem) extends POSEvent
  140. case class Subtotaled(txnItem: TxnItem) extends POSEvent
  141. object Subtotaled {
  142. def apply(vs: VchStates, vi: VchItems): Subtotaled = {
  143. val (cnt,tqty,tamt,tdsc) = vi.subTotal
  144.  
  145. Subtotaled(TxnItem(vs).copy(
  146. txntype = TXNTYPE.sales,
  147. salestype = SALESTYPE.sub,
  148. qty = tqty,
  149. amount = tamt,
  150. dscamt = tdsc,
  151. price = cnt
  152. ))
  153. }
  154. }
  155. case class Discounted(txnItem: TxnItem) extends POSEvent
  156. case class Payment(txnItem: TxnItem) extends POSEvent //settlement 结算支付
  157. object Payment {
  158. def apply(acct: String, num: String, vs: VchStates): Payment = Payment(TxnItem(vs).copy(
  159. txntype = TXNTYPE.sales,
  160. salestype = SALESTYPE.ttl,
  161. dpt = acct,
  162. code = num
  163. ))
  164. }
  165.  
  166.  
  167. }
  168.  
  169. object Responses {
  170.  
  171. object STATUS {
  172. val OK: Int = 0
  173. val FAIL: Int = -1
  174. }
  175.  
  176. case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
  177. }

states.scala

  1. package pos.states
  2. import java.time.LocalDate
  3. import java.time.LocalDateTime
  4. import java.time.format.DateTimeFormatter
  5. import java.util.Locale
  6.  
  7. import pos.commands.Events._
  8. import pos.commands.Commands._
  9. import akka.actor._
  10. import pos.dao.DAO._
  11. import pos.dao.Codes._
  12. import pos.commands.Responses._
  13.  
  14. object States {
  15.  
  16. val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA)
  17.  
  18.  
  19. object TXNTYPE {
  20. val sales: Int = 0
  21. val refund: Int = 1
  22. val void: Int = 2
  23. val voided: Int = 3
  24. val voidall: Int = 4
  25. val subtotal: Int = 5
  26. val logon: Int = 6
  27. val supon: Int = 7 // super user on/off
  28. val suspend: Int = 8
  29.  
  30. }
  31.  
  32. object SALESTYPE {
  33. val plu: Int = 0
  34. val dpt: Int = 1
  35. val cat: Int = 2
  36. val brd: Int = 3
  37. val ra: Int = 4
  38. val sub: Int = 5
  39. val ttl: Int = 6
  40. val dsc: Int = 7
  41. val crd: Int = 8
  42. }
  43.  
  44.  
  45. case class TxnItem(
  46. txndate: String = LocalDate.now.format(DateTimeFormatter.ofPattern("yyyyMMdd"))
  47. ,txntime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11)
  48. ,opr: String = ""//工号
  49. ,num: Int = 0 //销售单号
  50. ,seq: Int = 1 //交易序号
  51. ,txntype: Int = TXNTYPE.sales//交易类型
  52. ,salestype: Int = SALESTYPE.plu //销售类型
  53. ,qty: Int = 1 //交易数量
  54. ,price: Int = 0 //单价(分)
  55. ,amount: Int = 0 //码洋(分)
  56. ,dscamt: Int = 0 //折扣:负值 net实洋 = amount + dscamt
  57. ,member: String = "" //会员卡号
  58. ,code: String = "" //编号(商品、账号...)
  59. ,desc: String = "" //项目名称
  60. ,dpt: String = ""
  61. ,department: String = ""
  62. ,cat: String = ""
  63. ,category: String = ""
  64. ,brd: String = ""
  65. ,brand: String = ""
  66. )
  67. object TxnItem {
  68. def apply(vs: VchStates): TxnItem = TxnItem().copy(
  69. opr = vs.opr,
  70. num = vs.num,
  71. seq = vs.seq,
  72. member = vs.mbr
  73. )
  74. }
  75.  
  76. case class VchStatus( //操作状态锁留给前端维护
  77. qty: Int = 1,
  78. refund: Boolean = false,
  79. void: Boolean = false)
  80.  
  81. case class VchStates(
  82. opr: String = "", //收款员
  83. jseq: BigInt = 0, //begin journal sequence for read-side replay
  84. num: Int = 0, //当前单号
  85. seq: Int = 1, //当前序号
  86. void: Boolean = false, //取消模式
  87. refd: Boolean = false, //退款模式
  88. due: Boolean = true, //当前余额
  89. su: String = "",
  90. mbr: String = ""
  91. ) {
  92.  
  93. def nextVoucher : VchStates = VchStates().copy(
  94. opr = this.opr,
  95. jseq = this.jseq + 1,
  96. num = this.num + 1
  97. )
  98. }
  99.  
  100. case class VchItems(txnitems: List[TxnItem] = Nil) {
  101. def subTotal: (Int,Int,Int,Int) = txnitems.foldRight((0,0,0,0)) { case (txn,b) =>
  102. if (txn.salestype < SALESTYPE.sub && txn.txntype == TXNTYPE.sales)
  103. b.copy(_1 = b._1 + 1, _2 = b._2 + txn.qty, _3 = b._3 + txn.amount, _4 = b._4 + txn.dscamt)
  104. else b
  105. }
  106. def totalSales: Int = txnitems.foldRight(0) { case (txn, b) =>
  107. if ( txn.salestype <= SALESTYPE.ra)
  108. (txn.amount + txn.dscamt) + b
  109. else b
  110.  
  111. /*
  112. val amt: Int = txn.salestype match {
  113. case (SALESTYPE.plu | SALESTYPE.cat | SALESTYPE.brd | SALESTYPE.ra) => txn.amount + txn.dscamt
  114. case _ => 0
  115. }
  116. amt + b */
  117. }
  118. def totalPaid: Int = txnitems.foldRight(0) { case (txn, b) =>
  119. if (txn.txntype == TXNTYPE.sales && txn.salestype == SALESTYPE.ttl)
  120. txn.amount + b
  121. else b
  122. /*
  123. val amt: Int = txn.salestype match {
  124. case SALESTYPE.ttl => txn.amount
  125. case _ => 0
  126. }
  127. amt + b */
  128. }
  129. def addItem(item: TxnItem): VchItems = VchItems((item :: txnitems)) //.reverse)
  130.  
  131. }
  132.  
  133. def LastSecOfDate(ldate: LocalDate): LocalDateTime = {
  134. val dtStr = ldate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) + " 23:59:59"
  135. LocalDateTime.parse(dtStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
  136. }
  137.  
  138. def dateStr(dt: LocalDate): String = dt.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
  139.  
  140. def updateState(evt: POSEvent, state: VchStates, items: VchItems, lastSeqNr: BigInt = 0): (VchStates, VchItems) = evt match {
  141. case LogOned(txn) => (state.copy(seq = txn.seq + 1,opr = txn.opr,jseq = lastSeqNr), items)
  142. case LogOffed(txn) => (state.copy(seq = state.seq + 1,opr = ""), items)
  143. case RefundOned(txn) => (state.copy(seq = txn.seq + 1,refd = true), items)
  144. case RefundOffed(txn) => (state.copy(seq = txn.seq + 1,refd = false), items)
  145. case VoidOned(txn) => (state.copy(seq = txn.seq + 1,void = true), items)
  146. case VoidOffed(txn) => (state.copy(seq = txn.seq + 1,void = false), items)
  147. case SuperOned(txn) => (state.copy(seq = txn.seq + 1, su = txn.code), items)
  148. case SuperOffed(txn) => (state.copy(seq = txn.seq + 1, su = ""), items)
  149. case MemberOned(txn) => (state.copy(seq = txn.seq + 1, mbr = txn.member), items)
  150. case MemberOffed(txn) => (state.copy(seq = txn.seq + 1, mbr=""), items)
  151.  
  152.  
  153. case SalesLogged(txnitem) => (state.copy(
  154. seq = state.seq + 1)
  155. , items.addItem(txnitem))
  156.  
  157. case Subtotaled(txnitem) => (state.copy(
  158. seq = state.seq + 1)
  159. , items.addItem(txnitem))
  160.  
  161. case Payment(txnItem) =>
  162. val due = if(items.totalSales > 0) items.totalSales - items.totalPaid else items.totalSales + items.totalPaid
  163. val bal = if(items.totalSales > 0) due - txnItem.amount else due + txnItem.amount
  164. (state.copy(
  165. seq = state.seq + 1,
  166. due = (if( (txnItem.amount.abs + items.totalPaid.abs) >= items.totalSales.abs) false else true)
  167. )
  168. ,items.addItem(txnItem.copy(
  169. salestype = SALESTYPE.ttl,
  170. price = due,
  171. amount = txnItem.amount,
  172. dscamt = bal,
  173. department = getAcct(txnItem.dpt).getOrElse(Account("","")).name
  174. )))
  175.  
  176. case VoucherNumed(_,tnum) =>
  177. val vi = items.copy(txnitems = items.txnitems.map {it => it.copy(num = tnum)})
  178. (state.copy(seq = state.seq + 1, num = tnum), vi)
  179. /*
  180. case VoidVoucher(vnum) => (state.nextVoucher, VchItems())
  181. case SuspVoucher(vnum) => (state.nextVoucher, VchItems()) //represented by EndVoucher
  182. */
  183. case EndVoucher(vnum) => (state.nextVoucher.copy(jseq = lastSeqNr), VchItems())
  184.  
  185. case _ => (state, items)
  186. }
  187.  
  188. def cmdFilter(terminalid: String, cmd: POSCommand, state: VchStates, txns: VchItems, router: ActorRef): Boolean = cmd match {
  189. case LogOn(opr, passwd) => //only allowed in logOffState
  190. if (!txns.txnitems.isEmpty) { //in the middle of process
  191. router ! POSResponse(STATUS.FAIL, s"禁止用户登陆!终端 ${terminalid} 有未完成单据。", state, List())
  192. false
  193. } else{
  194. if (validateUser(opr, passwd).isDefined) true
  195. else {
  196. router ! POSResponse(STATUS.FAIL, s"终端-$terminalid: 用户 ${opr} 登陆失败!", state, List())
  197. false
  198. }
  199. }
  200. case LogOff => //only allowed in logOnState
  201. if (!txns.txnitems.isEmpty) { //in the middle of process
  202. router ! POSResponse(STATUS.FAIL, s"禁止用户退出!终端 ${terminalid} 有未完成单据。", state, List())
  203. false
  204. } else true
  205.  
  206. case VoidAll => //allowed in logOnState and paymentState
  207. if (txns.txnitems.isEmpty) { //no valid sales
  208. router ! POSResponse(STATUS.FAIL, s"全单取消失败!终端 ${terminalid} 本单无任何有效销售记录。", state, txns.txnitems)
  209. false
  210. } else true
  211.  
  212. case OfflinePay(acct,num,amt) =>
  213. if (txns.totalSales.abs == 0) { // no valid sales. void,refund neg values could produce zero
  214. router ! POSResponse(STATUS.FAIL, s"支付失败!终端 ${terminalid} 应付金额为零。", state, List())
  215. false
  216. } else {
  217. if(validateAcct(acct).isDefined) true
  218. else {
  219. router ! POSResponse(STATUS.FAIL, s"支付失败!终端 ${terminalid} 账号{$acct}不存在。", state, List())
  220. false
  221. }
  222. }
  223. case Subtotal =>
  224. if (txns.txnitems.isEmpty) { //in the middle of process
  225. router ! POSResponse(STATUS.FAIL, s"小计操作失败!终端 ${terminalid} 无任何销售记录。", state, List())
  226. false
  227. } else true
  228. case VCBalance(_,_,_) => true
  229. case MemberOn(_,_) => true
  230. case MemberOff => true
  231. case VoucherNum(_) => true
  232.  
  233. case LogSales(salesType,sdpt,scode,sqty,sprice) =>
  234. if (state.void) {
  235. txns.txnitems.find(ti => (ti.txntype == TXNTYPE.sales && ti.salestype == salesType &&
  236. ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)) match {
  237. case Some(_) => true
  238. case None =>
  239. router ! POSResponse(STATUS.FAIL, s"取消交易失败!终端 ${terminalid} 销售记录不存在。", state, List(TxnItem(state).copy(
  240. salestype = salesType,
  241. dpt = sdpt,
  242. code = scode,
  243. price = sprice
  244. )))
  245. false
  246. }
  247.  
  248. } else true
  249.  
  250. case c @ _ =>
  251. router ! POSResponse(STATUS.FAIL, s"终端 ${terminalid} 不支持操作 {$c}", state, List())
  252. false
  253. }
  254.  
  255.  
  256. }

POSHandler.scala

  1. package pos.handler
  2. import akka.actor._
  3. import akka.persistence._
  4. import akka.cluster._
  5. import pos.commands.Events._
  6. import pos.commands.Commands._
  7. import pos.states._
  8. import States._
  9. import pos.dao.Items._
  10. import pos.dao.DAO._
  11. import pos.dao.Codes._
  12. import com.typesafe.config.ConfigFactory
  13. import sdp.logging.LogSupport
  14. import pos.commands.Responses._
  15. import pos.interface.POSInterfaces
  16.  
  17.  
  18. class WriterActor extends PersistentActor with LogSupport {
  19. val cluster = Cluster(context.system)
  20. // shopdptId-posId
  21. // self.path.parent.name is the type name (utf-8 URL-encoded)
  22. // self.path.name is the entry identifier (utf-8 URL-encoded) but entity has a supervisor
  23. // override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.parent.name
  24. override def persistenceId: String = self.path.parent.name
  25.  
  26. override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
  27. super.preRestart(reason, message)
  28. log.info(s"Restarting terminal $persistenceId on ${cluster.selfAddress} for $message")
  29. }
  30.  
  31. override def postRestart(reason: Throwable): Unit = {
  32. super.postRestart(reason)
  33. log.info(s"terminal $persistenceId on ${cluster.selfAddress} restarted for ${reason.getMessage}")
  34. }
  35.  
  36. override def postStop(): Unit = {
  37. log.info(s"terminal $persistenceId on ${cluster.selfAddress} stopped!")
  38. }
  39.  
  40. override def preStart(): Unit = {
  41. log.info(s"terminal $persistenceId on ${cluster.selfAddress} starting...")
  42. }
  43.  
  44. //helper functions
  45. object RunPOSCommand {
  46. def unapply(arg: POSCommand) = if (cmdFilter(persistenceId,arg,vchState,vchItems,sender())) Some(arg) else None
  47. }
  48.  
  49. def persistEvent[E](evt: E)(f: E => Unit)(implicit dm: DebugMode) = {
  50. if (dm.debug)
  51. log.info(s"********** $persistenceId: persisted event: {$evt} **********")
  52. else {
  53. try {
  54. persist(evt)(f)
  55. log.debug(s"终端-$persistenceId:event: [$evt] state: [$vchState] : [${vchItems.txnitems.reverse}]")
  56. }
  57. catch {
  58. case err: Throwable =>
  59. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 操作失败![${err.getMessage}]。", vchState, vchItems.txnitems.reverse)
  60. log.error(s"终端-$persistenceId: 操作失败![${err.getMessage}] current state: [$vchState],[${vchItems.txnitems.reverse}]")
  61. }
  62. }
  63. }
  64.  
  65. var debugConfig: com.typesafe.config.Config = _
  66. var debug: Boolean = _
  67. try {
  68.  
  69. debugConfig = ConfigFactory.load("pos.conf").getConfig("pos.server")
  70. debug = debugConfig.getBoolean("debug")
  71. }
  72. catch {
  73. case _ : Throwable => debug = false
  74. }
  75.  
  76. log.info(s"********** $persistenceId: debug mode = $debug **********")
  77.  
  78. implicit val debugMode = DebugMode(debug)
  79.  
  80. //actor state
  81. var vchState = VchStates()
  82. var vchItems = VchItems()
  83.  
  84.  
  85. override def receiveRecover: Receive = {
  86. case evt: POSEvent => //incompleted voucher play back events
  87. val (vs,vi) = updateState(evt,vchState,vchItems)
  88. vchState = vs; vchItems = vi
  89. case SnapshotOffer(_,vs: VchStates) => vchState = vs //restore num,seq ...
  90. }
  91.  
  92. override def receiveCommand: Receive = logOffState
  93.  
  94. private def logOffState: Receive = {
  95. case RunPOSCommand(LogOn(opr, _)) =>
  96. persistEvent(LogOned(opr,vchState)) { evt =>
  97. val sts = updateState(evt, vchState, vchItems,lastSequenceNr)
  98. //starting seqenceNr for any voucher. no action logged before login
  99. vchState = sts._1
  100. vchItems = sts._2
  101. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户{$opr}成功登陆。", vchState, List(vchItems.txnitems.head))
  102. context.become(logOnState)
  103. }
  104.  
  105. case PassivatePOS =>
  106. log.info(s"**********${persistenceId} got passivate message and stopping ... ***********")
  107. context.parent ! PoisonPill
  108.  
  109. case _ =>
  110. sender() ! POSResponse(STATUS.FAIL, s"操作失败!终端 ${persistenceId} 用户未登陆。", vchState, List())
  111. }
  112.  
  113.  
  114. private def paymentState: Receive = {
  115. case RunPOSCommand(OfflinePay(acct,num, amount)) =>
  116. persistEvent(Payment(acct,num,vchState)) { evt =>
  117. val sts = updateState(evt, vchState, vchItems)
  118. vchState = sts._1
  119. vchItems = sts._2
  120. if (vchItems.totalSales > 0)
  121. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  122. else
  123. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  124.  
  125. if (!vchState.due) { //completed voucher. mark end of voucher and move next. return to logOnState
  126. persistEvent(EndVoucher(vchState.num)) { evt =>
  127. val sts = updateState(evt, vchState, vchItems, lastSequenceNr)
  128. vchState = sts._1.copy(jseq = lastSequenceNr + 1)
  129. vchItems = sts._2
  130. saveSnapshot(vchState) //recovery to next voucher
  131. context.unbecome() //logOnState
  132. }
  133. }
  134. // else wait for other payments and stay in logOnState
  135. }
  136. /* strictly disallow any action other than payment till completion
  137. case RunPOSCommand(VoidAll) =>
  138. persistEvent(VoidVoucher(vchState)) { _ =>
  139. persistEvent(EndVoucher(vchState.num)) { evt =>
  140. updateState(evt,vchState,vchItems)
  141. context.unbecome() //in paymentState, switch to logOnState
  142. }
  143. }
  144. case RunPOSCommand(Suspend) =>
  145. persistEvent(SuspVoucher(vchState)) { _ =>
  146. persistEvent(EndVoucher(vchState.num)) { evt =>
  147. updateState(evt,vchState,vchItems)
  148. context.unbecome() //in paymentState, switch to logOnState
  149. }
  150. }
  151. */
  152. case RunPOSCommand(SuperOn(su,_)) =>
  153. persistEvent(SuperOned(su,vchState)) { evt =>
  154. val sts = updateState(evt, vchState, vchItems)
  155. vchState = sts._1
  156. vchItems = sts._2
  157. }
  158. case RunPOSCommand(SuperOff) =>
  159. persistEvent(SuperOffed(vchState)) { evt =>
  160. val sts = updateState(evt, vchState, vchItems)
  161. vchState = sts._1
  162. vchItems = sts._2
  163. }
  164.  
  165. case RunPOSCommand(VCBalance(acct,num,passwd)) =>
  166. if (POSInterfaces.validateVC(acct,num,passwd)) {
  167. val res = POSInterfaces.getVCBalance(acct,num)
  168. if (res.sts == POSInterfaces.VCRESULT.OK)
  169. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 储值卡余额{${res.amt}}。", vchState, List(TxnItem(vchState).copy(
  170. amount = (res.amt * 100).toInt,
  171. dpt = acct,
  172. code = num
  173. )))
  174. else
  175. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额读取错误![${res.msg}]", vchState, List())
  176. } else {
  177. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]验证信息错误!", vchState, List())
  178. }
  179.  
  180. case RunPOSCommand(VCPay(acct,num, passwd,amount)) =>
  181. if (POSInterfaces.validateVC(acct,num,passwd)) {
  182. val res = POSInterfaces.getVCBalance(acct,num)
  183. if (res.sts == POSInterfaces.VCRESULT.OK) {
  184. if ((res.amt * 100).toInt < amount)
  185. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额不足!", vchState,List(TxnItem(vchState).copy(
  186. amount = (res.amt * 100).toInt,
  187. dpt = acct,
  188. code = num
  189. )))
  190. else {
  191. val res = POSInterfaces.payByVC(acct,num,amount/100.00)
  192. if (res.sts == POSInterfaces.VCRESULT.OK) {
  193. persistEvent(Payment(acct,num,vchState)) { evt =>
  194. val sts = updateState(evt, vchState, vchItems)
  195. vchState = sts._1
  196. vchItems = sts._2
  197. if (vchItems.totalSales > 0)
  198. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  199. else
  200. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  201. if (!vchState.due) { //completed voucher. mark end of voucher and move next. stay in logOnState
  202. persistEvent(EndVoucher(vchState.num)) { evt =>
  203. val sts = updateState(evt, vchState, vchItems,lastSequenceNr)
  204. vchState = sts._1
  205. vchItems = sts._2
  206. saveSnapshot(vchState) //recovery to next voucher
  207. context.unbecome() //switch to logOnState
  208. }
  209. }
  210. }
  211.  
  212. }
  213. }
  214.  
  215. }
  216. else
  217. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额读取错误![${res.msg}]", vchState, List())
  218. } else {
  219. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]验证信息错误!", vchState, List())
  220. }
  221. case _ =>
  222. sender() ! POSResponse(STATUS.FAIL, s"操作失败!终端 ${persistenceId} 结算中不容许其它非支付操作!", vchState, List(vchItems.txnitems.head))
  223.  
  224. }
  225.  
  226. private def logOnState: Receive = {
  227. case RunPOSCommand(LogOff) =>
  228. persistEvent(LogOffed(vchState)) { evt =>
  229. val user = vchState.opr
  230. val sts = updateState(evt,vchState,vchItems)
  231. vchState = sts._1; vchItems = sts._2
  232. saveSnapshot(vchState) //state of last voucher
  233. //手工passivate shard ! ShardRegion.Passivate(PassivatePOS)
  234. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 用户 $user 成功退出。", vchState, List(vchItems.txnitems.head))
  235. context.unbecome() //switch to logOffState
  236. }
  237.  
  238. case RunPOSCommand(SuperOn(su,_)) =>
  239. persistEvent(SuperOned(su,vchState)) { evt =>
  240. val sts = updateState(evt, vchState, vchItems)
  241. vchState = sts._1
  242. vchItems = sts._2
  243. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 转到管理模式{$su}。", vchState, List(vchItems.txnitems.head))
  244. }
  245. case RunPOSCommand(SuperOff) =>
  246. persistEvent(SuperOffed(vchState)) { evt =>
  247. val sts = updateState(evt, vchState, vchItems)
  248. vchState = sts._1
  249. vchItems = sts._2
  250. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 退出管理模式。", vchState, List(vchItems.txnitems.head))
  251. }
  252. case RunPOSCommand(RefundOn) =>
  253. persistEvent(RefundOned(vchState)) { evt =>
  254. val sts = updateState(evt, vchState, vchItems)
  255. vchState = sts._1
  256. vchItems = sts._2
  257. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 进入退款模式。", vchState, List(vchItems.txnitems.head))
  258. }
  259. case RunPOSCommand(RefundOff) =>
  260. persistEvent(RefundOffed(vchState)) { evt =>
  261. val sts = updateState(evt, vchState, vchItems)
  262. vchState = sts._1
  263. vchItems = sts._2
  264. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 退出退款模式。", vchState, List(vchItems.txnitems.head))
  265. }
  266. case RunPOSCommand(VoidOn) =>
  267. persistEvent(VoidOned(vchState)) { evt =>
  268. val sts = updateState(evt, vchState, vchItems)
  269. vchState = sts._1
  270. vchItems = sts._2
  271. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 进入取消模式。", vchState, List(vchItems.txnitems.head))
  272. }
  273. case RunPOSCommand(VoidOff) =>
  274. persistEvent(VoidOffed(vchState)) { evt =>
  275. val sts = updateState(evt, vchState, vchItems)
  276. vchState = sts._1
  277. vchItems = sts._2
  278. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 退出取消模式。", vchState, List(vchItems.txnitems.head))
  279. }
  280. case RunPOSCommand(MemberOn(cardnum, _)) =>
  281. persistEvent(MemberOned(cardnum,vchState)) { evt =>
  282. val sts = updateState(evt, vchState, vchItems)
  283. vchState = sts._1
  284. vchItems = sts._2
  285. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 进入会员模式,卡号{$cardnum}。", vchState, List(vchItems.txnitems.head))
  286. }
  287. case RunPOSCommand(MemberOff) =>
  288. val cardnum = vchState.mbr
  289. persistEvent(MemberOffed(vchState)) { evt =>
  290. val sts = updateState(evt, vchState, vchItems)
  291. vchState = sts._1
  292. vchItems = sts._2
  293. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 退出会员模式,卡号{$cardnum}。", vchState, List(vchItems.txnitems.head))
  294. }
  295.  
  296. case RunPOSCommand(VoucherNum(tnum)) =>
  297. val fnum = vchState.num
  298. persistEvent(VoucherNumed(fnum,tnum)) { evt =>
  299. val sts = updateState(evt, vchState, vchItems)
  300. vchState = sts._1
  301. vchItems = sts._2
  302. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功设定单号{$fnum -> $tnum}。", vchState, List())
  303. }
  304.  
  305. case RunPOSCommand(Subtotal) =>
  306. persistEvent(Subtotaled(vchState,vchItems)) { evt =>
  307. val sts = updateState(evt, vchState, vchItems)
  308. vchState = sts._1
  309. vchItems = sts._2
  310. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 小计:${vchItems.txnitems.head.price} 条项目。", vchState, List(vchItems.txnitems.head))
  311. }
  312. //first payment in a voucher
  313. case RunPOSCommand(OfflinePay(acct,num, amount)) =>
  314. persistEvent(Payment(acct,num,vchState)) { evt =>
  315. val sts = updateState(evt, vchState, vchItems)
  316. vchState = sts._1
  317. vchItems = sts._2
  318. if (vchItems.totalSales > 0)
  319. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  320. else
  321. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  322. if (!vchState.due) { //completed voucher. mark end of voucher and move next. stay in logOnState
  323. persistEvent(EndVoucher(vchState.num)) { evt =>
  324. val sts = updateState(evt, vchState, vchItems,lastSequenceNr)
  325. vchState = sts._1
  326. vchItems = sts._2
  327. saveSnapshot(vchState) //recovery to next voucher
  328. }
  329. }
  330. else context.become(paymentState) //switch into paymentState
  331. }
  332. case RunPOSCommand(VCBalance(acct,num,passwd)) =>
  333. if (POSInterfaces.validateVC(acct,num,passwd)) {
  334. val res = POSInterfaces.getVCBalance(acct,num)
  335. if (res.sts == POSInterfaces.VCRESULT.OK)
  336. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 储值卡余额{${res.amt}}。", vchState, List(TxnItem(vchState).copy(
  337. amount = (res.amt * 100).toInt,
  338. dpt = acct,
  339. code = num
  340. )))
  341. else
  342. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额读取错误![${res.msg}]", vchState, List())
  343. } else {
  344. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]验证信息错误!", vchState, List())
  345. }
  346.  
  347. case RunPOSCommand(VCPay(acct,num, passwd,amount)) =>
  348. if (POSInterfaces.validateVC(acct,num,passwd)) {
  349. val res = POSInterfaces.getVCBalance(acct,num)
  350. if (res.sts == POSInterfaces.VCRESULT.OK) {
  351. if ((res.amt * 100).toInt < amount)
  352. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额不足!", vchState,List(TxnItem(vchState).copy(
  353. amount = (res.amt * 100).toInt,
  354. dpt = acct,
  355. code = num
  356. )))
  357. else {
  358. val res = POSInterfaces.payByVC(acct,num,amount/100.00)
  359. if (res.sts == POSInterfaces.VCRESULT.OK) {
  360. persistEvent(Payment(acct,num,vchState)) { evt =>
  361. val sts = updateState(evt, vchState, vchItems)
  362. vchState = sts._1
  363. vchItems = sts._2
  364. if (vchItems.totalSales > 0)
  365. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}支付。", vchState, List(vchItems.txnitems.head))
  366. else
  367. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功完成{${vchItems.txnitems.head.category} ${amount/100.0}退款。", vchState, List(vchItems.txnitems.head))
  368. if (!vchState.due) { //completed voucher. mark end of voucher and move next. stay in logOnState
  369. persistEvent(EndVoucher(vchState.num)) { evt =>
  370. val sts = updateState(evt, vchState, vchItems,lastSequenceNr)
  371. vchState = sts._1
  372. vchItems = sts._2
  373. saveSnapshot(vchState) //recovery to next voucher
  374. }
  375. }
  376. else context.become(paymentState) //switch into paymentState
  377. }
  378.  
  379. }
  380. }
  381.  
  382. }
  383. else
  384. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]余额读取错误![${res.msg}]", vchState, List())
  385. } else {
  386. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 储值卡[$num]验证信息错误!", vchState, List())
  387. }
  388.  
  389. case RunPOSCommand(VoidAll) =>
  390. val vnum = vchState.num
  391. persistEvent(VoidVoucher(vchState)) { _ =>
  392. persistEvent(EndVoucher(vchState.num)) { evt =>
  393. val sts = updateState(evt, vchState, vchItems,lastSequenceNr)
  394. vchState = sts._1
  395. vchItems = sts._2
  396. saveSnapshot(vchState) //recovery to next voucher
  397. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 完成全单取消{${vnum}}。", vchState, List(vchItems.txnitems.head))
  398. }
  399. }
  400. case RunPOSCommand(Suspend) =>
  401. val vnum = vchState.num
  402. persistEvent(SuspVoucher(vchState)) { _ =>
  403. persistEvent(EndVoucher(vchState.num)) { evt =>
  404. val sts = updateState(evt, vchState, vchItems,lastSequenceNr)
  405. vchState = sts._1
  406. vchItems = sts._2
  407. saveSnapshot(vchState) //recovery to next voucher
  408. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 完成挂单{${vnum}}。", vchState, List(vchItems.txnitems.head))
  409. }
  410. }
  411.  
  412.  
  413.  
  414. case RunPOSCommand(LogSales(salesType,sdpt,scode,sqty,sprice)) => {
  415. var pqty = 0
  416. if (vchState.void) {
  417. vchItems.txnitems.find(ti => (ti.txntype == TXNTYPE.sales && ti.salestype == salesType &&
  418. ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)) match {
  419. case Some(ti) => pqty = -ti.qty
  420. case None => pqty = sqty
  421. }
  422. }
  423. salesType match {
  424. case SALESTYPE.plu =>
  425. getItem(scode) match {
  426. case QueryItemsOK(items) =>
  427. val pr = if (sprice > 0) sprice else items.head.price
  428. val evt = SalesLogged(TxnItem(vchState).copy(
  429. txntype = if (vchState.void) TXNTYPE.void else TXNTYPE.sales,
  430. salestype = salesType,
  431. price = pr,
  432. qty = pqty,
  433. amount = pr * pqty,
  434. code = scode,
  435. desc = items.head.name,
  436. dpt = items.head.dpt,
  437. cat = items.head.cat,
  438. brd = items.head.brd,
  439. department = (getDpt(items.head.dpt).getOrElse(Department("", ""))).name,
  440. category = (getCat(items.head.brd).getOrElse(Category("", ""))).name,
  441. brand = (getBrd(items.head.brd).getOrElse(Brand("", ""))).name
  442. ))
  443. persistEvent(evt) { evt =>
  444. val sts = updateState(evt, vchState, vchItems)
  445. vchState = sts._1
  446. vchItems = sts._2
  447. if (vchState.void) {
  448. vchItems = vchItems.copy(
  449. txnitems = vchItems.txnitems.map { ti =>
  450. if (ti.txntype == TXNTYPE.sales && ti.salestype == salesType &&
  451. ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)
  452. ti.copy(txntype = TXNTYPE.voided)
  453. else
  454. ti
  455. })
  456. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功冲销商品销售。", vchState, List(vchItems.txnitems.head))
  457. } else {
  458. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 商品销售操作成功。", vchState, List(vchItems.txnitems.head))
  459. }
  460.  
  461. }
  462. case QueryItemsFail(msg) =>
  463. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 商品编号错误![$msg]", vchState, List())
  464. }
  465. case SALESTYPE.dpt =>
  466. validateDpt(sdpt) match {
  467. case Some(dpt) =>
  468. val evt = SalesLogged(TxnItem(vchState).copy(
  469. txntype = if (vchState.void) TXNTYPE.void else TXNTYPE.sales,
  470. salestype = salesType,
  471. price = sprice,
  472. qty = pqty,
  473. amount = sprice * pqty,
  474. dpt = sdpt,
  475. department = dpt.name
  476. ))
  477. persistEvent(evt) { evt =>
  478. val sts = updateState(evt, vchState, vchItems)
  479. vchState = sts._1
  480. vchItems = sts._2
  481. if (vchState.void) {
  482. vchItems = vchItems.copy(
  483. txnitems = vchItems.txnitems.map { ti =>
  484. if (ti.txntype == TXNTYPE.sales && ti.salestype == salesType &&
  485. ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)
  486. ti.copy(txntype = TXNTYPE.voided)
  487. else
  488. ti
  489. })
  490. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功冲销部门销售。", vchState, List(vchItems.txnitems.head))
  491. } else {
  492. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 部门销售操作成功。", vchState, List(vchItems.txnitems.head))
  493. }
  494. }
  495. case None =>
  496. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 部门编号[$sdpt]错误!", vchState, List())
  497. }
  498. case SALESTYPE.brd =>
  499. validateBrd(sdpt) match {
  500. case Some(brd) =>
  501. val evt = SalesLogged(TxnItem(vchState).copy(
  502. txntype = if (vchState.void) TXNTYPE.void else TXNTYPE.sales,
  503. salestype = salesType,
  504. price = sprice,
  505. qty = pqty,
  506. amount = sprice * pqty,
  507. brd = sdpt,
  508. brand = brd.name
  509. ))
  510. persistEvent(evt) { evt =>
  511. val sts = updateState(evt, vchState, vchItems)
  512. vchState = sts._1
  513. vchItems = sts._2
  514. if (vchState.void) {
  515. vchItems = vchItems.copy(
  516. txnitems = vchItems.txnitems.map { ti =>
  517. if (ti.txntype == TXNTYPE.sales && ti.salestype == salesType &&
  518. ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)
  519. ti.copy(txntype = TXNTYPE.voided)
  520. else
  521. ti
  522. })
  523. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功冲销品牌销售。", vchState, List(vchItems.txnitems.head))
  524. } else {
  525. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 品牌销售操作成功。", vchState, List(vchItems.txnitems.head))
  526. }
  527. }
  528. case None =>
  529. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 品牌编号[$sdpt]错误!", vchState, List())
  530. }
  531. case SALESTYPE.cat =>
  532. validateCat(sdpt) match {
  533. case Some(cat) =>
  534. val evt = SalesLogged(TxnItem(vchState).copy(
  535. txntype = if (vchState.void) TXNTYPE.void else TXNTYPE.sales,
  536. salestype = salesType,
  537. price = sprice,
  538. qty = pqty,
  539. amount = sprice * pqty,
  540. cat = sdpt,
  541. category = cat.name
  542. ))
  543. persistEvent(evt) { evt =>
  544. val sts = updateState(evt, vchState, vchItems)
  545. vchState = sts._1
  546. vchItems = sts._2
  547. if (vchState.void) {
  548. vchItems = vchItems.copy(
  549. txnitems = vchItems.txnitems.map { ti =>
  550. if (ti.txntype == TXNTYPE.sales && ti.salestype == salesType &&
  551. ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)
  552. ti.copy(txntype = TXNTYPE.voided)
  553. else
  554. ti
  555. })
  556. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功冲销分类销售。", vchState, List(vchItems.txnitems.head))
  557. } else {
  558. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 分类销售操作成功。", vchState, List(vchItems.txnitems.head))
  559. }
  560. }
  561. case None =>
  562. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 分类编号[$sdpt]错误!", vchState, List())
  563. }
  564. case SALESTYPE.ra =>
  565. validateCat(sdpt) match {
  566. case Some(ra) =>
  567. val evt = SalesLogged(TxnItem(vchState).copy(
  568. txntype = if (vchState.void) TXNTYPE.void else TXNTYPE.sales,
  569. salestype = salesType,
  570. price = sprice,
  571. qty = pqty,
  572. amount = sprice * pqty,
  573. dpt = sdpt,
  574. code = scode,
  575. department = ra.name
  576. ))
  577. persistEvent(evt) { evt =>
  578. val sts = updateState(evt, vchState, vchItems)
  579. vchState = sts._1
  580. vchItems = sts._2
  581. if (vchState.void) {
  582. vchItems = vchItems.copy(
  583. txnitems = vchItems.txnitems.map { ti =>
  584. if (ti.txntype == TXNTYPE.sales && ti.salestype == salesType &&
  585. ti.dpt == sdpt && ti.code == scode && ti.qty == sqty && ti.price == sprice)
  586. ti.copy(txntype = TXNTYPE.voided)
  587. else
  588. ti
  589. })
  590. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 成功冲销代收。", vchState, List(vchItems.txnitems.head))
  591. } else {
  592. sender() ! POSResponse(STATUS.OK, s"终端-$persistenceId: 代收操作成功。", vchState, List(vchItems.txnitems.head))
  593. }
  594. }
  595. case None =>
  596. sender() ! POSResponse(STATUS.FAIL, s"终端-$persistenceId: 代收编号[$sdpt]错误!", vchState, List())
  597. }
  598.  
  599. }
  600. }
  601.  
  602. }
  603.  
  604.  
  605.  
  606. }

POSDataAccess.scala

  1. package pos.dao
  2.  
  3. import java.time.LocalDate
  4. import java.time.format.DateTimeFormatter
  5.  
  6.  
  7. case class Item(
  8. brd: String
  9. ,dpt: String
  10. ,cat: String
  11. ,code: String
  12. ,name: String
  13. ,price: Int
  14.  
  15. )
  16. object Items {
  17. val apple = Item("01","02","01","001", "green apple", 820)
  18. val grape = Item("01","02","01","002", "red grape", 1050)
  19. val orage = Item("01","02","01","003", "sunkist orage", 350)
  20. val banana = Item("01","02","01","004", "demon banana", 300)
  21. val pineapple = Item("01","02","01","005", "hainan pineapple", 1300)
  22. val peach = Item("01","02","01","006", "xinjiang peach", 2390)
  23.  
  24. val tblItems = List(apple, grape, orage, banana, pineapple, peach)
  25.  
  26. sealed trait QueryItemsResult {}
  27.  
  28. case class QueryItemsOK(items: List[Item]) extends QueryItemsResult
  29.  
  30. case class QueryItemsFail(msg: String) extends QueryItemsResult
  31.  
  32. }
  33.  
  34.  
  35. object Codes {
  36. case class User(code: String, name: String, passwd: String)
  37. case class Department(code: String, name: String)
  38. case class Category(code: String, name: String)
  39. case class Brand(code: String, name: String)
  40. case class Ra(code: String, name: String)
  41. case class Account(code: String, name: String)
  42. case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean)
  43.  
  44. val ras = List(Ra("01","Delivery"),Ra("02","Cooking"))
  45. val dpts = List(Department("01","Fruit"),Department("02","Grocery"))
  46. val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery"))
  47. val brds = List(Brand("01","Sunkist"),Brand("02","Demon"))
  48. val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa")
  49. ,Account("004","Alipay"),Account("005","WXPay"))
  50.  
  51. val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123"))
  52.  
  53. def getDpt(code: String) = dpts.find(d => d.code == code)
  54. def getCat(code: String) = cats.find(d => d.code == code)
  55. def getBrd(code: String) = brds.find(b => b.code == code)
  56. def getAcct(code: String) = accts.find(a => a.code == code)
  57. def getRa(code: String) = ras.find(a => a.code == code)
  58. }
  59.  
  60. object DAO {
  61. import Items._
  62. import Codes._
  63.  
  64. def getItem(code: String): QueryItemsResult = {
  65. val optItem = tblItems.find(it => it.code == code)
  66. optItem match {
  67. case Some(item) => QueryItemsOK(List(item))
  68. case None => QueryItemsFail("Invalid item code!")
  69. }
  70. }
  71.  
  72. def validateDpt(code: String) = dpts.find(d => d.code == code)
  73. def validateCat(code: String) = cats.find(d => d.code == code)
  74. def validateBrd(code: String) = brds.find(b => b.code == code)
  75. def validateRa(code: String) = ras.find(ac => ac.code == code)
  76. def validateAcct(code: String) = accts.find(ac => ac.code == code)
  77.  
  78. def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd))
  79.  
  80. def lastSecOfDateStr(ldate: LocalDate): String = {
  81. ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd"))+" 23:59:59"
  82. }
  83.  
  84.  
  85. }

ClusterMonitor.scala

  1. package pos.cluster
  2.  
  3. import akka.actor._
  4. import akka.cluster.ClusterEvent._
  5. import akka.cluster._
  6. import sdp.logging.LogSupport
  7.  
  8. class ClusterMonitor extends Actor with LogSupport {
  9. val cluster = Cluster(context.system)
  10. override def preStart(): Unit = {
  11. cluster.subscribe(self,initialStateMode = InitialStateAsEvents
  12. ,classOf[MemberEvent],classOf[UnreachableMember]) //订阅集群状态转换信息
  13. super.preStart()
  14. }
  15.  
  16. override def postStop(): Unit = {
  17. cluster.unsubscribe(self) //取消订阅
  18. super.postStop()
  19. }
  20.  
  21. override def receive: Receive = {
  22. case MemberJoined(member) =>
  23. log.info(s"Member is Joining: {${member.address}}")
  24. case MemberUp(member) =>
  25. log.info(s"Member is Up: {${member.address}}")
  26. case MemberLeft(member) =>
  27. log.info(s"Member is Leaving: {${member.address}}")
  28. case MemberExited(member) =>
  29. log.info(s"Member is Exiting: {${member.address}}")
  30. case MemberRemoved(member, previousStatus) =>
  31. log.info(
  32. s"Member is Removed: {${member.address}} after {${previousStatus}")
  33. case UnreachableMember(member) =>
  34. log.info(s"Member detected as unreachable: {${member.address}}")
  35. cluster.down(member.address) //手工驱除,不用auto-down
  36. case _: MemberEvent => // ignore
  37. }
  38.  
  39. }

 

原文链接:http://www.cnblogs.com/tiger-xc/p/10622046.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号