经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Apache Kafka » 查看文章
Flume+Kafka+Storm整合
来源:cnblogs  作者:Hongten  时间:2018/12/19 8:59:26  对本文有异议

Flume+Kafka+Storm整合

1. 需求:

有一个客户端Client可以产生日志信息,我们需要通过Flume获取日志信息,再把该日志信息放入到Kafka的一个Topic:flume-to-kafka

再由Storm读取该topic:flume-to-kafka,进行日志分析处理(这里我们做的逻辑处理为filter,即过滤日志信息),处理完日志信息后,再由Storm把处理好的日志信息放入到Kafka的另一个topic:storm-to-kafka

2.组件分布情况

我总共搭建了3个节点node1,node2,node3

Zookeeper安装在node1,node2,nod3

Flume安装在node2

Kafka安装在node1,node2,node3

Storm安装在node1,node2,node3

 

3.JDK安装

  1. --node1 node2 node3上面安装jdk
  2. --install JDK -- http://blog.51cto.com/vvxyz/1642258(LInux安装jdk的三种方法)
  3. --解压安装
  4. rpm -ivh your-package.rpm
  5. --修改环境变量
  6. vi /etc/profile
  7. JAVA_HOME=/usr/java/jdk1.7.0_67
  8. JRE_HOME=/usr/java/jdk1.7.0_67/jre
  9. CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
  10. PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
  11. export JAVA_HOME JRE_HOME CLASS_PATH PATH
  12. wq
  13. --使配置有效
  14. source /etc/profile

 

4.Zookeeper的安装

  1. ---============================
  2. --解压zookeeper压缩包并安装
  3. tar -zxvf zookeeper-3.4.6.tar.gz
  4. --创建zookeeper的软链
  5. ln -sf /root/zookeeper-3.4.6 /home/zk
  6. --配置zookeeper
  7. cd /home/zk/conf/
  8.  
  9. --把下面的zoo_sample.cfg文件重新命名
  10. cp zoo_sample.cfg zoo.cfg
  11. --修改zoo.cfg配置文件
  12. vi zoo.cfg
  13. --设置zookeeper的文件存放目录
  14. --找到dataDir=/tmp/zookeeper,并设置为下面值
  15. dataDir=/opt/zookeeper
  16. --设置zookeeper集群
  17. server.1=node1:2888:3888
  18. server.2=node2:2888:3888
  19. server.3=node3:2888:3888
  20. :wq
  21. --创建/opt/zookeeper目录
  22. mkdir /opt/zookeeper
  23. --进入/opt/zookeeper目录
  24. cd /opt/zookeeper
  25. --创建一个文件myid
  26. vi myid
  27. --输入1
  28. 1
  29. wq
  30. --以此类推,在node2node3,值分别是2 3
  31.  
  32. --拷贝zookeeper目录到node2 node3的/opt/目录下面
  33. cd ..
  34. scp -r zookeeper/ root@node2:/opt/
  35. scp -r zookeeper/ root@node3:/opt/
  36.  
  37. --分别进入到node2 node3里面,修改/opt/zookeeper/myid,值分别是2 3
  38.  
  39.  
  40. --作为以上配置,把node1里面的zookeeper拷贝到node2 node3上面。
  41. scp -r zookeeper-3.4.6 root@node2:~/
  42. scp -r zookeeper-3.4.6 root@node3:~/
  43.  
  44. --分别进入到node2 node3里面,创建软链
  45. ln -sf /root/zookeeper-3.4.6/ /home/zk
  46. --配置zookeeper环境变量
  47. cd /home/zk/bin
  48. --修改/etc/profile文件,把zookeeperbin目录路径添加进去
  49. vi /etc/profile
  50. PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:/home/zk/bin
  51. --让配置文件生效
  52. source /etc/profile
  53. --分别进入到node2 node3里面,修改/etc/profile文件,把zookeeperbin目录路径添加进去
  54.  
  55.  
  56. --作为环境变量配置,就可以启动zookeeper了。
  57. --分别在node1 node2 node3上面启动zookeeper
  58. zkServer.sh start
  59. --测试是否启动成功
  60. jps
  61. --观察是否有QuorumPeerMain进程

 

