tinyrpc是一个高性能的基于protocol buffer
的rpc框架。项目代码非常少,很适合初学者进行golang的学习。
tinyrpc功能
tinyrpc
基于TCP协议,支持各种压缩格式,基于protocol buffer
的序列化协议。其rpc是基于golang原生的net/rpc
开发而成。
tinyrpc项目结构
tinyrpc
基于net/rpc
开发而成,在此基础上集成了额外的能力。项目结构如图:

功能目录如下:
- codec 编码模块
- compressor 压缩模块
- header 请求/响应头模块
- protoc-gen-tinyrpc 代码生成插件
- serializer 序列化模块
tinyrpc源码解读
客户端和服务端构建
客户端是以net/rpc
的rpc.Client
为基础构建,在此基础上定义了Option
以配置压缩方式和序列化方式:
- type Option func(o *options)
-
- type options struct {
- compressType compressor.CompressType
- serializer serializer.Serializer
- }
在创建客户端的时候将配置好的压缩算法和序列化方式作为创建客户端的参数:
- func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
- options := options{
- compressType: compressor.Raw,
- serializer: serializer.Proto,
- }
- for _, option := range opts {
- option(&options)
- }
- return &Client{rpc.NewClientWithCodec(
- codec.NewClientCodec(conn, options.compressType, options.serializer))}
- }
服务端是以net/rpc
的rpc.Server
为基础构建,在此基础上扩展了Server
的定义:
- type Server struct {
- *rpc.Server
- serializer.Serializer
- }
在创建客户端和开启服务时传入序列化方式:
- func NewServer(opts ...Option) *Server {
- options := options{
- serializer: serializer.Proto,
- }
- for _, option := range opts {
- option(&options)
- }
-
- return &Server{&rpc.Server{}, options.serializer}
- }
-
- func (s *Server) Serve(lis net.Listener) {
- log.Printf("tinyrpc started on: %s", lis.Addr().String())
- for {
- conn, err := lis.Accept()
- if err != nil {
- continue
- }
- go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))
- }
- }
压缩算法compressor
压缩算法的实现中首先是定义了压缩的接口:
- type Compressor interface {
- Zip([]byte) ([]byte, error)
- Unzip([]byte) ([]byte, error)
- }
压缩的接口包含压缩和解压方法。
压缩算法使用的是uint
类型,使用iota
来初始化,并且使用map来进行所有压缩算法实现的管理:
- type CompressType uint16
-
- const (
- Raw CompressType = iota
- Gzip
- Snappy
- Zlib
- )
-
- // Compressors which supported by rpc
- var Compressors = map[CompressType]Compressor{
- Raw: RawCompressor{},
- Gzip: GzipCompressor{},
- Snappy: SnappyCompressor{},
- Zlib: ZlibCompressor{},
- }
序列化 serializer
序列化部分代码非常简单,提供了一个接口:
- type Serializer interface {
- Marshal(message interface{}) ([]byte, error)
- Unmarshal(data []byte, message interface{}) error
- }
目前只有ProtoSerializer
一个实现,ProtoSerializer
内部的实现是基于"google.golang.org/protobuf/proto"
来实现的,并没有什么特殊的处理,因此就不花费笔墨详述了。
请求/响应头 header
tinyrpc
定义了自己的请求头和响应头:
- // RequestHeader request header structure looks like:
- // +--------------+----------------+----------+------------+----------+
- // | CompressType | Method | ID | RequestLen | Checksum |
- // +--------------+----------------+----------+------------+----------+
- // | uint16 | uvarint+string | uvarint | uvarint | uint32 |
- // +--------------+----------------+----------+------------+----------+
- type RequestHeader struct {
- sync.RWMutex
- CompressType compressor.CompressType
- Method string
- ID uint64
- RequestLen uint32
- Checksum uint32
- }
请求头由压缩类型,方法,id,请求长度和校验码组成。
- // ResponseHeader request header structure looks like:
- // +--------------+---------+----------------+-------------+----------+
- // | CompressType | ID | Error | ResponseLen | Checksum |
- // +--------------+---------+----------------+-------------+----------+
- // | uint16 | uvarint | uvarint+string | uvarint | uint32 |
- // +--------------+---------+----------------+-------------+----------+
- type ResponseHeader struct {
- sync.RWMutex
- CompressType compressor.CompressType
- ID uint64
- Error string
- ResponseLen uint32
- Checksum uint32
- }
响应头由压缩类型,id,错误信息,返回长度和校验码组成。
为了实现头的重用,tinyrpc
为头构建了缓存池:
- var (
- RequestPool sync.Pool
- ResponsePool sync.Pool
- )
-
- func init() {
- RequestPool = sync.Pool{New: func() interface{} {
- return &RequestHeader{}
- }}
- ResponsePool = sync.Pool{New: func() interface{} {
- return &ResponseHeader{}
- }}
- }
在使用时get出来,生命周期结束后放回池子,并且在put之前需要进行重置:
- h := header.RequestPool.Get().(*header.RequestHeader)
- defer func() {
- h.ResetHeader()
- header.RequestPool.Put(h)
- }()
- // ResetHeader reset request header
- func (r *RequestHeader) ResetHeader() {
- r.Lock()
- defer r.Unlock()
- r.ID = 0
- r.Checksum = 0
- r.Method = ""
- r.CompressType = 0
- r.RequestLen = 0
- }
-
- // ResetHeader reset response header
- func (r *ResponseHeader) ResetHeader() {
- r.Lock()
- defer r.Unlock()
- r.Error = ""
- r.ID = 0
- r.CompressType = 0
- r.Checksum = 0
- r.ResponseLen = 0
- }
搞清楚了头的结构以及对象池的复用逻辑,那么具体的头的编码与解码就是很简单的拆装工作,就不在此一行一行解析了,大家有兴趣可以自行去阅读。
编码 codec
由于tinyrpc
是基于net/rpc
开发,那么其codec
模块自然也是依赖于net/rpc
的ClientCodec
和ServerCodec
接口来实现的。
客户端实现
客户端是基于ClientCodec
实现的能力:
- type ClientCodec interface {
- WriteRequest(*Request, any) error
- ReadResponseHeader(*Response) error
- ReadResponseBody(any) error
-
- Close() error
- }
client
定义了一个clientCodec
类型,并且实现了ClientCodec
的接口方法:
- type clientCodec struct {
- r io.Reader
- w io.Writer
- c io.Closer
-
- compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
- serializer serializer.Serializer
- response header.ResponseHeader // rpc response header
- mutex sync.Mutex // protect pending map
- pending map[uint64]string
- }
WriteRequest
实现:
- // WriteRequest Write the rpc request header and body to the io stream
- func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
- c.mutex.Lock()
- c.pending[r.Seq] = r.ServiceMethod
- c.mutex.Unlock()
-
- if _, ok := compressor.Compressors[c.compressor]; !ok {
- return NotFoundCompressorError
- }
- reqBody, err := c.serializer.Marshal(param)
- if err != nil {
- return err
- }
- compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
- if err != nil {
- return err
- }
- h := header.RequestPool.Get().(*header.RequestHeader)
- defer func() {
- h.ResetHeader()
- header.RequestPool.Put(h)
- }()
- h.ID = r.Seq
- h.Method = r.ServiceMethod
- h.RequestLen = uint32(len(compressedReqBody))
- h.CompressType = compressor.CompressType(c.compressor)
- h.Checksum = crc32.ChecksumIEEE(compressedReqBody)
-
- if err := sendFrame(c.w, h.Marshal()); err != nil {
- return err
- }
- if err := write(c.w, compressedReqBody); err != nil {
- return err
- }
-
- c.w.(*bufio.Writer).Flush()
- return nil
- }
可以看到代码的实现还是比较清晰的,主要分为几个步骤:
- 将数据进行序列化构成请求体
- 选择相应的压缩算法进行压缩
- 从Pool中获取请求头实例将数据全部填入其中构成最后的请求头
- 分别通过io操作发送处理过的请求头和请求体
ReadResponseHeader
实现:
- // ReadResponseHeader read the rpc response header from the io stream
- func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
- c.response.ResetHeader()
- data, err := recvFrame(c.r)
- if err != nil {
- return err
- }
- err = c.response.Unmarshal(data)
- if err != nil {
- return err
- }
- c.mutex.Lock()
- r.Seq = c.response.ID
- r.Error = c.response.Error
- r.ServiceMethod = c.pending[r.Seq]
- delete(c.pending, r.Seq)
- c.mutex.Unlock()
- return nil
- }
此方法作用是读取返回的响应头,并解析成具体的结构体
ReadResponseBody
实现:
- func (c *clientCodec) ReadResponseBody(param interface{}) error {
- if param == nil {
- if c.response.ResponseLen != 0 {
- if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
- return err
- }
- }
- return nil
- }
-
- respBody := make([]byte, c.response.ResponseLen)
- err := read(c.r, respBody)
- if err != nil {
- return err
- }
-
- if c.response.Checksum != 0 {
- if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
- return UnexpectedChecksumError
- }
- }
-
- if c.response.GetCompressType() != c.compressor {
- return CompressorTypeMismatchError
- }
-
- resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
- if err != nil {
- return err
- }
-
- return c.serializer.Unmarshal(resp, param)
- }
此方法是用于读取返回的响应结构体,流程如下:
- 读取流获取响应体
- 根据响应头中的校验码来比对响应体是否完整
- 根据压缩算法来解压具体的结构体
- 进行反序列化
服务端实现
服务端是基于ServerCodec
实现的能力:
- type ServerCodec interface {
- ReadRequestHeader(*Request) error
- ReadRequestBody(any) error
- WriteResponse(*Response, any) error
-
- // Close can be called multiple times and must be idempotent.
- Close() error
- }
和客户端类似,server
定义了一个serverCodec
类型,并且实现了ServerCodec
的接口方法:
- type serverCodec struct {
- r io.Reader
- w io.Writer
- c io.Closer
-
- request header.RequestHeader
- serializer serializer.Serializer
- mutex sync.Mutex // protects seq, pending
- seq uint64
- pending map[uint64]*reqCtx
- }
ReadRequestHeader
实现:
- // ReadRequestHeader read the rpc request header from the io stream
- func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
- s.request.ResetHeader()
- data, err := recvFrame(s.r)
- if err != nil {
- return err
- }
- err = s.request.Unmarshal(data)
- if err != nil {
- return err
- }
- s.mutex.Lock()
- s.seq++
- s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}
- r.ServiceMethod = s.request.Method
- r.Seq = s.seq
- s.mutex.Unlock()
- return nil
- }
此方法用于读取请求头并解析成结构体
ReadRequestBody
实现:
- // ReadRequestBody read the rpc request body from the io stream
- func (s *serverCodec) ReadRequestBody(param interface{}) error {
- if param == nil {
- if s.request.RequestLen != 0 {
- if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
- return err
- }
- }
- return nil
- }
-
- reqBody := make([]byte, s.request.RequestLen)
-
- err := read(s.r, reqBody)
- if err != nil {
- return err
- }
-
- if s.request.Checksum != 0 {
- if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {
- return UnexpectedChecksumError
- }
- }
-
- if _, ok := compressor.
- Compressors[s.request.GetCompressType()]; !ok {
- return NotFoundCompressorError
- }
-
- req, err := compressor.
- Compressors[s.request.GetCompressType()].Unzip(reqBody)
- if err != nil {
- return err
- }
-
- return s.serializer.Unmarshal(req, param)
- }
此方法用于读取请求体,流程和读取响应体差不多,大致如下:
- 读取流并解析成请求体
- 根据请求头中的校验码进行校验
- 根据压缩算法进行解压
- 反序列化
WriteResponse
实现:
- // WriteResponse Write the rpc response header and body to the io stream
- func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {
- s.mutex.Lock()
- reqCtx, ok := s.pending[r.Seq]
- if !ok {
- s.mutex.Unlock()
- return InvalidSequenceError
- }
- delete(s.pending, r.Seq)
- s.mutex.Unlock()
-
- if r.Error != "" {
- param = nil
- }
- if _, ok := compressor.
- Compressors[reqCtx.compareType]; !ok {
- return NotFoundCompressorError
- }
-
- var respBody []byte
- var err error
- if param != nil {
- respBody, err = s.serializer.Marshal(param)
- if err != nil {
- return err
- }
- }
-
- compressedRespBody, err := compressor.
- Compressors[reqCtx.compareType].Zip(respBody)
- if err != nil {
- return err
- }
- h := header.ResponsePool.Get().(*header.ResponseHeader)
- defer func() {
- h.ResetHeader()
- header.ResponsePool.Put(h)
- }()
- h.ID = reqCtx.requestID
- h.Error = r.Error
- h.ResponseLen = uint32(len(compressedRespBody))
- h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
- h.CompressType = reqCtx.compareType
-
- if err = sendFrame(s.w, h.Marshal()); err != nil {
- return err
- }
-
- if err = write(s.w, compressedRespBody); err != nil {
- return err
- }
- s.w.(*bufio.Writer).Flush()
- return nil
- }
此方法用于写入响应体,大致与写入请求体差不多,流程如下:
- 将响应体序列化
- 使用压缩算法将响应体进行压缩
- 使用Pool管理响应头
- 分别发送返回头和返回体
总结
tinyrpc
是基于golang
原生的net/rpc
包实现,在此基础上实现了压缩和序列化等能力扩展。整体来看tinyrpc
的代码非常简单,比较适合刚接触golang
的程序员来进行阅读学习,学习一些golang
的基础的开发技巧和一些语言特性。
以上就是Golang中tinyrpc框架的源码解读详解的详细内容,更多关于Golang tinyrpc框架的资料请关注w3xue其它相关文章!