经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
Kafka 入门实战(4)--开启 Kerberos 认证
来源:cnblogs  作者:且行且码  时间:2022/1/2 16:17:00  对本文有异议

本主要介绍在 Kafka 中如何配置 Kerberos 认证,文中所使用到的软件版本:Java 1.8.0_191、Kafka 2.13-2.4.1、Kerberos 1.15.1。

1、Kerberos 安装

要使用 Kerberos 服务,需先安装 Kerberos,安装方法可参考:Kerberos 入门实战(2)--Kerberos 安装及使用

2、Zookeeper 开启 Kerberos 认证

Zookeeper 开启 Kerberos 认证的方法参见:Zookeeper 入门实战(4)--开启 Kerberos 认证

3、Kafka 开启 Kerberos 认证

假设 Kafka 集群信息如下:

ip 主机名 描述
10.49.196.10 pxc1 zookeeper、kafka
10.49.196.11 pxc2 zookeeper、kafka
10.49.196.12 pxc3 zookeeper、kafka

3.1、创建 keytab

在安装 Kerberos 的机器上进入 kadmin(Kerberos 服务端上使用 kadmin.local,安装了 Kerberos Client 的机器上可以使用 kadmin),然后执行如下命令分别创建服务端和客户端的 keytab:

  1. kadmin.local: add_principal -randkey kafka-server/pxc1@ABC.COM
  2. kadmin.local: add_principal -randkey kafka-server/pxc2@ABC.COM
  3. kadmin.local: add_principal -randkey kafka-server/pxc3@ABC.COM
  4. kadmin.local: add_principal -randkey kafka-client@ABC.COM
  5. kadmin.local: xst -k /root/zk-server.keytab kafka-server/pxc1@ABC.COM
  6. kadmin.local: xst -k /root/zk-server.keytab kafka-server/pxc2@ABC.COM
  7. kadmin.local: xst -k /root/zk-server.keytab kafka-server/pxc3@ABC.COM
  8. kadmin.local: xst -k /root/zk-client.keytab kafka-client@ABC.COM

3.2、配置

3.2.1、配置 hosts 文件

  1. 10.49.196.10 pxc1
  2. 10.49.196.11 pxc2
  3. 10.49.196.12 pxc3

3.2.2、Kerberos 相关配置

拷贝 krb5.conf 及 keytab 文件到所有安装 Kafka 的机器,这里把文件都放到 Kafka 的 config/kerveros 目录下(kerberos 目录需新建)。

2.2.3、Kafka 服务端配置

A、修改 config/server.properties,增加如下配置:

  1. security.inter.broker.protocol=SASL_PLAINTEXT
  2. sasl.mechanism.inter.broker.protocol=GSSAPI
  3. sasl.enabled.mechanisms=GSSAPI
  4. sasl.kerberos.service.name=kafka-server

B、新建 kafka-server-jaas.conf 文件,该文件也放到 Kafka 的 config/kerveros 目录下。

  1. KafkaServer {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server.keytab"
  5. storeKey=true
  6. useTicketCache=false
  7. principal="kafka-server/pxc1@ABC.COM"; #不同的主机,需修改为本机的主机名
  8. };
  9. //Zookeeper client authentication
  10. Client {
  11. com.sun.security.auth.module.Krb5LoginModule required
  12. useKeyTab=true
  13. keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server.keytab"
  14. storeKey=true
  15. useTicketCache=false
  16. principal="kafka-server/pxc1@ABC.COM"; #不同的主机,需修改为本机的主机名
  17. };

C、修改 bin/kafka-server-start.sh 脚本,倒数第二行增加如下配置:

  1. export KAFKA_OPTS="-Dzookeeper.sasl.client=true -Dzookeeper.sasl.client.username=zk-server -Djava.security.krb5.conf=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-server-jaas.conf"

如果 Zookeeper 没有开启 Kerberos 认证,则这里 zookeeper.sasl.client 可设为 false;仅仅启用 Kafka 的 Kerberos 认证。

3.2.4、Kafka 客户端配置

该配置主要为了使用 bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 等命令。

A、新建 client.properties 文件,该文件也放到 Kafka 的 config/kerveros 目录下。

  1. security.protocol=SASL_PLAINTEXt
  2. sasl.mechanism=GSSAPI
  3. sasl.kerberos.service.name=kafka-server