5.Flume的安装

  1. ---------------------------------------------------
  2. --安装Flume
  3. --把安装包上传到node2上面
  4. cd
  5. tar -zxvf apache-flume-1.6.0-bin.tar.gz
  6. --创建软链
  7. ln -s /root/apache-flume-1.6.0-bin /home/flume
  8. --配置flume
  9. cd /root/apache-flume-1.6.0-bin/conf
  10. cp flume-env.sh.template flume-env.sh
  11. vi flume-env.sh
  12. --配置JDK
  13. export JAVA_HOME=/usr/java/jdk1.7.0_67
  14. :wq
  15. --加入系统变量
  16. vi /etc/profile
  17. export FLUME_HOME=/root/apache-flume-1.6.0-bin
  18. export PATH=$PATH:$FLUME_HOME/bin
  19. :wq
  20. source /etc/profile
  21. --验证是否安装成功
  22. flume-ng version
  23. flume-ng version
  24. Flume 1.6.0
  25. Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
  26. Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
  27. Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
  28. From source with checksum b29e416802ce9ece3269d34233baf43f

 

6.Kafka的安装

  1. ---------------------------------------------------
  2. --kafka安装
  3. --node1 node2, node3上面搭建kafka
  4. --先进入node1
  5. mkdir /root/kafka
  6. --解压
  7. tar -zxvf kafka_2.10-0.8.2.2.tgz
  8. --创建软链
  9. ln -s /root/kafka/kafka_2.10-0.8.2.2 /home/kafka-2.10
  10.  
  11. --配置
  12. cd /root/kafka/kafka_2.10-0.8.2.2/config
  13. vi server.properties
  14. --node1=0, node2=1,node2=2
  15. broker.id=0
  16. log.dirs=/opt/kafka-log
  17. zookeeper.connect=node1:2181,node2:2181,node3:2181
  18. :wq
  19. --为了启动方便
  20. cd /root/kafka/kafka_2.10-0.8.2.2
  21. vi start-kafka.sh
  22. nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
  23. :wq
  24. chmod 755 start-kafka.sh
  25. --配置好以后
  26. --分发到node2node3
  27. cd /root/kafka/
  28. scp -r kafka_2.10-0.8.2.2/ root@node2:/root/kafka
  29. scp -r kafka_2.10-0.8.2.2/ root@node3:/root/kafka
  30. --进入到node2
  31. cd /root/kafka/kafka_2.10-0.8.2.2/config
  32. vi server.properties
  33. --node1=0, node2=1,node2=2
  34. broker.id=1
  35. :wq
  36. --进入到node3
  37. cd /root/kafka/kafka_2.10-0.8.2.2/config
  38. vi server.properties
  39. --node1=0, node2=1,node2=2
  40. broker.id=2
  41. :wq
  42. --启动kafka
  43. ./zkServer.sh start
  44. --分别进入node1,node2,node3
  45. cd /root/kafka/kafka_2.10-0.8.2.2
  46. ./start-kafka.sh
  47. --检查是否启动
  48. jps
  49. 查看是否有Kafka进程

 

