经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
Storm框架:如何消费RabbitMq消息(代码案例)
来源:cnblogs  作者:本拉邓  时间:2018/11/2 9:19:28  对本文有异议

1、定义拓扑topology

  1. public class MessageTopology {
  2. public static void main(String[] args) throws Exception {
  3. //组装topology
  4. TopologyBuilder topologyBuilder = new TopologyBuilder();
  5. topologyBuilder.setSpout("RabbitmqSpout", new RabbitmqSpout());
  6. topologyBuilder.setBolt("FilterBolt", new FilterBolt()).shuffleGrouping("RabbitmqSpout");
  7. Config conf = new Config ();
  8. try {
  9. if (args.length > 0) {
  10. StormSubmitter.submitTopology(args[0], conf, topologyBuilder.createTopology());
  11. } else {
  12. LocalCluster localCluster = new LocalCluster();
  13. localCluster.submitTopology("messageTopology", conf, topologyBuilder.createTopology());
  14. }
  15. } catch (AlreadyAliveException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }

2、定义数据源RabbitmqSpout

RabbitmqSpout继承自org.apache.storm.topology.IRichSpout接口,实现对应的方法:open(),close(),activate(),deactivate(),nextTuple(),ack(),fail()。

unconfirmedMap对象存储了MQ所有发射出去等待确认的消息唯一标识deliveryTag,当storm系统回调ack、fail方法后进行MQ消息的成功确认或失败重回队列操作(Storm系统回调方法会在bolt操作中主动调用ack、fail方法时触发)。

  1. public class RabbitmqSpout implements IRichSpout {
  2. private final Logger LOGGER = LoggerFactory.getLogger(RabbitmqSpout.class);
  3. private Map map;
  4. private TopologyContext topologyContext;
  5. private SpoutOutputCollector spoutOutputCollector;
  6. private Connection connection;
  7. private Channel channel;
  8. private static final String QUEUE_NAME = "message_queue";
  9. private final Map<String, Long> unconfirmedMap = Collections.synchronizedMap(new HashMap<String, Long>());
  10. //连接mq服务
  11. private void connect() throws IOException, TimeoutException {
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("127.0.0.1");
  14. factory.setUsername("admin");
  15. factory.setPassword("admin");
  16. factory.setVirtualHost("/");
  17. connection = factory.newConnection();
  18. channel = connection.createChannel();
  19. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  20. }
  21. @Override
  22. public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
  23. this.map = map;
  24. this.topologyContext = topologyContext;
  25. this.spoutOutputCollector = spoutOutputCollector;
  26. try {
  27. this.connect();
  28. } catch (IOException e) {
  29. e.printStackTrace();
  30. } catch (TimeoutException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. @Override
  35. public void close() {
  36. try {
  37. channel.close();
  38. connection.close();
  39. } catch (IOException e) {
  40. e.printStackTrace();
  41. } catch (TimeoutException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. @Override
  46. public void nextTuple() {
  47. try {
  48. GetResponse response = channel.basicGet(QUEUE_NAME, false);
  49. if (response == null) {
  50. Utils.sleep(3000);
  51. } else {
  52. AMQP.BasicProperties props = response.getProps();
  53. String messageId = UUID.randomUUID().toString();
  54. Long deliveryTag = response.getEnvelope().getDeliveryTag();
  55. String body = new String(response.getBody());
  56. unconfirmedMap.put(messageId, deliveryTag);
  57. LOGGER.info("RabbitmqSpout: {}, {}, {}, {}", body, messageId, deliveryTag, props);
  58. this.spoutOutputCollector.emit(new Values(body), messageId);
  59. }
  60. } catch (IOException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. @Override
  65. public void ack(Object o) {
  66. String messageId = o.toString();
  67. Long deliveryTag = unconfirmedMap.get(messageId);
  68. LOGGER.info("ack: {}, {}, {}\n\n", messageId, deliveryTag, unconfirmedMap.size());
  69. try {
  70. unconfirmedMap.remove(messageId);
  71. channel.basicAck(deliveryTag, false);
  72. } catch (IOException e) {
  73. e.printStackTrace();
  74. }
  75. }
  76. @Override
  77. public void fail(Object o) {
  78. String messageId = o.toString();
  79. Long deliveryTag = unconfirmedMap.get(messageId);
  80. LOGGER.info("fail: {}, {}, {}\n\n", messageId, deliveryTag, unconfirmedMap.size());
  81. try {
  82. unconfirmedMap.remove(messageId);
  83. channel.basicNack(deliveryTag, false, true);
  84. } catch (IOException e) {
  85. e.printStackTrace();
  86. }
  87. }
  88. @Override
  89. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  90. outputFieldsDeclarer.declare(new Fields("body"));
  91. }
  92. @Override
  93. public Map<String, Object> getComponentConfiguration() {
  94. return null;
  95. }
  96. @Override
  97. public void activate() {
  98. }
  99. @Override
  100. public void deactivate() {
  101. }
  102. }

3、定义数据流处理FilterBolt

  1. public class FilterBolt implements IRichBolt {
  2. private final Logger LOGGER = LoggerFactory.getLogger(FilterBolt.class);
  3. private Map map;
  4. private TopologyContext topologyContext;
  5. private OutputCollector outputCollector;
  6. @Override
  7. public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
  8. this.map = map;
  9. this.topologyContext = topologyContext;
  10. this.outputCollector = outputCollector;
  11. }
  12. @Override
  13. public void execute(Tuple tuple) {
  14. String value = tuple.getStringByField("body");
  15. LOGGER.info("FilterBolt:{}", value);
  16. outputCollector.ack(tuple);
  17. }
  18. @Override
  19. public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
  20. outputFieldsDeclarer.declare(new Fields("body"));
  21. }
  22. @Override
  23. public Map<String, Object> getComponentConfiguration() {
  24. return null;
  25. }
  26. @Override
  27. public void cleanup() {
  28. }
  29. }
 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号