经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » C# » 查看文章
哎呀,我老大写Bug啦——记一次MessageQueen的优化
来源:cnblogs  作者:山治先生  时间:2018/11/8 9:57:22  对本文有异议

  MessageQueen,顾名思义消息队列,在系统开发中也是用的比较多的一个中间件吧。我们这里主要用它来做日志管理和订单管理的,记得老老大(恩,是的,就是老老大,因为他已经跳槽了)还在的时候,当时也是为了赶项目进度,他也参与开发了,那时候我才刚刚入职,他负责写后端这块,我来了就把他手上的任务接过来了,(接着接着……就辞职了)。

之后我们的开发仍然有条不紊的开发着,直到今年的一月份吧,才上线开始运行,然后就出现了常规状态,上线之后就开始爆炸,

                                                                                     

这个页面打不开呀,那个内容没东西呀,第三方登录问题呀,支付问题呀,临时再改需求呀……(该来的都来了),加班、debug、测试、再debug……,然后经过几天的修复,终于完成了跟自己电脑一样稳定的运行,组员们都美滋滋的,今晚加个鸡腿才行。

                                                                                    

都说祸不单行,古人是不会骗我们的,Bug怎么会修得完呢?天真,要是Bug能修得完还要我们来干啥,好景不长,果然,过了一周之后,组员突然群里叫喳喳,

what is it ? 

 

 

来了,今天的主角登场了,我也要开始加班了。

RabbitMQ

  这个是今天要说的东西,基础概念什么的不是今天要说的重点,重点是:

 

RabbitMQ内存使得整个服务器濒临瘫痪,远程登录服务器都差点挤不进去的状态,别看截图目前才1.3G,吃个午饭回来,就2.3G了,可怕不可怕?咋回事?

老板喊你回来加班啦

  先不管了,线上优先解决,手动先Reset回收资源以释放空间,这个只是临时的办法,然后检查一下rabbitMQ的配置有没有问题,路径在

 C:\Users\Administrator\AppData\Roaming\RabbitMQ 

完全是默认的配置,完全ojbk啊,那到底咋回事?继续检查,想想不如从项目开始吧,然后查看项目中的代码,都是从来自【MessageLib】的组件调用

好了,叫我老老大要这个组件的代码,他把git的地址就发给我,我把项目down下来,

这个封装的组件内容不多,主要的文件一目了然,其实就是用到这个两个组件来进行的二次封装来调用

主要的代码是在【MessageQueue.cs】文件里,展示一下当时的代码情况:

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading.Tasks;
  6. using MessageLib.ClassBean;
  7. using EasyNetQ;
  8. using System.Threading;
  9. namespace MessageLib
  10. {
  11. public static class MessageQueue
  12. {
  13. public static IBus bus = MQBusBuilder.CreateMessageBus();
  14. //消息队列
  15. private static Queue<Item> NoticQueue = new Queue<Item>(5000);
  16. //日志队列
  17. private static Queue<Item> LogQueue = new Queue<Item>(5000);
  18. //队列数目发布数量
  19. private static int max_count_to_pulish = 1000;
  20. /// <summary>
  21. /// 可供外部使用的消息入列操作
  22. /// </summary>
  23. public static void push(Item item)
  24. {
  25. if (item.type == ItemType.notic)
  26. {
  27. NoticQueue.Enqueue(item);
  28. }
  29. if (item.type == ItemType.log)
  30. {
  31. LogQueue.Enqueue(item);
  32. }
  33. }
  34. /// <summary>
  35. /// 监听后需要调用的发布接口
  36. /// </summary>
  37. private static void Pulish(object source, System.Timers.ElapsedEventArgs e)
  38. {
  39. if (NoticQueue.Count > 0 || LogQueue.Count > 0)
  40. {
  41. if (bus == null || !bus.IsConnected)
  42. {
  43. bus = MQBusBuilder.CreateMessageBus();
  44. }
  45. if (bus.IsConnected)
  46. {
  47. Send(ItemType.notic);
  48. Send(ItemType.log);
  49. }
  50. }
  51. }
  52. /// <summary>
  53. /// 程序自运行并开始监听
  54. /// </summary>
  55. public static void Run()
  56. {
  57. System.Timers.Timer timer = new System.Timers.Timer();
  58. timer.Interval = 1000;
  59. timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;
  60. timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);
  61. timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;
  62. }
  63. /// <summary>
  64. /// 启动线程异步调用
  65. /// </summary>
  66. /// <param name="channelType"></param>
  67. private static void Send(string channelType)
  68. {
  69. Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
  70. thread.IsBackground = true;
  71. thread.Start(channelType);
  72. }
  73. /// <summary>
  74. /// 调用发布日志及提醒两个接口
  75. /// </summary>
  76. /// <param name="channel"></param>
  77. private static void PublishAction(object channel)
  78. {
  79. PublisLog();
  80. PublisNotic();
  81. }
  82. /// <summary>
  83. /// 日志消息发送至RabbitMQ指定exchange、Queue
  84. /// </summary>
  85. private static void PublisLog()
  86. {
  87. string channelName = ItemType.log;
  88. try
  89. {
  90. var routingKey = channelName;
  91. var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
  92. var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}",channelName), "direct");
  93. var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
  94. while (LogQueue.Count > 0)
  95. {
  96. Item item = LogQueue.Dequeue();
  97. if (item != null)
  98. {
  99. var properties = new MessageProperties();
  100. var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
  101. Message.Properties.AppId = item.appid;
  102. bus.Advanced.Publish(exchange, routingKey, false, Message);
  103. }
  104. }
  105. }
  106. catch (Exception ex)
  107. {
  108. throw ex;
  109. }
  110. }
  111. /// <summary>
  112. /// 提醒消息发送至RabbitMQ指定exchange、Queue
  113. /// </summary>
  114. private static void PublisNotic()
  115. {
  116. string channelName = ItemType.notic;
  117. var routingKey = channelName;
  118. var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
  119. var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");
  120. var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
  121. while(NoticQueue.Count > 0)
  122. {
  123. Item item = NoticQueue.Dequeue();
  124. if (item != null)
  125. {
  126. var properties = new MessageProperties();
  127. var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
  128. Message.Properties.AppId = item.appid;
  129. bus.Advanced.Publish(exchange, routingKey, false, Message);
  130. }
  131. }
  132. }
  133. }
  134. }
