经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Go语言 » 查看文章
go 源码学习之---Tail 源码分析
来源:cnblogs  作者:python修行路  时间:2018/9/25 20:30:23  对本文有异议

已经有两个月没有写博客了,也有好几个月没有看go相关的内容了,由于工作原因最近在做java以及大数据相关的内容,导致最近工作较忙,博客停止了更新,正好想捡起之前go的东西,所以找了一个源码学习

这个也是之前用go写日志收集的时候用到的一个包 :github.com/hpcloud/tail, 这次就学习一下人家的源码,为了方便看这个代码,我将这个包进行了简化,也是用于方便理解,代码放到了:https://github.com/pythonsite/tail, 这个代码包可能无法正常用,只是为了方面理解tail这个包,以及学习人家的代码

精简后的代码目录

  1. tail.go
  2. └─watch
  3. filechanges.go
  4. inotify.go
  5. inotify_tracker.go
  6. watch.go

tail.go: 这里包含着tail包的核心代码,主要的逻辑处理时在这个里面

watch: 这个包主要用于对文件的监控,用于将文件的变化通知到tail.如:文件修改了,文件删除了,文件内容追加了

 

tail.go 代码分析

在tail.go中主要有几下几个结构体:

  1. // Line 结构体用于存读每行的时候的对象
  2. type Line struct {
  3. Text string //当前行的内容
  4. Time time.Time // 时间
  5. Err error // Error from tail
  6. }
  7. type SeekInfo struct {
  8. Offset int64
  9. Whence int
  10. }
  11. // 关于配置的结构体
  12. type Config struct {
  13. Location *SeekInfo
  14. ReOpen bool
  15. MustExist bool // 要打开的文件是否必须存在
  16. Poll bool
  17. Pipe bool
  18. Follow bool // 是否继续读取新的一行,可以理解为tail -f 命令
  19. }
  20. // 核心的结构体Tail
  21. type Tail struct {
  22. Filename string // 要打开的文件名
  23. Lines chan *Line // 用于存每行内容的Line结构体
  24. Config
  25. watcher watch.FileWatcher
  26. changes *watch.FileChanges
  27. tomb.Tomb
  28. file *os.File
  29. reader *bufio.Reader
  30. lk sync.Mutex
  31. }
Line 结构体用于存读取文件的每行内容
Tail 是核心的结构体,我们使用tail这个包的时候其实就是会先调用初始化这个struct的方法TailFile,如我在写日志收集的时候的使用:
  1. tail,err := tail.TailFile(conf.LogPath,tail.Config{
  2. ReOpen:true,
  3. Follow:true,
  4. Location:&tail.SeekInfo{Offset:0,Whence:2},
  5. MustExist:false,
  6. Poll:true,
  7. })

既然我们使用的时候就会在最开始的时候调用tail.TailFile方法,就直接看这个方法:

  1. // 主要用于Tail结构体的初始化
  2. func TailFile(filename string, config Config) (*Tail, error) {
  3. t := &Tail {
  4. Filename: filename,
  5. Lines: make(chan *Line),
  6. Config: config,
  7. }
  8. t.watcher = watch.NewInotifyFileWatcher(filename)
  9. if t.MustExist {
  10. var err error
  11. t.file, err = OpenFile(t.Filename)
  12. if err != nil {
  13. return nil, err
  14. }
  15. }
  16. go t.tailFileSync()
  17. return t, nil
  18. }

从这个代码里我们就可以看到它首先初始化了Tail结构体并且对Tail中的watcher进行的复制,先暂时不看watch相关的内容

