经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Redis » 查看文章
GO实现Redis:GO实现TCP服务器(1)
来源:cnblogs  作者:csgopher  时间:2023/3/24 9:07:05  对本文有异议

interface/tcp/Handler.go

  1. type Handler interface {
  2. Handle(ctx context.Context, conn net.Conn)
  3. Close() error
  4. }
  • Handler:业务逻辑的处理接口
    • Handle(ctx context.Context, conn net.Conn) 处理连接

tcp/server.go

  1. type Config struct {
  2. Address string
  3. }
  4. func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
  5. closeChan := make(chan struct{})
  6. listen, err := net.Listen("tcp", cfg.Address)
  7. if err != nil {
  8. return err
  9. }
  10. logger.Info("start listen")
  11. ListenAndServe(listen, handler, closeChan)
  12. return nil
  13. }
  14. func ListenAndServe(listener net.Listener,
  15. handler tcp.Handler,
  16. closeChan <-chan struct{}) {
  17. ctx := context.Background()
  18. var waitDone sync.WaitGroup
  19. for true {
  20. conn, err := listener.Accept()
  21. if err != nil {
  22. break
  23. }
  24. logger.Info("accept link")
  25. waitDone.Add(1)
  26. go func() {
  27. defer func() {
  28. waitDone.Done()
  29. }()
  30. handler.Handler(ctx, conn)
  31. }()
  32. }
  33. waitDone.Wait()
  34. }
  • Config:启动tcp服务器的配置
    • Address:监听地址
  • ListenAndServe:ctx是上下文,可以传递一些参数。死循环中接收到新连接时,让一个协程去处理连接
  • 如果listener.Accept()出错了就会break跳出来,这时候需要等待已经服务的客户端退出。使用WaitGroup等待客服端退出

  1. func ListenAndServe(listener net.Listener,
  2. handler tcp.Handler,
  3. closeChan <-chan struct{}) {
  4. go func() {
  5. <-closeChan
  6. logger.Info("shutting down...")
  7. _ = listener.Close()
  8. _ = handler.Close()
  9. }()
  10. defer func() {
  11. _ = listener.Close()
  12. _ = handler.Close()
  13. }()
  14. ......
  15. }

listener和handler在退出的时候需要关掉。如果用户直接kill掉了程序,我们也需要关掉listener和handler,这时候要使用closeChan,一旦接收到关闭信号,就执行关闭逻辑

  1. func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
  2. closeChan := make(chan struct{})
  3. sigCh := make(chan os.Signal)
  4. signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  5. go func() {
  6. sig := <-sigCh
  7. switch sig {
  8. case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  9. closeChan <- struct{}{}
  10. }
  11. }()
  12. listen, err := net.Listen("tcp", cfg.Address)
  13. if err != nil {
  14. return err
  15. }
  16. logger.Info("start listen")
  17. ListenAndServe(listen, handler, closeChan)
  18. return nil
  19. }

当系统对程序发送信号时,sigCh会接收到信号

tcp/echo.go

  1. type EchoHandler struct {
  2. activeConn sync.Map
  3. closing atomic.Boolean
  4. }

EchoHandler:

  • activeConn:记录连接
  • closing:是否正在关闭,有并发竞争,使用atomic.Boolean

  1. type EchoClient struct {
  2. Conn net.Conn
  3. Waiting wait.Wait
  4. }
  5. func (c *EchoClient) Close() error {
  6. c.Waiting.WaitWithTimeout(10 * time.Second)
  7. _ = c.Conn.Close()
  8. return nil
  9. }

EchoClient:一个客户端就是一个连接。Close方法关闭客户端连接,超时时间设置为10s

  1. func MakeHandler() *EchoHandler {
  2. return &EchoHandler{}
  3. }
  4. func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
  5. // 连接正在关闭,不接收新连接
  6. if h.closing.Get() {
  7. _ = conn.Close()
  8. }
  9. client := &EchoClient{
  10. Conn: conn,
  11. }
  12. h.activeConn.Store(client, struct{}{})
  13. reader := bufio.NewReader(conn)
  14. for {
  15. msg, err := reader.ReadString('\n')
  16. if err != nil {
  17. if err == io.EOF {
  18. logger.Info("connection close")
  19. h.activeConn.Delete(client)
  20. } else {
  21. logger.Warn(err)
  22. }
  23. return
  24. }
  25. // 正在处理业务,不要关掉
  26. client.Waiting.Add(1)
  27. // 将数据原封不动写回去,测试
  28. b := []byte(msg)
  29. _, _ = conn.Write(b)
  30. client.Waiting.Done()
  31. }
  32. }
  33. func (h *EchoHandler) Close() error {
  34. logger.Info("handler shutting down...")
  35. h.closing.Set(true)
  36. h.activeConn.Range(func(key interface{}, val interface{}) bool {
  37. client := key.(*EchoClient)
  38. _ = client.Close()
  39. return true
  40. })
  41. return nil
  42. }
  • MakeEchoHandler:创建EchoHandler
  • Handle:处理客户端的连接。
    • 1.连接正在关闭时,不接收新连接
    • 2.存储新连接,value用空结构体
    • 3.使用缓存区接收用户发来的数据,使用\n作为结束的标志
  • Close:将所有客户端连接关掉

main.go

  1. const configFile string = "redis.conf"
  2. var defaultProperties = &config.ServerProperties{
  3. Bind: "0.0.0.0",
  4. Port: 6379,
  5. }
  6. func fileExists(filename string) bool {
  7. info, err := os.Stat(filename)
  8. return err == nil && !info.IsDir()
  9. }
  10. func main() {
  11. logger.Setup(&logger.Settings{
  12. Path: "logs",
  13. Name: "godis",
  14. Ext: "log",
  15. TimeFormat: "2022-02-02",
  16. })
  17. if fileExists(configFile) {
  18. config.SetupConfig(configFile)
  19. } else {
  20. config.Properties = defaultProperties
  21. }
  22. err := tcp.ListenAndServeWithSignal(
  23. &tcp.Config{
  24. Address: fmt.Sprintf("%s:%d",
  25. config.Properties.Bind,
  26. config.Properties.Port),
  27. },
  28. EchoHandler.MakeHandler())
  29. if err != nil {
  30. logger.Error(err)
  31. }
  32. }

原文链接:https://www.cnblogs.com/csgopher/p/17248642.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号