经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
Tigase 发送消息的流程源码分析
来源:cnblogs  作者:jianfulove  时间:2018/11/1 9:42:55  对本文有异议
XMPP 的<message/>节是使用基本的”push”方法来从一个地方到另一个地方得到消息。因为消息通常是不告知的,它们是一种”fire-and-forget”(发射后自寻目的)的机制来从一个地方到另一个地方快速获取信息
消息节有五种不同的类型,通过 type 属性来进行区分:例如 chat 类型为 chat 的消息在两个实体间的实时对话中交换,例如两个朋友之间的即时通讯聊天。除了 type 属性外,消息节还包括一个 to 和 from 地址,并且也可以包含一个用于跟踪目的的 id  属性(我们在使用更为广泛的 IQ  节中详细的讨论 IDs)。to  地址是预期接收人的
JabberID,from 地址是发送者的JabberID。from 地址不由发送客户端提供,而是由发送者的服务器添加邮戳,以避免地址欺骗。
在Tigase中,有两个重要的组成,一个组件,二是插件,可以去官方网去看下他的架构介绍 https://docs.tigase.net/tigase-server/7.1.4/Development_Guide/html/#writePluginCode
例如最著名的组件的一个例子是MUC或PubSub。在Tigase中,几乎所有东西实际上都是一个组件:会话管理器、s2s连接管理器、消息路由器等等,组件是根据服务器配置加载的,新的组件可以在运行时加载和激活。您可以轻松地替换组件实现,唯一要做的更改是配置条目中的类名。

Tigase 中定义一个最简单的消息组件,需要实现MessageReceiver或继承 extends AbstractMessageReceiver 类, MessageReceiver 的抽象类: AbstractMessageReceiver 子类 :
一、ClientConnectionManager
二、SessionManager
三、 MessageRouter
  1. public void setProperties(Map<String, Object> props){
  2. for (String name : msgrcv_names) {
  3. mr = conf.getMsgRcvInstance(name);
  4. if (mr instanceof MessageReceiver) {
  5. ((MessageReceiver) mr).setParent(this);
  6. ((MessageReceiver) mr).start();
  7. }
  8. }
  9. }

1、当客户端发送的message消息到tigase服务端,每个一SOCKET连接都会被包装成IOService对象,IOService包含一系列操作socket的方法(接收发送数据等),processSocketData()接收网络数据,由tigase.net处理解析成xml对象,并将packet放到接收队列receivedPackets中再调用serviceListener.packetsReady(this)。由于ConnectionManager实现IOServiceListener接口,实现上调用的的是ConnectionManager中的packetsReady()来开始处理数据

此时的packet :packetFrom=null,packetTo=null。
 
ClientConnectionManager.processSocketData方法中设置packet的一些属性:
此时: packetFrom=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, packetTo=sess-man@llooper
  1. ClientConnectionManager.processSocketData(XMPPIOService<Object>serv)
  2. JID id = serv.getConnectionId(); //c2s@llooper/192.168.0.33_5222_192.168.0.33_38624
  3. p.setPacketFrom(id); //packetFrom 设置为onnectionId
  4. p.setPacketTo(serv.getDataReceiver()); //packetTo 设置为sess-man --> SessionManager
  5. addOutPacket(p);//将会委托给父 MessageRouter 路由
  6. }
//packet 被设置上一些源信息,和目的地信息,接下来,这个数据包将会委托给父 MessageRouter 帮忙路由到 SessionManager组件中进行处理
packet = (tigase.server.Message) from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, DATA=<message xmlns="jabber:client" id="44grM-176" type="chat" to="llooper@llooper"><thread>SWjZv5</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, SIZE=170, XMLNS=jabber:client, PRIORITY=NORMAL, PERMISSION=NONE, TYPE=chat
 
packet = from=c2s@llooper/192.168.0.33_5222_192.168.0.33_38624, to=sess-man@llooper, DATA=<message to="admin@llooper" type="chat" id="2jePE-253" xmlns="jabber:client"><thread>7VKMRq</thread><composing xmlns="http://jabber.org/protocol/chatstates"/></message>, SIZE=168, XMLNS=jabber:client, PRIORITY=NORMAL, PERMISSION=NONE, TYPE=chat
 
