承接上篇:上篇文章讲到改造 go-zero 生成的 app module 中的 gateway & RPC 。本篇讲讲如何接入 异步任务 以及 log的使用。
Delay Job
日常任务开放中,我们会有很多异步、批量、定时、延迟任务要处理,go-zero中有 go-queue,推荐使用 go-queue 去处理,go-queue 本身也是基于 go-zero 开发的,其本身是有两种模式:
- dq : 依赖于-  beanstalkd,分布式,可存储,延迟、定时设置,关机重启可以重新执行,消息会丢失,使用非常简单,go-queue中使用了redis setnx保证了每个消息只被消费一次,使用场景主要是用来做日常任务使用
- kq:依赖于- kafka,这个就不多介绍啦,大名鼎鼎的- kafka,使用场景主要是做日志用
我们主要说一下dq,kq使用也一样的,只是依赖底层不同,如果没使用过beanstalkd,没接触过beanstalkd的可以先google一下,使用起来还是挺容易的。
我在jobs下使用goctl新建了一个message-job.api服务
- info(
- 	title: //消息任务
- 	desc: // 消息任务
- 	author: "Mikael"
- 	email: "13247629622@163.com"
- )
- type BatchSendMessageReq {}
- type BatchSendMessageResp {}
- service message-job-api {
- 	@handler batchSendMessageHandler // 批量发送短信
- 	post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)
- }
因为不需要使用路由,所以handler下的routes.go被我删除了,在handler下新建了一个jobRun.go,内容如下:
- package handler
- import (
- 	"fishtwo/lib/xgo"
- 	"fishtwo/app/jobs/message/internal/svc"
- )
- /**
- * @Description 启动job
- * @Author Mikael
- * @Date 2021/1/18 12:05
- * @Version 1.0
- **/
- func JobRun(serverCtx *svc.ServiceContext)  {
- 	xgo.Go(func() {
- 		batchSendMessageHandler(serverCtx)
-     //...many job
- 	})
- }
其实xgo.Go就是 go batchSendMessageHandler(serverCtx) ,封装了一下go携程,防止野生goroutine panic
然后修改一下启动文件message-job.go
- package main
- import (
-    "flag"
-    "fmt"
-    "fishtwo/app/jobs/message/internal/config"
-    "fishtwo/app/jobs/message/internal/handler"
-    "fishtwo/app/jobs/message/internal/svc"
-    "github.com/tal-tech/go-zero/core/conf"
-    "github.com/tal-tech/go-zero/rest"
- )
- var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")
- func main() {
-    flag.Parse()
-    var c config.Config
-    conf.MustLoad(*configFile, &c)
-    ctx := svc.NewServiceContext(c)
-    server := rest.MustNewServer(c.RestConf)
-    defer server.Stop()
-    handler.JobRun(ctx)
-    fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
-    server.Start()
- }
主要是handler.RegisterHandlers(server, ctx) 修改为handler.JobRun(ctx)
接下来,我们就可以引入dq了,首先在etc/xxx.yaml下添加dqConf
- .....
- DqConf:
-   Beanstalks:
-     - Endpoint: 127.0.0.1:7771
-       Tube: tube1
-     - Endpoint: 127.0.0.1:7772
-       Tube: tube2
-   Redis:
-     Host: 127.0.0.1:6379
-     Type: node
我这里本地用不同端口,模拟开了2个节点,7771、7772
在internal/config/config.go添加配置解析对象
- type Config struct {
- 	....
- 	DqConf dq.DqConf
- }
修改handler/batchsendmessagehandler.go
- package handler
- import (
- 	"context"
- 	"fishtwo/app/jobs/message/internal/logic"
- 	"fishtwo/app/jobs/message/internal/svc"
- 	"github.com/tal-tech/go-zero/core/logx"
- )
- func batchSendMessageHandler(ctx *svc.ServiceContext){
- 	rootCxt:= context.Background()
- 	l := logic.NewBatchSendMessageLogic(context.Background(), ctx)
- 	err := l.BatchSendMessage()
- 	if err != nil{
- 		logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err)
- 	}
- }
修改logic下batchsendmessagelogic.go,写我们的consumer消费逻辑
- package logic
- import (
-    "context"
-    "fishtwo/app/jobs/message/internal/svc"
-    "fmt"
-    "github.com/tal-tech/go-zero/core/logx"
- )
- type BatchSendMessageLogic struct {
-    logx.Logger
-    ctx    context.Context
-    svcCtx *svc.ServiceContext
- }
- func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic {
-    return BatchSendMessageLogic{
-    	Logger: logx.WithContext(ctx),
-    	ctx:    ctx,
-    	svcCtx: svcCtx,
-    }
- }
- func (l *BatchSendMessageLogic) BatchSendMessage() error {
-    fmt.Println("job BatchSendMessage start")
-    l.svcCtx.Consumer.Consume(func(body []byte) {
-    	fmt.Printf("job BatchSendMessage %s \n" + string(body))
-    })
-    fmt.Printf("job BatchSendMessage finish \n")
-    return nil
- }
这样就大功告成了,启动message-job.go就ok课
- go run message-job.go
之后我们就可以在业务代码中向dq添加任务,它就可以自动消费了
producer.Delay 向dq中投递5个延迟任务:
- 	producer := dq.NewProducer([]dq.Beanstalk{
- 		{
- 			Endpoint: "localhost:7771",
- 			Tube:     "tube1",
- 		},
- 		{
- 			Endpoint: "localhost:7772",
- 			Tube:     "tube2",
- 		},
- 	})
- 	for i := 1000; i < 1005; i++ {
- 		_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
- 		if err != nil {
- 			fmt.Println(err)
- 		}
- 	}
producer.At 可以指定某个时间执行,非常好用,感兴趣的朋友自己可以研究下。
错误日志
在前面说到gateway改造时候,如果眼神好的童鞋,在上面的httpresult.go中已经看到了log的身影:

我们在来看下rpc中怎么处理的

是的,我在每个rpc启动的main中加入了grpc拦截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那让我们看看grpc拦截器里面做了什么

然后我代码里面使用github/pkg/errors这个包去处理错误的,这个包还是很好用的


所以呢:
我们在 grpc 中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err);
在api 中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)
go-zero 中打印日志,使用logx.WithContext会把trace-id带入,这样一个请求下来,比如
- user-api --> user-srv --> message-srv
那如果 messsage-srv 出错,他们三个是同一个 trace-id ,是不是就可以在elk通过输入这个trace-id一次性搜索出来这条请求报错堆栈信息呢?当然你也可以接入 jaeger、zipkin、skywalking 等,这个我暂时还没接入。
框架地址
https://github.com/tal-tech/go-zero
欢迎使用 go-zero 并 star 支持我们!??
go-zero 系列文章见『微服务实践』公众号