经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring » 查看文章
SkyWalking?自定义插件(Spring?RabbitMQ)具体分析过程
来源:jb51  时间:2022/2/14 13:35:56  对本文有异议

SkyWalking 自定义插件(Spring RabbitMQ) 官方

RabbitMQ插件问题

skywalking官方提供的RabbitMQ插件存在缺陷,其只针对RabbitMQ官方原生Client实现扩展,但我们在项目中一般不直接使用原生Client,而是使用Spring RabitMQ Client,因Spring RabitMQ Consumer中存在跨线程操作,导致跟踪ID断链。

具体分析过程

1.官方插件源码的拦截点是原生Consumer的handleDelivery方法,源码如下:

2.而Spring RabbitMQ消费者的默认实现是BlockingQueueConsumer, handleDelivery核心逻辑是把消息放到内部的BlockingQueue队列,不做真正的消费处理,因此拦截此处无法关联到消费者逻辑,源码如下

  1. @Override
  2. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
  3. byte[] body) {
  4. ...
  5. try {
  6. if (BlockingQueueConsumer.this.abortStarted > 0) {
  7. if (!BlockingQueueConsumer.this.queue.offer(
  8. new Delivery(consumerTag, envelope, properties, body, this.queueName),
  9. BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
  10.  
  11. Channel channelToClose = super.getChannel();
  12. RabbitUtils.setPhysicalCloseRequired(channelToClose, true);
  13. // Defensive - should never happen
  14. BlockingQueueConsumer.this.queue.clear();
  15. if (!this.canceled) {
  16. RabbitUtils.cancel(channelToClose, consumerTag);
  17. }
  18. try {
  19. channelToClose.close();
  20. catch (@SuppressWarnings("unused") TimeoutException e) {
  21. // no-op
  22. }
  23. }
  24. else {
  25. BlockingQueueConsumer.this.queue
  26. .put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
  27. }
  28. catch (@SuppressWarnings("unused") InterruptedException e) {
  29. Thread.currentThread().interrupt();
  30. catch (Exception e) {
  31. BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
  32. }

3.真正的消费处理在SimpleMessageListenerContainer,SimpleMessageListenerContainer继承Runnable接口,在其run方法中while循环调用mainLoop方法,整体调用链路为

4.SimpleMessageListenerContainer.run() -> SimpleMessageListenerContainer.mainLoop() -> SimpleMessageListenerContainer.receiveAndExecute() -> SimpleMessageListenerContainer.doReceiveAndExecute() -> AbstractMessageListenerContainer.executeListener()。最终在executeListener中执行消费逻辑

  1. protected void executeListener(Channel channel, Object data) {
  2. ...
  3. try {
  4. // 执行消费逻辑
  5. doExecuteListener(channel, data);
  6. if (sample != null) {
  7. this.micrometerHolder.success(sample, data instanceof Message
  8. ? ((Message) data).getMessageProperties().getConsumerQueue()
  9. : queuesAsListString());
  10. }
  11. }
  12. catch (RuntimeException ex) {
  13. ....
  14. }
  15. }

实现自定义插件

从上面可以分析出,AbstractMessageListenerContainer.executeListener()是最佳的拦截点
实现源码已放到码云仓库:https://gitee.com/eureka-gitee/apm-sniffer-pro/tree/v7.0.0.0/

效果展示

SkyWalking调用链路

logback日志

到此这篇关于SkyWalking 自定义插件(Spring RabbitMQ)的文章就介绍到这了,更多相关SkyWalking 自定义插件内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号