7.Storm的安装

  1. ------------
  2. --Storm分布式安装
  3. --部署到node1node2node3节点上
  4. --进入node1
  5. cd /root/apache-storm-0.10.0/conf
  6. vi storm.yaml
  7. --配置如下
  8. # storm.zookeeper.servers:
  9. - "node1"
  10. - "node2"
  11. - "node3"
  12. #
  13. nimbus.host: "node1"
  14. storm.local.dir: "/opt/storm"
  15. supervisor.slots.ports:
  16. - 6700
  17. - 6701
  18. - 6702
  19. - 6703
  20. :wq
  21. --node1分发到node2node3
  22. scp -r apache-storm-0.10.0 root@node2:/
  23. scp -r apache-storm-0.10.0 root@node3:/
  24.  
  25.  
  26. --分别进入node2node3创建软链
  27. ln -r /root/apache-storm-0.10.0 /home/storm-0.10
  28.  
  29.  
  30. --分别进入node1,node2node3快捷启动
  31. cd /root/apache-storm-0.10.0
  32. vi start-storm.sh
  33. nohup bin/storm nimbus >> logs/numbus.out 2>&1 &
  34. nohup bin/storm supervisor >> logs/supervisor.out 2>&1 &
  35.  
  36.  
  37. --node1上面配置,node2node3上面不需要UI
  38. nohup bin/storm ui >> logs/ui.out 2>&1 &
  39. nohup bin/storm drpc >> logs/drpc.out 2>&1 &
  40. :wq
  41. --分别进入node1,node2node3快捷stop-storm
  42. vi stop-storm.sh
  43. --node1上面配置,node2node3上面不需要UI
  44. kill -9 `jps | grep core | awk '{print $1}'`
  45. kill -9 `jps | grep supervisor | awk '{print $1}'`
  46. kill -9 `jps | grep nimbus | awk '{print $1}'`
  47. kill -9 `jps | grep worker | awk '{print $1}'`
  48. kill -9 `jps | grep LogWriter | awk '{print $1}'`
  49. :wq
  50. chmod 755 start-storm.sh
  51. chmod 755 stop-storm.sh
  52. --启动Zookeeper服务
  53. --node1node2node3上面启动
  54. zkServer.sh start
  55. --node1node2node3上面启动Storm
  56. cd /root/apache-storm-0.10.0
  57. ./start-storm.sh
  58. --上传storm_wc.jar 文件
  59. ./storm jar /root/storm_wc.jar storm.wordcount.Test wordcount
  60. ------------
  61. Storm DRPC 配置
  62. --进入node1
  63. cd /root/apache-storm-0.10.0/conf
  64. vi storm.yaml
  65. drpc.servers:
  66. - "node1"
  67. :wq
  68. --node1,分发到node2node3
  69. cd /root/apache-storm-0.10.0/conf/
  70. scp -r root@node2:/root/apache-storm-0.10.0/conf
  71. scp -r root@node3:/root/apache-storm-0.10.0/conf
  72. --配置完,进入node1node2node3
  73. cd /root/apache-storm-0.10.0
  74. ./start-storm.sh &

 

8.Flume+Kafka+Storm整合

8.1.配置Flume

  1. --node2
  2. cd flumedir
  3. vi flume_to_kafka
  4. --node2配置如下
  5. a1.sources = r1
  6. a1.sinks = k1
  7. a1.channels = c1
  8. # Describe/configure the source
  9. a1.sources.r1.type = avro
  10. a1.sources.r1.bind = node2
  11. a1.sources.r1.port = 41414
  12. # Describe the sink
  13. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  14. a1.sinks.k1.topic = flume-to-kafka
  15. a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
  16. a1.sinks.k1.requiredAcks = 1
  17. a1.sinks.k1.batchSize = 20
  18. # Use a channel which buffers events in memory
  19. a1.channels.c1.type = memory
  20. a1.channels.c1.capacity = 1000000
  21. a1.channels.c1.transactionCapacity = 10000
  22. # Bind the source and sink to the channel
  23. a1.sources.r1.channels = c1
  24. a1.sinks.k1.channel = c1
  25. :wq

 

8.2.启动Zookeeper

  1. --启动Zookeeper,在node1node2node3
  2. --关闭防火墙
  3. service iptables stop
  4. --启动Zookeeper
  5. zkServer.sh start

 

8.3.启动Kfaka

  1. --启动kafka
  2. --分别进入node1,node2,node3
  3. cd /root/kafka/kafka_2.10-0.8.2.2
  4. ./start-kafka.sh

 

8.4.启动Flume

  1. --进入node2,启动
  2. cd /root/flumedir
  3. flume-ng agent -n a1 -c conf -f flume_to_kafka -Dflume.root.logger=DEBUG,console

 

8.4.启动客户端Client

启动客户端产生日志信息。

