经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » MySQL » 查看文章
通过Python将监控数据由influxdb写入到MySQL
来源:cnblogs  作者:东山絮柳仔  时间:2021/5/17 9:17:19  对本文有异议

一.项目背景

我们知道InfluxDB是最受欢迎的时序数据库(TSDB)。InfluxDB具有 持续高并发写入、无更新;数据压缩存储;低查询延时 的特点。从下面这个权威的统计图中,就可以看出InfluxDB的热度。

 InfluxDB可以作为 性能监控、应用程序指标、物联网传感器数据和实时分析等的后端存储。

我们的DB性能监控体系是基于Telegraf+InfluxDB+Grafana组件搭建,如下图所示。

 但是这个体系没有和既有的CMDB打通,例如,不清楚公司目前有多少台数据库实例已部署了监控?是不是有部分实例的监控漏掉了?而目前公司CMDB的信息都保存在了MySQL数据库中,所以,需要先实现 Influxdb 与 MySQL DB 的数据互通互联 。此功能的实现时借助Python完成的。

在此项目中,为便于说明演示,抽象简化后,需求概况为:将InfluxDB中保存的各个服务器的IP查询出来保存到指定的MySQL数据库中。进一步分解任务,因为measurement(表)为disk 存储有 Server host的数据,根据其命名规则,可host逆向拼凑出Server IP数据。

所以,此需求简化为:从InfluxDB的disk【measurement、表】中找出host【tag】对应的value,加工处理后,保存到MySQL。

二.安装运行环境遇到的错误

1.TypeError: Struct() 错误

调试时,报如下错误,查找资料发现,和python版本有关。

错误信息如下:

  1. ......
  2. File "/usr/lib/python2.7/site-packages/influxdb/client.py", line 527, in query
  3. expected_response_code=expected_response_code
  4. File "/usr/lib/python2.7/site-packages/influxdb/client.py", line 361, in request
  5. raw=False)
  6. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 129, in unpackb
  7. ret = unpacker._unpack()
  8. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 671, in _unpack
  9. ret[key] = self._unpack(EX_CONSTRUCT)
  10. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 644, in _unpack
  11. ret.append(self._unpack(EX_CONSTRUCT))
  12. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 671, in _unpack
  13. ret[key] = self._unpack(EX_CONSTRUCT)
  14. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 644, in _unpack
  15. ret.append(self._unpack(EX_CONSTRUCT))
  16. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 671, in _unpack
  17. ret[key] = self._unpack(EX_CONSTRUCT)
  18. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 644, in _unpack
  19. ret.append(self._unpack(EX_CONSTRUCT))
  20. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 644, in _unpack
  21. ret.append(self._unpack(EX_CONSTRUCT))
  22. File "/usr/lib/python2.7/site-packages/msgpack/fallback.py", line 697, in _unpack
  23. return self._ext_hook(n, bytes(obj))
  24. File "/usr/lib/python2.7/site-packages/influxdb/client.py", line 1247, in _msgpack_parse_hook
  25. (epoch_s, epoch_ns) = struct.unpack(">QI", data)
  26. TypeError: Struct() argument 1 must be string, not unicode

报错的python版本为Python 2.7.5,查看资料,建议升级到2.7.7以上。为规避这个错误,我们将版本升级到了Python 3.6.8

2.升级安装Python 3.6.8

安装执行make install时报错,错误信息如下:

  1. zipimport.ZipImportError: cant decompress data; zlib not available
  2. make: * [install] Error 1

原因是缺少了zlib的解压缩类库,

解决方案,执行以下命令

  1. yum -y install zlib*

 3.引入influxdb插件报错

运行报错,提示信息如下:

  1. ..........
  2. from influxdb import InfluxDBClient
  3. ModuleNotFoundError: No module named 'influxdb'

解决方案:

  1. git clone https://github.com/influxdb/influxdb-python.git
  2. cd influxdb-python
  3. pip3 install -r requirements.txt
  4. python3 setup.py install

成功安装的记录如下:

  1. 。。。。。。。。。。。。
  2. Using /usr/local/lib/python3.6/site-packages
  3. Searching for urllib3==1.25.6
  4. Best match: urllib3 1.25.6
  5. Adding urllib3 1.25.6 to easy-install.pth file
  6. Using /usr/lib/python3.6/site-packages
  7. Finished processing dependencies for influxdb==5.3.1

验证是否成功安装,打开python输入

  1. from influxdb import client as influxdb

如果没有错误信息,则表示安装成功

4.Python3 环境执行mysql报错

  1. ...
  2. import MySQLdb
  3. ModuleNotFoundError: No module named 'MySQLdb'

环境测试

原因分析: Python 2安装的是mysql-python,而Python 3应该安装mysqlclient。

因此需要:

pip3 install mysqlclient