2、MessageRouter.processPacket(Packet packet)部分代码如下:
 
  1. //我们不会处理没有目标地址的数据包,只是丢弃它们并写一个日志消息
  2. if (packet.getTo() == null) {
  3. log.log(Level.WARNING, "Packet with TO attribute set to NULL: {0}", packet);
  4. return;
  5. }
  6. //它不是一个服务发现包,我们必须找到一个处理组件
  7. //下面的代码块是“快速”找到一个组件if
  8. //这个包TO 组件ID,格式在以下一项:
  9. // 1。组件名+“@”+默认域名
  10. // 2。组件名+“@”+任何虚拟主机名
  11. // 3。组件名+ "."+默认域名
  12. // 4。组件名+ "."+任何虚拟主机名
  13. ServerComponent comp = getLocalComponent(packet.getTo()); //SessionManager
  14. comp.processPacket(packet, results);

 3、SessionManager.processPacket(final Packet packet)处理,有要代码如下。 例如A->B,这样做的目的是为了首先确定用户A有权限发送packet,然后是确定用户B有权限接收数据。如果用户B不在线,那么离线消息处理器会把packet保存到数据库当中。

  1. //XMPPResourceConnection session——用户会话保存所有用户会话数据,并提供对用户数据存储库的访问。它只允许在会话的生命周期内将信息存储在永久存储或内存中。如果在分组处理时没有联机用户会话,则此参数可以为空。
  2. XMPPResourceConnection conn = getXMPPResourceConnection(packet);
  3. //现在要走SessionManager的处理函数,主要是走插件流程,插件在Tigase中也是一个重要的组成,入口就是在这里,SM plugin
  4. processPacket(packet, conn);

   插入下SM plugin 流程说明 :

这个设计有一个惊人的结果。如果你看下面的图片,显示了两个用户之间的通信,你可以看到数据包被复制了两次才送到最终目的地: 

会话管理器(SessionManager)必须对数据包进行两次处理。第一次以用户A的名义将其作为传出包进行处理,第二次以用户B的名义将其作为传入包进行处理。
这是为了确保用户A有权限发送一个包,所有的processor都应用到packet上,也为了确保用户B有权限接收packet,所有的processor都应用到packet了。例如,如果用户B是脱机的,那么有一个脱机消息processor应该将包发送到数据库,而不是用户B。
 
  1. protected XMPPResourceConnection getXMPPResourceConnection(Packet p) {
  2. XMPPResourceConnection conn = null;
  3. //首先根据这个包的发起者,来查找他的连接资源类,找不到则找接收者的资源类
  4. JID from = p.getPacketFrom();
  5. if (from != null) {
  6. conn = connectionsByFrom.get(from);
  7. if (conn != null) {
  8. return conn;
  9. }
  10. }
  11. //这个接收者它可能是这个服务器上某个用户的消息,让我们为这个用户查找已建立的会话
  12. JID to = p.getStanzaTo();
  13. if (to != null) {
  14. if (log.isLoggable(Level.FINEST)) {
  15. log.finest("Searching for resource connection for: " + to);
  16. }
  17. conn = getResourceConnection(to);
  18. } else {
  19. // Hm, not sure what should I do now....
  20. // Maybe I should treat it as message to admin....
  21. log.log(Level.INFO,
  22. "Message without TO attribute set, don''t know what to do wih this: {0}", p);
  23. } // end of else
  24.  
  25. return conn;
  26. }
  27. protected void processPacket(Packet packet, XMPPResourceConnection conn) {
  28. ...
  29. packet.setPacketTo(getComponentId()); //sess-man@llooper
  30. ...
  31. if (!stop) {
  32. //授权匹配的processor处理packet
  33. walk(packet, conn);
  34. try {
  35. if ((conn != null) && conn.getConnectionId().equals(packet.getPacketFrom())) {
  36. handleLocalPacket(packet, conn);
  37. }
  38. } catch (NoConnectionIdException ex) {
  39. ...
  40. }
  41. }
  42. ...
  43. }

 

