经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
Golang中tinyrpc框架的源码解读详解
来源:jb51  时间:2023/1/16 9:16:08  对本文有异议

tinyrpc是一个高性能的基于protocol buffer的rpc框架。项目代码非常少,很适合初学者进行golang的学习。

tinyrpc功能

tinyrpc基于TCP协议,支持各种压缩格式,基于protocol buffer的序列化协议。其rpc是基于golang原生的net/rpc开发而成。

tinyrpc项目结构

tinyrpc基于net/rpc开发而成,在此基础上集成了额外的能力。项目结构如图:

功能目录如下:

  • codec 编码模块
  • compressor 压缩模块
  • header 请求/响应头模块
  • protoc-gen-tinyrpc 代码生成插件
  • serializer 序列化模块

tinyrpc源码解读

客户端和服务端构建

客户端是以net/rpcrpc.Client为基础构建,在此基础上定义了Option以配置压缩方式和序列化方式:

  1. type Option func(o *options)
  2.  
  3. type options struct {
  4. compressType compressor.CompressType
  5. serializer serializer.Serializer
  6. }

在创建客户端的时候将配置好的压缩算法和序列化方式作为创建客户端的参数:

  1. func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
  2. options := options{
  3. compressType: compressor.Raw,
  4. serializer: serializer.Proto,
  5. }
  6. for _, option := range opts {
  7. option(&options)
  8. }
  9. return &Client{rpc.NewClientWithCodec(
  10. codec.NewClientCodec(conn, options.compressType, options.serializer))}
  11. }

服务端是以net/rpcrpc.Server为基础构建,在此基础上扩展了Server的定义:

  1. type Server struct {
  2. *rpc.Server
  3. serializer.Serializer
  4. }

在创建客户端和开启服务时传入序列化方式:

  1. func NewServer(opts ...Option) *Server {
  2. options := options{
  3. serializer: serializer.Proto,
  4. }
  5. for _, option := range opts {
  6. option(&options)
  7. }
  8.  
  9. return &Server{&rpc.Server{}, options.serializer}
  10. }
  11.  
  12. func (s *Server) Serve(lis net.Listener) {
  13. log.Printf("tinyrpc started on: %s", lis.Addr().String())
  14. for {
  15. conn, err := lis.Accept()
  16. if err != nil {
  17. continue
  18. }
  19. go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))
  20. }
  21. }

压缩算法compressor

压缩算法的实现中首先是定义了压缩的接口:

  1. type Compressor interface {
  2. Zip([]byte) ([]byte, error)
  3. Unzip([]byte) ([]byte, error)
  4. }

压缩的接口包含压缩和解压方法。

压缩算法使用的是uint类型,使用iota来初始化,并且使用map来进行所有压缩算法实现的管理:

  1. type CompressType uint16
  2.  
  3. const (
  4. Raw CompressType = iota
  5. Gzip
  6. Snappy
  7. Zlib
  8. )
  9.  
  10. // Compressors which supported by rpc
  11. var Compressors = map[CompressType]Compressor{
  12. Raw: RawCompressor{},
  13. Gzip: GzipCompressor{},
  14. Snappy: SnappyCompressor{},
  15. Zlib: ZlibCompressor{},
  16. }

序列化 serializer

序列化部分代码非常简单,提供了一个接口:

  1. type Serializer interface {
  2. Marshal(message interface{}) ([]byte, error)
  3. Unmarshal(data []byte, message interface{}) error
  4. }

目前只有ProtoSerializer一个实现,ProtoSerializer内部的实现是基于"google.golang.org/protobuf/proto"来实现的,并没有什么特殊的处理,因此就不花费笔墨详述了。

请求/响应头 header

