经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
通过Consul Raft库打造自己的分布式系统
来源:cnblogs  作者:egmkang  时间:2020/12/8 9:09:39  对本文有异议

 

通用的CP系统有etcd和consul, 通用的对立面就是专用系统. 所以在某些场合是有这种需求的.

然而etcd embed的可用性极差, Windows上面跑会出现各种问题, 而且不能定制协议, 你必须得用etcd定义好的协议和客户端来和etcd集群通讯. 所以这时候的选择:

1. 忍着

2. 自己实现一个raft算法库, 在这上面做应用

 有一定的可能性, 起码MIT 6.824可以做出来, 但是和工业应用还是有很大的差距

3. 找一个工业级raft库, 然后在这上面做应用

   这时候到Raft Consensus Algorithm上面看看就能找到几个可选的Raft算法库, 例如braft, hashicorp/raft, lni/dragonboat.

   但是呢, C++代码比较难写的, 所以就pass掉了braft. 就剩下consul raft和dragonboat.

 

本文就用consul raft做一个简单的KeyValue服务.

 

首先前端用的gin, 提供put/get/inc/delete几个接口, 三个接口都走raft状态机, 因为要支持多节点, 所以内部非leader节点就需要把请求转发给leader节点.

前端的代码类似于这样:

  1. func (this *ApiService) Start() error {
  2. //转发请求给leader节点
  3. this.router.Use(this.proxyHandler())
  4.  
  5. this.router.POST("/get", this.Get)
  6. this.router.POST("/put", this.Put)
  7. this.router.POST("/delete", this.Delete)
  8. this.router.POST("/inc", this.Inc)
  9.  
  10. address := fmt.Sprintf(":%d", this.port)
  11. return this.router.Run(address)
  12. }

请求都很简单, 就是直接把命令, 或者叫服务提供的原语塞到Raft状态机里面等候Raft状态Apply, 然后才能拿到结果(future/promise模式), 例如put命令:

  1. func (this *ApiService) Put(ctx *gin.Context) {
  2. req := &Request{}
  3. if err := ctx.ShouldBindJSON(req); err != nil {
  4. ctx.JSON(http.StatusBadRequest, Response{
  5. Error: err.Error(),
  6. })
  7. return
  8. }
  9. result, err := this.raft.ApplyCommand(raft.CommandPut, req.Key, req.Value)
  10. if err != nil {
  11. ctx.JSON(http.StatusInternalServerError, Response{
  12. Error: err.Error(),
  13. })
  14. return
  15. }
  16. ctx.JSON(http.StatusOK, Response{
  17. Value: result.Value,
  18. })
  19. }

前端还有一个转发请求到leader节点的拦截器(? 应该叫这个名字, 实际上是pipeline模式的一种)

  1. func (this *ApiService) proxyHandler() gin.HandlerFunc {
  2. return func(context *gin.Context) {
  3. if this.raft.IsLeader() {
  4. context.Next()
  5. } else {
  6. leaderServiceAddress := this.raft.GetLeaderServiceAddress()
  7. if this.leaderServiceAddress != leaderServiceAddress {
  8. Director := func(req *http.Request) {
  9. req.URL.Scheme = "http"
  10. req.URL.Host = leaderServiceAddress
  11. }
  12. this.leaderProxy = &httputil.ReverseProxy{
  13. Director: Director,
  14. }
  15. this.leaderServiceAddress = leaderServiceAddress
  16. }
  17. this.leaderProxy.ServeHTTP(context.Writer, context.Request)
  18. context.Abort()
  19. }
  20. }
  21. }

 

下面是对协议的处理:

  1. func (this *FSM) Apply(log *raft.Log) interface{} {
  2. result := &FSMApplyResult{
  3. Success: false,
  4. }
  5. t, cmd, err := raftLogToCommand(log)
  6. if err != nil {
  7. result.Error = err
  8. return result
  9. }
  10. binary.LittleEndian.PutUint64(keyCache, uint64(cmd.Key))
  11. binary.LittleEndian.PutUint64(valueCache, uint64(cmd.Value))
  12. switch t {
  13. case CommandPut:
  14. result.Success, result.Error = this.add(keyCache, valueCache)
  15. case CommandDelete:
  16. result.Success, result.Error = this.delete(keyCache)
  17. case CommandGet:
  18. result.Value, result.Error = this.get(keyCache)
  19. case CommandInc:
  20. result.Value, result.Error = this.inc(keyCache, cmd.Value)
  21. }
  22. return result
  23. }

输入给Raft状态的命令实际上都是序列化好的, Raft状态机会自己把命令保存到Storage里面(可以是内存, 也可以是磁盘/DB等). 所以Apply命令的时候, 先对raft log进行解码, 然后switch去处理.

这边再看看例如inc的处理:

  1. func (this *FSM) inc(key []byte, add int64) (int64, error) {
  2. var value int64 = 0
  3. err := this.db.Update(func(tx *bbolt.Tx) error {
  4. b, err := tx.CreateBucketIfNotExists(BBoltBucket)
  5. if err != nil {
  6. return err
  7. }
  8. valueBytes := b.Get(key)
  9. if len(valueBytes) != 8 {
  10. logging.Errorf("FSM.inc, key:%d, value length:%d, Reset",
  11. int64(binary.LittleEndian.Uint64(key)), len(valueBytes))
  12. valueBytes = make([]byte, 8)
  13. }
  14. value = int64(binary.LittleEndian.Uint64(valueBytes))
  15. value += add
  16. binary.LittleEndian.PutUint64(valueBytes, uint64(value))
  17. err = b.Put(key, valueBytes)
  18. return err
  19. })
  20. if err != nil {
  21. return -1, err
  22. }
  23. return value, err
  24. }

这个指令稍微复杂一点, 需要先到db里面去找, 找到的话, 再加一个N, 然后存储, 然后返回新的值. 因为raft状态机apply log的时候, 是顺序的, 所以不需要加锁啥的, inc本身就是原子的.

 

至此一个简单的分布式KeyValue服务就实现, 而且还是一个CP系统.

当然这只是一个demo, 实际的应用远远比这个复杂, 本文只是提供一种思路.

 

 

不必非要把自己绑死在Etcd上, 条条大路通罗马. 如果你的系统只需要提供有限的操作原语, 那么是可以考虑Consul Raft或者DragonBoat来制作自定义协议的CP服务. 蚂蚁的SOFARaft也可以干这种事.

 

参考:

1) RaftKV (https://gitee.com/egmkang/raft-kv)

2) Consul Raft (https://github.com/hashicorp/raft)

3) DragonBoat (https://github.com/lni/dragonboat)

4) Dapr (https://github.com/dapr/dapr/tree/master/cmd/placement)

原文链接:http://www.cnblogs.com/egmkang/p/14045593.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号