经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
Akka-Cluster(4)- DistributedData, 分布式数据类型
来源:cnblogs  作者:雪川大虫  时间:2018/12/24 10:35:17  对本文有异议

在实际应用中,集群环境里共用一些数据是不可避免的。我的意思是有些数据可以在任何节点进行共享同步读写,困难的是如何解决更改冲突问题。本来可以通过分布式数据库来实现这样的功能,但使用和维护成本又过高,不值得。分布式数据类型distributed-data (ddata)正是为解决这样的困局而设计的。akka提供了一组CRDT(ConflictFreeReplicatedDataType 免冲突可复制数据类型)和一套管理方法来实现分布式数据在集群中的免冲突共享共用。

akka提供的分布式数据共享管理方案是通过replicator来实现的。replicator就是一种actor, 在集群的每一个节点运行replicator后,各节点相同actor路径(去掉地址信息后)的replicator可以通过gissip协议进行沟通,仿佛连接成一个replicator网络通道。replicator提供一套解决数据更新冲突及数据同步的api。首先,共享数据结构是在各节点的replicator中构建的,数据更新时各节点程序把包嵌共享数据类型指定和对该数据更新方法函数的消息发送给本节点的replicator去更新并通过gossip协议向其它节点的replicator同步,同时解决同步时发生的冲突问题。由于数据是存在于replicator内的,所以数据值的读取同样是通过向本地replicator发送数据读取消息实现的。

replicator作为一个actor,可以通过在.conf文件中定义akka-cluster-ddata-DistributedData扩展来启动,又或者直接通过replicator.prop构建。个人认为直接构建actor会灵活许多,而且可以在一个节点上构建多个replicator,因为不同节点上的replicator是通过actor路径来分群组的。下面是通过replicator.prop构建replicator的示范代码:

  1. val replicator = system.actorOf(Replicator.props(
  2. ReplicatorSettings(system).withGossipInterval(1.second)), "replicator")

如果使用配置文件中的akka.extension 进行构建:

  1. akka {
  2. extensions = ["akka.cluster.ddata.DistributedData"]
  3. ...
  4. }
  5. val replicator = DistributedData(context.system).replicator