tinyrpc定义了自己的请求头和响应头:

  1. // RequestHeader request header structure looks like:
  2. // +--------------+----------------+----------+------------+----------+
  3. // | CompressType | Method | ID | RequestLen | Checksum |
  4. // +--------------+----------------+----------+------------+----------+
  5. // | uint16 | uvarint+string | uvarint | uvarint | uint32 |
  6. // +--------------+----------------+----------+------------+----------+
  7. type RequestHeader struct {
  8. sync.RWMutex
  9. CompressType compressor.CompressType
  10. Method string
  11. ID uint64
  12. RequestLen uint32
  13. Checksum uint32
  14. }

请求头由压缩类型,方法,id,请求长度和校验码组成。

  1. // ResponseHeader request header structure looks like:
  2. // +--------------+---------+----------------+-------------+----------+
  3. // | CompressType | ID | Error | ResponseLen | Checksum |
  4. // +--------------+---------+----------------+-------------+----------+
  5. // | uint16 | uvarint | uvarint+string | uvarint | uint32 |
  6. // +--------------+---------+----------------+-------------+----------+
  7. type ResponseHeader struct {
  8. sync.RWMutex
  9. CompressType compressor.CompressType
  10. ID uint64
  11. Error string
  12. ResponseLen uint32
  13. Checksum uint32
  14. }

响应头由压缩类型,id,错误信息,返回长度和校验码组成。

为了实现头的重用,tinyrpc为头构建了缓存池:

  1. var (
  2. RequestPool sync.Pool
  3. ResponsePool sync.Pool
  4. )
  5.  
  6. func init() {
  7. RequestPool = sync.Pool{New: func() interface{} {
  8. return &RequestHeader{}
  9. }}
  10. ResponsePool = sync.Pool{New: func() interface{} {
  11. return &ResponseHeader{}
  12. }}
  13. }

在使用时get出来,生命周期结束后放回池子,并且在put之前需要进行重置:

  1. h := header.RequestPool.Get().(*header.RequestHeader)
  2. defer func() {
  3. h.ResetHeader()
  4. header.RequestPool.Put(h)
  5. }()
  1. // ResetHeader reset request header
  2. func (r *RequestHeader) ResetHeader() {
  3. r.Lock()
  4. defer r.Unlock()
  5. r.ID = 0
  6. r.Checksum = 0
  7. r.Method = ""
  8. r.CompressType = 0
  9. r.RequestLen = 0
  10. }
  11.  
  12. // ResetHeader reset response header
  13. func (r *ResponseHeader) ResetHeader() {
  14. r.Lock()
  15. defer r.Unlock()
  16. r.Error = ""
  17. r.ID = 0
  18. r.CompressType = 0
  19. r.Checksum = 0
  20. r.ResponseLen = 0
  21. }

搞清楚了头的结构以及对象池的复用逻辑,那么具体的头的编码与解码就是很简单的拆装工作,就不在此一行一行解析了,大家有兴趣可以自行去阅读。

编码 codec

由于tinyrpc是基于net/rpc开发,那么其codec模块自然也是依赖于net/rpcClientCodecServerCodec接口来实现的。

客户端实现

客户端是基于ClientCodec实现的能力:

  1. type ClientCodec interface {
  2. WriteRequest(*Request, any) error
  3. ReadResponseHeader(*Response) error
  4. ReadResponseBody(any) error
  5.  
  6. Close() error
  7. }

client定义了一个clientCodec类型,并且实现了ClientCodec的接口方法:

  1. type clientCodec struct {
  2. r io.Reader
  3. w io.Writer
  4. c io.Closer
  5.  
  6. compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
  7. serializer serializer.Serializer
  8. response header.ResponseHeader // rpc response header
  9. mutex sync.Mutex // protect pending map
  10. pending map[uint64]string
  11. }

