经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Elasticsearch » 查看文章
Python对ElasticSearch获取数据及操作
来源:jb51  时间:2019/4/24 12:29:27  对本文有异议

使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下

Version

Python :2.7

ElasticSearch:6.3

代码:

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. @Time : 2018/7/4
  5. @Author : LiuXueWen
  6. @Site :
  7. @File : ElasticSearchOperation.py
  8. @Software: PyCharm
  9. @Description: 对elasticsearch数据的操作,包括获取数据,发送数据
  10. """
  11. import elasticsearch
  12. import json
  13.  
  14. import Util_Ini_Operation
  15.  
  16. class elasticsearch_data():
  17. def __init__(self,hosts,username,password,maxsize,is_ssl):
  18. # 初始化ini操作脚本,获取配置文件
  19. try:
  20. # 判断请求方式是否ssl加密
  21. if is_ssl == "true":
  22. # 获取证书地址
  23. cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
  24. es_ssl = elasticsearch.Elasticsearch(
  25. # 地址
  26. hosts=hosts,
  27. # 用户名密码
  28. http_auth=(username,password),
  29. # 开启ssl
  30. use_ssl=True,
  31. # 确认有加密证书
  32. verify_certs=True,
  33. # 对应的加密证书地址
  34. client_cert=cert_pem
  35. )
  36. self.es = es_ssl
  37. elif is_ssl == "false":
  38. # 创建普通类型的ES客户端
  39. es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
  40. self.es = es_ordinary
  41. except Exception as e:
  42. print(e)
  43.  
  44.  
  45. def query_data(self,keywords_list,date):
  46. gte = "now-"+str(date)
  47. query_data = {
  48. # 查询语句
  49. "query": {
  50. "bool": {
  51. "must": [
  52. {
  53. "query_string": {
  54. "query": keywords_list,
  55. "analyze_wildcard": True
  56. }
  57. },
  58. {
  59. "range": {
  60. "@timestamp": {
  61. "gte": gte,
  62. "lte": "now",
  63. "format": "epoch_millis"
  64. }
  65. }
  66. }
  67. ],
  68. "must_not": []
  69. }
  70. }
  71. }
  72. return query_data
  73.  
  74. # 从es获取数据
  75. def get_datas_by_query(self,index_name,keywords,param,date):
  76. '''
  77. :param index_name: 索引名称
  78. :param keywords: 关键字词,数组
  79. :param param: 需要数据条件,例如_source
  80. :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m"
  81. :return: all_datas 返回查询到的所有数据(已经过param过滤)
  82. '''
  83.  
  84. all_datas = []
  85. # 遍历所有的查询条件
  86. for keywords_list in keywords:
  87. # DSL语句
  88. query_data = self.query_data(keywords_list,date)
  89. res = self.es.search(
  90. index=index_name,
  91. body=query_data
  92. )
  93. for hit in res['hits']['hits']:
  94. # 获取指定的内容
  95. response = hit[param]
  96. # 添加所有数据到数据集中
  97. all_datas.append(response)
  98. # 返回所有数据内容
  99. return all_datas
  100.  
  101. # 当索引不存在创建索引
  102. def create_index(self,index_name):
  103. '''
  104. :param index_name: 索引名称
  105. :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称
  106. '''
  107. # 获取索引的映射
  108. # index_mapping = IndexMapping.index_mapping
  109. # # 判断索引是否存在
  110. # if self.es.indices.exists(index=index_name) is not True:
  111. # # 创建索引
  112. # res = self.es.indices.create(index=index_name,body=index_mapping)
  113. # # 返回结果
  114. # return res
  115. # else:
  116. # # 返回索引名称
  117. # return index_name
  118. pass
  119.  
  120. # 插入指定的单条数据内容
  121. def insert_single_data(self,index_name,doc_type,data):
  122. '''
  123. :param index_name: 索引名称
  124. :param doc_type: 文档类型
  125. :param data: 需要插入的数据内容
  126. :return: 执行结果
  127. '''
  128. res = self.es.index(index=index_name,doc_type=doc_type,body=data)
  129. return res
  130.  
  131. # 向ES中新增数据,批量插入
  132. def insert_datas(self,index_name):
  133. '''
  134. :desc 通过读取指定的文件内容获取需要插入的数据集
  135. :param index_name: 索引名称
  136. :return: 插入成功的数据条数
  137. '''
  138. insert_datas = []
  139. # 判断插入数据的索引是否存在
  140. self.createIndex(index_name=index_name)
  141. # 获取插入数据的文件地址
  142. data_file_path = self.ini.get_key_value("datafile","datafilepath")
  143. # 获取需要插入的数据集
  144. with open(data_file_path,"r+") as data_file:
  145. # 获取文件所有数据
  146. data_lines = data_file.readlines()
  147. for data_line in data_lines:
  148. # string to json
  149. data_line = json.loads(data_line)
  150. insert_datas.append(data_line)
  151. # 批量处理
  152. res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
  153. return res
  154.  
  155. # 从ES中在指定的索引中删除指定数据(根据id判断)
  156. def delete_data_by_id(self,index_name,doc_type,id):
  157. '''
  158. :param index_name: 索引名称
  159. :param index_type: 文档类型
  160. :param id: 唯一标识id
  161. :return: 删除结果信息
  162. '''
  163. res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
  164. return res
  165.  
  166. # 根据条件删除数据
  167. def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
  168. '''
  169. :param index_name:索引名称,为空查询所有索引
  170. :param doc_type:文档类型,为空查询所有文档类型
  171. :param param:过滤条件值
  172. :param gt_time:时间范围,大于该时间
  173. :param lt_time:时间范围,小于该时间
  174. :return:执行条件删除后的结果信息
  175. '''
  176. # DSL语句
  177. query_data = {
  178. # 查询语句
  179. "query": {
  180. "bool": {
  181. "must": [
  182. {
  183. "query_string": {
  184. "query": param,
  185. "analyze_wildcard": True
  186. }
  187. },
  188. {
  189. "range": {
  190. "@timestamp": {
  191. "gte": gt_time,
  192. "lte": lt_time,
  193. "format": "epoch_millis"
  194. }
  195. }
  196. }
  197. ],
  198. "must_not": []
  199. }
  200. }
  201. }
  202. res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
  203. return res
  204.  
  205. # 指定index中删除指定时间段内的全部数据
  206. def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
  207. '''
  208. :param index_name:索引名称,为空查询所有索引
  209. :param doc_type:文档类型,为空查询所有文档类型
  210. :param gt_time:时间范围,大于该时间
  211. :param lt_time:时间范围,小于该时间
  212. :return:执行条件删除后的结果信息
  213. '''
  214. # DSL语句
  215. query_data = {
  216. # 查询语句
  217. "query": {
  218. "bool": {
  219. "must": [
  220. {
  221. "match_all": {}
  222. },
  223. {
  224. "range": {
  225. "@timestamp": {
  226. "gte": gt_time,
  227. "lte": lt_time,
  228. "format": "epoch_millis"
  229. }
  230. }
  231. }
  232. ],
  233. "must_not": []
  234. }
  235. }
  236. }
  237. res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)
  238. return res
  239.  
  240. # 修改ES中指定的数据
  241. def update_data_by_id(self,index_name,doc_type,id,data):
  242. '''
  243. :param index_name: 索引名称
  244. :param doc_type: 文档类型,为空表示所有类型
  245. :param id: 文档唯一标识编号
  246. :param data: 更新的数据
  247. :return: 更新结果信息
  248. '''
  249. res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
  250. return res

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持w3xue。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号