经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 其他 » 网络安全 » 查看文章
redis未授权访问批量验证脚本编写[python]
来源:cnblogs  作者:Feather·Lee  时间:2019/3/4 9:14:51  对本文有异议

漏洞简介

简单来说,redis是一个数据库。在默认的配置下,redis绑定在0.0.0.0:6379,也就是说,如果服务器有公网ip,可以通过访问其公网ip的6379端口来操作redis。最为致命的是,redis默认是没有密码验证的,可以免密码登录操作,攻击者可以通过操作redis进一步控制服务器。

漏洞的危害

  1. 无密码验证登录redis后,可读取、删除、更改数据
  2. 攻击者可以通过redis读写文件,植入后门
  3. 如果redis以root权限运行,攻击者可以写入ssh公钥文件,然后即可远程ssh登录服务器
    ...

漏洞修复

修复方案大概有以下几种:

  1. 把redis绑定在127.0.0.1即本地上
  2. 配置登录验证
  3. 防火墙设置白名单,拒绝不信任的连接
    ...

本文主要讲解验证脚本的编写,故不再过多阐述漏洞原理、利用等细节。

验证方式

登录redis后,执行info命令,可以获得类似下面的信息:

  1. # Server
  2. redis_version:5.0.3
  3. redis_git_sha1:00000000
  4. redis_git_dirty:0
  5. redis_build_id:68e47d9309ff01ae
  6. redis_mode:standalone
  7. ...

如果登录失败,是不可以执行命令的,所以我们可以向目标ip的6379(redis默认端口)发起连接,发送info命令,只要得到的响应中存在上面信息中的某些独特的字符串,如redis_version,我们就认为目标存在redis未授权访问漏洞。

代码如下:

  1. ...
  2. sock = socket.socket() # 创建套接字
  3. try:
  4. sock.connect((ip, 6379)) # 连接
  5. sock.send(payload) # 发送info命令
  6. response = sock.recv(1024).decode() # 接收响应数据
  7. if 'redis_version' in response:
  8. result = True # 存在漏洞
  9. else:
  10. result = False # 不存在漏洞
  11. except (socket.error, socket.timeout):
  12. # 连接失败,可能端口6379未开放,或者被拦截,此时认为漏洞不存在
  13. result = False
  14. ...

好了,现在的关键就在:如何发送info命令?

python有操作redis的第三方库,可以很方便的操作redis。然而,我们并不使用这些第三方库,归根结底,发送info命令其实是发送了一个可以让redis服务识别的特定的数据而已,只要我们知道这个数据是什么,我们就可以使用info命令了。

下面我们就来分析,redis是如何发送info命令的。

截获info命令

我们需要搭建一个redis环境,使用抓包工具来截获使用info命令时redis发送的数据,为了方便,我使用了linux系统的命令netcat、tee来充当抓包工具,读者可以自己在linux系统下搭建redis环境尝试。

我们使用netcat连接到本地的redis服务,然后使用另一个netcat进程监听127.0.0.1:9000,将接受的连接发来的数据,重定向至连接到redis服务的netcat进程的输入,即可完成连接的转发,我们在这两个netcat经常之间,使用tee来截获数据,流程大致如下:

流程

在实际的操作中,我们还需要使用命名管道来实现双向通信,否则客户端无法接受到redis的登录响应就会阻塞,无法发生命令。

具体操作如下:

  1. 启动redis服务
  2. 创建两个管道文件:pipe1、pipe2
    shell $ mkfifo pipe1 $ mkfifo pipe2
  3. 启动一个netcat进程监听在本地的9000端口上,以pipe1作为输入,输出重定向到tee进程,tee进程负责将数据输出到pipe2和屏幕上:
    shell $ ncat -l 127.0.0.1 9000 < pipe1 | tee pipe2
  4. 启动另一个终端,启动netcat进程,负责连接redis服务,以pipe2作为输入,输出重定向到pipe1中:
    shell $ ncat 127.0.0.1 6379 < pipe2 > pipe1
  5. 用redis客户端连接本地9000端口:
    shell $ redis-cli -h 127.0.0.1 -p 9000
  6. 在redis-cli中发送info命令,tee进程在终端上的输出即为整个过程需要发送给redis的数据