但是报错,错误信息如下:

  1. Traceback (most recent call last):
  2. File "/usr/local/bin/pip3", line 11, in <module>
  3. load_entry_point('pip==1.5.4', 'console_scripts', 'pip3')()
  4. File "/usr/local/lib/python3.6/site-packages/setuptools-39.2.0-py3.6.egg/pkg_resources/__init__.py", line 476, in load_entry_point
  5. File "/usr/local/lib/python3.6/site-packages/setuptools-39.2.0-py3.6.egg/pkg_resources/__init__.py", line 2700, in load_entry_point
  6. File "/usr/local/lib/python3.6/site-packages/setuptools-39.2.0-py3.6.egg/pkg_resources/__init__.py", line 2318, in load
  7. File "/usr/local/lib/python3.6/site-packages/setuptools-39.2.0-py3.6.egg/pkg_resources/__init__.py", line 2324, in resolve
  8. File "/usr/local/lib/python3.6/site-packages/pip-1.5.4-py3.6.egg/pip/__init__.py", line 9, in <module>
  9. from pip.log import logger
  10. File "/usr/local/lib/python3.6/site-packages/pip-1.5.4-py3.6.egg/pip/log.py", line 9, in <module>
  11. from pip._vendor import colorama, pkg_resources
  12. File "/usr/local/lib/python3.6/site-packages/pip-1.5.4-py3.6.egg/pip/_vendor/pkg_resources.py", line 1423, in <module>
  13. register_loader_type(importlib_bootstrap.SourceFileLoader, DefaultProvider)
  14. AttributeError: module 'importlib._bootstrap' has no attribute 'SourceFileLoader'

原因分析:

pip-1.5.4,远低于pip目前的版本,

解决方案:

下载新的版本安装更新pip,下载网址https://pypi.org/project/pip/#files

例如下载了pip-21.1.1.tar.gz,安装步骤如下:

  1. step 1
    tar
    -zxvf pip-21.1.1.tar.gz
    step 2
    cd pip-21.1.1
    step 3
    python3 setup.py build
    step 4
    python3 setup.py install

再次安装 mysqlclient

  1. pip3 install mysqlclient

安装过程不再报错,验证安装OK。

 三.部分代码说明

1.对象类型及属性查看--print(type(?))和print(dir(?))

因为我们平常对influxdb使用的相对较少,不像关系型数据库那么熟练,通过python查看influxdb数据,比较陌生,不知道返回值对象的类型是什么或者怎么操作。这时候可以通过print(type(?)) 和print(dir(?))来查看。

如下图,假如response是influxdb的query返回值。

 print显示的返回信息如下:

注意 上面有一个 get_points 方法,不知道你找到了吗? 这个需要特别注意,后面我们就会讲到。

2. Getting all points

Using query() that returns data in 'influxdb.resultset.ResultSet' format.This is the sample output of the query():

  1. Result: ResultSet({'(u'cpu', None)': [{u'usage_guest_nice': 0, u'usage_user': 0.90783871790308868, u'usage_nice': 0, u'usage_steal': 0, u'usage_iowait': 0.056348610076366427, u'host': u'xxx.xxx.hostname.com', u'usage_guest': 0, u'usage_idle': 98.184322579062794, u'usage_softirq': 0.0062609566755314457, u'time': u'2016-06-26T16:25:00Z', u'usage_irq': 0, u'cpu': u'cpu-total', u'usage_system': 0.84522915123660536}]})

Using rs.get_points() will return a generator for all the points in the ResultSet.

you can Filtering by measurement

  1. rs = cli.query("SELECT * from cpu")
  2. cpu_points = list(rs.get_points(measurement='cpu'))

or you can Filtering by tags

  1. rs = cli.query("SELECT * from cpu")
  2. cpu_influxdb_com_points = list(rs.get_points(tags={"host_name": "influxdb.com"}))

or you can Filtering by measurement and tags

  1. rs = cli.query("SELECT * from cpu")
  2. points = list(rs.get_points(measurement='cpu', tags={'host_name': 'influxdb.com'}))

3.telegraf模板中关于host的命名

我们知道telegraf 模板中有host参数(默认在/etc/telegraf.conf设置),在grafana界面上可以根据这个host参数进行刷选,进一步定位到想要查看的 Server 或 DB 实例。因为公司有多个项目组,每个项目组负责不同的系统,有各自的DB Server 、实例。为了区分这个Server究竟属于那个项目组(Team),所以,我们在定义Host时,不是简单的赋值Server IP,而是 产品线 + Server IP的后两位。如此,也方便 监控、研发、运维的同学快速找到Server,判断相应的业务项目组。

例如 订单中心 所在的 DB Server 为 18.19.20.21 其host为 order_20_21;CRM 所在的 DB Server 为 18.19.22.23 其host为crm_22_23;ERP所在的DB Server 为 18.19.24.25其host为erp_24_25;app所在的DB Server 为 18.19.34.35其host为app_34_35;等等。