然后就是关于文件是否必须存在的判断处理,最后开启了一个一个线程执行tailFileSync()方法,我们接着看tailFileSync方法

  1. func (tail *Tail) tailFileSync(){
  2. defer tail.Done()
  3. defer tail.close()
  4. if !tail.MustExist {
  5. err := tail.reopen()
  6. if err != nil {
  7. if err != tomb.ErrDying {
  8. tail.Kill(err)
  9. }
  10. return
  11. }
  12. }
  13. tail.openReader()
  14. var offset int64
  15. var err error
  16. // 一行行读文件内容
  17. for {
  18. if !tail.Pipe {
  19. offset,err = tail.Tell()
  20. if err != nil {
  21. tail.Kill(err)
  22. return
  23. }
  24. }
  25. line, err := tail.readLine()
  26. if err == nil {
  27. // 将读取的一行内容放到chan中
  28. tail.sendLine(line)
  29. } else if err == io.EOF {
  30. // 表示读到文件的最后了
  31. // 如果Follow 设置为false的话就不会继续读文件
  32. if !tail.Follow {
  33. if line != "" {
  34. tail.sendLine(line)
  35. }
  36. return
  37. }
  38. // 如果Follow设置为True则会继续读
  39. if tail.Follow && line != "" {
  40. err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0})
  41. if err != nil {
  42. tail.Kill(err)
  43. return
  44. }
  45. }
  46. // 如果读到文件最后,文件并没有新的内容增加
  47. err := tail.waitForChanges()
  48. if err != nil {
  49. if err != ErrStop {
  50. tail.Kill(err)
  51. }
  52. return
  53. }
  54. } else {
  55. // 既不是文件结尾,也没有error
  56. tail.Killf("error reading %s :%s", tail.Filename, err)
  57. return
  58. }
  59. select {
  60. case <- tail.Dying():
  61. if tail.Err() == errStopAtEOF {
  62. continue
  63. }
  64. return
  65. default:
  66. }
  67. }
  68. }

这个方法里主要是先调用了openReader方法,这个方法其实并没有做什么,只是对tail.reqader进行了赋值:tail.reader = bufio.NewReader(tail.file)

接着就是循环一行行的读文件

在循环里最开始判断了tail.Pipe的值,这个值一般开始我也并不会设置,所以默认就是false,所以就会执行tail.Tell()方法,这个方法主要是用于获取文件当前行的位置信息,下面是Tell的代码内容:

  1. // 获取文件当前行的位置信息
  2. func (tail *Tail) Tell()(offset int64, err error) {
  3. if tail.file == nil {
  4. return
  5. }
  6. offset, err = tail.file.Seek(0, os.SEEK_CUR)
  7. if err != nil {
  8. return
  9. }
  10. tail.lk.Lock()
  11. defer tail.lk.Unlock()
  12. if tail.reader == nil {
  13. return
  14. }
  15. offset -= int64(tail.reader.Buffered())
  16. return
  17. }

接着会调用tail.readLine()方法,这个方法就是用于获取文件的一行内容,同时将一行内容实例化为Line对象,然后扔到管道tail.Lines中

  1. //将读取的文件的每行内容存入到Line结构体中,并最终存入到tail.Lines的chan中
  2. func (tail *Tail) sendLine(line string) bool {
  3. now := time.Now()
  4. lines := []string{line}
  5. for _, line := range lines {
  6. tail.Lines <- &Line {
  7. line,
  8. now,
  9. nil,
  10. }
  11. }
  12. return true
  13. }

最后的大量if 判断其实主要是针对读到文件末尾后的一些操作,

Tail结构体在最后定义的时候有一个参数:Follow, 这个参数的目的就是当读到文件最后的时候是否继续读文件, 如果最开始设置了false,那么读到最后之后就不会在读文件了