附上动图:
获取payload

可以知道payload为:

  1. *1
  2. $4
  3. info

写成python字节串就是:b'*1\r\n$4\r\ninfo\r\n''\r\n'是换行符)

所以我们只需使用socket发送以上字节串即可达到同样的效果

编写验证poc

验证漏洞的代码如下:

  1. def poc(url):
  2. url = url2ip(url) # 将url转换成ip地址
  3. if url:
  4. port = int(url.split(':', -1)) if ':' in url else 6379 # redis默认端口是6379
  5. host = url.split(':')[0]
  6. payload = b'b'*1\r\n$4\r\ninfo\r\n' # 发送的数据
  7. s = socket.socket()
  8. socket.setdefaulttimeout(3) # 设置超时时间
  9. try:
  10. s.connect((host, port))
  11. s.send(payload) # 发送info命令
  12. response = s.recv(1024).decode()
  13. s.close()
  14. if response and 'redis_version' in data:
  15. return True,'%s:%s'%(host,port)
  16. except (socket.error, socket.timeout):
  17. pass
  18. return False, None

其中url转换成ip地址的函数如下:

  1. def url2ip(url):
  2. """
  3. url转换成ip
  4. argument: url
  5. return: 形如www.a.com:80格式的字符串 若转换失败则返回None
  6. """
  7. try:
  8. if not url.startswith('http://') and not url.startswith('https://'):
  9. url = 'http://' + url
  10. ip = urlparse(url).netloc
  11. return ip
  12. except (ValueError, socket.gaierror):
  13. pass
  14. return None

处理输入

我们把要验证漏洞的目标放在一个文件里,每一行为一个目标,现在来编写一个函数,读取文件,将所有目标放到一个队列里,代码如下:

  1. def create_queue(file_name):
  2. """
  3. 创建数据队列
  4. argument: file_name -> 输入文件名
  5. return: data,total 数据队列,数据总数
  6. """
  7. total = 0
  8. data = Queue()
  9. for line in open(file_name):
  10. url = line.strip()
  11. if url:
  12. # 跳过空白的行
  13. data.put(url)
  14. total += 1
  15. data.put(None) # 结束标记
  16. return data,total

创建多个线程

我们的start_jobs函数用于启动多个线程来验证目标,其代码如下:

  1. def start_jobs(data, num):
  2. """
  3. 启动所有工作线程
  4. argument: data -> 数据队列 num -> 线程数
  5. """
  6. is_alive = [True]
  7. def job():
  8. """工作线程"""
  9. while is_alive[0]:
  10. try:
  11. url = data.get()
  12. if url == None:
  13. # 遇到结束标记
  14. break
  15. code, result = poc(url) # 验证漏洞
  16. if code:
  17. print(result) # 存在漏洞
  18. except:
  19. is_alive[0] = False
  20. data.put(None) # 结束标记
  21. jobs = [ Thread(target=job) for i in range(num) ] # 创建多个线程
  22. for j in jobs:
  23. j.setDaemon(True)
  24. j.start() # 启动线程
  25. for j in jobs:
  26. j.join() # 等待线程退出

编写主程序框架

现在我们需要一个主函数来控制整个流程,代码很简单:

  1. def main():
  2. import sys
  3. if len(sys.argv) != 3:
  4. print('Usage: python %s inputFile numOfThread' % sys.argv[0])
  5. return
  6. file_name = sys.argv[1] # 输入文件
  7. num = int(sys.argv[2]) # 线程数
  8. data, total = create_queue(file_name) # 创建数据队列
  9. print('total: %s' % total)
  10. begin = time()
  11. start_jobs(data, num) # 启动工作线程
  12. end = time()
  13. print('spent %ss' % str(end-begin))
  14. if __name__ == '__main__':
  15. main()

