经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Redis » 查看文章
使用 Redis 流实现消息队列的代码
来源:jb51  时间:2019/11/4 11:55:02  对本文有异议

在介绍了 Redis 流的基本功能之后, 现在是时候使用这些功能来构建一些实际的应用了。 消息队列作为流的典型应用之一, 具有非常好的示范性, 因此我们将使用 Redis 流的相关功能构建一个消息队列应用, 这个消息队列跟我们之前使用其他 Redis 数据结构构建的消息队列具有相似的功能。

代码清单 10-1 展示了一个具有基本功能的消息队列实现:

  • 代码最开头的是几个转换函数, 它们负责对程序的相关输入输出进行转换和格式化;
  • MessageQueue 类用于实现消息队列, 它的添加消息、移除消息以及返回消息数量三个方法分别使用了流的 XADD 命令、 XDEL 命令和 XLEN 命令;
  • 消息队列的两个获取方法 get_message() 和 get_by_range() 分别以两种形式调用了流的 XRANGE 命令;
  • 最后, 用于迭代消息的 iterate() 方法使用了 XREAD 命令对流进行迭代。

代码清单 10-1 使用 Redis 流实现的消息队列: /stream/message_queue.py

  1. def reconstruct_message_list(message_list):
  2. """
  3. 为了让多条消息能够以更结构化的方式返回给调用者,
  4. 将 Redis 返回的多条消息从原来的格式:
  5. [(id1, {k1:v1, k2:v2, ...}), (id2, {k1:v1, k2:v2, ...}), ...]
  6. 转换成以下格式:
  7. [{id1: {k1:v1, k2:v2, ...}}, {id2: {k1:v1, k2:v2, ...}}, ...]
  8. """ result = []
  9. for id, kvs in message_list:
  10. result.append({id: kvs})
  11. return result
  12. def get_message_from_nested_list(lst):
  13. """
  14. 从嵌套列表中取出消息本体。
  15. """
  16. return lst[0][1]
  17. class MessageQueue:
  18. """
  19. 使用 Redis 流实现的消息队列。
  20. """
  21. def __init__(self, client, stream_key):
  22. self.client = client
  23. self.stream = stream_key
  24. def add_message(self, key_value_pairs):
  25. """
  26. 将给定的键值对存入到消息里面,并返回相应的消息 ID 。
  27. """
  28. return self.client.xadd(self.stream, key_value_pairs)
  29. def get_message(self, message_id):
  30. """
  31. 根据给定的消息 ID 返回相应的消息,如果消息不存在则返回 None 。
  32. """
  33. reply = self.client.xrange(self.stream, message_id, message_id)
  34. if len(reply) == 1:
  35. return get_message_from_nested_list(reply)
  36.  
  37. def remove_message(self, message_id):
  38. """
  39. 根据给定的消息 ID 删除相应的消息,如果消息不存在则忽略该动作。
  40. """
  41. self.client.xdel(self.stream, message_id)
  42.  
  43. def len(self):
  44. """
  45. 返回消息队列的长度。
  46. """
  47. return self.client.xlen(self.stream)
  48.  
  49. def get_by_range(self, start_id, end_id, max_item=10):
  50. """
  51. 根据给定的 ID 区间范围返回队列中的消息。
  52. """
  53. reply = self.client.xrange(self.stream, start_id, end_id, max_item)
  54. return reconstruct_message_list(reply)
  55.  
  56. def iterate(self, start_id=0, max_item=10):
  57. """
  58. 对消息队列进行迭代,返回最多 N 条大于给定 ID 的消息。
  59. """
  60. reply = self.client.xread({self.stream: start_id}, max_item)
  61. if len(reply) == 0:
  62. return list()
  63. else:
  64. messages = get_message_from_nested_list(reply)
  65. return reconstruct_message_list(messages)

对于这个消息队列实现, 我们可以通过执行以下代码, 创建出它的实例:

  1. >>> from redis import Redis
  2. >>> from message_queue import MessageQueue
  3. >>> client = Redis(decode_responses=True)
  4. >>> mq = MessageQueue(client, "mq")

然后通过执行以下代码, 向队列里面添加十条消息:

  1. >>> for i in range(10):
  2. ... key = "key{0}".format(i)
  3. ... value = "value{0}".format(i)
  4. ... msg = {key:value}
  5. ... mq.add_message(msg)
  6. ...
  7. '1554113926280-0'
  8. '1554113926280-1'
  9. '1554113926281-0'
  10. '1554113926281-1'
  11. '1554113926281-2'
  12. '1554113926281-3'
  13. '1554113926281-4'
  14. '1554113926281-5'
  15. '1554113926281-6'
  16. '1554113926282-0'

还可以根据 ID 获取指定的消息, 又或者使用 get_by_range() 方法同时获取多条消息:

  1. >>> mq.get_message('1554113926280-0')
  2. {'key0': 'value0'}
  3. >>> mq.get_message('1554113926280-1')
  4. {'key1': 'value1'}
  5. >>> mq.get_by_range("-", "+", 3)
  6. [{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]

又或者使用 iterate() 方法对消息队列进行迭代, 等等:

  1. >>> mq.iterate(0, 3)
  2. [{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]
  3. >>> mq.iterate('1554113926281-0', 3)
  4. [{'1554113926281-1': {'key3': 'value3'}}, {'1554113926281-2': {'key4': 'value4'}}, {'1554113926281-3': {'key5': 'value5'}}]

总结

以上所述是小编给大家介绍的使用 Redis 流实现消息队列的代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对w3xue网站的支持!
如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号