经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云计算 » Apache Kafka » 查看文章
kafka_2.11-2.0.0_常用操作
来源:cnblogs  作者:踏歌行666  时间:2018/9/25 19:47:19  对本文有异议

 

参考博文:Kafka消费组(consumer group)

参考博文:kafka 1.0 中文文档(九):操作

参考博文:kafka集群管理工具kafka-manager部署安装

 

 

       以下操作可以在mini01、mini02、mini03任意一台操作即可

 

1. kafka通过网页管理

参考博文:kafka集群管理工具kafka-manager部署安装

 

 

2. 创建topic

  1. 1 # 参数说明 --replication-factor 2 表示有2个副本
  2. 2 # --partitions 4 表示有4个分区
  3. 3 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 2 --partitions 4 --topic test
  4. 4 Created topic "test".
  5. 5 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 3 --partitions 4 --topic zhang
  6. 6 Created topic "zhang".
  7. 7 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 再次查看
  8. 8 zhang
  9. 9 test

 

2.1. 各主机信息查看

mini01

  1. 1 [yun@mini01 logs]$ pwd
  2. 2 /app/kafka/logs
  3. 3 [yun@mini01 logs]$ ll
  4. 4 total 160
  5. 5 ………………
  6. 6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-1
  7. 7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-2
  8. 8 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-3

 

mini02

  1. 1 [yun@mini02 logs]$ pwd
  2. 2 /app/kafka/logs
  3. 3 [yun@mini02 logs]$ ll
  4. 4 total 260
  5. 5 ………………
  6. 6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-0
  7. 7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-2

 

mini03

  1. 1 [yun@mini03 logs]$ pwd
  2. 2 /app/kafka/logs
  3. 3 [yun@mini03 logs]$ ll
  4. 4 total 132
  5. 5 ………………
  6. 6 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-0
  7. 7 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-1
  8. 8 drwxrwxr-x 2 yun yun 141 Sep 15 18:53 test-3

 

 

3. 修改topic

3.1. 增加分区数

       注意:分区数不能减少

  Kafka目前不支持减少主题的分区数量。

  1. 1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181
  2. 2 __consumer_offsets
  3. 3 test
  4. 4 test01
  5. 5 test02
  6. 6 test03
  7. 7 test04
  8. 8 zhang
  9. 9 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01
  10. 10 Topic:test01 PartitionCount:5 ReplicationFactor:1 Configs:
  11. 11 Topic: test01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  12. 12 Topic: test01 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  13. 13 Topic: test01 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
  14. 14 Topic: test01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
  15. 15 Topic: test01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1
  16. 16 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 2 --topic test01 # 失败,分区数不能减少
  17. 17 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
  18. 18 Error while executing topic command : The number of partitions for a topic can only be increased. Topic test01 currently has 5 partitions, 2 would not be an increase.
  19. 19 [2018-09-16 09:12:40,034] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic test01 currently has 5 partitions, 2 would not be an increase.
  20. 20 (kafka.admin.TopicCommand$)
  21. 21 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 7 --topic test01 # 增加分区数
  22. 22 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
  23. 23 Adding partitions succeeded!
  24. 24 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01
  25. 25 Topic:test01 PartitionCount:7 ReplicationFactor:1 Configs:
  26. 26 Topic: test01 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  27. 27 Topic: test01 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  28. 28 Topic: test01 Partition: 2 Leader: 2 Replicas: 2 Isr: 2
  29. 29 Topic: test01 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
  30. 30 Topic: test01 Partition: 4 Leader: 1 Replicas: 1 Isr: 1
  31. 31 Topic: test01 Partition: 5 Leader: 2 Replicas: 2 Isr: 2
  32. 32 Topic: test01 Partition: 6 Leader: 0 Replicas: 0 Isr: 0

 

 

4. 删除topic

  1. 1 # server.properties中设置delete.topic.enable=true 【当前版本默认就是true】否则只是标记删除或者直接重启
  2. 2 [yun@mini01 ~]$ kafka-topics.sh --delete --zookeeper mini01:2181 --topic test
  3. 3 Topic test is marked for deletion.
  4. 4 Note: This will have no impact if delete.topic.enable is not set to true.
  5. 5 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 再次查看 只有 zhang,则表示真的删除了
  6. 6 zhang

 

 

5. 查看所有topic

  1. 1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181
  2. 2 __consumer_offsets
  3. 3 test
  4. 4 zhang

 

 

