经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
Golang实现简易的rpc调用
来源:jb51  时间:2023/3/8 10:59:10  对本文有异议

RPC(Remote Procedure Call Protocol)远程过程调用协议。 一个通俗的描述是:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样。 比较正式的描述是:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议 从使用的方面来说,服务端和客户端通过TCP/UDP/HTTP等通讯协议通讯,在通讯的时候客户端指定好服务端的方法、参数等信息通过序列化传送到服务端,服务端可以通过已有的元信息找到需要调用的方法,然后完成一次调用后序列化返回给客户端(rpc更多的是指服务与服务之间的通信,可以使用效率更高的协议和序列化格式去进行,并且可以进行有效的负载均衡和熔断超时等,因此跟前后端之间的web的交互概念上是有点不一样的) 用一张简单的图来表示

开始

本文只实现一个rpc框架基本的功能,不对性能做保证,因此尽量使用go原生自带的net/json库等进行操作,对使用方面不做stub(偷懒,只使用简单的json格式指定需要调用的方法),用最简单的方式实现一个简易rpc框架,也不保证超时调用和服务发现等集成的逻辑,服务发现可以参考下文 本文代码地址(https://github.com/wuhuZhao/rpc_demo)

实现两点之间的通讯(transport)

本段先实现两端之间的通讯,只确保两个端之间能互相通讯即可 server.go

  1. package server
  2.  
  3. import (
  4. "fmt"
  5. "log"
  6. "net"
  7. )
  8.  
  9. // Server: transport底层实现,通过Server去接受客户端的字节流
  10. type Server struct {
  11. ls net.Listener
  12. port int
  13. }
  14.  
  15. // NewServer: 根据端口创建一个server
  16. func NewServer(port int) *Server {
  17. s := &Server{port: port}
  18. s.init()
  19. return s
  20. }
  21.  
  22. // init: 初始化服务端连接
  23. func (s *Server) init() {
  24. l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", s.port))
  25. if err != nil {
  26. panic(err)
  27. }
  28. s.ls = l
  29. }
  30.  
  31. // Start: 启动服务端的端口监听,采取一个conn一个g的模型,没有使用reactor等高性能模型
  32. func (s *Server) Start() {
  33. go func() {
  34. log.Printf("server [%s] start....", s.ls.Addr().String())
  35. for {
  36. conn, err := s.ls.Accept()
  37. if err != nil {
  38. panic(err)
  39. }
  40. go func() {
  41. buf := make([]byte, 1024)
  42. for {
  43. idx, err := conn.Read(buf)
  44. if err != nil {
  45. panic(err)
  46. }
  47. if len(buf) == 0 {
  48. continue
  49. }
  50. // todo 等序列化的信息
  51. log.Printf("[conn: %v] get data: %v\n", conn.RemoteAddr(), string(buf[:idx]))
  52.  
  53. }
  54. }()
  55. }
  56. }()
  57.  
  58. }
  59.  
  60. // Close: 关闭服务监听
  61. func (s *Server) Close() error {
  62. return s.ls.Close()
  63. }
  64.  
  65.  
  66. // Close: 关闭服务监听
  67. func (s *Server) Close() error {
  68. return s.ls.Close()
  69. }

client.go

  1. package client
  2.  
  3. import (
  4. "fmt"
  5. "log"
  6. "net"
  7. "unsafe"
  8. )
  9.  
  10. type Client struct {
  11. port int
  12. conn net.Conn
  13. }
  14.  
  15. func NewClient(port int) *Client {
  16. c := &Client{port: port}
  17. c.init()
  18. return c
  19. }
  20.  
  21. // init: initialize tcp client
  22. func (c *Client) init() {
  23. conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", c.port))
  24. if err != nil {
  25. panic(err)
  26. }
  27. c.conn = conn
  28.  
  29. }
  30.  
  31. func (c *Client) Send(statement string) error {
  32. _, err := c.conn.Write(*(*[]byte)(unsafe.Pointer(&statement)))
  33. if err != nil {
  34. panic(err)
  35. }
  36. return nil
  37. }
  38.  
  39. // Close: use to close connection
  40. func (c *Client) Close() error {
  41. return c.conn.Close()
  42. }

使用main.go做测试 main.go

  1. package main
  2.  
  3. import (
  4. "rpc_demo/internal/client"
  5. "rpc_demo/internal/server"
  6. "time"
  7. )
  8.  
  9. func main() {
  10. s := server.NewServer(9999)
  11. s.Start()
  12. time.Sleep(5 * time.Second)
  13. c := client.NewClient(9999)
  14. c.Send("this is a test\n")
  15. time.Sleep(5 * time.Second)
  16. }

执行一次main.go, go run main.go

2023/03/05 14:39:11 server [127.0.0.1:9999] start....
2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test

可以证明第一部分的任务已经完成,可以实现两端之间的通讯了

实现反射调用已注册的方法

实现了双端的通信以后,我们在internal.go里实现两个方法,一个是注册,一个是调用,因为go有运行时的反射,所以我们使用反射去注册每一个需要调用到的方法,然后提供全局唯一的函数名,让client端可以实现指定方法的调用

internal.go

  1. package internal
  2.  
  3. import (
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "runtime"
  8. "strings"
  9. )
  10.  
  11. // 全局唯一
  12. var GlobalMethod = &Method{methods: map[string]reflect.Value{}}
  13.  
  14. type Method struct {
  15. methods map[string]reflect.Value
  16. }
  17.  
  18. func (m *Method) register(impl interface{}) error {
  19. pl := reflect.ValueOf(impl)
  20. if pl.Kind() != reflect.Func {
  21. return errors.New("impl should be function")
  22. }
  23. // 获取函数名
  24. methodName := runtime.FuncForPC(pl.Pointer()).Name()
  25. if len(strings.Split(methodName, ".")) < 1 {
  26. return errors.New("invalid function name")
  27. }
  28. lastFuncName := strings.Split(methodName, ".")[1]
  29. m.methods[lastFuncName] = pl
  30. fmt.Printf("methods: %v\n", m.methods)
  31. return nil
  32. }
  33.  
  34. func (m *Method) call(methodName string, callParams ...interface{}) ([]interface{}, error) {
  35. fn, ok := m.methods[methodName]
  36. if !ok {
  37. return nil, errors.New("impl method not found! Please Register first")
  38. }
  39. in := make([]reflect.Value, len(callParams))
  40. for i := 0; i < len(callParams); i++ {
  41. in[i] = reflect.ValueOf(callParams[i])
  42. }
  43. res := fn.Call(in)
  44. out := make([]interface{}, len(res))
  45. for i := 0; i < len(res); i++ {
  46. out[i] = res[i].Interface()
  47. }
  48. return out, nil
  49. }
  50.  
  51. func Call(methodName string, callParams ...interface{}) ([]interface{}, error) {
  52. return GlobalMethod.call(methodName, callParams...)
  53. }
  54.  
  55. func Register(impl interface{}) error {
  56. return GlobalMethod.register(impl)
  57. }

在单测里测试一下这个注册和调用的功能internal_test.go

  1. package internal
  2.  
  3. import (
  4. "testing"
  5. )
  6.  
  7. func Sum(a, b int) int {
  8. return a + b
  9. }
  10. func TestRegister(t *testing.T) {
  11. err := Register(Sum)
  12. if err != nil {
  13. t.Fatalf("err: %v\n", err)
  14. }
  15. t.Logf("test success\n")
  16. }
  17.  
  18. func TestCall(t *testing.T) {
  19. TestRegister(t)
  20. result, err := Call("Sum", 1, 2)
  21. if err != nil {
  22. t.Fatalf("err: %v\n", err)
  23. }
  24. if len(result) != 1 {
  25. t.Fatalf("len(result) is not equal to 1\n")
  26. }
  27. t.Logf("Sum(1,2) = %d\n", result[0].(int))
  28. if err := recover(); err != nil {
  29. t.Fatalf("%v\n", err)
  30. }
  31. }

执行调用

/usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v

Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v

=== RUN   TestCall
methods: map[Sum:<func(int, int) int Value>]
    /root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:15: test success
    /root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:27: Sum(1,2) = 3
--- PASS: TestCall (0.00s)
PASS
ok      rpc_demo/internal    0.002s

可以看到这个注册和调用的过程已经实现并且达到指定方法调用的作用

设计struct完整表达一次完整的rpc调用,并且封装json库中的Decoder和Encoder,完成序列化和反序列化

internal.go

  1. type RpcRequest struct {
  2. MethodName string
  3. Params []interface{}
  4. }
  5.  
  6. type RpcResponses struct {
  7. Returns []interface{}
  8. Err error
  9. }

transport.go考虑可以对接更多的格式,所以抽象了一层进行使用(demo肯定没有更多格式了)

  1. package transport
  2.  
  3. // Transport: 序列化格式的抽象层,从connection中读取数据序列化并且反序列化到connection中
  4. type Transport interface {
  5. Decode(v interface{}) error
  6. Encode(v interface{}) error
  7. Close()
  8. }

json_transport.go

  1. package transport
  2.  
  3. import (
  4. "encoding/json"
  5. "net"
  6. )
  7.  
  8. var _ Transport = (*JSONTransport)(nil)
  9.  
  10. type JSONTransport struct {
  11. encoder *json.Encoder
  12. decoder *json.Decoder
  13. }
  14.  
  15. // NewJSONTransport: 负责读取和写入conn
  16. func NewJSONTransport(conn net.Conn) *JSONTransport {
  17. return &JSONTransport{json.NewEncoder(conn), json.NewDecoder(conn)}
  18. }
  19.  
  20. // Decode: use json package to decode
  21. func (t *JSONTransport) Decode(v interface{}) error {
  22. if err := t.decoder.Decode(v); err != nil {
  23. return err
  24. }
  25. return nil
  26. }
  27.  
  28. // Encode: use json package to encode
  29. func (t *JSONTransport) Encode(v interface{}) error {
  30. if err := t.encoder.Encode(v); err != nil {
  31. return err
  32. }
  33. return nil
  34. }
  35.  
  36. // Close: not implement
  37. func (dec *JSONTransport) Close() {
  38.  
  39. }

然后我们将服务端和客户端的逻辑进行修改,改成通过上面两个结构体进行通信,然后返回一次调用 server.go

  1. //...
  2. for {
  3. conn, err := s.ls.Accept()
  4. if err != nil {
  5. panic(err)
  6. }
  7. tsp := transport.NewJSONTransport(conn)
  8. go func() {
  9. for {
  10. request := &internal.RpcRequest{}
  11. err := tsp.Decode(request)
  12. if err != nil {
  13. panic(err)
  14. }
  15. log.Printf("[server] get request: %v\n", request)
  16. result, err := internal.Call(request.MethodName, request.Params...)
  17. log.Printf("[server] invoke method: %v\n", result)
  18. if err != nil {
  19. response := &internal.RpcResponses{Returns: nil, Err: err}
  20. tsp.Encode(response)
  21. continue
  22. }
  23. response := &internal.RpcResponses{Returns: result, Err: err}
  24. if err := tsp.Encode(response); err != nil {
  25. log.Printf("[server] encode response err: %v\n", err)
  26. continue
  27. }
  28. }
  29. }()
  30. }
  31. //...

client.go

  1. // ...
  2. // Call: remote invoke
  3. func (c *Client) Call(methodName string, params ...interface{}) (res *internal.RpcResponses) {
  4. request := internal.RpcRequest{MethodName: methodName, Params: params}
  5. log.Printf("[client] create request to invoke server: %v\n", request)
  6. err := c.tsp.Encode(request)
  7. if err != nil {
  8. panic(err)
  9. }
  10. res = &internal.RpcResponses{}
  11. if err := c.tsp.Decode(res); err != nil {
  12. panic(err)
  13. }
  14. log.Printf("[client] get response from server: %v\n", res)
  15. return res
  16. }
  17. // ...

main.go

  1. package main
  2.  
  3. import (
  4. "log"
  5. "rpc_demo/internal"
  6. "rpc_demo/internal/client"
  7. "rpc_demo/internal/server"
  8. "strings"
  9. "time"
  10. )
  11.  
  12. // Rpc方法的一个简易实现
  13. func Join(a ...string) string {
  14. res := &strings.Builder{}
  15. for i := 0; i < len(a); i++ {
  16. res.WriteString(a[i])
  17. }
  18. return res.String()
  19. }
  20.  
  21. func main() {
  22. internal.Register(Join)
  23. s := server.NewServer(9999)
  24. s.Start()
  25. time.Sleep(5 * time.Second)
  26. c := client.NewClient(9999)
  27. res := c.Call("Join", "aaaaa", "bbbbb", "ccccccccc", "end")
  28. if res.Err != nil {
  29. log.Printf("[main] get an error from server: %v\n", res.Err)
  30. return
  31. }
  32. log.Printf("[main] get a response from server: %v\n", res.Returns[0].(string))
  33. time.Sleep(5 * time.Second)
  34. }

接下来我们运行一下main

[root@hecs-74066 rpc_demo]# go run main.go 
2023/03/05 14:39:11 server [127.0.0.1:9999] start....
2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test

[root@hecs-74066 rpc_demo]# go run main.go 
2023/03/05 21:53:41 server [127.0.0.1:9999] start....
2023/03/05 21:53:46 [client] create request to invoke server: {Join [aaaaa bbbbb ccccccccc end]}
2023/03/05 21:53:46 [server] get request: &{Join [aaaaa bbbbb ccccccccc end]}
2023/03/05 21:53:46 [server] invoke method: [aaaaabbbbbcccccccccend]
2023/03/05 21:53:46 [client] get response from server: &{[aaaaabbbbbcccccccccend] <nil>}
2023/03/05 21:53:46 [main] get a response from server: aaaaabbbbbcccccccccend

总结(自我pua)

这样我们就实现了一个简单的rpc框架了,符合最简单的架构图,从client->序列化请求->transport -> 反序列化 ->server然后从server->序列化请求->transport->反序列化请求->client。当然从可用性的角度来说是差远了,没有实现stub代码,也没有idl的实现,导致所有的注册方法都是硬编码,可用性不高,而且没有集成服务发现(可以参考我的另一篇文章去集成)和熔断等功能,也没用中间件(也是我的另一篇文章)和超时等丰富的功能在里面,并且最近看了不少rpc框架的源码,感觉这个demo的设计也差远了。不过因为时间问题和代码的复杂性问题(单纯懒),起码算是实现了一个简单的rpc框架。

推荐一些比较好的框架实现

到此这篇关于Golang实现简易的rpc调用的文章就介绍到这了,更多相关Golang rpc调用内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号