CRDT是某种key,value数据类型。CRDT value主要包括Counter,Flag,Set,Map几种类型,包括:

  1. /**
  2. * Implements a boolean flag CRDT that is initialized to `false` and
  3. * can be switched to `true`. `true` wins over `false` in merge.
  4. *
  5. * This class is immutable, i.e. "modifying" methods return a new instance.
  6. */
  7. final case class Flag(enabled: Boolean)
  8. final case class FlagKey(_id: String)
  9. /**
  10. * Implements a 'Growing Counter' CRDT, also called a 'G-Counter'.
  11. * A G-Counter is a increment-only counter (inspired by vector clocks) in
  12. * which only increment and merge are possible. Incrementing the counter
  13. * adds 1 to the count for the current node. Divergent histories are
  14. * resolved by taking the maximum count for each node (like a vector
  15. * clock merge). The value of the counter is the sum of all node counts.
  16. *
  17. * This class is immutable, i.e. "modifying" methods return a new instance.
  18. */
  19. final class GCounter
  20. final case class GCounterKey(_id: String)
  21. /**
  22. * Implements a 'Increment/Decrement Counter' CRDT, also called a 'PN-Counter'.
  23. * PN-Counters allow the counter to be incremented by tracking the
  24. * increments (P) separate from the decrements (N). Both P and N are represented
  25. * as two internal [[GCounter]]s. Merge is handled by merging the internal P and N
  26. * counters. The value of the counter is the value of the P counter minus
  27. * the value of the N counter.
  28. *
  29. * This class is immutable, i.e. "modifying" methods return a new instance.
  30. */
  31. final class PNCounter
  32. final case class PNCounterKey(_id: String)
  33. /**
  34. * Implements a 'Add Set' CRDT, also called a 'G-Set'. You can't
  35. * remove elements of a G-Set.
  36. * A G-Set doesn't accumulate any garbage apart from the elements themselves.
  37. * This class is immutable, i.e. "modifying" methods return a new instance.
  38. */
  39. final case class GSet[A]
  40. final case class GSetKey[A](_id: String)
  41. /**
  42. * Implements a 'Observed Remove Set' CRDT, also called a 'OR-Set'.
  43. * Elements can be added and removed any number of times. Concurrent add wins
  44. * over remove.
  45. *
  46. * The ORSet has a version vector that is incremented when an element is added to
  47. * the set. The `node -> count` pair for that increment is stored against the
  48. * element as its "birth dot". Every time the element is re-added to the set,
  49. * its "birth dot" is updated to that of the `node -> count` version vector entry
  50. * resulting from the add. When an element is removed, we simply drop it, no tombstones.
  51. *
  52. * When an element exists in replica A and not replica B, is it because A added
  53. * it and B has not yet seen that, or that B removed it and A has not yet seen that?
  54. * In this implementation we compare the `dot` of the present element to the version vector
  55. * in the Set it is absent from. If the element dot is not "seen" by the Set version vector,
  56. * that means the other set has yet to see this add, and the item is in the merged
  57. * Set. If the Set version vector dominates the dot, that means the other Set has removed this
  58. * element already, and the item is not in the merged Set.
  59. *
  60. * This class is immutable, i.e. "modifying" methods return a new instance.
  61. */
  62. final class ORSet[A]
  63. final case class ORSetKey[A](_id: String)
  64. /**
  65. * Implements a 'Observed Remove Map' CRDT, also called a 'OR-Map'.
  66. *
  67. * It has similar semantics as an [[ORSet]], but in case of concurrent updates
  68. * the values are merged, and must therefore be [[ReplicatedData]] types themselves.
  69. *
  70. * This class is immutable, i.e. "modifying" methods return a new instance.
  71. */
  72. final class ORMap[A, B <: ReplicatedData]
  73. final case class ORMapKey[A, B <: ReplicatedData](_id: String)
  74. /**
  75. * An immutable multi-map implementation. This class wraps an
  76. * [[ORMap]] with an [[ORSet]] for the map's value.
  77. *
  78. * This class is immutable, i.e. "modifying" methods return a new instance.
  79. */
  80. final class ORMultiMap[A, B]
  81. final case class ORMultiMapKey[A, B](_id: String)
  82. /**
  83. * Map of named counters. Specialized [[ORMap]] with [[PNCounter]] values.
  84. *
  85. * This class is immutable, i.e. "modifying" methods return a new instance.
  86. */
  87. final class PNCounterMap[A]
  88. final case class PNCounterMapKey[A](_id: String)

综合统计,akka提供现成的CRDT类型包括:

Counters: GCounter, PNCounter
Sets: GSet, ORSet
Maps: ORMap, ORMultiMap, LWWMap, PNCounterMap
Registers: LWWRegister, Flag

CRDT操作结果也可以通过订阅方式获取。用户发送Subscribe消息给replicator订阅有关Key[A]数据的操作结果:

  1. /**
  2. * Register a subscriber that will be notified with a [[Changed]] message
  3. * when the value of the given `key` is changed. Current value is also
  4. * sent as a [[Changed]] message to a new subscriber.
  5. *
  6. * Subscribers will be notified periodically with the configured `notify-subscribers-interval`,
  7. * and it is also possible to send an explicit `FlushChanges` message to
  8. * the `Replicator` to notify the subscribers immediately.
  9. *
  10. * The subscriber will automatically be unregistered if it is terminated.
  11. *
  12. * If the key is deleted the subscriber is notified with a [[Deleted]]
  13. * message.
  14. */
  15. final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
  16. /**
  17. * Unregister a subscriber.
  18. *
  19. * @see [[Replicator.Subscribe]]
  20. */
  21. final case class Unsubscribe[A <: ReplicatedData](key: Key[A], subscriber: ActorRef) extends ReplicatorMessage
  22. /**
  23. * The data value is retrieved with [[#get]] using the typed key.
  24. *
  25. * @see [[Replicator.Subscribe]]
  26. */
  27. final case class Changed[A <: ReplicatedData](key: Key[A])(data: A) extends ReplicatorMessage {
  28. /**
  29. * The data value, with correct type.
  30. * Scala pattern matching cannot infer the type from the `key` parameter.
  31. */
  32. def get[T <: ReplicatedData](key: Key[T]): T = {
  33. require(key == this.key, "wrong key used, must use contained key")
  34. data.asInstanceOf[T]
  35. }
  36. /**
  37. * The data value. Use [[#get]] to get the fully typed value.
  38. */
  39. def dataValue: A = data
  40. }
  41. final case class Deleted[A <: ReplicatedData](key: Key[A]) extends NoSerializationVerificationNeeded {
  42. override def toString: String = s"Deleted [$key]"
  43. }