6. 查看某个Topic的详情

  1. 1 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic zhang
  2. 2 Topic:zhang PartitionCount:4 ReplicationFactor:3 Configs:
  3. 3 Topic: zhang Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
  4. 4 Topic: zhang Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
  5. 5 Topic: zhang Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
  6. 6 Topic: zhang Partition: 3 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

 

 

7. 通过shell命令生产消息

7.1. 输入单条数据

  1. 1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic zhang
  2. 2 >111
  3. 3 >222
  4. 4 >333
  5. 5 >444
  6. 6 >555
  7. 7 >666
  8. 8 >777
  9. 9 >888
  10. 10 >999

 

7.2. 批量导入数据

  1. 1 [yun@mini01 zhangliang]$ kafka-console-producer.sh --broker-list mini01:9092 --topic liang < 001.info

 

 

8. 通过shell命令消费消息

  1. 1 # --from-beginning 从最开始读取
  2. 2 # kafka-console-consumer.sh --zookeeper mini01:2181 --from-beginning --topic zhang # 老版本
  3. 3 [yun@mini01 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --from-beginning --topic zhang
  4. 4 111
  5. 5 555
  6. 6 999
  7. 7 333
  8. 8 777
  9. 9 444
  10. 10 888
  11. 11 222
  12. 12 666

 

 

9. 消费组消费

9.1. 创建topic

  1. 1 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order
  2. 2 Created topic "order".
  3. 3 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 查看所有topic列表
  4. 4 __consumer_offsets
  5. 5 order
  6. 6 test
  7. 7 zhang
  8. 8 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order # 查看topic详情
  9. 9 Topic:order PartitionCount:4 ReplicationFactor:1 Configs:
  10. 10 Topic: order Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  11. 11 Topic: order Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  12. 12 Topic: order Partition: 2 Leader: 2 Replicas: 2 Isr: 2
  13. 13 Topic: order Partition: 3 Leader: 0 Replicas: 0 Isr: 0

 

9.2. 生产消息

  1. 1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic order
  2. 2 >111
  3. 3 >222
  4. 4 >333
  5. 5 >444
  6. 6 >555

 

9.3. 消费组消费消息

mini02机器上运行

  1. 1 # --group 指定组
  2. 2 [yun@mini02 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group

 

mini03机器上运行

  1. 1 # --group 指定组
  2. 2 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
  3. 3
  4. 4 # 新开一个窗口执行
  5. 5 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group

 

  表示order-group消费组有3个消费者,消费topic order的信息。

 

9.4. 消费组消费位置信息查看

  1. 1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group # 查看消费情况
  2. 2
  3. 3 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  4. 4 order 0 4 4 0 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1
  5. 5 order 1 5 5 0 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1
  6. 6 order 2 5 5 0 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13 consumer-1
  7. 7 order 3 4 4 0 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12 consumer-1
  8. 8 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group --members --verbose
  9. 9
  10. 10 CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
  11. 11 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13 consumer-1 1 order(2)
  12. 12 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1 2 order(0,1)
  13. 13 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12 consumer-1 1 order(3)

 

 

10. 管理消费组

10.1. 查看所有消费组

  1. 1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
  2. 2 console-consumer-26727
  3. 3 console-consumer-92984
  4. 4 console-consumer-60755
  5. 5 console-consumer-11661
  6. 6 console-consumer-31713
  7. 7 console-consumer-20244
  8. 8 console-consumer-65733

 

10.2. 查看消费组消费情况【消费位置】

  1. 1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-26727
  2. 2 Consumer group 'console-consumer-26727' has no active members.
  3. 3
  4. 4 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  5. 5 zhang 3 11 11 0 - - -
  6. 6 zhang 0 9 9 0 - - -
  7. 7 zhang 2 8 8 0 - - -
  8. 8 zhang 1 11 11 0 - - -
  9. 9 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733
  10. 10
  11. 11 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  12. 12 zhang 0 11 11 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
  13. 13 zhang 1 12 12 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
  14. 14 zhang 2 10 10 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
  15. 15 zhang 3 12 12 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1

 

--members

  1. 1 # --members 此选项提供使用者组中所有活动成员的列表。
  2. 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members
  3. 3
  4. 4 CONSUMER-ID HOST CLIENT-ID #PARTITIONS
  5. 5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 4

 

--verbose

  1. 1 # --verbose 这个选项还提供了分配给每个成员的分区。
  2. 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members --verbose
  3. 3
  4. 4 CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
  5. 5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 4 zhang(0,1,2,3)

 

--state

  1. 1 # --state 这个选项提供了有用的组级信息。
  2. 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --state
  3. 3
  4. 4 COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
  5. 5 mini01:9092 (0) range Stable 1

 

10.3. 删除一个或多个用户组

  1. 1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
  2. 2 console-consumer-3826
  3. 3 console-consumer-92984
  4. 4 console-consumer-60755
  5. 5 console-consumer-11661
  6. 6 console-consumer-31713
  7. 7 console-consumer-20244
  8. 8 console-consumer-65733
  9. 9 # 删除一个或多个组
  10. 10 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --delete --group console-consumer-11661 --group console-consumer-31713
  11. 11 Deletion of requested consumer groups ('console-consumer-31713', 'console-consumer-11661') was successful.
  12. 12 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
  13. 13 console-consumer-3826
  14. 14 console-consumer-92984
  15. 15 console-consumer-60755
  16. 16 console-consumer-20244
  17. 17 console-consumer-65733

 

 

11. 增加副本因子

  1. 1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order
  2. 2 Created topic "order".
  3. 3 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order
  4. 4 Topic:order PartitionCount:4 ReplicationFactor:1 Configs:
  5. 5 Topic: order Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  6. 6 Topic: order Partition: 1 Leader: 1 Replicas: 1 Isr: 1
  7. 7 Topic: order Partition: 2 Leader: 2 Replicas: 2 Isr: 2
  8. 8 Topic: order Partition: 3 Leader: 0 Replicas: 0 Isr: 0

 

       要求:topic order 的副本数由1变为2, 之前每个分区在哪台机器上在上面已给出。

       说明:part 0分布在群集0,1; part 1分布在集群1,2;part 2 分布在集群2,0;part 3分布在集群0,1。

 

11.1. 创建一个重新调整计划文件

  1. 1 [yun@mini01 kafka_20180916]$ cat increase-replication-factor.json
  2. 2 {
  3. 3 "version":1,
  4. 4 "partitions":[
  5. 5 {"topic": "order","partition": 0,"replicas": [0,1]},
  6. 6 {"topic": "order","partition": 1,"replicas": [1,2]},
  7. 7 {"topic": "order","partition": 2,"replicas": [2,0]},
  8. 8 {"topic": "order","partition": 3,"replicas": [0,1]}
  9. 9 ]
  10. 10 }

 

11.2. 语句执行

  1. 1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --execute
  2. 2 Current partition replica assignment
  3. 3
  4. 4 {"version":1,"partitions":[{"topic":"order","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"order","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"order","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"order","partition":0,"replicas":[0],"log_dirs":["any"]}]}
  5. 5
  6. 6 Save this to use as the --reassignment-json-file option during rollback
  7. 7 Successfully started reassignment of partitions.

 

11.3. 查看是否执行成功

  1. 1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --verify
  2. 2 Status of partition reassignment:
  3. 3 Reassignment of partition order-0 completed successfully
  4. 4 Reassignment of partition order-1 completed successfully
  5. 5 Reassignment of partition order-2 completed successfully
  6. 6 Reassignment of partition order-3 completed successfully

  

11.4. 再次查看该topic详情

  1. 1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order # 由下可见分配成功
  2. 2 Topic:order PartitionCount:4 ReplicationFactor:2 Configs:
  3. 3 Topic: order Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
  4. 4 Topic: order Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
  5. 5 Topic: order Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
  6. 6 Topic: order Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1

 

 

12. 创建partitions时在broker的分配机制

  1. 1 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 5 --topic test01
  2. 2 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 11 --topic test02

 

注意在各机器上partitions的分布

  1. 1 mini01
  2. 2 test01-0
  3. 3 test01-3
  4. 4 test02-2
  5. 5 test02-5
  6. 6 test02-8
  7. 7
  8. 8 mini02
  9. 9 test01-1
  10. 10 test01-4
  11. 11 test02-0
  12. 12 test02-3
  13. 13 test02-6
  14. 14 test02-9
  15. 15
  16. 16 mini03
  17. 17 test01-2
  18. 18 test02-1
  19. 19 test02-10
  20. 20 test02-4
  21. 21 test02-7

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号