使用方法

现在假设输入文件名为input.txt,脚本文件名为redis_unauth.py,使用16个线程来批量验证漏洞,我们可以启动以下命令:

  1. $ python redis_unauth.py input.txt 16

完整代码

只是一个小脚本,就没必要放到github上了,这里直接贴出,需要的读者可以复制:

  1. #!/usr/python3
  2. '''
  3. created by feather
  4. '''
  5. import socket
  6. from threading import Thread
  7. from queue import Queue
  8. from time import sleep,time
  9. from urllib.parse import urlparse
  10. def poc(url):
  11. url = url2ip(url) # 将url转换成ip地址
  12. if url:
  13. port = int(url.split(':', -1)) if ':' in url else 6379 # redis默认端口是6379
  14. host = url.split(':')[0]
  15. payload = b'*1\r\n$4\r\ninfo\r\n' # 发送的数据
  16. s = socket.socket()
  17. socket.setdefaulttimeout(3) # 设置超时时间
  18. try:
  19. s.connect((host, port))
  20. s.send(payload) # 发送info命令
  21. response = s.recv(1024).decode()
  22. s.close()
  23. if response and 'redis_version' in response:
  24. return True,'%s:%s'%(host,port)
  25. except (socket.error, socket.timeout):
  26. pass
  27. return False, None
  28. def url2ip(url):
  29. """
  30. url转换成ip
  31. argument: url
  32. return: 形如www.a.com:80格式的字符串 若转换失败则返回None
  33. """
  34. try:
  35. if not url.startswith('http://') and not url.startswith('https://'):
  36. url = 'http://' + url
  37. ip = urlparse(url).netloc
  38. return ip
  39. except (ValueError, socket.gaierror):
  40. pass
  41. return None
  42. def create_queue(file_name):
  43. """
  44. 创建数据队列
  45. argument: file_name -> 输入文件名
  46. return: data,total 数据队列,数据总数
  47. """
  48. total = 0
  49. data = Queue()
  50. for line in open(file_name):
  51. url = line.strip()
  52. if url:
  53. # 跳过空白的行
  54. data.put(url)
  55. total += 1
  56. data.put(None) # 结束标记
  57. return data,total
  58. def start_jobs(data, num):
  59. """
  60. 启动所有工作线程
  61. argument: data -> 数据队列 num -> 线程数
  62. """
  63. is_alive = [True]
  64. def job():
  65. """工作线程"""
  66. while is_alive[0]:
  67. try:
  68. url = data.get()
  69. if url == None:
  70. # 遇到结束标记
  71. break
  72. code, result = poc(url) # 验证漏洞
  73. if code:
  74. print(result) # 存在漏洞
  75. except:
  76. is_alive[0] = False
  77. data.put(None) # 结束标记
  78. jobs = [ Thread(target=job) for i in range(num) ] # 创建多个线程
  79. for j in jobs:
  80. j.setDaemon(True)
  81. j.start() # 启动线程
  82. for j in jobs:
  83. j.join() # 等待线程退出
  84. def main():
  85. import sys
  86. if len(sys.argv) != 3:
  87. print('Usage: python %s inputFile numOfThread' % sys.argv[0])
  88. return
  89. file_name = sys.argv[1] # 输入文件
  90. num = int(sys.argv[2]) # 线程数
  91. data, total = create_queue(file_name) # 创建数据队列
  92. print('total: %s' % total)
  93. begin = time()
  94. start_jobs(data, num) # 启动工作线程
  95. end = time()
  96. print('spent %ss' % str(end-begin))
  97. if __name__ == '__main__':
  98. main()

原文链接:http://www.cnblogs.com/featherl/p/10461935.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号