replicator完成操作后发布topic为Key[A]的Changed, Deleted消息。

分布式数据读写是通过发送消息给本地的replicator来实现的。读写消息包括Update,Get,Delete。读取数据用Get,也可以订阅CRDT的更新状态消息Changed, Deleted。

赋予CRDT复制和免冲突特性的应该是replicator对Update这个消息的处理方式。Update消息的构建代码如下:

  1. final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,request: Option[Any])(val modify: Option[A] ? A)
  2. extends Command[A] with NoSerializationVerificationNeeded {...}
  3. def apply[A <: ReplicatedData](
  4. key: Key[A], initial: A, writeConsistency: WriteConsistency,
  5. request: Option[Any] = None)(modify: A ? A): Update[A] =
  6. Update(key, writeConsistency, request)(modifyWithInitial(initial, modify))
  7. private def modifyWithInitial[A <: ReplicatedData](initial: A, modify: A ? A): Option[A] ? A = {
  8. case Some(data) ? modify(data)
  9. case None ? modify(initial)
  10. }

我们看到在Update类型里包嵌了数据标示Key[A]和一个函数modify: Option[A] => A。replicator会用这个modify函数来对CRDT数据A进行转换处理。构建器函数apply还包括了A类型数据的初始值,在第一次引用这个数据时就用initial这个初始值,这个从modifyWithInitial函数和它在apply里的引用可以了解。下面是这个Update消息的使用示范:

  1. val timeout = 3.seconds.dilated
  2. val KeyA = GCounterKey("A")
  3. val KeyB = ORSetKey[String]("B")
  4. val KeyC = PNCounterMapKey[String]("C")
  5. val KeyD = ORMultiMapKey[String, String]("D")
  6. val KeyE = ORMapKey[String, GSet[String]]("E")
  7. replicator ! Update(KeyA, GCounter(), WriteAll(timeout))(_ + 3)
  8. replicator ! Update(KeyB, ORSet(), WriteAll(timeout))(_ + "a" + "b" + "c")
  9. replicator ! Update(KeyC, PNCounterMap.empty[String], WriteAll(timeout)) { _ increment "x" increment "y" }
  10. replicator ! Update(KeyD, ORMultiMap.empty[String, String], WriteAll(timeout)) { _ + ("a" Set("A")) }
  11. replicator ! Update(KeyE, ORMap.empty[String, GSet[String]], WriteAll(timeout)) { _ + ("a" GSet.empty[String].add("A")) }

