经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
微服务实践之分布式定时任务
来源:cnblogs  作者:Kevin Wan  时间:2021/2/1 11:59:20  对本文有异议

承接上篇:上篇文章讲到改造 go-zero 生成的 app module 中的 gateway & RPC 。本篇讲讲如何接入 异步任务 以及 log的使用

Delay Job

日常任务开放中,我们会有很多异步、批量、定时、延迟任务要处理,go-zero中有 go-queue,推荐使用 go-queue 去处理,go-queue 本身也是基于 go-zero 开发的,其本身是有两种模式:

  • dq : 依赖于 beanstalkd ,分布式,可存储,延迟、定时设置,关机重启可以重新执行,消息会丢失,使用非常简单,go-queue中使用了redis setnx保证了每个消息只被消费一次,使用场景主要是用来做日常任务使用
  • kq:依赖于 kafka ,这个就不多介绍啦,大名鼎鼎的 kafka ,使用场景主要是做日志用

我们主要说一下dq,kq使用也一样的,只是依赖底层不同,如果没使用过beanstalkd,没接触过beanstalkd的可以先google一下,使用起来还是挺容易的。

我在jobs下使用goctl新建了一个message-job.api服务

  1. info(
  2. title: //消息任务
  3. desc: // 消息任务
  4. author: "Mikael"
  5. email: "13247629622@163.com"
  6. )
  7. type BatchSendMessageReq {}
  8. type BatchSendMessageResp {}
  9. service message-job-api {
  10. @handler batchSendMessageHandler // 批量发送短信
  11. post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)
  12. }

因为不需要使用路由,所以handler下的routes.go被我删除了,在handler下新建了一个jobRun.go,内容如下:

  1. package handler
  2. import (
  3. "fishtwo/lib/xgo"
  4. "fishtwo/app/jobs/message/internal/svc"
  5. )
  6. /**
  7. * @Description 启动job
  8. * @Author Mikael
  9. * @Date 2021/1/18 12:05
  10. * @Version 1.0
  11. **/
  12. func JobRun(serverCtx *svc.ServiceContext) {
  13. xgo.Go(func() {
  14. batchSendMessageHandler(serverCtx)
  15. //...many job
  16. })
  17. }

其实xgo.Go就是 go batchSendMessageHandler(serverCtx) ,封装了一下go携程,防止野生goroutine panic

然后修改一下启动文件message-job.go

  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "fishtwo/app/jobs/message/internal/config"
  6. "fishtwo/app/jobs/message/internal/handler"
  7. "fishtwo/app/jobs/message/internal/svc"
  8. "github.com/tal-tech/go-zero/core/conf"
  9. "github.com/tal-tech/go-zero/rest"
  10. )
  11. var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")
  12. func main() {
  13. flag.Parse()
  14. var c config.Config
  15. conf.MustLoad(*configFile, &c)
  16. ctx := svc.NewServiceContext(c)
  17. server := rest.MustNewServer(c.RestConf)
  18. defer server.Stop()
  19. handler.JobRun(ctx)
  20. fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
  21. server.Start()
  22. }

主要是handler.RegisterHandlers(server, ctx) 修改为handler.JobRun(ctx)

接下来,我们就可以引入dq了,首先在etc/xxx.yaml下添加dqConf

  1. .....
  2. DqConf:
  3. Beanstalks:
  4. - Endpoint: 127.0.0.1:7771
  5. Tube: tube1
  6. - Endpoint: 127.0.0.1:7772
  7. Tube: tube2
  8. Redis:
  9. Host: 127.0.0.1:6379
  10. Type: node

我这里本地用不同端口,模拟开了2个节点,7771、7772

在internal/config/config.go添加配置解析对象

  1. type Config struct {
  2. ....
  3. DqConf dq.DqConf
  4. }

修改handler/batchsendmessagehandler.go

  1. package handler
  2. import (
  3. "context"
  4. "fishtwo/app/jobs/message/internal/logic"
  5. "fishtwo/app/jobs/message/internal/svc"
  6. "github.com/tal-tech/go-zero/core/logx"
  7. )
  8. func batchSendMessageHandler(ctx *svc.ServiceContext){
  9. rootCxt:= context.Background()
  10. l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
  11. err := l.BatchSendMessage()
  12. if err != nil{
  13. logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err)
  14. }
  15. }

修改logic下batchsendmessagelogic.go,写我们的consumer消费逻辑

  1. package logic
  2. import (
  3. "context"
  4. "fishtwo/app/jobs/message/internal/svc"
  5. "fmt"
  6. "github.com/tal-tech/go-zero/core/logx"
  7. )
  8. type BatchSendMessageLogic struct {
  9. logx.Logger
  10. ctx context.Context
  11. svcCtx *svc.ServiceContext
  12. }
  13. func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
  14. return BatchSendMessageLogic{
  15. Logger: logx.WithContext(ctx),
  16. ctx: ctx,
  17. svcCtx: svcCtx,
  18. }
  19. }
  20. func (l *BatchSendMessageLogic) BatchSendMessage() error {
  21. fmt.Println("job BatchSendMessage start")
  22. l.svcCtx.Consumer.Consume(func(body []byte) {
  23. fmt.Printf("job BatchSendMessage %s \n" + string(body))
  24. })
  25. fmt.Printf("job BatchSendMessage finish \n")
  26. return nil
  27. }

这样就大功告成了,启动message-job.go就ok课

  1. go run message-job.go

之后我们就可以在业务代码中向dq添加任务,它就可以自动消费了

producer.Delay 向dq中投递5个延迟任务:

  1. producer := dq.NewProducer([]dq.Beanstalk{
  2. {
  3. Endpoint: "localhost:7771",
  4. Tube: "tube1",
  5. },
  6. {
  7. Endpoint: "localhost:7772",
  8. Tube: "tube2",
  9. },
  10. })
  11. for i := 1000; i < 1005; i++ {
  12. _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
  13. if err != nil {
  14. fmt.Println(err)
  15. }
  16. }

producer.At 可以指定某个时间执行,非常好用,感兴趣的朋友自己可以研究下。

错误日志

在前面说到gateway改造时候,如果眼神好的童鞋,在上面的httpresult.go中已经看到了log的身影:

我们在来看下rpc中怎么处理的

是的,我在每个rpc启动的main中加入了grpc拦截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那让我们看看grpc拦截器里面做了什么

然后我代码里面使用github/pkg/errors这个包去处理错误的,这个包还是很好用的

所以呢:

我们在 grpc 中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err)

api 中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)

go-zero 中打印日志,使用logx.WithContext会把trace-id带入,这样一个请求下来,比如

  1. user-api --> user-srv --> message-srv

那如果 messsage-srv 出错,他们三个是同一个 trace-id ,是不是就可以在elk通过输入这个trace-id一次性搜索出来这条请求报错堆栈信息呢?当然你也可以接入 jaeger、zipkin、skywalking 等,这个我暂时还没接入。

框架地址

https://github.com/tal-tech/go-zero

欢迎使用 go-zero 并 star 支持我们!??

go-zero 系列文章见『微服务实践』公众号

原文链接:http://www.cnblogs.com/kevinwan/p/14355293.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号