大家可以参考RPC clients - Avro and Thrift的代码

  1. import org.apache.flume.Event;
  2. import org.apache.flume.EventDeliveryException;
  3. import org.apache.flume.api.RpcClient;
  4. import org.apache.flume.api.RpcClientFactory;
  5. import org.apache.flume.event.EventBuilder;
  6. import java.nio.charset.Charset;
  7. public class MyApp {
  8. public static void main(String[] args) {
  9. MyRpcClientFacade1 client = new MyRpcClientFacade1();
  10. // Initialize client with the remote Flume agent's host and port
  11. client.init("node2", 41414);
  12. // Send 10 events to the remote Flume agent. That agent should be
  13. // configured to listen with an AvroSource.
  14. String sampleData = "Hello ERROR ! ------ Test";
  15. for (int i = 500; i < 505; i++) {
  16. client.sendDataToFlume(sampleData + " " + i);
  17. System.out.println(sampleData + " " + i);
  18. }
  19. client.cleanUp();
  20. }
  21. }
  22. class MyRpcClientFacade1 {
  23. private RpcClient client;
  24. private String hostname;
  25. private int port;
  26. public void init(String hostname, int port) {
  27. // Setup the RPC connection
  28. this.hostname = hostname;
  29. this.port = port;
  30. this.client = RpcClientFactory.getDefaultInstance(hostname, port);
  31. // Use the following method to create a thrift client (instead of the
  32. // above line):
  33. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  34. }
  35. public void sendDataToFlume(String data) {
  36. // Create a Flume Event object that encapsulates the sample data
  37. Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
  38. // Send the event
  39. try {
  40. client.append(event);
  41. } catch (EventDeliveryException e) {
  42. // clean up and recreate the client
  43. client.close();
  44. client = null;
  45. client = RpcClientFactory.getDefaultInstance(hostname, port);
  46. // Use the following method to create a thrift client (instead of
  47. // the above line):
  48. // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  49. }
  50. }
  51. public void cleanUp() {
  52. // Close the RPC connection
  53. client.close();
  54. }
  55. }

 

在eclipse控制台输出的结果是:

  1. [ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:505) ] Invalid value for batchSize: 0; Using default value.
  2. [ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634) ] Using default maxIOWorkers
  3. Hello ERROR ! ------ Test 500
  4. Hello ERROR ! ------ Test 501
  5. Hello ERROR ! ------ Test 502
  6. Hello ERROR ! ------ Test 503
  7. Hello ERROR ! ------ Test 504

 

8.5.查看Kafka的Topic

  1. --进入node3,查看kafkatopic
  2. cd /home/kafka-2.10/bin
  3. ./kafka-topics.sh --zookeeper node1,node2,node3 --list

 

可以看到,由于客户端代码的执行,Kafka里面的topic:flume-to-kafka被自动创建

 

8.6.启动Kafka Consumer:flume-to-kafka

我们在这里是查看topic: flume-to-kafka的消费信息

  1. --进入node3,启动kafka消费者
  2. cd /home/kafka-2.10/bin
  3. ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic flume-to-kafka

 

控制台输出:

  1. Hello ERROR ! ------ Test 500
  2. Hello ERROR ! ------ Test 501
  3. Hello ERROR ! ------ Test 502
  4. Hello ERROR ! ------ Test 503
  5. Hello ERROR ! ------ Test 504

 

8.7.创建Topic:storm-to-kafka

在Kafka里面创建另一个topic:

  1. --进入node1,创建一个topicstorm-to-kafka
  2. --设置3partitions
  3. --replication-factor=3
  4. ./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic storm-to-kafka --partitions 3 --replication-factor 3

 