由于CRDT数据读写是通过消息发送形式实现的,读写结果也是通过消息形式返回的。数据读取返回消息里包嵌了结果数据。下面就是读写返回结果消息类型:

  1. /*------------------UPDATE STATE MESSAGES -----------*/
  2. final case class UpdateSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])
  3. extends UpdateResponse[A] with DeadLetterSuppression
  4. sealed abstract class UpdateFailure[A <: ReplicatedData] extends UpdateResponse[A]
  5. /**
  6. * The direct replication of the [[Update]] could not be fulfill according to
  7. * the given [[WriteConsistency consistency level]] and
  8. * [[WriteConsistency#timeout timeout]].
  9. *
  10. * The `Update` was still performed locally and possibly replicated to some nodes.
  11. * It will eventually be disseminated to other replicas, unless the local replica
  12. * crashes before it has been able to communicate with other replicas.
  13. */
  14. final case class UpdateTimeout[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends UpdateFailure[A]
  15. /**
  16. * If the `modify` function of the [[Update]] throws an exception the reply message
  17. * will be this `ModifyFailure` message. The original exception is included as `cause`.
  18. */
  19. final case class ModifyFailure[A <: ReplicatedData](key: Key[A], errorMessage: String, cause: Throwable, request: Option[Any])
  20. extends UpdateFailure[A] {
  21. override def toString: String = s"ModifyFailure [$key]: $errorMessage"
  22. }
  23. /**
  24. * The local store or direct replication of the [[Update]] could not be fulfill according to
  25. * the given [[WriteConsistency consistency level]] due to durable store errors. This is
  26. * only used for entries that have been configured to be durable.
  27. *
  28. * The `Update` was still performed in memory locally and possibly replicated to some nodes,
  29. * but it might not have been written to durable storage.
  30. * It will eventually be disseminated to other replicas, unless the local replica
  31. * crashes before it has been able to communicate with other replicas.
  32. */
  33. final case class StoreFailure[A <: ReplicatedData](key: Key[A], request: Option[Any])
  34. extends UpdateFailure[A] with DeleteResponse[A] {
  35. /* ---------------- GET MESSAGES --------*/
  36. /**
  37. * Reply from `Get`. The data value is retrieved with [[#get]] using the typed key.
  38. */
  39. final case class GetSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any])(data: A)
  40. extends GetResponse[A] with ReplicatorMessage {
  41. /**
  42. * The data value, with correct type.
  43. * Scala pattern matching cannot infer the type from the `key` parameter.
  44. */
  45. def get[T <: ReplicatedData](key: Key[T]): T = {
  46. require(key == this.key, "wrong key used, must use contained key")
  47. data.asInstanceOf[T]
  48. }
  49. /**
  50. * The data value. Use [[#get]] to get the fully typed value.
  51. */
  52. def dataValue: A = data
  53. }
  54. final case class NotFound[A <: ReplicatedData](key: Key[A], request: Option[Any])
  55. extends GetResponse[A] with ReplicatorMessage
  56. /*----------------DELETE MESSAGES ---------*/
  57. final case class DeleteSuccess[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
  58. final case class ReplicationDeleteFailure[A <: ReplicatedData](key: Key[A], request: Option[Any]) extends DeleteResponse[A]
  59. final case class DataDeleted[A <: ReplicatedData](key: Key[A], request: Option[Any])
  60. extends RuntimeException with NoStackTrace with DeleteResponse[A] {
  61. override def toString: String = s"DataDeleted [$key]"
  62. }

读取返回消息中定义了数据读取方法def dataValue: A 获取数据,或者用类型方法get(Key[A])指定读取目标。下面是一些数据读取例子:

  1. val replicator = DistributedData(system).replicator
  2. val Counter1Key = PNCounterKey("counter1")
  3. val Set1Key = GSetKey[String]("set1")
  4. val Set2Key = ORSetKey[String]("set2")
  5. val ActiveFlagKey = FlagKey("active")
  6. replicator ! Get(Counter1Key, ReadLocal)
  7. val readFrom3 = ReadFrom(n = 3, timeout = 1.second)
  8. replicator ! Get(Set1Key, readFrom3)
  9. val readMajority = ReadMajority(timeout = 5.seconds)
  10. replicator ! Get(Set2Key, readMajority)
  11. val readAll = ReadAll(timeout = 5.seconds)
  12. replicator ! Get(ActiveFlagKey, readAll)
  13. case g @ GetSuccess(Counter1Key, req) ?
  14. val value = g.get(Counter1Key).value
  15. case NotFound(Counter1Key, req) ? // key counter1 does not exist
  16. ...
  17. case g @ GetSuccess(Set1Key, req) ?
  18. val elements = g.get(Set1Key).elements
  19. case GetFailure(Set1Key, req) ?
  20. // read from 3 nodes failed within 1.second
  21. case NotFound(Set1Key, req) ? // key set1 does not exist
  22. /*---- return get result to user (sender()) ----*/
  23. case "get-count" ?
  24. // incoming request to retrieve current value of the counter
  25. replicator ! Get(Counter1Key, readTwo, request = Some(sender()))
  26. case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) ?
  27. val value = g.get(Counter1Key).value.longValue
  28. replyTo ! value
  29. case GetFailure(Counter1Key, Some(replyTo: ActorRef)) ?
  30. replyTo ! -1L
  31. case NotFound(Counter1Key, Some(replyTo: ActorRef)) ?
  32. replyTo ! 0L

下面是用消息订阅方式获取读写状态的示范:

  1. replicator ! Subscribe(DataKey, self)
  2. ...
  3. case c @ Changed(DataKey) ?
  4. val data = c.get(DataKey)
  5. log.info("Current elements: {}", data.elements)