WriteRequest实现:

  1. // WriteRequest Write the rpc request header and body to the io stream
  2. func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
  3. c.mutex.Lock()
  4. c.pending[r.Seq] = r.ServiceMethod
  5. c.mutex.Unlock()
  6.  
  7. if _, ok := compressor.Compressors[c.compressor]; !ok {
  8. return NotFoundCompressorError
  9. }
  10. reqBody, err := c.serializer.Marshal(param)
  11. if err != nil {
  12. return err
  13. }
  14. compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
  15. if err != nil {
  16. return err
  17. }
  18. h := header.RequestPool.Get().(*header.RequestHeader)
  19. defer func() {
  20. h.ResetHeader()
  21. header.RequestPool.Put(h)
  22. }()
  23. h.ID = r.Seq
  24. h.Method = r.ServiceMethod
  25. h.RequestLen = uint32(len(compressedReqBody))
  26. h.CompressType = compressor.CompressType(c.compressor)
  27. h.Checksum = crc32.ChecksumIEEE(compressedReqBody)
  28.  
  29. if err := sendFrame(c.w, h.Marshal()); err != nil {
  30. return err
  31. }
  32. if err := write(c.w, compressedReqBody); err != nil {
  33. return err
  34. }
  35.  
  36. c.w.(*bufio.Writer).Flush()
  37. return nil
  38. }

可以看到代码的实现还是比较清晰的,主要分为几个步骤:

  • 将数据进行序列化构成请求体
  • 选择相应的压缩算法进行压缩
  • 从Pool中获取请求头实例将数据全部填入其中构成最后的请求头
  • 分别通过io操作发送处理过的请求头和请求体

ReadResponseHeader实现:

  1. // ReadResponseHeader read the rpc response header from the io stream
  2. func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
  3. c.response.ResetHeader()
  4. data, err := recvFrame(c.r)
  5. if err != nil {
  6. return err
  7. }
  8. err = c.response.Unmarshal(data)
  9. if err != nil {
  10. return err
  11. }
  12. c.mutex.Lock()
  13. r.Seq = c.response.ID
  14. r.Error = c.response.Error
  15. r.ServiceMethod = c.pending[r.Seq]
  16. delete(c.pending, r.Seq)
  17. c.mutex.Unlock()
  18. return nil
  19. }

此方法作用是读取返回的响应头,并解析成具体的结构体

ReadResponseBody实现:

  1. func (c *clientCodec) ReadResponseBody(param interface{}) error {
  2. if param == nil {
  3. if c.response.ResponseLen != 0 {
  4. if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
  5. return err
  6. }
  7. }
  8. return nil
  9. }
  10.  
  11. respBody := make([]byte, c.response.ResponseLen)
  12. err := read(c.r, respBody)
  13. if err != nil {
  14. return err
  15. }
  16.  
  17. if c.response.Checksum != 0 {
  18. if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
  19. return UnexpectedChecksumError
  20. }
  21. }
  22.  
  23. if c.response.GetCompressType() != c.compressor {
  24. return CompressorTypeMismatchError
  25. }
  26.  
  27. resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
  28. if err != nil {
  29. return err
  30. }
  31.  
  32. return c.serializer.Unmarshal(resp, param)
  33. }

此方法是用于读取返回的响应结构体,流程如下:

  • 读取流获取响应体
  • 根据响应头中的校验码来比对响应体是否完整
  • 根据压缩算法来解压具体的结构体
  • 进行反序列化

服务端实现

服务端是基于ServerCodec实现的能力:

  1. type ServerCodec interface {
  2. ReadRequestHeader(*Request) error
  3. ReadRequestBody(any) error
  4. WriteResponse(*Response, any) error
  5.  
  6. // Close can be called multiple times and must be idempotent.
  7. Close() error
  8. }

和客户端类似,server定义了一个serverCodec类型,并且实现了ServerCodec的接口方法:

  1. type serverCodec struct {
  2. r io.Reader
  3. w io.Writer
  4. c io.Closer
  5.  
  6. request header.RequestHeader
  7. serializer serializer.Serializer
  8. mutex sync.Mutex // protects seq, pending
  9. seq uint64
  10. pending map[uint64]*reqCtx
  11. }

ReadRequestHeader实现:

  1. // ReadRequestHeader read the rpc request header from the io stream
  2. func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
  3. s.request.ResetHeader()
  4. data, err := recvFrame(s.r)
  5. if err != nil {
  6. return err
  7. }
  8. err = s.request.Unmarshal(data)
  9. if err != nil {
  10. return err
  11. }
  12. s.mutex.Lock()
  13. s.seq++
  14. s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}
  15. r.ServiceMethod = s.request.Method
  16. r.Seq = s.seq
  17. s.mutex.Unlock()
  18. return nil
  19. }

