经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
深圳scala-meetup-20180902(2)- Future vs Task and ReaderMonad依赖注入
来源:cnblogs  作者:雪川大虫  时间:2018/9/25 20:22:00  对本文有异议

  在对上一次3月份的scala-meetup里我曾分享了关于Future在函数组合中的问题及如何用Monix.Task来替代。具体分析可以查阅这篇博文。在上篇示范里我们使用了Future来实现某种non-blocking数据库操作,现在可以用Task替换Future部分:

  1. class KVStore[K,V] {
  2. private val kvs = new ConcurrentHashMap[K,V]()
  3. def create(k: K, v: V): Task[Unit] = Task.delay(kvs.putIfAbsent(k,v))
  4. def read(k: K): Task[Option[V]] = Task.delay(Option(kvs.get(k)))
  5. def update(k: K, v: V): Task[Unit] = Task.delay(kvs.put(k,v))
  6. def delete(k: K): Task[Boolean] = Task.delay(kvs.remove(k) != null)
  7. }

Task是一个真正的Monad,我们可以放心的用来实现函数组合:

  1. type FoodName = String
  2. type Quantity = Int
  3. type FoodStore = KVStore[String,Int]
  4. def addFood(food: FoodName, qty: Quantity)(implicit fs: FoodStore): Task[Quantity] = for {
  5. current <- fs.read(food)
  6. newQty = current.map(cq => cq + qty).getOrElse(qty)
  7. _ <- fs.update(food,newQty)
  8. } yield newQty
  9. def takeFood(food: FoodName, qty: Quantity)(implicit fs: FoodStore): Task[Quantity] = for {
  10. current <- fs.read(food)
  11. cq = current.getOrElse(0)
  12. taken = Math.min(cq,qty)
  13. left = cq - taken
  14. _ <- if(left > 0) fs.update(food,left) else fs.delete(food)
  15. } yield taken
  16. def cookSauce(qty: Quantity)(get: (FoodName,Quantity) => Task[Quantity],
  17. put: (FoodName,Quantity) => Task[Quantity]): Task[Quantity] = for {
  18. tomato <- get("Tomato",qty)
  19. vaggies <- get("Veggies",qty)
  20. _ <- get("Galic",10)
  21. sauceQ = tomato/2 + vaggies * 3 / 2
  22. _ <- put("Sauce",sauceQ)
  23. } yield sauceQ
  24. def cookPasta(qty: Quantity)(get: (FoodName,Quantity) => Task[Quantity],
  25. put: (FoodName,Quantity) => Task[Quantity]): Task[Quantity] = for {
  26. pasta <- get("Pasta", qty)
  27. sauce <- get("Sauce", qty)
  28. _ <- get("Spice", 3)
  29. portions = Math.min(pasta, sauce)
  30. _ <- put("Meal", portions)
  31. } yield portions

跟上次我们使用Future时的方式没有两样。值得研究的是如何获取Task运算结果,及如何更精确的控制Task运算如取消运行中的Task:

  1. implicit val refridge = new FoodStore
  2. val shopping: Task[Unit] = for {
  3. _ <- addFood("Tomato",10)
  4. _ <- addFood("Veggies",15)
  5. _ <- addFood("Garlic", 42)
  6. _ <- addFood("Spice", 100)
  7. _ <- addFood("Pasta", 6)
  8. } yield()
  9. val cooking: Task[Quantity] = for {
  10. _ <- shopping
  11. sauce <- cookSauce(10)(takeFood(_,_),addFood(_,_))
  12. meals <- cookPasta(10)(takeFood(_,_),addFood(_,_))
  13. } yield meals
  14. import scala.util._
  15. import monix.execution.Scheduler.Implicits.global
  16. val cancellableCooking = Cooking.runOnComplete { result =>
  17. result match {
  18. case Success(meals) => println(s"we have $meals pasta meals for the day.")
  19. case Failure(err) => println(s"cooking trouble: ${err.getMessage}")
  20. }
  21. }
  22. global.scheduleOnce(1 second) {
  23. println(s"its taking too long, cancelling cooking ...")
  24. cancellableCooking.cancel()
  25. }

