经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Apache Kafka » 查看文章
Kafka2.0消费者协调器源码
来源:cnblogs  作者:O'Neal  时间:2019/7/17 11:01:00  对本文有异议

消费组和消费者

  1. 消费组和消费者是一对多的关系。
  2. 同一个消费组的消费者可以消费多个分区,且是独占的。
  3. 消费者的分区分配策略由接口PartitionAssignor定义,内置三种分配策略RangeAssignorRoundRobinAssignorStickyAssignor,支持自定义策略。
  4. 不同消费组可以消费相同的分区,互不干扰。

消费者协调器和组协调器

  1. 客户端的消费者协调器ConsumerCoordinator和服务端的组协调器GroupCoordinator通过心跳不断保持通信。
  2. 消费者进行消费之前,需要确保协调器是 ready 的。
    1. 选择具有最少请求的节点Node,即具有最少的InFlightRequests的节点。
    2. 向该节点发送获取协调器节点的请求,发送流程类似发送拉取请求。
    3. 向找到的协调器节点发送加入组请求,此时会禁止心跳线程。
    4. 加入组响应处理器JoinGroupResponseHandler对响应进行处理,响应包含generationIdmemberIdleaderIdprotocol
    5. 如果是 leader 消费者,即memberId=leaderId,则需要根据分配策略protocol计算分区分配。
    6. 将分区分配结果封装到同步组请求,再向协调器节点发送同步组请求。
    7. 同步组响应处理器SyncGroupResponseHandler对上述请求的响应进行处理。
    8. 如果第5步判断不是 follower 消费者,同样需要向协调器发送同步组请求,只是请求中不需要封装分区分配结果,而是从组协调器获取。
    9. 加入组成功后,启动心跳线程。
    10. 更新本地缓存的分区分配,此处会调用消费者再平衡监听器。

消费者状态

  • UNJOINED:消费者初始状态为UNJOINED,表示未加入消费组。
  • REBALANCING:消费者向协调器发送加入组请求之前,状态变更为REBALANCING,表示再平衡状态
  • STABLE:消费者监听到消息成功返回,状态变更为STABLE,表示稳定状态,如果是失败的消息,状态重置为UNJOINED

心跳线程

  1. 消费者加入消费组之后会启动心跳线程,并保持和组协调器的通信。
  2. 如果消费者状态不是STABLE,则不发送心跳。
  3. 如果组协调器未知,则等待一段时间重试。
  4. 如果心跳会话超时,则标记协调器节点未知。
  5. 如果心跳轮询超时,则发送离开组请求。
  6. 如果暂不需要发送心跳,则等待一段时间重试。
  7. 发送心跳,注册响应监听器,接收到响应后,设置接收时间,并进行下一轮的心跳。

偏移量

拉取偏移量

  1. 如果有指定的分区,消费者协调器从组协调器拉取一组分区和已提交偏移量的映射关系,缓存到SubscriptionState
  2. 设置偏移量重置策略:LATEST, EARLIEST,NONE
  3. 异步地更新消费的偏移量位置。

提交偏移量

  1. 消费者协调器获取当前的协调器节点。
  2. 向该节点发送提交偏移量请求,返回Future

加入组流程

加入组流程