B、新建 kafka-client-jaas.conf 文件,该文件也放到 Kafka 的 config/kerveros 目录下。

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-client.keytab"
  5. storeKey=true
  6. useTicketCache=false
  7. principal="kafka-client@ABC.COM";
  8. };

C、修改 bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 脚本,倒数第二行增加如下配置:

  1. export KAFKA_OPTS="-Djava.security.krb5.conf=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/krb5.conf -Djava.security.auth.login.config=/home/hadoop/app/kafka_2.13-2.4.1/config/kerberos/kafka-client-jaas.conf"

3.3、启动并测试

配置完成后就可以启动 Kafka 集群了:

  1. cd /home/hadoop/app/kafka_2.13-2.4.1/bin
    ./kafka-server-start.sh -daemon ../config/server.properties

启动完成后可以使用 bin/kafka-topics.sh、bin/kafka-console-consumer.sh、kafka-console-producer.sh 来测试:

查看 topic 信息:

  1. cd /home/hadoop/app/kafka_2.13-2.4.1/bin
  2. ./kafka-topics.sh --list --bootstrap-server 10.49.196.10:9092 --command-config ../config/kerberos/client.properties

发送消息:

  1. cd /home/hadoop/app/kafka_2.13-2.4.1/bin
  2. ./kafka-console-producer.sh --broker-list 10.49.196.10:9092 --topic test --producer.config ../config/kerberos/client.properties

接受消息:

  1. cd /home/hadoop/app/kafka_2.13-2.4.1/bin
  2. ./kafka-console-consumer.sh --bootstrap-server 10.49.196.10:9092 --topic test --from-beginning --consumer.config ../config/kerberos/client.properties

3.4、java 程序连接 Zookeeper

java 可以使用 JAAS 来进行 Kerberos 认证,需要 JAAS 配置文件、keytab 文件及 Kerberos 配置文件。

A、配置文件

JAAS 配置文件(kafka-client-jaas.conf):

  1. KafkaClient {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client.keytab"
  5. storeKey=true
  6. useTicketCache=false
  7. principal="kafka-client@ABC.COM";
  8. };

keytab 文件:

从 Kerberos 服务器上拷贝到目标机器,拷贝路径即为 JAAS 配置中间配置的路径:D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client.keytab。

Kerberos 配置文件(krb5.conf):

从 Kerberos 服务器上拷贝 /etc/krb5.conf 到目标机器即可。

B、配置 hosts 文件

在 hosts 文件中添加:

  1. 10.49.196.10 pxc1
  2. 10.49.196.11 pxc2
  3. 10.49.196.12 pxc3

C、引入依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.4.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-streams</artifactId>
  9. <version>2.4.1</version>
  10. </dependency>

D、样例程序

  1. package com.inspur.demo.general.kafka;
  2. import org.apache.kafka.clients.CommonClientConfigs;
  3. import org.apache.kafka.clients.admin.AdminClient;
  4. import org.apache.kafka.clients.admin.ListTopicsOptions;
  5. import org.apache.kafka.clients.admin.ListTopicsResult;
  6. import org.apache.kafka.clients.admin.TopicListing;
  7. import org.junit.After;
  8. import org.junit.Before;
  9. import org.junit.Test;
  10. import java.util.Collection;
  11. import java.util.Properties;
  12. public class KafkaKerberos {
  13. private AdminClient adminClient;
  14. @Before
  15. public void before() {
  16. System.setProperty("java.security.auth.login.config", "D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\kafka-client-jaas.conf");
  17. System.setProperty("java.security.krb5.conf", "D:\\workspaceidea\\demo\\demo\\src\\main\\resources\\kerberos\\krb5.conf");
  18. Properties props = new Properties();
  19. props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "10.49.196.10:9092,10.49.196.11:9092,10.49.196.12:9092");
  20. props.put("sasl.mechanism", "GSSAPI");
  21. props.put("security.protocol", "SASL_PLAINTEXT");
  22. props.put("sasl.kerberos.service.name", "kafka-server");
  23. adminClient = AdminClient.create(props);
  24. }
  25. @After
  26. public void after() {
  27. adminClient.close();
  28. }
  29. @Test
  30. public void listTopics() throws Exception {
  31. ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
  32. //是否罗列内部主题
  33. listTopicsOptions.listInternal(true);
  34. ListTopicsResult result = adminClient.listTopics(listTopicsOptions);
  35. Collection<TopicListing> list = result.listings().get();
  36. System.out.println(list);
  37. }
  38. }

 

原文链接:http://www.cnblogs.com/wuyongyin/p/15668848.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号