在上面例子里的addFood,takeFood函数中都有个fs:FoodStore参数。这样做可以使函数更加通用,可以对用不同方式实施的FoodStore进行操作。这里FoodStore就是函数的依赖,我们是通过函数参数来传递这个依赖的。重新组织一下代码使这种关系更明显:

  1. class Refridge {
  2. def addFood(food: FoodName, qty: Quantity): FoodStore => Task[Quantity] = { foodStore =>
  3. for {
  4. current <- foodStore.read(food)
  5. newQty = current.map(c => c + qty).getOrElse(qty)
  6. _ <- foodStore.update(food, newQty)
  7. } yield newQty
  8. }
  9. def takeFood(food: FoodName, qty: Quantity): FoodStore => Task[Quantity] = { foodStore =>
  10. for {
  11. current <- foodStore.read(food)
  12. cq = current.getOrElse(0)
  13. taken = Math.min(cq, qty)
  14. left = cq - taken
  15. _ <- if (left > 0) foodStore.update(food, left) else foodStore.delete(food)
  16. } yield taken
  17. }
  18. }

现在我们用一个函数类型的结果来代表依赖注入。这样做的好处是简化了函数主体,彻底把依赖与函数进行了分割,使用函数时不必考虑依赖。

scala的函数式组件库cats提供了一个Kleisli类型,reader monad就是从它推导出来的:

  1. final case class Kleisli[M[_], A, B](run: A => M[B]) { self =>
  2. ...
  3. trait KleisliFunctions {
  4. /**Construct a Kleisli from a Function1 */
  5. def kleisli[M[_], A, B](f: A => M[B]): Kleisli[M, A, B] = Kleisli(f)
  6. def >=>[C](k: Kleisli[M, B, C])(implicit b: Bind[M]): Kleisli[M, A, C] =
  7. kleisli((a: A) => b.bind(this(a))(k.run))
  8. Kleisli的用途就是进行函数的转换
  9. // (A=>M[B]) >=> (B=>M[C]) >=> (C=>M[D]) = M[D]

实际上Kleisli就是ReaderT:

  1. type ReaderT[F[_], E, A] = Kleisli[F, E, A]
  2. val ReaderT = Kleisli
  3. val reader = ReaderT[F,B,A](A => F[B])
  4. val readerTask = ReaderT[Task,B,A](A => Task[B])
  5. val injection = ReaderT { foodStore => Task.delay { foodStore.takeFood } }
  6. val food = injection.run(db) // run(kvs), run(dbConfig) …

这段代码里我们也针对上面的例子示范了ReaderT的用法。现在我们可以把例子改成下面这样:

  1. type FoodName = String
  2. type Quantity = Int
  3. type FoodStore = KVStore[String,Int]
  4. class Refridge {
  5. def addFood(food: FoodName, qty: Quantity): ReaderT[Task,FoodStore,Quantity] = ReaderT{ foodStore =>
  6. for {
  7. current <- foodStore.read(food)
  8. newQty = current.map(c => c + qty).getOrElse(qty)
  9. _ <- foodStore.update(food, newQty)
  10. } yield newQty
  11. }
  12. def takeFood(food: FoodName, qty: Quantity): ReaderT[Task,FoodStore,Quantity] = ReaderT{ foodStore =>
  13. for {
  14. current <- foodStore.read(food)
  15. cq = current.getOrElse(0)
  16. taken = Math.min(cq, qty)
  17. left = cq - taken
  18. _ <- if (left > 0) foodStore.update(food, left) else foodStore.delete(food)
  19. } yield taken
  20. }
  21. }

ReaderT[F[_],E,A]就是ReaderT[Task,FoodStore,Quantity]. FoodStore是注入的依赖,ReaderT.run返回Task:

  1. val cooking: ReaderT[Task,FoodStore,Quantity] = for {
  2. _ <- shopping
  3. sauce <- cooker.cookSauce(10)
  4. pasta <- cooker.cookPasta(10)
  5. } yield pasta
  6. import scala.concurrent.duration._
  7. import scala.util._
  8. import monix.execution.Scheduler.Implicits.global
  9. val timedCooking = cooking.run(foodStore).timeoutTo(1 seconds, Task.raiseError( new RuntimeException(
  10. "oh no, take too long to cook ...")))
  11. val cancellableCooking = timedCooking.runOnComplete { result =>
  12. result match {
  13. case Success(meals) => println(s"we have $meals specials for the day.")
  14. case Failure(exception) => println(s"kitchen problem! ${exception.getMessage}")
  15. }
  16. }
  17. global.scheduleOnce(3 seconds) {
  18. println("3 seconds passed,cancelling ...")
  19. cancellableCooking.cancel()
  20. }

我们知道cooking是个ReaderT,用run(foodStore)来注入依赖foodStore。那么如果我们还有一个kvStore或者jdbcDB,mongoDB可以直接用run(kvStore), run(jdbcDB), run(mongoDB) ... 返回的结果都是Task。

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号