四 主要代码

 1.在MySQL实例上创建保存Server信息的表

  1. CREATE TABLE `monitor_serverdb` (
  2. `id` int(11) NOT NULL AUTO_INCREMENT,
  3. `ip_address` varchar(255) NOT NULL DEFAULT '',
  4. `datetime_created` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据行创建时间',
  5. PRIMARY KEY (`id`)
  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2.连接MySQL的python文件db_conn.py

代码如下:

  1. #!/usr/bin/python3
  2. # -*- coding: UTF-8 -*-
  3.  
  4. import MySQLdb
  5. # 打开数据库连接
  6. db = MySQLdb.connect("server DB实例IP","DB 用户名","DB PWD","DB Name",charset='utf8mb4',port=数据库端口号)

3.连接InfluxDB的python文件collect_dbhost_telegraf_info.py

主要代码:

  1. #!/usr/bin/python
  2. # -*- coding: UTF-8 -*-
  3.  
  4.  
  5.  
  6. from influxdb import InfluxDBClient
  7. import pytz
  8. import time
  9. import dateutil.parser
  10. import datetime
  11. class DBApi(object):
  12. """
  13. 通过infludb获取数据
  14. """
  15.  
  16. def __init__(self, ip, port):
  17. """
  18. 初始化数据
  19. :param ip:influxdb地址
  20. :param port: 端口
  21. """
  22. self.db_name = 'telegraf'
  23. self.use_cpu_table = 'cpu' # cpu使用率表
  24. self.phy_mem_table = 'mem'# 物理内存表
  25. self.traffic_table = 'net'# 接收流量表
  26. self.disk_table = 'disk'# 磁盘表
  27. self.client = InfluxDBClient(ip, port, 'DB UID', 'DB PWD', self.db_name) # 连接influxdb数据库
  28. print ('test link influxdb')
  29. def get_telegraf_list(self):
  30. """
  31. :param host: 查询的主机host (telegraf 配置参数中的host栏位)
  32. """
  33.  
  34. print ('step 1 check get deployment')
  35. response = {}
  36. telegraf_list = self.client.query('SHOW TAG VALUES FROM disk WITH KEY = "host";')
  37. return telegraf_list

4.执行文件collect_monitordb_info.py

  1. #!/usr/bin/python
  2. # -*- coding: UTF-8 -*-
  3.  
  4.  
  5. import os
  6. import time
  7. from collect_dbhost_telegraf_info.py import DBApi
  8. ## get mysqldb connection
  9. import db_conn
  10. mysqldb = db_conn.db
  11. # use cursor
  12. cursor = mysqldb.cursor()
  13. ###数据收集前,清除之前收集的数据
  14. sql_delete = "delete from monitor_serverdb "
  15. cursor.execute(sql_delete)
  16. mysqldb.commit()
  17. # 连接 influxdb
  18. # INFLUXDB_IP influxdb所在主机
  19. # INFLUXDB_PROT influxdb端口
  20. db = DBApi(ip='influxdb 所在主机', port='端口号')
  21. ###print(db)
  22. response = db.get_telegraf_list()
  23. #print (response)
  24. #print(type(response))
  25. #print(dir(response))
  26. disk_points = list(response.get_points(measurement='disk'))
  27. #print(disk_points)
  28. #print(dir(disk_points))
  29. for disk_check in disk_points:
  30. ##print(disk_check)
  31. for host_key in disk_check.keys():
  32. if host_key == "value":
  33. ##print(disk_check[host_key])
  34. ##基于host的命名进行切割,分割符为_,返回值为列表
  35. diskhost_split = disk_check[host_key].split('_')
  36. ##将列表中的后两个元素提取出来,组成server IP,因为集团IP前两位一样,所以如此拼凑。
  37. ##print(type(diskhost_split))
  38. ##print(diskhost_split)
  39. ##print(diskhost_split[-2:-3:-1][0])
  40. disk_ip = '110.' + '120.' + diskhost_split[-2:-3:-1][0] + '.' + diskhost_split[-1:-2:-1][0]
  41. print(disk_ip)
  42. sql_insert = "insert into monitor_serverdb(ip_address) " "values('%s')" % (disk_ip)
  43. cursor.execute(sql_insert)
  44. mysqldb.commit()

5.执行命令如下

  1. python3 collect_monitordb_info.py

 五.参考资料

1.https://stackoverflow.com/questions/38040644/processing-influx-db-output-of-influxdb-resultset-resultset/38055771

2.python 获取指定字符前面或后面的所有字符

https://www.cnblogs.com/syw20170419/p/10969191.html

3.https://www.cnblogs.com/jadexia/p/7797791.html

4.https://blog.csdn.net/Linking_sea/article/details/112690038

5.InfluxDB 入门

https://www.jianshu.com/p/f0905f36e9c3

6.https://grafana.com/grafana/

7.https://github.com/dbarun/mysql_archiver#readme

原文链接:http://www.cnblogs.com/xuliuzai/p/14721413.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号