如果设置为True,那么读到文件最后之后会保存文件的位置信息,并执行waitForChanges() 去等待文件的变化,waitForChanges()代码内容如下:

  1. // 等待文件的变化事件
  2. func (tail *Tail) waitForChanges() error {
  3. if tail.changes == nil {
  4. // 这里是获取文件指针的当前位置
  5. pos, err := tail.file.Seek(0,os.SEEK_CUR)
  6. if err != nil {
  7. return err
  8. }
  9. tail.changes, err = tail.watcher.ChangeEvents(&tail.Tomb, pos)
  10. if err != nil {
  11. return err
  12. }
  13. }
  14. // 和inotify中进行很巧妙的配合,这里通过select 来进行查看那个chan变化了,来知道文件的变化
  15. select {
  16. case <- tail.changes.Modified: // 文件被修改
  17. return nil
  18. case <- tail.changes.Deleted: // 文件被删除或者移动到其他目录
  19. tail.changes = nil
  20. // 如果文件被删除或者被移动到其他目录,则会尝试重新打开文件
  21. if tail.ReOpen {
  22. fmt.Printf("Re-opening moved/deleted file %s...",tail.Filename)
  23. if err := tail.reopen();err != nil {
  24. return err
  25. }
  26. fmt.Printf("Successfully reopened %s", tail.Filename)
  27. tail.openReader()
  28. return nil
  29. } else {
  30. fmt.Printf("Stoping tail as file not longer exists: %s", tail.Filename)
  31. return ErrStop
  32. }
  33. case <- tail.changes.Truncated: // 文件被追加新的内容
  34. fmt.Printf("Re-opening truncated file %s....", tail.Filename)
  35. if err := tail.reopen();err != nil {
  36. return err
  37. }
  38. fmt.Printf("SuccessFuly reopend truncated %s", tail.Filename)
  39. tail.openReader()
  40. return nil
  41. case <- tail.Dying():
  42. return nil
  43. }
  44. panic("unreachable")
  45. }

看到这里的时候其实就能感觉到,别人写的代码其实也并不是非常复杂,也是很普通的代码,但是你会觉得人家很多地方用的非常巧妙,

这段代码中主要的是的内容就是select部分,这个部分通过select监控

tail.changes.Modified
tail.changes.Deleted
tail.changes.Truncated

从而知道文件的变化,是修改了,还是删除了,还是追加内容了,这几个其实都是一个channel,这几个channel中的内容是怎么放进去的呢,接下来看watch包中的内容

watch包代码分析

首先先看一下watch包中的watch.go,这个里面其实就是定一个了一个FileWatcher的接口

  1. type FileWatcher interface {
  2. BlockUntilExists(*tomb.Tomb) error
  3. ChangeEvents(*tomb.Tomb, int64) (*FileChanges, error)
  4. }