此方法用于读取请求头并解析成结构体

ReadRequestBody实现:

  1. // ReadRequestBody read the rpc request body from the io stream
  2. func (s *serverCodec) ReadRequestBody(param interface{}) error {
  3. if param == nil {
  4. if s.request.RequestLen != 0 {
  5. if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
  6. return err
  7. }
  8. }
  9. return nil
  10. }
  11.  
  12. reqBody := make([]byte, s.request.RequestLen)
  13.  
  14. err := read(s.r, reqBody)
  15. if err != nil {
  16. return err
  17. }
  18.  
  19. if s.request.Checksum != 0 {
  20. if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {
  21. return UnexpectedChecksumError
  22. }
  23. }
  24.  
  25. if _, ok := compressor.
  26. Compressors[s.request.GetCompressType()]; !ok {
  27. return NotFoundCompressorError
  28. }
  29.  
  30. req, err := compressor.
  31. Compressors[s.request.GetCompressType()].Unzip(reqBody)
  32. if err != nil {
  33. return err
  34. }
  35.  
  36. return s.serializer.Unmarshal(req, param)
  37. }

此方法用于读取请求体,流程和读取响应体差不多,大致如下:

  • 读取流并解析成请求体
  • 根据请求头中的校验码进行校验
  • 根据压缩算法进行解压
  • 反序列化

WriteResponse实现:

  1. // WriteResponse Write the rpc response header and body to the io stream
  2. func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {
  3. s.mutex.Lock()
  4. reqCtx, ok := s.pending[r.Seq]
  5. if !ok {
  6. s.mutex.Unlock()
  7. return InvalidSequenceError
  8. }
  9. delete(s.pending, r.Seq)
  10. s.mutex.Unlock()
  11.  
  12. if r.Error != "" {
  13. param = nil
  14. }
  15. if _, ok := compressor.
  16. Compressors[reqCtx.compareType]; !ok {
  17. return NotFoundCompressorError
  18. }
  19.  
  20. var respBody []byte
  21. var err error
  22. if param != nil {
  23. respBody, err = s.serializer.Marshal(param)
  24. if err != nil {
  25. return err
  26. }
  27. }
  28.  
  29. compressedRespBody, err := compressor.
  30. Compressors[reqCtx.compareType].Zip(respBody)
  31. if err != nil {
  32. return err
  33. }
  34. h := header.ResponsePool.Get().(*header.ResponseHeader)
  35. defer func() {
  36. h.ResetHeader()
  37. header.ResponsePool.Put(h)
  38. }()
  39. h.ID = reqCtx.requestID
  40. h.Error = r.Error
  41. h.ResponseLen = uint32(len(compressedRespBody))
  42. h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
  43. h.CompressType = reqCtx.compareType
  44.  
  45. if err = sendFrame(s.w, h.Marshal()); err != nil {
  46. return err
  47. }
  48.  
  49. if err = write(s.w, compressedRespBody); err != nil {
  50. return err
  51. }
  52. s.w.(*bufio.Writer).Flush()
  53. return nil
  54. }

此方法用于写入响应体,大致与写入请求体差不多,流程如下:

  • 将响应体序列化
  • 使用压缩算法将响应体进行压缩
  • 使用Pool管理响应头
  • 分别发送返回头和返回体

总结

tinyrpc是基于golang原生的net/rpc包实现,在此基础上实现了压缩和序列化等能力扩展。整体来看tinyrpc的代码非常简单,比较适合刚接触golang的程序员来进行阅读学习,学习一些golang的基础的开发技巧和一些语言特性。

以上就是Golang中tinyrpc框架的源码解读详解的详细内容,更多关于Golang tinyrpc框架的资料请关注w3xue其它相关文章!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号