View Code

然后我就发现了这一段代码!

  1. /// <summary>
  2. /// 程序自运行并开始监听
  3. /// </summary>
  4. public static void Run()
  5. {
  6. System.Timers.Timer timer = new System.Timers.Timer();
  7. timer.Interval = 1000;
  8. timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;
  9. timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);
  10. timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;
  11. }
  1. /// <summary>
  2. /// 启动线程异步调用
  3. /// </summary>
  4. /// <param name="channelType"></param>
  5. private static void Send(string channelType)
  6. {
  7. Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
  8. thread.IsBackground = true;
  9. thread.Start(channelType);
  10. }

  老老大写Bug了,当Run()起来之后,队列中【NoticQueue】有内容,就开始推送消息,发送消息Send(),每来一次推送new一个线程并设置为后台线程,然后发送消息。好了,明白了,这里的线程很混乱,因为线程操作不当,new了N多个频道,并且没有主动回收,这也难怪内存暴涨呢。并且要是Run()调用多次,后果更加不堪设想。

加班改起来

  开始动手吧,业务主要推送有普通消息、错误消息和通知消息,那么将队列与线程组装一起,新增一个类QueueTask.cs:

  1. public class QueueTask
  2. {
  3. private Queue<Item> NoticQueue = new Queue<Item>(5000);
  4. //队列数目发布数量
  5. private int max_count_to_pulish = 1000;
  6. public bool isRunning = false;
  7. private string itemType = ItemType.info;
  8. private string MessageRouter = ItemType.info;
  9. public QueueTask(string itemType,string MessageRouter)
  10. {
  11. this.itemType = itemType;
  12. this.MessageRouter = MessageRouter;
  13. }
  14. /// <summary>
  15. /// 可供外部使用的消息入列操作
  16. /// </summary>
  17. public void Push(Item item, IBus IBus)
  18. {
  19. NoticQueue.Enqueue(item);
  20. if (!isRunning)
  21. Run(IBus);
  22. }
  23. public void Run(IBus IBus)
  24. {
  25. if (!isRunning)
  26. {
  27. Timer timerNotic = new Timer(PulishMsg, IBus, 1000, 1000);
  28. isRunning = true;
  29. }
  30. }
  31. private void PulishMsg(object state)
  32. {
  33. IBus IBus = state as IBus;
  34. if (NoticQueue.Count > 0)
  35. {
  36. PublisMsg(itemType, IBus);
  37. }
  38. }
  39. private void PublisMsg(object channel, IBus BusInstance)
  40. {
  41. try
  42. {
  43. string channelName = channel as string;
  44. if (NoticQueue.Count > 0)
  45. {
  46. var routingKey = MessageRouter;
  47. var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
  48. var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");
  49. var binding = BusInstance.Advanced.Bind(exchange, mqqueue, routingKey);
  50. while (NoticQueue.Count > 0)
  51. {
  52. Item item = NoticQueue.Dequeue();
  53. if (item != null)
  54. {
  55. var properties = new MessageProperties();
  56. var Message = new EasyNetQ.Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
  57. Message.Properties.AppId = item.appid;
  58. BusInstance.Advanced.Publish(exchange, routingKey, false, Message);
  59. }
  60. }
  61. }
  62. }
  63. catch (Exception ex)
  64. {
  65. Console.WriteLine("PublisMsg error:" + ex.Message);
  66. }
  67. }
  68. public void Read<T>(IBus BusInstance,Action<Item> dealAction) where T : Item
  69. {
  70. try
  71. {
  72. string channelName = itemType;
  73. var routingKey = MessageRouter;
  74. var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
  75. var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");
  76. var binding = BusInstance.Advanced.Bind(exchange, mqqueue, routingKey);
  77. var Consume = BusInstance.Advanced.Consume(mqqueue, registration =>
  78. {
  79. registration.Add<string>((message, info) =>
  80. {
  81. Item data = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message.Body);
  82. dealAction(data);
  83. });
  84. });
  85. }
  86. catch (Exception ex)
  87. {
  88. Console.WriteLine("Read error:" + ex.Message);
  89. }
  90. }
  91. }