packetTo被设置为组件ID(sess-man@llooper),其值原先也是这个。
其中walk(packet, conn)方法,匹配处理器(授权)。对于message,此处匹配到的processor是amp和message-carbons,message-carbons没有怎么处理,主要是amp在处理,packet被塞amp的队列中等待处理。

  1. private void walk(final Packet packet, final XMPPResourceConnection connection) {
  2. for (XMPPProcessorIfc proc_t : processors.values()) {
  3. XMPPProcessorIfc processor = proc_t;
  4. //根据element和xmlns,授权匹配成功的processor
  5. Authorization result = processor.canHandle(packet, connection);
  6. if (result == Authorization.AUTHORIZED) {
  7. ....
  8. ProcessingThreads pt = workerThreads.get(processor.id());
  9. if (pt == null) {
  10. pt = workerThreads.get(defPluginsThreadsPool);
  11. }
  12. //packet 放到(addItem)授权了的processor的队列
  13. if (pt.addItem(processor, packet, connection)) {
  14. packet.processedBy(processor.id());
  15. } else {
  16. ...
  17. }
  18. } else {
  19. ...
  20. }
  21. }
  22. }
WorkerThread.run() 从队列中取出packet,由SessionManager.process(QueueItem item)给amp处理。
SessionManager.pocess(QueueItem item) 如下:
  1. @Override
  2. public void process(QueueItem item) {
  3. XMPPProcessorIfc processor = item.getProcessor();
  4. try {
  5. //由授权的 processor 处理 packet
  6. processor.process(item.getPacket(), item.getConn(), naUserRepository,local_results, plugin_config.get(processor.id()));
  7. if (item.getConn() != null) {
  8. setPermissions(item.getConn(), local_results);
  9. }
  10. addOutPackets(item.getPacket(), item.getConn(), local_results);
  11. } catch (PacketErrorTypeException e) {
  12. ...
  13. } catch (XMPPException e) {
  14. ...
  15. }
  16. }
  17. //其中processor.process()------> MessageAmp.process(),如下:
  18. @Override
  19. public void process(Packet packet, XMPPResourceConnection session,
  20. NonAuthUserRepository repo, Queue results, Map settings) throws XMPPException {
  21. if (packet.getElemName() == "presence") {
  22. ...
  23. } else {
  24. Element amp = packet.getElement().getChild("amp", XMLNS);
  25. if ((amp == null) || (amp.getAttributeStaticStr("status") != null)) {
  26. messageProcessor.process(packet, session, repo, results, settings);
  27. } else {
  28. ...
  29. }
  30. }
  31. // 其中messageProcessor.process() --------> Message.process(),如下
  32. @Override
  33. public void process(Packet packet, XMPPResourceConnection session,
  34. NonAuthUserRepository repo, Queue results, Map settings) throws XMPPException {
  35. ...
  36. try {
  37. ...
  38. // 在比较JIDs之前,记住要去除资源部分
  39. id = (packet.getStanzaFrom() != null)
  40. ? packet.getStanzaFrom().getBareJID()
  41. : null;
  42. // 检查这是否是来自客户端的数据包
  43. if (session.isUserId(id)) {
  44. // 这是来自这个客户端的数据包,最简单的操作是转发到它的目的地:
  45. // Simple clone the XML element and....
  46. // ... putting it to results queue is enough
  47. results.offer(packet.copyElementOnly());
  48. return;
  49. }
  50. } catch (NotAuthorizedException e) {
  51. ...
  52. } // end of try-catch
  53. }

 

检查stanzaFfrom与session匹配通过后,将packet.copyElementOnly()放到results中,作后续投递,原来的packet 就丢弃了。
此时投递的packet :packetFrom=null,packetTo=null。
packet在SessionManager.addOutPacket(Packet packet)中判断packetFrom是否为空,为空则将其设置为ComponentId(此处为sess-man@llooper),然后调用父类(AbstractMessageReceiver.java) 的addOutPacket(packet)方法塞到out_queue 队列中。
此时packet::packetFrom=sess-man@llooper,packetTo=null。
 

4、上层组件MessageRouter处理,把packet塞到in_queues. 又回到了MessageRouter.processPacket(Packet packet)处理:

 
不同的是 PacketTo为空,packet.getTo()的返回值是stanzaTo。
getLocalComponent(packet.getTo());方法根据stanzaTo与compId、comp name、Component都匹配不到。
此时packet会给组件SessionManager处理,Packet will be processed by: sess-man@llooper,由AbstractMessageReceiver的非阻塞性方法addPacketNB(Packet packet)加入到in_queues。
 
 5、第二次来到SessionManager.processPacket(final Packet packet)处理。不同的是在getXMPPResourceConnection(packet)方法中,
conn = connectionsByFrom.get(from)返回值是null,所以是根据stanzaTo取获取接收方的session,返回接收方连接的Connection。
  1. protected XMPPResourceConnection getXMPPResourceConnection(Packet p) {
  2. XMPPResourceConnection conn = null;
  3. JID from = p.getPacketFrom();
  4. if (from != null) {
  5. conn = connectionsByFrom.get(from);
  6. if (conn != null) {
  7. return conn;
  8. }
  9. }
  10. // It might be a message _to_ some user on this server
  11. // so let's look for established session for this user...
  12. JID to = p.getStanzaTo();
  13. if (to != null) {
  14. ...
  15. conn = getResourceConnection(to);
  16. } else {
  17. ...
  18. } // end of else
  19.  
  20. return conn;
  21. }

 

 6、如同步骤3,此时packet作为一个以用户B的名义将其作为传入包进行处理。

然后packetTo被设置为组件ID(sess-man@llooper)

此时packet: packetFrom = sess-man@llooper,packetTo =sess-man@llooper。

之后packet又经walk(packet, conn)方法,匹配处理器(授权),扔给amp处理。

 如同前面: 直到Message.process(),如下:
  1. @Override
  2. public void process(Packet packet, XMPPResourceConnection session,
  3. NonAuthUserRepository repo, Queue<Packet> results, Map<String, Object> settings) throws XMPPException {
  4. // For performance reasons it is better to do the check
  5. // before calling logging method.
  6. if (log.isLoggable(Level.FINEST)) {
  7. log.log(Level.FINEST, "Processing packet: {0}, for session: {1}", new Object[] {
  8. packet,
  9. session });
  10. }
  11. // You may want to skip processing completely if the user is offline.
  12. if (session == null) {
  13. processOfflineUser( packet, results );
  14. return;
  15. } // end of if (session == null)
  16. try {
  17. // Remember to cut the resource part off before comparing JIDs
  18. BareJID id = (packet.getStanzaTo() != null)
  19. ? packet.getStanzaTo().getBareJID()
  20. : null;
  21. // Checking if this is a packet TO the owner of the session
  22. if (session.isUserId(id)) {
  23. if (log.isLoggable(Level.FINEST)) {
  24. log.log(Level.FINEST, "Message 'to' this user, packet: {0}, for session: {1}",
  25. new Object[] { packet,
  26. session });
  27. }
  28. if (packet.getStanzaFrom() != null && session.isUserId(packet.getStanzaFrom().getBareJID())) {
  29. JID connectionId = session.getConnectionId();
  30. if (connectionId.equals(packet.getPacketFrom())) {
  31. results.offer(packet.copyElementOnly());
  32. // this would cause message packet to be stored in offline storage and will not
  33. // send recipient-unavailable error but it will behave the same as a message to
  34. // unavailable resources from other sessions or servers
  35. return;
  36. }
  37. }
  38. // Yes this is message to 'this' client
  39. List<XMPPResourceConnection> conns = new ArrayList<XMPPResourceConnection>(5);
  40. // This is where and how we set the address of the component
  41. // which should rceive the result packet for the final delivery
  42. // to the end-user. In most cases this is a c2s or Bosh component
  43. // which keep the user connection.
  44. String resource = packet.getStanzaTo().getResource();
  45. if (resource == null) {
  46. // If the message is sent to BareJID then the message is delivered to
  47. // all resources
  48. conns.addAll(getConnectionsForMessageDelivery(session));
  49. } else {
  50. // Otherwise only to the given resource or sent back as error.
  51. XMPPResourceConnection con = session.getParentSession().getResourceForResource(
  52. resource);
  53. if (con != null) {
  54. conns.add(con);
  55. }
  56. }
  57. // MessageCarbons: message cloned to all resources? why? it should be copied only
  58. // to resources with non negative priority!!
  59.  
  60. if (conns.size() > 0) {
  61. for (XMPPResourceConnection con : conns) {
  62. Packet result = packet.copyElementOnly();
  63. result.setPacketTo(con.getConnectionId());
  64. // In most cases this might be skept, however if there is a
  65. // problem during packet delivery an error might be sent back
  66. result.setPacketFrom(packet.getTo());
  67. // Don't forget to add the packet to the results queue or it
  68. // will be lost.
  69. results.offer(result);
  70. if (log.isLoggable(Level.FINEST)) {
  71. log.log(Level.FINEST, "Delivering message, packet: {0}, to session: {1}",
  72. new Object[] { packet,
  73. con });
  74. }
  75. }
  76. } else {
  77. // if there are no user connections we should process packet
  78. // the same as with missing session (i.e. should be stored if
  79. // has type 'chat'
  80. processOfflineUser( packet, results );
  81. }
  82. return;
  83. } // end of else
  84. // Remember to cut the resource part off before comparing JIDs
  85. id = (packet.getStanzaFrom() != null)
  86. ? packet.getStanzaFrom().getBareJID()
  87. : null;
  88. // Checking if this is maybe packet FROM the client
  89. if (session.isUserId(id)) {
  90. // This is a packet FROM this client, the simplest action is
  91. // to forward it to is't destination:
  92. // Simple clone the XML element and....
  93. // ... putting it to results queue is enough
  94. results.offer(packet.copyElementOnly());
  95. return;
  96. }
  97. // Can we really reach this place here?
  98. // Yes, some packets don't even have from or to address.
  99. // The best example is IQ packet which is usually a request to
  100. // the server for some data. Such packets may not have any addresses
  101. // And they usually require more complex processing
  102. // This is how you check whether this is a packet FROM the user
  103. // who is owner of the session:
  104. JID jid = packet.getFrom();
  105. // This test is in most cases equal to checking getElemFrom()
  106. if (session.getConnectionId().equals(jid)) {
  107. // Do some packet specific processing here, but we are dealing
  108. // with messages here which normally need just forwarding
  109. Element el_result = packet.getElement().clone();
  110. // If we are here it means FROM address was missing from the
  111. // packet, it is a place to set it here:
  112. el_result.setAttribute("from", session.getJID().toString());
  113. Packet result = Packet.packetInstance(el_result, session.getJID(), packet
  114. .getStanzaTo());
  115. // ... putting it to results queue is enough
  116. results.offer(result);
  117. }
  118. } catch (NotAuthorizedException e) {
  119. log.log(Level.FINE, "NotAuthorizedException for packet: " + packet + " for session: " + session, e);
  120. results.offer(Authorization.NOT_AUTHORIZED.getResponseMessage(packet,
  121. "You must authorize session first.", true));
  122. } // end of try-catch
  123. }

检查stanzaTo与session匹配通过后,根据session拿到接收方所有的连接(可能多端登陆),然后Packet result = packet.copyElementOnly()生成新的packet(原packet丢弃了),并将packetTo设置为接收方连接的ConnectionId(例如:c2s@llooper/192.168.0.33_5222_192.168.0.33_38624),通过addOutPacket()方法塞到out_queue队列。
此时packet:packetFrom = sess-man@llooper,packetTo =c2s@llooper/192.168.0.33_5222_192.168.0.33_38624。

7、 如同前面步骤2,不同的是根据packetTo匹配到组件 c2s@llooper

8、 组件 c2s@llooper 从queue中取出packet,分发到目的地

  1. public void processPacket(final Packet packet) {
  2. ...
  3. if (packet.isCommand() && (packet.getCommand() != Command.OTHER)) {
  4. ...
  5. } else {
  6. // 把packet 发送给客户端
  7. if (!writePacketToSocket(packet)) {
  8. ...
  9. }
  10. } // end of else
  11. }

 

后续有时间会不断更新,欢迎加入QQ群 310790965 更多的交流

 
 
 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号