消费者加入组流程的源码分析

  1. boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
  2. final long startMs = time.milliseconds();
  3. if (!coordinator.poll(timeoutMs)) { // 获取协调器
  4. return false;
  5. }
  6. // 更新偏移量
  7. return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
  8. }
  9. // 获取协调器
  10. public boolean poll(final long timeoutMs) {
  11. final long startTime = time.milliseconds();
  12. long currentTime = startTime;
  13. long elapsed = 0L;
  14. if (subscriptions.partitionsAutoAssigned()) { // 是自动分配主题类型
  15. // 更新心跳的上一次的轮询时间
  16. pollHeartbeat(currentTime);
  17. if (coordinatorUnknown()) { // 协调器未知
  18. // 确保协调器已经 ready
  19. if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
  20. return false;
  21. }
  22. }
  23. if (rejoinNeededOrPending()) { // 需要加入消费组
  24. // 加入组、同步组
  25. if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
  26. return false;
  27. }
  28. currentTime = time.milliseconds();
  29. }
  30. } else { // 指定分区类型
  31. if (metadata.updateRequested() && !client.hasReadyNodes(startTime)) {// 如果没有准备就绪的节点
  32. // 阻塞等待元数据更新
  33. final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
  34. if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
  35. return false; // 更新元数据失败
  36. }
  37. currentTime = time.milliseconds();
  38. }
  39. }
  40. maybeAutoCommitOffsetsAsync(currentTime); // 异步自动提交偏移量
  41. return true;
  42. }
  43. // 确保协调器已经 ready
  44. protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
  45. final long startTimeMs = time.milliseconds();
  46. long elapsedTime = 0L;
  47. while (coordinatorUnknown()) { // 如果协调器未知
  48. final RequestFuture<Void> future = lookupCoordinator(); // 向当前请求队列最少的节点,发送获取协调器的请求
  49. client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
  50. if (!future.isDone()) {
  51. break; // 响应未完成,退出
  52. }
  53. }
  54. return !coordinatorUnknown();
  55. }
  56. // 加入组、同步组
  57. boolean ensureActiveGroup(long timeoutMs, long startMs) {
  58. startHeartbeatThreadIfNeeded(); // 启动心跳线程
  59. return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
  60. }
  61. boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) {
  62. long elapsedTime = 0L;
  63. while (rejoinNeededOrPending()) {
  64. // 发送加入组请求
  65. final RequestFuture<ByteBuffer> future = initiateJoinGroup();
  66. client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
  67. if (!future.isDone()) {
  68. // we ran out of time
  69. return false;
  70. }
  71. if (future.succeeded()) { // 加入成功,回调处理响应,更新缓存的分区分配
  72. ByteBuffer memberAssignment = future.value().duplicate();
  73. onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
  74. }
  75. }
  76. return true;
  77. }
  78. // 发送加入组请求
  79. private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
  80. if (joinFuture == null) {
  81. disableHeartbeatThread(); // 暂停心跳线程
  82. state = MemberState.REBALANCING; // 状态改为 REBALANCING
  83. joinFuture = sendJoinGroupRequest(); // 向协调器发送加入组请求
  84. joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { // 响应监听器
  85. @Override
  86. public void onSuccess(ByteBuffer value) { // 成功
  87. synchronized (AbstractCoordinator.this) {
  88. state = MemberState.STABLE; // 状态改为 STABLE
  89. rejoinNeeded = false; // 不需要加入了
  90. if (heartbeatThread != null)
  91. heartbeatThread.enable(); // 启动暂停了的心跳
  92. }
  93. }
  94. @Override
  95. public void onFailure(RuntimeException e) { // 失败
  96. synchronized (AbstractCoordinator.this) {
  97. state = MemberState.UNJOINED; // 状态改为 UNJOINED
  98. }
  99. }
  100. });
  101. }
  102. return joinFuture;
  103. }
  104. // 向协调器发送加入组请求
  105. RequestFuture<ByteBuffer> sendJoinGroupRequest() {
  106. JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
  107. groupId,
  108. this.sessionTimeoutMs,
  109. this.generation.memberId,
  110. protocolType(),
  111. metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
  112. int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
  113. return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
  114. .compose(new JoinGroupResponseHandler()); // 异步回调响应处理类
  115. }
  116. // 异步回调响应处理类
  117. private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
  118. @Override
  119. public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
  120. Errors error = joinResponse.error();
  121. if (error == Errors.NONE) {
  122. synchronized (AbstractCoordinator.this) {
  123. if (state != MemberState.REBALANCING) { // 如果是 REBALANCING,状态异常
  124. future.raise(new UnjoinedGroupException());
  125. } else {
  126. AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
  127. if (joinResponse.isLeader()) { // 当前消费组是 leader
  128. onJoinLeader(joinResponse).chain(future);
  129. } else { // 当消费者是 follower
  130. onJoinFollower().chain(future);
  131. }
  132. }
  133. }
  134. }
  135. }
  136. }
  137. // 发送 leader 消费者同步组请求
  138. private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
  139. try {
  140. // 根据响应的分配策略,给消费者分配分区
  141. Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
  142. SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
  143. return sendSyncGroupRequest(requestBuilder);
  144. } catch (RuntimeException e) {
  145. return RequestFuture.failure(e);
  146. }
  147. }
  148. // 发送 follower 消费者同步组请求
  149. private RequestFuture<ByteBuffer> onJoinFollower() {
  150. SyncGroupRequest.Builder requestBuilder =
  151. new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
  152. Collections.<String, ByteBuffer>emptyMap()); // 发送不带分配信息的请求
  153. return sendSyncGroupRequest(requestBuilder);
  154. }

原文链接:http://www.cnblogs.com/bigshark/p/11198481.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号