然后,在MessageQueue.cs修改为单例模式:

  1. public static class MessageQueue
  2. {
  3. /*Install-Package EasyNetQ-dotnet-core -Version 2.0.2-radicalgeek-netc0001 -Pre*/
  4.  
  5. private static IBus bus = null;
  6. public static bool isRunning = false;
  7. //消息队列
  8. private static QueueTask NoticQueue = null;
  9. //日志队列
  10. private static QueueTask LogQueue = null;
  11. //自定义
  12. private static QueueTask InfoQueue = null;
  13. #region 同步锁
  14. private static readonly object obj = new object();
  15. #endregion
  16.  
  17. public static void Init(string Connection, string routeKey)
  18. {
  19. if (NoticQueue == null)
  20. NoticQueue = new QueueTask(ItemType.notic, ItemType.notic);
  21. if (LogQueue == null)
  22. LogQueue = new QueueTask(ItemType.error, ItemType.error);
  23. if (InfoQueue == null)
  24. InfoQueue = new QueueTask(ItemType.info, routeKey);
  25. if (string.IsNullOrEmpty(MQBusBuilder.Connnection))
  26. MQBusBuilder.Connnection = Connection;
  27. }
  28. public static IBus BusInstance
  29. {
  30. get
  31. {
  32. if (bus == null)
  33. {
  34. lock (obj)
  35. {
  36. if (bus == null|| !bus.IsConnected)
  37. {
  38. bus = MQBusBuilder.CreateMessageBus();
  39. }
  40. }
  41. }
  42. return bus;
  43. }
  44. }
  45. /// <summary>
  46. /// 可供外部使用的消息入列操作
  47. /// </summary>
  48. public static void PushAndRun(Item item)
  49. {
  50. if (string.IsNullOrWhiteSpace(MQBusBuilder.Connnection) || BusInstance == null)
  51. return;
  52. if (item.type == ItemType.notic)
  53. {
  54. NoticQueue.Push(item, BusInstance);
  55. }
  56. if (item.type == ItemType.error)
  57. {
  58. LogQueue.Push(item, BusInstance);
  59. }
  60. if (item.type == ItemType.info)
  61. {
  62. InfoQueue.Push(item, BusInstance);
  63. }
  64. }
  65. public static void Read(string itemType, Action<Item> dealAction)
  66. {
  67. if (itemType == ItemType.notic)
  68. {
  69. NoticQueue.Read<NoticItem>(BusInstance, dealAction);
  70. }
  71. if (itemType == ItemType.error)
  72. {
  73. LogQueue.Read<ErrorItem>(BusInstance, dealAction);
  74. }
  75. if (itemType == ItemType.info)
  76. {
  77. InfoQueue.Read<Message>(BusInstance, dealAction);
  78. }
  79. }
  80. }
View Code

每次推送消息的时候,每个QueueTask就自己维护自己的线程和队列了,当调用推送之后,就开始运作起来。恩,应该没问题了。然后就发布nuget,再更新项目,然后发布。观察一段时间,恩,完美。

 

事件二

  事情过后,B端开始搞起来了,然后涉及到订单系统,跟老大(不是老老大,老老大那时候已经跑了)商量之后确定使用消息队列来做订单的事件的拓展,然后就直接美滋滋的调用好之前写的了,没想到啊,这次是线程涨!因为订单是从B端推送过来的,B端肯定没事,订单后台订阅消息之后,读取过程中出现的线程增多,然后看看之前写的Read()方法,感觉没啥问题啊,每运行完一次,就多了一个线程,这个神奇了啊,那么源代码撸起来。

翻来覆去,看到这个Consume方法,继承的是IDisposable接口,得勒,知道咋回事了。

Consume.Dispose(); 用完请记得主动释放啊。

这回真的可以浪了。

 

总结

  遇到问题,冷静下来,耐得了寂寞才行。线上的问题优先解决,然后再慢慢Debug,解决不了,看源码,再解决不了,降级处理,欢迎共同探讨。同时也感谢一下技术群里的兄弟给的一些建议,并帮忙查找资料,还好EasyNetQ是开源了,不然也打算说先不用了,毕竟一开始没什么用户量,所以没必要整那么麻烦,加班加点的弄这个问题。不过最终都完美的解决了,心里还是挺美滋滋的,程序猿随之而来的成就感。

  别看我们在工位上默不作声,我们可能在拯救世界呢!老板,该加工资啦!

                                                                                             

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号