上篇我们做了一个WriterActor的例子,主要目的是示范WriterActor如何作为集群分片用persistentActor特性及event-sourcing模式实现CQRS的写功能。既然是集群分片,那么我们就在这篇讲讲WriterActor的部署和测试,因为这个里面还是有些值得注意的地方。下面是一段WriteActor,即集群分片(cluster-sharding)的部署代码:
- ClusterSharding(system).start(
- typeName = shardName,
- entityProps = writerProps,
- settings = cpsSettings,
- extractEntityId = getPOSId,
- extractShardId = getShopId,
- allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
- handOffStopMessage = PassivatePOS
- )
注意带handOffStopMessage参数的start函数必须同时提供allocationStrategy。这个参数提供了passivation消息类型。
整个集群分片部署代码如下:
- object POSRouter extends LogSupport {
- def main(args: Array[String]) {
- import WriterActor._
- import Commands._
- val argsPat = "(.*):(.*)".r
- val (host, port) = args(0) match {
- case argsPat(h, p) => (h, p)
- case _ => ("localhost", "2551")
- }
- val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
- .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
- //roles can be deployed on this node
- .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
- .withFallback(ConfigFactory.load())
- log.info(s"******* hostname = $host, port = $port *******")
- val shardName = "POSShard"
- case class POSMessage(id: Long, cmd: POSCommand) {
- def shopId = id.toString.head.toString
- def posId = id.toString
- }
- val getPOSId: ShardRegion.ExtractEntityId = {
- case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
- }
- val getShopId: ShardRegion.ExtractShardId = {
- case posCommand: POSMessage => posCommand.shopId
- }
- val system = ActorSystem("cloud-pos-server", config)
- val role = "poswriter" //role of this shard
- val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)
- ClusterSharding(system).start(
- typeName = shardName,
- entityProps = writerProps,
- settings = cpsSettings,
- extractEntityId = getPOSId,
- extractShardId = getShopId,
- allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
- handOffStopMessage = PassivatePOS
- )
- system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")
- }
- }
以上有几个参数需要特别注意:host和port是从main的参数解析出来的如192.168.11.162:2551,代表本节点的host和port。akka.cluster.roles代表本节点支持的角色,这里poswriter是其中之一。而ClusterShardingSettings(system).withRole("poswriter")代表这个分片shard只能在支持poswriter角色的节点上部署。如果搞错了运行时你会发现Sharding无法启动。上面这段程序代表本节点支持poswriter角色。在本节点(输入的IP地址)部署了一个名称为“POSShard”的cluster-sharding,它具备poswriter角色。
如果我在多部机器上运行这段代码,输入当前机器的IP+PORT就代表在这么多台机器上都部署了“POSShard”分片。上面的ClusterMonitor是个集群状态监控actor:
- package sdp.cluster.monitor
- import akka.actor._
- import akka.cluster.ClusterEvent._
- import akka.cluster._
- import sdp.logging.LogSupport
- object ClusterMonitor {
- def props = Props(new ClusterMonitor)
- }
- class ClusterMonitor extends Actor with LogSupport {
- val cluster = Cluster(context.system)
- override def preStart(): Unit = {
- cluster.subscribe(self,initialStateMode = InitialStateAsEvents
- ,classOf[MemberEvent],classOf[UnreachableMember]) //订阅集群状态转换信息
- super.preStart()
- }
- override def postStop(): Unit = {
- cluster.unsubscribe(self) //取消订阅
- super.postStop()
- }
- override def receive: Receive = {
- case MemberJoined(member) =>
- log.info(s"Member is Joining: {${member.address}}")
- case MemberUp(member) =>
- log.info(s"Member is Up: {${member.address}}")
- case MemberLeft(member) =>
- log.info(s"Member is Leaving: {${member.address}}")
- case MemberExited(member) =>
- log.info(s"Member is Exiting: {${member.address}}")
- case MemberRemoved(member, previousStatus) =>
- log.info(
- s"Member is Removed: {${member.address}} after {${previousStatus}")
- case UnreachableMember(member) =>
- log.info(s"Member detected as unreachable: {${member.address}}")
- cluster.down(member.address) //手工驱除,不用auto-down
- case _: MemberEvent => // ignore
- }
- }
有了它我们可以监视集群节点连接状态。
好了,现在假设我们在几台机器组成的集群各节点上都部署了“POSShard”分片,那么就设计个客户端来向这个“POSShard”分片发送POSMessage:
- case class POSMessage(id: Long, cmd: POSCommand) {
- def shopId = id.toString.head.toString
- def posId = id.toString
- }
- val getPOSId: ShardRegion.ExtractEntityId = {
- case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
- }
- val getShopId: ShardRegion.ExtractShardId = {
- case posCommand: POSMessage => posCommand.shopId
- }
这个客户端必须考虑以下几点:它必须在同一个集群,也就是它也是集群其中一个节点,否则无法和其它部署了“POSShard”分片的节点进行信息交流。但它又不能同处与部署了“POSShard”的节点,因为remote的hostname和port已经被占用。所以只能把客户端放在一个没有部署“POSShard”的节点上,然后用ClusterSharding(system).startProxy来启动一个分片中介:
- //no shard deployed on this node 2558, use proxy
- val posHandler = ClusterSharding(system).startProxy(
- typeName = shardName,
- role = Some("poswriter"),
- extractEntityId = getPOSId,
- extractShardId = getShopId
- )
- //val posHandler = ClusterSharding(system).shardRegion(shardName)
- system.actorOf(POSClient.props(posHandler), "pos-client")
注意这个proxy的role必须是Some("poswriter"),只有这样才能调用其它节点上的”POSShard“,因为它们的角色都是“poswriter”。与WriterActor交互的必须是个actor,因为WriterActor会用sender()返回结果,这个sender()是个ActorRef:
- object POSClient {
- def props(pos: ActorRef) = Props(new POSClient(pos))
- }
- class POSClient(posHandler: ActorRef) extends Actor with LogSupport {
- override def receive: Receive = {
- case msg @ POSMessage(_,_) => posHandler ! msg
- case resp: POSResponse =>
- log.info(s"response from server: $resp")
- }
- }
我们可用下面的方式来指挥WriterActor:
- val posref = system.actorOf(POSClient.props(posHandler), "pos-client")
-
- posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
- posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
- posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
- posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
- posref ! POSMessage(4021,Subtotal)
下面是服务端分片部署源代码:
resources/application.conf
- akka.actor.warn-about-java-serializer-usage = off
- akka.log-dead-letters-during-shutdown = off
- akka.log-dead-letters = off
- akka {
- loglevel = INFO
- actor {
- provider = "cluster"
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 0
- }
- }
- cluster {
- seed-nodes = [
- "akka.tcp://cloud-pos-server@192.168.11.162:2551"]
- log-info = off
- sharding {
- role = "poswriter"
- passivate-idle-entity-after = 5 m
- }
- }
- persistence {
- journal.plugin = "cassandra-journal"
- snapshot-store.plugin = "cassandra-snapshot-store"
- }
- }
- cassandra-journal {
- contact-points = ["192.168.11.162"]
- }
- cassandra-snapshot-store {
- contact-points = ["192.168.11.162"]
- }
POSRouter.scala
- package cloud.pos.server
- import akka.actor._
- import akka.cluster.sharding._
- import akka.cluster.sharding.ClusterSharding
- import com.typesafe.config.ConfigFactory
- import sdp.cluster.monitor._
- import sdp.logging._
- object POSRouter extends LogSupport {
- def main(args: Array[String]) {
- import WriterActor._
- import Commands._
- val argsPat = "(.*):(.*)".r
- val (host, port) = args(0) match {
- case argsPat(h, p) => (h, p)
- case _ => ("localhost", "2551")
- }
- val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=\"" + port + "\"")
- .withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"" + host + "\""))
- //roles can be deployed on this node
- .withFallback(ConfigFactory.parseString("akka.cluster.roles = [poswriter]"))
- .withFallback(ConfigFactory.load())
- log.info(s"******* hostname = $host, port = $port *******")
- val shardName = "POSShard"
-
- case class POSMessage(id: Long, cmd: POSCommand) {
- def shopId = id.toString.head.toString
- def posId = id.toString
- }
- val getPOSId: ShardRegion.ExtractEntityId = {
- case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
- }
- val getShopId: ShardRegion.ExtractShardId = {
- case posCommand: POSMessage => posCommand.shopId
- }
- val system = ActorSystem("cloud-pos-server", config)
- val role = "poswriter" //role of this shard
- val cpsSettings = ClusterShardingSettings(system).withRole(role) //.withPassivateIdleAfter(10 minutes)
- ClusterSharding(system).start(
- typeName = shardName,
- entityProps = writerProps,
- settings = cpsSettings,
- extractEntityId = getPOSId,
- extractShardId = getShopId,
- allocationStrategy = ClusterSharding(system).defaultShardAllocationStrategy(cpsSettings),
- handOffStopMessage = PassivatePOS
- )
- system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")
- }
- }
下面是这个测试项目的源代码:
build.sbt
- name := "cloud-pos-client"
- version := "0.1"
- scalaVersion := "2.12.8"
- libraryDependencies := Seq(
- "com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.19",
- "com.typesafe.akka" %% "akka-persistence" % "2.5.19",
- "com.typesafe.akka" %% "akka-persistence-cassandra" % "0.93",
- "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.93" % Test,
- "ch.qos.logback" % "logback-classic" % "1.2.3"
- )
resources/application.conf
- akka.actor.warn-about-java-serializer-usage = off
- akka.log-dead-letters-during-shutdown = off
- akka.log-dead-letters = off
- akka {
- loglevel = INFO
- actor {
- provider = "cluster"
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "192.168.11.162"
- port = 2558
- }
- }
- cluster {
- seed-nodes = [
- "akka.tcp://cloud-pos-server@192.168.11.162:2551"]
- log-info = off
- }
- }
resources/logback.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <Pattern>
- %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
- </Pattern>
- </encoder>
- </appender>
-
- <root level="debug">
- <appender-ref ref="STDOUT" />
- </root>
- </configuration>
ClientDemo.scala
- package cloud.pos.client
- import akka.actor._
- import akka.cluster.sharding.ClusterSharding
- import sdp.cluster.monitor._
- import sdp.logging._
- import Commands._
- import States._
- import Items._
- import akka.cluster.sharding._
- object POSClientDemo extends LogSupport {
- def main(args: Array[String]) {
- val system = ActorSystem("cloud-pos-server")
- val shardName = "POSShard"
- val getPOSId: ShardRegion.ExtractEntityId = {
- case posCommand: POSMessage => (posCommand.posId, posCommand.cmd)
- }
- val getShopId: ShardRegion.ExtractShardId = {
- case posCommand: POSMessage => posCommand.shopId
- }
- //no shard deployed on this node 2558, use proxy
- val posHandler = ClusterSharding(system).startProxy(
- typeName = shardName,
- role = Some("poswriter"),
- extractEntityId = getPOSId,
- extractShardId = getShopId
- )
- //val posHandler = ClusterSharding(system).shardRegion(shardName)
- system.actorOf(ClusterMonitor.props, "cps-cluster-monitor")
- val posref = system.actorOf(POSClient.props(posHandler), "pos-client")
- posref ! POSMessage(1021, LogSales(SALESTYPE.plu, "", apple.code, 1, 0))
- posref ! POSMessage(2021, LogSales(SALESTYPE.plu, "", pineapple.code, 2, 0))
- posref ! POSMessage(3021, LogSales(SALESTYPE.plu, "", banana.code, 1, 0))
- posref ! POSMessage(4021, LogSales(SALESTYPE.plu, "", grape.code, 3, 0))
- posref ! POSMessage(4021,Subtotal)
- scala.io.StdIn.readLine()
- system.terminate()
- }
- }
client/Commands.scala
- package cloud.pos.client
- object Commands {
- sealed trait POSCommand {}
- case class LogOn(opr: String, passwd: String) extends POSCommand
- case object LogOff extends POSCommand
- case class SuperOn(su: String, passwd: String) extends POSCommand
- case object SuperOff extends POSCommand
- case class MemberOn(cardnum: String, passwd: String) extends POSCommand
- case object MemberOff extends POSCommand //remove member status for the voucher
- case object RefundOn extends POSCommand
- case object RefundOff extends POSCommand
- case object VoidOn extends POSCommand
- case object VoidOff extends POSCommand
- case object VoidAll extends POSCommand
- case object Suspend extends POSCommand
- case class VoucherNum(vnum: Int) extends POSCommand
- case class LogSales(salesType: Int, dpt: String, code: String, qty: Int, price: Int) extends POSCommand
- case object Subtotal extends POSCommand
- case class Discount(code: String, percent: Int) extends POSCommand
- case class OfflinePay(acct: String, num: String, amount: Int) extends POSCommand //settlement 结算支付
- //read only command, no event process
- case class VCBalance(acct: String, num: String, passwd: String) extends POSCommand
- case class VCPay(acct: String, num: String, passwd: String, amount: Int) extends POSCommand
- case class AliPay(acct: String, num: String, amount: Int) extends POSCommand
- case class WxPay(acct: String, num: String, amount: Int) extends POSCommand
- // read only command, no update event
- case class Plu(itemCode: String) extends POSCommand //read only
-
- case class POSMessage(id: Long, cmd: POSCommand) {
- def shopId = id.toString.head.toString
- def posId = id.toString
- }
- }
client/States.scala
- package cloud.pos.client
- object States {
- object TXNTYPE {
- val sales: Int = 0
- val refund: Int = 1
- val void: Int = 2
- val voided: Int = 3
- val voidall: Int = 4
- val subtotal: Int = 5
- val logon: Int = 6
- val supon: Int = 7 // super user on/off
- val suspend: Int = 8
- }
- object SALESTYPE {
- val plu: Int = 0
- val dpt: Int = 1
- val cat: Int = 2
- val brd: Int = 3
- val ra: Int = 4
- val sub: Int = 5
- val ttl: Int = 6
- val dsc: Int = 7
- val crd: Int = 8
- }
- case class TxnItem(
- txndate: String = ""
- ,txntime: String = ""
- ,opr: String = ""//工号
- ,num: Int = 0 //销售单号
- ,seq: Int = 1 //交易序号
- ,txntype: Int = TXNTYPE.sales//交易类型
- ,salestype: Int = SALESTYPE.plu //销售类型
- ,qty: Int = 1 //交易数量
- ,price: Int = 0 //单价(分)
- ,amount: Int = 0 //码洋(分)
- ,dscamt: Int = 0 //折扣:负值 net实洋 = amount + dscamt
- ,member: String = "" //会员卡号
- ,code: String = "" //编号(商品、账号...)
- ,desc: String = "" //项目名称
- ,dpt: String = ""
- ,department: String = ""
- ,cat: String = ""
- ,category: String = ""
- ,brd: String = ""
- ,brand: String = ""
- )
- case class VchStatus( //操作状态锁留给前端维护
- qty: Int = 1,
- refund: Boolean = false,
- void: Boolean = false)
- case class VchStates(
- opr: String = "", //收款员
- jseq: BigInt = 0, //begin journal sequence for read-side replay
- num: Int = 0, //当前单号
- seq: Int = 1, //当前序号
- void: Boolean = false, //取消模式
- refd: Boolean = false, //退款模式
- due: Boolean = true, //当前余额
- su: String = "",
- mbr: String = ""
- )
- }
client/POSClient.scala
- package cloud.pos.client
- import akka.actor._
- import sdp.logging._
- import Responses._
- import Commands._
- object POSClient {
- def props(pos: ActorRef) = Props(new POSClient(pos))
- }
- class POSClient(posHandler: ActorRef) extends Actor with LogSupport {
- override def receive: Receive = {
- case msg @ POSMessage(_,_) => posHandler ! msg
- case resp: POSResponse =>
- log.info(s"response from server: $resp")
- }
- }
client/Responses.scala
- package cloud.pos.client
- import States._
- object Responses {
- object STATUS {
- val OK: Int = 0
- val FAIL: Int = -1
- }
- case class POSResponse (sts: Int, msg: String, voucher: VchStates, txnItems: List[TxnItem])
- }
client/DataAccess.scala
- package cloud.pos.client
- import java.time.LocalDate
- import java.time.format.DateTimeFormatter
- case class Item(
- brd: String
- ,dpt: String
- ,cat: String
- ,code: String
- ,name: String
- ,price: Int
- )
- object Items {
- val apple = Item("01","02","01","001", "green apple", 820)
- val grape = Item("01","02","01","002", "red grape", 1050)
- val orage = Item("01","02","01","003", "sunkist orage", 350)
- val banana = Item("01","02","01","004", "demon banana", 300)
- val pineapple = Item("01","02","01","005", "hainan pineapple", 1300)
- val peach = Item("01","02","01","006", "xinjiang peach", 2390)
- val tblItems = List(apple, grape, orage, banana, pineapple, peach)
- sealed trait QueryItemsResult {}
- case class QueryItemsOK(items: List[Item]) extends QueryItemsResult
- case class QueryItemsFail(msg: String) extends QueryItemsResult
- }
- object Codes {
- case class User(code: String, name: String, passwd: String)
- case class Department(code: String, name: String)
- case class Category(code: String, name: String)
- case class Brand(code: String, name: String)
- case class Ra(code: String, name: String)
- case class Account(code: String, name: String)
- case class Disc(code: String, best: Boolean, aggr: Boolean, group: Boolean)
- val ras = List(Ra("01","Delivery"),Ra("02","Cooking"))
- val dpts = List(Department("01","Fruit"),Department("02","Grocery"))
- val cats = List(Category("0101","Fresh Fruit"),Category("0201","Dry Grocery"))
- val brds = List(Brand("01","Sunkist"),Brand("02","Demon"))
- val accts = List(Account("001","Cash"),Account("002","Value Card"), Account("003", "Visa")
- ,Account("004","Alipay"),Account("005","WXPay"))
- val users = List(User("1001","Tiger", "123"),User("1002","John", "123"),User("1003","Maria", "123"))
- def getDpt(code: String) = dpts.find(d => d.code == code)
- def getCat(code: String) = cats.find(d => d.code == code)
- def getBrd(code: String) = brds.find(b => b.code == code)
- def getAcct(code: String) = accts.find(a => a.code == code)
- def getRa(code: String) = ras.find(a => a.code == code)
- }
- object DAO {
- import Items._
- import Codes._
- def getItem(code: String): QueryItemsResult = {
- val optItem = tblItems.find(it => it.code == code)
- optItem match {
- case Some(item) => QueryItemsOK(List(item))
- case None => QueryItemsFail("Invalid item code!")
- }
- }
- def validateDpt(code: String) = dpts.find(d => d.code == code)
- def validateCat(code: String) = cats.find(d => d.code == code)
- def validateBrd(code: String) = brds.find(b => b.code == code)
- def validateRa(code: String) = ras.find(ac => ac.code == code)
- def validateAcct(code: String) = accts.find(ac => ac.code == code)
- def validateUser(userid: String, passwd: String) = users.find(u => (u.code == userid && u.passwd == passwd))
- def lastSecOfDateStr(ldate: LocalDate): String = {
- ldate.format(DateTimeFormatter.ofPattern( "yyyy-MM-dd"))+" 23:59:59"
- }
- }
logging/Log.scala
- package sdp.logging
- import org.slf4j.Logger
- /**
- * Logger which just wraps org.slf4j.Logger internally.
- *
- * @param logger logger
- */
- class Log(logger: Logger) {
- // use var consciously to enable squeezing later
- var isDebugEnabled: Boolean = logger.isDebugEnabled
- var isInfoEnabled: Boolean = logger.isInfoEnabled
- var isWarnEnabled: Boolean = logger.isWarnEnabled
- var isErrorEnabled: Boolean = logger.isErrorEnabled
- def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
- level match {
- case 'debug | 'DEBUG => debug(msg)
- case 'info | 'INFO => info(msg)
- case 'warn | 'WARN => warn(msg)
- case 'error | 'ERROR => error(msg)
- case _ => // nothing to do
- }
- }
- def debug(msg: => String): Unit = {
- if (isDebugEnabled && logger.isDebugEnabled) {
- logger.debug(msg)
- }
- }
- def debug(msg: => String, e: Throwable): Unit = {
- if (isDebugEnabled && logger.isDebugEnabled) {
- logger.debug(msg, e)
- }
- }
- def info(msg: => String): Unit = {
- if (isInfoEnabled && logger.isInfoEnabled) {
- logger.info(msg)
- }
- }
- def info(msg: => String, e: Throwable): Unit = {
- if (isInfoEnabled && logger.isInfoEnabled) {
- logger.info(msg, e)
- }
- }
- def warn(msg: => String): Unit = {
- if (isWarnEnabled && logger.isWarnEnabled) {
- logger.warn(msg)
- }
- }
- def warn(msg: => String, e: Throwable): Unit = {
- if (isWarnEnabled && logger.isWarnEnabled) {
- logger.warn(msg, e)
- }
- }
- def error(msg: => String): Unit = {
- if (isErrorEnabled && logger.isErrorEnabled) {
- logger.error(msg)
- }
- }
- def error(msg: => String, e: Throwable): Unit = {
- if (isErrorEnabled && logger.isErrorEnabled) {
- logger.error(msg, e)
- }
- }
- }
logging/LogSupport.scala
- package sdp.logging
- import org.slf4j.LoggerFactory
- trait LogSupport {
- /**
- * Logger
- */
- protected val log = new Log(LoggerFactory.getLogger(this.getClass))
- }
logging/ClusterMonitor.scala
- package sdp.cluster.monitor
- import akka.actor._
- import akka.cluster.ClusterEvent._
- import akka.cluster._
- import sdp.logging.LogSupport
- object ClusterMonitor {
- def props = Props(new ClusterMonitor())
- }
- class ClusterMonitor extends Actor with LogSupport {
- val cluster = Cluster(context.system)
- override def preStart(): Unit = {
- cluster.subscribe(self,initialStateMode = InitialStateAsEvents
- ,classOf[MemberEvent],classOf[UnreachableMember]) //订阅集群状态转换信息
- super.preStart()
- }
- override def postStop(): Unit = {
- cluster.unsubscribe(self) //取消订阅
- super.postStop()
- }
- override def receive: Receive = {
- case MemberJoined(member) =>
- log.info(s"Member is Joining: {${member.address}}")
- case MemberUp(member) =>
- log.info(s"Member is Up: {${member.address}}")
- case MemberLeft(member) =>
- log.info(s"Member is Leaving: {${member.address}}")
- case MemberExited(member) =>
- log.info(s"Member is Exiting: {${member.address}}")
- case MemberRemoved(member, previousStatus) =>
- log.info(
- s"Member is Removed: {${member.address}} after {${previousStatus}")
- case UnreachableMember(member) =>
- log.info(s"Member detected as unreachable: {${member.address}}")
- cluster.down(member.address) //手工驱除,不用auto-down
- case _: MemberEvent => // ignore
- }
- }