接着我们看一下inotify.go文件,这个里面我们就可以看到定一个InotifyFileWatcher结构体,并且实现了FileWatcher 这个接口

  1. type InotifyFileWatcher struct {
  2. Filename string
  3. Size int64
  4. }
  5. func NewInotifyFileWatcher(filename string) *InotifyFileWatcher {
  6. fw := &InotifyFileWatcher {
  7. filepath.Clean(filename),
  8. 0,
  9. }
  10. return fw
  11. }
  12. // 关于文件改变事件的处理,当文件被修改了或者文件内容被追加了,进行通知
  13. func (fw *InotifyFileWatcher) ChangeEvents(t *tomb.Tomb, pos int64) (*FileChanges, error) {
  14. err := Watch(fw.Filename)
  15. if err != nil {
  16. return nil, err
  17. }
  18. changes := NewFileChanges()
  19. fw.Size = pos
  20. go func() {
  21. events := Events(fw.Filename)
  22. for {
  23. prevSize := fw.Size
  24. var evt fsnotify.Event
  25. var ok bool
  26. select {
  27. case evt, ok = <- events:
  28. if !ok {
  29. RemoveWatch(fw.Filename)
  30. return
  31. }
  32. case <- t.Dying():
  33. RemoveWatch(fw.Filename)
  34. return
  35. }
  36. switch {
  37. case evt.Op & fsnotify.Remove == fsnotify.Remove:
  38. fallthrough
  39. case evt.Op & fsnotify.Rename == fsnotify.Rename:
  40. RemoveWatch(fw.Filename)
  41. changes.NotifyDeleted()
  42. return
  43.  
  44. case evt.Op & fsnotify.Chmod == fsnotify.Chmod:
  45. fallthrough
  46. case evt.Op & fsnotify.Write == fsnotify.Write:
  47. fi, err := os.Stat(fw.Filename)
  48. if err != nil {
  49. // 文件如果被删除了通知文件删除到chan
  50. if os.IsNotExist(err) {
  51. RemoveWatch(fw.Filename)
  52. changes.NotifyDeleted()
  53. return
  54. }
  55. }
  56. fw.Size = fi.Size()
  57. if prevSize > 0 && prevSize > fw.Size {
  58. // 表示文件内容增加了
  59. changes.NotifyTruncated()
  60. } else {
  61. // 表示文件被修改了
  62. changes.NotifyModified()
  63. }
  64. prevSize = fw.Size
  65. }
  66. }
  67. }()
  68. return changes, nil
  69. }
  70. func (fw *InotifyFileWatcher) BlockUntilExists(t *tomb.Tomb) error {
  71. err := WatchCreate(fw.Filename)
  72. if err != nil {
  73. return err
  74. }
  75. defer RemoveWatchCreate(fw.Filename)
  76. if _, err := os.Stat(fw.Filename);!os.IsNotExist(err) {
  77. return err
  78. }
  79. events := Events(fw.Filename)
  80. for {
  81. select {
  82. case evt, ok := <- events:
  83. if !ok {
  84. return fmt.Errorf("inotify watcher has been closed")
  85. }
  86. evtName, err := filepath.Abs(evt.Name)
  87. if err != nil {
  88. return err
  89. }
  90. fwFilename, err := filepath.Abs(fw.Filename)
  91. if err != nil {
  92. return err
  93. }
  94. if evtName == fwFilename {
  95. return nil
  96. }
  97. case <- t.Dying():
  98. return tomb.ErrDying
  99. }
  100. }
  101. panic("unreachable")
  102. }

实现的接口就两个方法:

ChangeEvents: 这个主要是监控文件的变化,是删除了,还是被修改了,或者是文件,然后将状态信息通过调用:changes.NotifyTruncated()或者
changes.NotifyDeleted() 或者changes.NotifyModified() 将状态信息更新到channel中,这样我们在分析tail.go 中最后的分析的那部分channel中的数据,就是在这里
放进去的
BlockUntilExists:这个主要是关于文件不存在的时候,如果最开始的时候可以允许文件不存在,那么就会 在这里通过for循环一直等待,知道文件存在
 
再看看filechanges.go 文件,代码内容如下:
  1. type FileChanges struct {
  2. Modified chan bool // 修改
  3. Truncated chan bool // 增加
  4. Deleted chan bool // 删除
  5. }
  6. func NewFileChanges() *FileChanges {
  7. return &FileChanges{
  8. make(chan bool, 1),
  9. make(chan bool, 1),
  10. make(chan bool, 1),
  11. }
  12. }
  13. func (fc *FileChanges) NotifyModified() {
  14. sendOnlyIfEmpty(fc.Modified)
  15. }
  16. func (fc *FileChanges) NotifyTruncated() {
  17. sendOnlyIfEmpty(fc.Truncated)
  18. }
  19. func (fc *FileChanges) NotifyDeleted() {
  20. sendOnlyIfEmpty(fc.Deleted)
  21. }
  22. func sendOnlyIfEmpty(ch chan bool) {
  23. select {
  24. case ch <- true:
  25. default:
  26. }
  27. }

在这个里面也是可以学习到人家写的这个地方非常巧妙,虽然谈不上代码高达上,但是看着会让你很舒服,通过这个结构体,当文件被删除,修改和增加的时候就会让对应的channel中插入一个true,并且这里

的channel都是不带缓冲区的,只有当tail中触发一次之后,channel中的内容就会被获取出来,从而触发tail继续读文件的内容

 

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号