8.8.运行Storm代码

  1. package storm.logfilter;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Properties;
  5. import storm.kafka.KafkaSpout;
  6. import storm.kafka.SpoutConfig;
  7. import storm.kafka.StringScheme;
  8. import storm.kafka.ZkHosts;
  9. import storm.kafka.bolt.KafkaBolt;
  10. import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
  11. import storm.kafka.bolt.selector.DefaultTopicSelector;
  12. import backtype.storm.Config;
  13. import backtype.storm.LocalCluster;
  14. import backtype.storm.StormSubmitter;
  15. import backtype.storm.spout.SchemeAsMultiScheme;
  16. import backtype.storm.topology.BasicOutputCollector;
  17. import backtype.storm.topology.OutputFieldsDeclarer;
  18. import backtype.storm.topology.TopologyBuilder;
  19. import backtype.storm.topology.base.BaseBasicBolt;
  20. import backtype.storm.tuple.Fields;
  21. import backtype.storm.tuple.Tuple;
  22. import backtype.storm.tuple.Values;
  23. public class LogFilterTopology {
  24. public static class FilterBolt extends BaseBasicBolt {
  25. private static final long serialVersionUID = 1L;
  26. @Override
  27. public void execute(Tuple tuple, BasicOutputCollector collector) {
  28. String line = tuple.getString(0);
  29. // 包含ERROR的行留下
  30. if (line.contains("ERROR")) {
  31. System.err.println("Filter: " + line + " ~ filtered.");
  32. collector.emit(new Values(line + " ~ filtered."));
  33. }
  34. }
  35. @Override
  36. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  37. // 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
  38. declarer.declare(new Fields("message"));
  39. }
  40. }
  41. public static void main(String[] args) throws Exception {
  42. TopologyBuilder builder = new TopologyBuilder();
  43. // https://github.com/apache/storm/tree/master/external/storm-kafka
  44. // config kafka spout,话题
  45. String topic = "flume-to-kafka";
  46. ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
  47. // /MyKafka,偏移量offset的根目录,记录队列取到了哪里
  48. SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");
  49. List<String> zkServers = new ArrayList<String>();
  50. for (String host : zkHosts.brokerZkStr.split(",")) {
  51. zkServers.add(host.split(":")[0]);
  52. }
  53. spoutConfig.zkServers = zkServers;
  54. spoutConfig.zkPort = 2181;
  55. // 是否从头开始消费
  56. spoutConfig.forceFromStart = true;
  57. spoutConfig.socketTimeoutMs = 60 * 1000;
  58. // StringScheme将字节流转解码成某种编码的字符串
  59. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  60. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  61. // set kafka spout
  62. builder.setSpout("kafkaSpout", kafkaSpout, 3);
  63. // set bolt
  64. builder.setBolt("filterBolt", new FilterBolt(), 8).shuffleGrouping("kafkaSpout");
  65. // 数据写出
  66. // set kafka bolt
  67. // withTopicSelector使用缺省的选择器指定写入的topic: storm-to-kafka
  68. // withTupleToKafkaMapper tuple==>kafka的key和message
  69. KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("storm-to-kafka"))
  70. .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
  71. builder.setBolt("kafkaBolt", kafka_bolt, 2).shuffleGrouping("filterBolt");
  72. Config conf = new Config();
  73. // set producer properties.
  74. Properties props = new Properties();
  75. props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
  76. /**
  77. * Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
  78. * 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
  79. * 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
  80. */
  81. props.put("request.required.acks", "1");
  82. props.put("serializer.class", "kafka.serializer.StringEncoder");
  83. conf.put("kafka.broker.properties", props);
  84. conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
  85. if (args == null || args.length == 0) {
  86. // 本地方式运行
  87. LocalCluster localCluster = new LocalCluster();
  88. localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
  89. } else {
  90. // 集群方式运行
  91. conf.setNumWorkers(3);
  92. StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  93. }
  94. }
  95. }

 

8.9.启动Kafka Consumer:storm-to-kafka

我们在这里是查看topic: storm-to-kafka的消费信息

  1. --进入node1,启动kafka消费者
  2. cd /home/kafka-2.10/bin
  3. ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic storm-to-kafka

 

控制台输出:

  1. Hello ERROR ! ------ Test 504 ~ filtered.
  2. Hello ERROR ! ------ Test 500 ~ filtered.
  3. Hello ERROR ! ------ Test 501 ~ filtered.
  4. Hello ERROR ! ------ Test 503 ~ filtered.
  5. Hello ERROR ! ------ Test 502 ~ filtered.

 

========================================================

More reading,and english is important.

I'm Hongten

 

  1. 大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
    Hongten博客排名在100名以内。粉丝过千。
    Hongten出品,必是精品。

E | hongtenzone@foxmail.com  B | http://www.cnblogs.com/hongten

========================================================

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号