在下面我们做一个例子来示范几种CRDT数据的读写和监控操作:

  1. object DDataUpdator {
  2. case object IncCounter
  3. case class AddToSet(item: String)
  4. case class AddToMap(item: String)
  5. case object ReadSet
  6. case object ReadMap
  7. case object ShutDownDData
  8. val KeyCounter = GCounterKey("counter")
  9. val KeySet = ORSetKey[String]("gset")
  10. val KeyMap = ORMultiMapKey[Long, String]("ormap")
  11. val timeout = 300 millis
  12. val writeAll = WriteAll(timeout)
  13. val readAll = ReadAll(timeout)
  14. def create(port: Int): ActorRef = {
  15. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = $port")
  16. .withFallback(ConfigFactory.load())
  17. val system = ActorSystem("DDataSystem",config)
  18. system.actorOf(Props[DDataUpdator],s"updator-$port")
  19. }
  20. }
  21. class DDataUpdator extends Actor with ActorLogging {
  22. import DDataUpdator._
  23. implicit val cluster = Cluster(context.system)
  24. val replicator = DistributedData(context.system).replicator
  25. replicator ! Subscribe(KeyCounter,self)
  26. replicator ! Subscribe(KeySet,self)
  27. replicator ! Subscribe(KeyMap,self)
  28. override def receive: Receive = {
  29. case IncCounter =>
  30. log.info(s"******* Incrementing counter... *****")
  31. replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
  32. case UpdateSuccess(KeyCounter,_) =>
  33. log.info(s"********** Counter updated successfully ********")
  34. case UpdateTimeout(KeyCounter,_) =>
  35. log.info(s"******* Counter update timed out! *****")
  36. case ModifyFailure(KeyCounter,msg,err,_) =>
  37. log.info(s"******* Counter update failed with error: ${msg} *****")
  38. case StoreFailure(KeyCounter,_) =>
  39. log.info(s"******* Counter value store failed! *****")
  40. case c @ Changed(KeyCounter) ?
  41. val data = c.get(KeyCounter)
  42. log.info("********Current count: {}*******", data.getValue)
  43. case AddToSet(item) =>
  44. replicator ! Update(KeySet,ORSet.empty[String],writeAll)(_ + item)
  45. case UpdateSuccess(KeySet,_) =>
  46. log.info(s"**********Add to ORSet successfully ********")
  47. case UpdateTimeout(KeySet,_) =>
  48. log.info(s"******* Add to ORSet timed out! *****")
  49. case ModifyFailure(KeySet,msg,err,_) =>
  50. log.info(s"******* Add to ORSet failed with error: ${msg} *****")
  51. case StoreFailure(KeySet,_) =>
  52. log.info(s"******* ORSet items store failed! *****")
  53. case c @ Changed(KeySet) =>
  54. val data = c.get(KeySet)
  55. log.info("********Items in ORSet: {}*******", data.elements)
  56. case ReadSet =>
  57. replicator ! Get(KeySet,readAll)
  58. case g @ GetSuccess(KeySet, req) =>
  59. val value = g.get(KeySet)
  60. log.info("********Current items read in ORSet: {}*******", value.elements)
  61. case NotFound(KeySet, req) =>
  62. log.info("******No item found in ORSet!!!*******")
  63. case AddToMap(item) =>
  64. replicator ! Get(KeyCounter,readAll,Some(AddToMap(item)))
  65. case g @ GetSuccess(KeyCounter,Some(AddToMap(item))) =>
  66. val idx: Long = g.get(KeyCounter).getValue.longValue()
  67. log.info(s"*********** got counter=${idx} with item: $item ************")
  68. replicator ! Update(KeyMap,ORMultiMap.empty[Long,String],writeAll)(_ + (idx -> Set(item)))
  69. replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
  70. case c @ Changed(KeyMap) =>
  71. val data = c.get(KeyMap).entries
  72. log.info("******** Items in ORMultiMap: {}*******", data)
  73. case ReadMap =>
  74. replicator ! Get(KeyMap,readAll)
  75. case g @ GetSuccess(KeyMap, req) =>
  76. val value = g.get(KeyMap)
  77. log.info("********Current items read in ORMultiMap: {}*******", value.entries)
  78. case NotFound(KeyMap, req) =>
  79. log.info("****** No item found in ORMultiMap!!! *******")
  80. case ShutDownDData => context.system.terminate()
  81. }

在这个例子里我们示范了每种CRDT数据的通用操作方法。然后我们再测试一下使用结果:

  1. object DDataDemo extends App {
  2. import DDataUpdator._
  3. val ud1 = create(2551)
  4. val ud2 = create(2552)
  5. val ud3 = create(2553)
  6. scala.io.StdIn.readLine()
  7. ud1 ! IncCounter
  8. ud2 ! AddToSet("Apple")
  9. ud1 ! AddToSet("Orange")
  10. scala.io.StdIn.readLine()
  11. ud2 ! IncCounter
  12. ud2 ! AddToSet("Pineapple")
  13. ud1 ! IncCounter
  14. ud1 ! AddToMap("Cat")
  15. scala.io.StdIn.readLine()
  16. ud1 ! AddToMap("Dog")
  17. ud2 ! AddToMap("Tiger")
  18. scala.io.StdIn.readLine()
  19. ud3 ! ReadSet
  20. ud3 ! ReadMap
  21. scala.io.StdIn.readLine()
  22. ud1 ! ShutDownDData
  23. ud2 ! ShutDownDData
  24. ud3 ! ShutDownDData
  25. }

结果如下:

  1. [INFO] [12/24/2018 08:33:40.500] [DDataSystem-akka.actor.default-dispatcher-16] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******* Incrementing counter... *****
  2. [INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-26] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] **********Add to ORSet successfully ********
  3. [INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
  4. [INFO] [12/24/2018 08:33:40.585] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] **********Add to ORSet successfully ********
  5. [INFO] [12/24/2018 08:33:40.726] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 1*******
  6. [INFO] [12/24/2018 08:33:40.726] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Items in ORSet: Set(Orange, Apple)*******
  7. [INFO] [12/24/2018 08:33:40.775] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Items in ORSet: Set(Apple, Orange)*******
  8. [INFO] [12/24/2018 08:33:40.775] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 1*******
  9. [INFO] [12/24/2018 08:33:40.829] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 1*******
  10. [INFO] [12/24/2018 08:33:40.829] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Items in ORSet: Set(Apple, Orange)*******
  11. [INFO] [12/24/2018 08:34:19.707] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******* Incrementing counter... *****
  12. [INFO] [12/24/2018 08:34:19.707] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******* Incrementing counter... *****
  13. [INFO] [12/24/2018 08:34:19.710] [DDataSystem-akka.actor.default-dispatcher-23] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********** Counter updated successfully ********
  14. [INFO] [12/24/2018 08:34:19.711] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
  15. [INFO] [12/24/2018 08:34:19.712] [DDataSystem-akka.actor.default-dispatcher-28] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] **********Add to ORSet successfully ********
  16. [INFO] [12/24/2018 08:34:19.723] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 3*******
  17. [INFO] [12/24/2018 08:34:19.723] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Items in ORSet: Set(Orange, Apple, Pineapple)*******
  18. [INFO] [12/24/2018 08:34:19.733] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] *********** got counter=3 with item: Cat ************
  19. [INFO] [12/24/2018 08:34:19.767] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
  20. [INFO] [12/24/2018 08:34:19.772] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 4*******
  21. [INFO] [12/24/2018 08:34:19.773] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Items in ORSet: Set(Apple, Orange, Pineapple)*******
  22. [INFO] [12/24/2018 08:34:19.774] [DDataSystem-akka.actor.default-dispatcher-19] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
  23. [INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 4*******
  24. [INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Items in ORSet: Set(Apple, Orange, Pineapple)*******
  25. [INFO] [12/24/2018 08:34:19.828] [DDataSystem-akka.actor.default-dispatcher-17] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
  26. [INFO] [12/24/2018 08:34:20.222] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******** Items in ORMultiMap: Map(3 -> Set(Cat))*******
  27. [INFO] [12/24/2018 08:34:20.223] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 4*******
  28. [INFO] [12/24/2018 08:34:45.918] [DDataSystem-akka.actor.default-dispatcher-25] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] *********** got counter=4 with item: Tiger ************
  29. [INFO] [12/24/2018 08:34:45.919] [DDataSystem-akka.actor.default-dispatcher-16] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] *********** got counter=4 with item: Dog ************
  30. [INFO] [12/24/2018 08:34:45.920] [DDataSystem-akka.actor.default-dispatcher-15] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current items read in ORSet: Set(Apple, Orange, Pineapple)*******
  31. [INFO] [12/24/2018 08:34:45.922] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current items read in ORMultiMap: Map(3 -> Set(Cat))*******
  32. [INFO] [12/24/2018 08:34:45.925] [DDataSystem-akka.actor.default-dispatcher-21] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********** Counter updated successfully ********
  33. [INFO] [12/24/2018 08:34:45.926] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********** Counter updated successfully ********
  34. [INFO] [12/24/2018 08:34:46.221] [DDataSystem-akka.actor.default-dispatcher-2] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ******** Items in ORMultiMap: Map(4 -> Set(Dog, Tiger), 3 -> Set(Cat))*******
  35. [INFO] [12/24/2018 08:34:46.221] [DDataSystem-akka.actor.default-dispatcher-2] [akka.tcp://DDataSystem@localhost:2551/user/updator-2551] ********Current count: 6*******
  36. [INFO] [12/24/2018 08:34:46.272] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ******** Items in ORMultiMap: Map(4 -> Set(Tiger, Dog), 3 -> Set(Cat))*******
  37. [INFO] [12/24/2018 08:34:46.272] [DDataSystem-akka.actor.default-dispatcher-27] [akka.tcp://DDataSystem@localhost:2552/user/updator-2552] ********Current count: 6*******
  38. [INFO] [12/24/2018 08:34:46.326] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ******** Items in ORMultiMap: Map(4 -> Set(Dog, Tiger), 3 -> Set(Cat))*******
  39. [INFO] [12/24/2018 08:34:46.326] [DDataSystem-akka.actor.default-dispatcher-22] [akka.tcp://DDataSystem@localhost:2553/user/updator-2553] ********Current count: 6*******

注意最后一段显示结果是在另一个节点2553上读取其它节点上更新的ORSet和ORMultiMap里面的数据。其中Map(4->set(Dog,Tiger)) 应该是分两次读取了Counter后再更新的。不过由于两次消息发送时间间隔太短,Counter还没来得及更新复制。

下面是这个例子的全部源代码:

build.sbt

  1. name := "akka-distributed-data"
  2. version := "0.1"
  3. scalaVersion := "2.12.8"
  4. libraryDependencies := Seq(
  5. "com.typesafe.akka" %% "akka-actor" % "2.5.19",
  6. "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.19",
  7. "com.typesafe.akka" %% "akka-distributed-data" % "2.5.19"
  8. )

resources/application.conf

  1. akka {
  2. actor.provider = "cluster"
  3. remote {
  4. netty.tcp.port = 0
  5. netty.tcp.hostname = "localhost"
  6. }
  7. extensions = ["akka.cluster.ddata.DistributedData"]
  8. cluster {
  9. seed-nodes = [
  10. "akka.tcp://DDataSystem@localhost:2551",
  11. "akka.tcp://DDataSystem@localhost:2552"]
  12. # auto downing is NOT safe for production deployments.
  13. # you may want to use it during development, read more about it in the docs.
  14. #
  15. # auto-down-unreachable-after = 10s
  16. }
  17. }

DDataUpdator.scala

  1. import akka.actor._
  2. import akka.cluster.ddata._
  3. import Replicator._
  4. import akka.cluster.Cluster
  5. import com.typesafe.config.ConfigFactory
  6. import scala.concurrent.duration._
  7. object DDataUpdator {
  8. case object IncCounter
  9. case class AddToSet(item: String)
  10. case class AddToMap(item: String)
  11. case object ReadSet
  12. case object ReadMap
  13. case object ShutDownDData
  14. val KeyCounter = GCounterKey("counter")
  15. val KeySet = ORSetKey[String]("gset")
  16. val KeyMap = ORMultiMapKey[Long, String]("ormap")
  17. val timeout = 300 millis
  18. val writeAll = WriteAll(timeout)
  19. val readAll = ReadAll(timeout)
  20. def create(port: Int): ActorRef = {
  21. val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port = $port")
  22. .withFallback(ConfigFactory.load())
  23. val system = ActorSystem("DDataSystem",config)
  24. system.actorOf(Props[DDataUpdator],s"updator-$port")
  25. }
  26. }
  27. class DDataUpdator extends Actor with ActorLogging {
  28. import DDataUpdator._
  29. implicit val cluster = Cluster(context.system)
  30. val replicator = DistributedData(context.system).replicator
  31. replicator ! Subscribe(KeyCounter,self)
  32. replicator ! Subscribe(KeySet,self)
  33. replicator ! Subscribe(KeyMap,self)
  34. override def receive: Receive = {
  35. case IncCounter =>
  36. log.info(s"******* Incrementing counter... *****")
  37. replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
  38. case UpdateSuccess(KeyCounter,_) =>
  39. log.info(s"********** Counter updated successfully ********")
  40. case UpdateTimeout(KeyCounter,_) =>
  41. log.info(s"******* Counter update timed out! *****")
  42. case ModifyFailure(KeyCounter,msg,err,_) =>
  43. log.info(s"******* Counter update failed with error: ${msg} *****")
  44. case StoreFailure(KeyCounter,_) =>
  45. log.info(s"******* Counter value store failed! *****")
  46. case c @ Changed(KeyCounter) ?
  47. val data = c.get(KeyCounter)
  48. log.info("********Current count: {}*******", data.getValue)
  49. case AddToSet(item) =>
  50. replicator ! Update(KeySet,ORSet.empty[String],writeAll)(_ + item)
  51. case UpdateSuccess(KeySet,_) =>
  52. log.info(s"**********Add to ORSet successfully ********")
  53. case UpdateTimeout(KeySet,_) =>
  54. log.info(s"******* Add to ORSet timed out! *****")
  55. case ModifyFailure(KeySet,msg,err,_) =>
  56. log.info(s"******* Add to ORSet failed with error: ${msg} *****")
  57. case StoreFailure(KeySet,_) =>
  58. log.info(s"******* ORSet items store failed! *****")
  59. case c @ Changed(KeySet) =>
  60. val data = c.get(KeySet)
  61. log.info("********Items in ORSet: {}*******", data.elements)
  62. case ReadSet =>
  63. replicator ! Get(KeySet,readAll)
  64. case g @ GetSuccess(KeySet, req) =>
  65. val value = g.get(KeySet)
  66. log.info("********Current items read in ORSet: {}*******", value.elements)
  67. case NotFound(KeySet, req) =>
  68. log.info("******No item found in ORSet!!!*******")
  69. case AddToMap(item) =>
  70. replicator ! Get(KeyCounter,readAll,Some(AddToMap(item)))
  71. case g @ GetSuccess(KeyCounter,Some(AddToMap(item))) =>
  72. val idx: Long = g.get(KeyCounter).getValue.longValue()
  73. log.info(s"*********** got counter=${idx} with item: $item ************")
  74. replicator ! Update(KeyMap,ORMultiMap.empty[Long,String],writeAll)(_ + (idx -> Set(item)))
  75. replicator ! Update(KeyCounter,GCounter(),writeAll)(_ + 1)
  76. case c @ Changed(KeyMap) =>
  77. val data = c.get(KeyMap).entries
  78. log.info("******** Items in ORMultiMap: {}*******", data)
  79. case ReadMap =>
  80. replicator ! Get(KeyMap,readAll)
  81. case g @ GetSuccess(KeyMap, req) =>
  82. val value = g.get(KeyMap)
  83. log.info("********Current items read in ORMultiMap: {}*******", value.entries)
  84. case NotFound(KeyMap, req) =>
  85. log.info("****** No item found in ORMultiMap!!! *******")
  86. case ShutDownDData => context.system.terminate()
  87. }
  88. }
  89. object DDataDemo extends App {
  90. import DDataUpdator._
  91. val ud1 = create(2551)
  92. val ud2 = create(2552)
  93. val ud3 = create(2553)
  94. scala.io.StdIn.readLine()
  95. ud1 ! IncCounter
  96. ud2 ! AddToSet("Apple")
  97. ud1 ! AddToSet("Orange")
  98. scala.io.StdIn.readLine()
  99. ud2 ! IncCounter
  100. ud2 ! AddToSet("Pineapple")
  101. ud1 ! IncCounter
  102. ud1 ! AddToMap("Cat")
  103. scala.io.StdIn.readLine()
  104. ud1 ! AddToMap("Dog")
  105. ud2 ! AddToMap("Tiger")
  106. scala.io.StdIn.readLine()
  107. ud3 ! ReadSet
  108. ud3 ! ReadMap
  109. scala.io.StdIn.readLine()
  110. ud1 ! ShutDownDData
  111. ud2 ! ShutDownDData
  112. ud3 ! ShutDownDData
  113. }

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号