Flume+Kafka+Storm整合
1. 需求:
有一个客户端Client可以产生日志信息,我们需要通过Flume获取日志信息,再把该日志信息放入到Kafka的一个Topic:flume-to-kafka
再由Storm读取该topic:flume-to-kafka,进行日志分析处理(这里我们做的逻辑处理为filter,即过滤日志信息),处理完日志信息后,再由Storm把处理好的日志信息放入到Kafka的另一个topic:storm-to-kafka

2.组件分布情况
我总共搭建了3个节点node1,node2,node3
Zookeeper安装在node1,node2,nod3
Flume安装在node2
Kafka安装在node1,node2,node3
Storm安装在node1,node2,node3

3.JDK安装
- --在node1, node2, node3上面安装jdk
- --install JDK -- http://blog.51cto.com/vvxyz/1642258(LInux安装jdk的三种方法)
- --解压安装
- rpm -ivh your-package.rpm
- --修改环境变量
- vi /etc/profile
- JAVA_HOME=/usr/java/jdk1.7.0_67
- JRE_HOME=/usr/java/jdk1.7.0_67/jre
- CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
- PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
- export JAVA_HOME JRE_HOME CLASS_PATH PATH
- :wq
- --使配置有效
- source /etc/profile
4.Zookeeper的安装
- ---============================
- --解压zookeeper压缩包并安装
- tar -zxvf zookeeper-3.4.6.tar.gz
- --创建zookeeper的软链
- ln -sf /root/zookeeper-3.4.6 /home/zk
- --配置zookeeper
- cd /home/zk/conf/
-
- --把下面的zoo_sample.cfg文件重新命名
- cp zoo_sample.cfg zoo.cfg
- --修改zoo.cfg配置文件
- vi zoo.cfg
- --设置zookeeper的文件存放目录
- --找到dataDir=/tmp/zookeeper,并设置为下面值
- dataDir=/opt/zookeeper
- --设置zookeeper集群
- server.1=node1:2888:3888
- server.2=node2:2888:3888
- server.3=node3:2888:3888
- :wq
- --创建/opt/zookeeper目录
- mkdir /opt/zookeeper
- --进入/opt/zookeeper目录
- cd /opt/zookeeper
- --创建一个文件myid
- vi myid
- --输入1
- 1
- :wq
- --以此类推,在node2,node3,值分别是2, 3
-
- --拷贝zookeeper目录到node2, node3的/opt/目录下面
- cd ..
- scp -r zookeeper/ root@node2:/opt/
- scp -r zookeeper/ root@node3:/opt/
-
- --分别进入到node2, node3里面,修改/opt/zookeeper/myid,值分别是2, 3
-
-
- --作为以上配置,把node1里面的zookeeper拷贝到node2, node3上面。
- scp -r zookeeper-3.4.6 root@node2:~/
- scp -r zookeeper-3.4.6 root@node3:~/
-
- --分别进入到node2, node3里面,创建软链
- ln -sf /root/zookeeper-3.4.6/ /home/zk
- --配置zookeeper环境变量
- cd /home/zk/bin
- --修改/etc/profile文件,把zookeeper的bin目录路径添加进去
- vi /etc/profile
- PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:/home/zk/bin
- --让配置文件生效
- source /etc/profile
- --分别进入到node2, node3里面,修改/etc/profile文件,把zookeeper的bin目录路径添加进去
-
-
- --作为环境变量配置,就可以启动zookeeper了。
- --分别在node1, node2, node3上面启动zookeeper
- zkServer.sh start
- --测试是否启动成功
- jps
- --观察是否有QuorumPeerMain进程
5.Flume的安装
- ---------------------------------------------------
- --安装Flume
- --把安装包上传到node2上面
- cd
- tar -zxvf apache-flume-1.6.0-bin.tar.gz
- --创建软链
- ln -s /root/apache-flume-1.6.0-bin /home/flume
- --配置flume
- cd /root/apache-flume-1.6.0-bin/conf
- cp flume-env.sh.template flume-env.sh
- vi flume-env.sh
- --配置JDK
- export JAVA_HOME=/usr/java/jdk1.7.0_67
- :wq
- --加入系统变量
- vi /etc/profile
- export FLUME_HOME=/root/apache-flume-1.6.0-bin
- export PATH=$PATH:$FLUME_HOME/bin
- :wq
- source /etc/profile
- --验证是否安装成功
- flume-ng version
- flume-ng version
- Flume 1.6.0
- Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
- Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
- Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
- From source with checksum b29e416802ce9ece3269d34233baf43f
6.Kafka的安装
- ---------------------------------------------------
- --kafka安装
- --在node1, node2, node3上面搭建kafka
- --先进入node1
- mkdir /root/kafka
- --解压
- tar -zxvf kafka_2.10-0.8.2.2.tgz
- --创建软链
- ln -s /root/kafka/kafka_2.10-0.8.2.2 /home/kafka-2.10
-
- --配置
- cd /root/kafka/kafka_2.10-0.8.2.2/config
- vi server.properties
- --node1=0, node2=1,node2=2
- broker.id=0
- log.dirs=/opt/kafka-log
- zookeeper.connect=node1:2181,node2:2181,node3:2181
- :wq
- --为了启动方便
- cd /root/kafka/kafka_2.10-0.8.2.2
- vi start-kafka.sh
- nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
- :wq
- chmod 755 start-kafka.sh
- --配置好以后
- --分发到node2,node3
- cd /root/kafka/
- scp -r kafka_2.10-0.8.2.2/ root@node2:/root/kafka
- scp -r kafka_2.10-0.8.2.2/ root@node3:/root/kafka
- --进入到node2
- cd /root/kafka/kafka_2.10-0.8.2.2/config
- vi server.properties
- --node1=0, node2=1,node2=2
- broker.id=1
- :wq
- --进入到node3
- cd /root/kafka/kafka_2.10-0.8.2.2/config
- vi server.properties
- --node1=0, node2=1,node2=2
- broker.id=2
- :wq
- --启动kafka
- ./zkServer.sh start
- --分别进入node1,node2,node3
- cd /root/kafka/kafka_2.10-0.8.2.2
- ./start-kafka.sh
- --检查是否启动
- jps
- 查看是否有Kafka进程
7.Storm的安装
8.Flume+Kafka+Storm整合
8.1.配置Flume
- --从node2
- cd flumedir
- vi flume_to_kafka
- --node2配置如下
- a1.sources = r1
- a1.sinks = k1
- a1.channels = c1
- # Describe/configure the source
- a1.sources.r1.type = avro
- a1.sources.r1.bind = node2
- a1.sources.r1.port = 41414
- # Describe the sink
- a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
- a1.sinks.k1.topic = flume-to-kafka
- a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
- a1.sinks.k1.requiredAcks = 1
- a1.sinks.k1.batchSize = 20
- # Use a channel which buffers events in memory
- a1.channels.c1.type = memory
- a1.channels.c1.capacity = 1000000
- a1.channels.c1.transactionCapacity = 10000
- # Bind the source and sink to the channel
- a1.sources.r1.channels = c1
- a1.sinks.k1.channel = c1
- :wq
8.2.启动Zookeeper
- --启动Zookeeper,在node1,node2,node3
- --关闭防火墙
- service iptables stop
- --启动Zookeeper
- zkServer.sh start
8.3.启动Kfaka
- --启动kafka
- --分别进入node1,node2,node3
- cd /root/kafka/kafka_2.10-0.8.2.2
- ./start-kafka.sh
8.4.启动Flume
- --进入node2,启动
- cd /root/flumedir
- flume-ng agent -n a1 -c conf -f flume_to_kafka -Dflume.root.logger=DEBUG,console
8.4.启动客户端Client
启动客户端产生日志信息。
大家可以参考RPC clients - Avro and Thrift的代码
- import org.apache.flume.Event;
- import org.apache.flume.EventDeliveryException;
- import org.apache.flume.api.RpcClient;
- import org.apache.flume.api.RpcClientFactory;
- import org.apache.flume.event.EventBuilder;
- import java.nio.charset.Charset;
- public class MyApp {
- public static void main(String[] args) {
- MyRpcClientFacade1 client = new MyRpcClientFacade1();
- // Initialize client with the remote Flume agent's host and port
- client.init("node2", 41414);
- // Send 10 events to the remote Flume agent. That agent should be
- // configured to listen with an AvroSource.
- String sampleData = "Hello ERROR ! ------ Test";
- for (int i = 500; i < 505; i++) {
- client.sendDataToFlume(sampleData + " " + i);
- System.out.println(sampleData + " " + i);
- }
- client.cleanUp();
- }
- }
- class MyRpcClientFacade1 {
- private RpcClient client;
- private String hostname;
- private int port;
- public void init(String hostname, int port) {
- // Setup the RPC connection
- this.hostname = hostname;
- this.port = port;
- this.client = RpcClientFactory.getDefaultInstance(hostname, port);
- // Use the following method to create a thrift client (instead of the
- // above line):
- // this.client = RpcClientFactory.getThriftInstance(hostname, port);
- }
- public void sendDataToFlume(String data) {
- // Create a Flume Event object that encapsulates the sample data
- Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
- // Send the event
- try {
- client.append(event);
- } catch (EventDeliveryException e) {
- // clean up and recreate the client
- client.close();
- client = null;
- client = RpcClientFactory.getDefaultInstance(hostname, port);
- // Use the following method to create a thrift client (instead of
- // the above line):
- // this.client = RpcClientFactory.getThriftInstance(hostname, port);
- }
- }
- public void cleanUp() {
- // Close the RPC connection
- client.close();
- }
- }
在eclipse控制台输出的结果是:
- [ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:505) ] Invalid value for batchSize: 0; Using default value.
- [ WARN ] - [ org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634) ] Using default maxIOWorkers
- Hello ERROR ! ------ Test 500
- Hello ERROR ! ------ Test 501
- Hello ERROR ! ------ Test 502
- Hello ERROR ! ------ Test 503
- Hello ERROR ! ------ Test 504
8.5.查看Kafka的Topic
- --进入node3,查看kafka的topic
- cd /home/kafka-2.10/bin
- ./kafka-topics.sh --zookeeper node1,node2,node3 --list
可以看到,由于客户端代码的执行,Kafka里面的topic:flume-to-kafka被自动创建
8.6.启动Kafka Consumer:flume-to-kafka
我们在这里是查看topic: flume-to-kafka的消费信息
- --进入node3,启动kafka消费者
- cd /home/kafka-2.10/bin
- ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic flume-to-kafka
控制台输出:
- Hello ERROR ! ------ Test 500
- Hello ERROR ! ------ Test 501
- Hello ERROR ! ------ Test 502
- Hello ERROR ! ------ Test 503
- Hello ERROR ! ------ Test 504
8.7.创建Topic:storm-to-kafka
在Kafka里面创建另一个topic:
- --进入node1,创建一个topic:storm-to-kafka
- --设置3个partitions
- --replication-factor=3
- ./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic storm-to-kafka --partitions 3 --replication-factor 3
8.8.运行Storm代码
- package storm.logfilter;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
- import storm.kafka.KafkaSpout;
- import storm.kafka.SpoutConfig;
- import storm.kafka.StringScheme;
- import storm.kafka.ZkHosts;
- import storm.kafka.bolt.KafkaBolt;
- import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
- import storm.kafka.bolt.selector.DefaultTopicSelector;
- import backtype.storm.Config;
- import backtype.storm.LocalCluster;
- import backtype.storm.StormSubmitter;
- import backtype.storm.spout.SchemeAsMultiScheme;
- import backtype.storm.topology.BasicOutputCollector;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.topology.base.BaseBasicBolt;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- public class LogFilterTopology {
- public static class FilterBolt extends BaseBasicBolt {
- private static final long serialVersionUID = 1L;
- @Override
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String line = tuple.getString(0);
- // 包含ERROR的行留下
- if (line.contains("ERROR")) {
- System.err.println("Filter: " + line + " ~ filtered.");
- collector.emit(new Values(line + " ~ filtered."));
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
- declarer.declare(new Fields("message"));
- }
- }
- public static void main(String[] args) throws Exception {
- TopologyBuilder builder = new TopologyBuilder();
- // https://github.com/apache/storm/tree/master/external/storm-kafka
- // config kafka spout,话题
- String topic = "flume-to-kafka";
- ZkHosts zkHosts = new ZkHosts("node1:2181,node2:2181,node3:2181");
- // /MyKafka,偏移量offset的根目录,记录队列取到了哪里
- SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");
- List<String> zkServers = new ArrayList<String>();
- for (String host : zkHosts.brokerZkStr.split(",")) {
- zkServers.add(host.split(":")[0]);
- }
- spoutConfig.zkServers = zkServers;
- spoutConfig.zkPort = 2181;
- // 是否从头开始消费
- spoutConfig.forceFromStart = true;
- spoutConfig.socketTimeoutMs = 60 * 1000;
- // StringScheme将字节流转解码成某种编码的字符串
- spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
- KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
- // set kafka spout
- builder.setSpout("kafkaSpout", kafkaSpout, 3);
- // set bolt
- builder.setBolt("filterBolt", new FilterBolt(), 8).shuffleGrouping("kafkaSpout");
- // 数据写出
- // set kafka bolt
- // withTopicSelector使用缺省的选择器指定写入的topic: storm-to-kafka
- // withTupleToKafkaMapper tuple==>kafka的key和message
- KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("storm-to-kafka"))
- .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
- builder.setBolt("kafkaBolt", kafka_bolt, 2).shuffleGrouping("filterBolt");
- Config conf = new Config();
- // set producer properties.
- Properties props = new Properties();
- props.put("metadata.broker.list", "node1:9092,node2:9092,node3:9092");
- /**
- * Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
- * 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
- * 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
- */
- props.put("request.required.acks", "1");
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- conf.put("kafka.broker.properties", props);
- conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
-
- if (args == null || args.length == 0) {
- // 本地方式运行
- LocalCluster localCluster = new LocalCluster();
- localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
- } else {
- // 集群方式运行
- conf.setNumWorkers(3);
- StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
- }
-
- }
- }
8.9.启动Kafka Consumer:storm-to-kafka
我们在这里是查看topic: storm-to-kafka的消费信息
- --进入node1,启动kafka消费者
- cd /home/kafka-2.10/bin
- ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic storm-to-kafka
控制台输出:
- Hello ERROR ! ------ Test 504 ~ filtered.
- Hello ERROR ! ------ Test 500 ~ filtered.
- Hello ERROR ! ------ Test 501 ~ filtered.
- Hello ERROR ! ------ Test 503 ~ filtered.
- Hello ERROR ! ------ Test 502 ~ filtered.
========================================================
More reading,and english is important.
I'm Hongten
- 大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
Hongten博客排名在100名以内。粉丝过千。
Hongten出品,必是精品。
E | hongtenzone@foxmail.com B | http://www.cnblogs.com/hongten
========================================================