经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Apache Kafka » 查看文章
Flume+Kafka+Storm+Hbase+HDSF+Poi整合
来源:cnblogs  作者:Hongten  时间:2018/12/24 10:26:48  对本文有异议

Flume+Kafka+Storm+Hbase+HDSF+Poi整合

需求:

针对一个网站,我们需要根据用户的行为记录日志信息,分析对我们有用的数据。

举例:这个网站www.hongten.com(当然这是一个我虚拟的电商网站),用户在这个网站里面可以有很多行为,比如注册,登录,查看,点击,双击,购买东西,加入购物车,添加记录,修改记录,删除记录,评论,登出等一系列我们熟悉的操作。这些操作都被记录在日志信息里面。我们要对日志信息进行分析。

本文中,我们对购买东西和加入购物车两个行为进行分析。然后生成相应的报表,这样我们可以通过报表查看用户在什么时候喜欢购买东西,什么时候喜欢加入购物车,从而,在相应的时间采取行动,激烈用户购买东西,推荐商品给用户加入购物车(加入购物车,这属于潜在购买用户)。

毕竟网站盈利才是我们希望达到的目的,对吧。

 

1.抽象用户行为

  1. // 用户的action
  2. public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };

 

2.日志格式定义

  1. 115.19.62.102 海南 2018-12-20 1545286960749 1735787074662918890 www.hongten.com Edit
  2. 27.177.45.84 新疆 2018-12-20 1545286962255 6667636903937987930 www.hongten.com Delete
  3. 176.54.120.96 宁夏 2018-12-20 1545286962256 6988408478348165495 www.hongten.com Comment
  4. 175.117.33.187 辽宁 2018-12-20 1545286962257 8411202446705338969 www.hongten.com Shopping_Car
  5. 17.67.62.213 天津 2018-12-20 1545286962258 7787584752786413943 www.hongten.com Add
  6. 137.81.41.9 海南 2018-12-20 1545286962259 6218367085234099455 www.hongten.com Shopping_Car
  7. 125.187.107.57 山东 2018-12-20 1545286962260 3358658811146151155 www.hongten.com Double_Click
  8. 104.167.205.87 内蒙 2018-12-20 1545286962261 2303468282544965471 www.hongten.com Shopping_Car
  9. 64.106.149.83 河南 2018-12-20 1545286962262 8422202443986582525 www.hongten.com Delete
  10. 138.22.156.183 浙江 2018-12-20 1545286962263 7649154147863130337 www.hongten.com Shopping_Car
  11. 41.216.103.31 河北 2018-12-20 1545286962264 6785302169446728008 www.hongten.com Shopping_Car
  12. 132.144.93.20 广东 2018-12-20 1545286962265 6444575166009004406 www.hongten.com Add

日志格式:

  1. //log fromat
  2. String log = ip + "\t" + address + "\t" + d + "\t" + timestamp + "\t" + userid + "\t" + Common.WEB_SITE + "\t" + action;

 

3.系统架构

 

 

4.报表样式

由于我采用的是随机生成数据,所有,我们看到的结果呈现线性增长

这里我只是实现了一个小时的报表,当然,也可以做一天,一个季度,全年,三年,五年的报表,可以根据实际需求实现即可。

 

5.组件分布情况

我总共搭建了4个节点node1,node2,node3,node4(注: 4个节点上面都要有JDK)

Zookeeper安装在node1,node2,nod3

Hadoop集群在node1,node2,nod3,node4

Hbase集群在node1,node2,nod3,node4

Flume安装在node2

Kafka安装在node1,node2,node3

Storm安装在node1,node2,node3

 

6.具体实现

6.1.配置Flume

  1. --node2
  2. cd flumedir
  3. vi flume2kafka
  4. --node2配置如下
  5. a1.sources = r1
  6. a1.sinks = k1
  7. a1.channels = c1
  8. # Describe/configure the source
  9. a1.sources.r1.type = avro
  10. a1.sources.r1.bind = node2
  11. a1.sources.r1.port = 41414
  12. # Describe the sink
  13. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  14. a1.sinks.k1.topic = all_my_log
  15. a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
  16. a1.sinks.k1.requiredAcks = 1
  17. a1.sinks.k1.batchSize = 20
  18. # Use a channel which buffers events in memory
  19. a1.channels.c1.type = memory
  20. a1.channels.c1.capacity = 1000000
  21. a1.channels.c1.transactionCapacity = 10000
  22. # Bind the source and sink to the channel
  23. a1.sources.r1.channels = c1
  24. a1.sinks.k1.channel = c1
  25. :wq

 

6.2.启动Zookeeper

  1. --关闭防火墙node1,node2,node3,node4
  2. service iptables stop
  3. --启动Zookeeper,在node1node2node3
  4. zkServer.sh start

 

6.3.启动Kafka

  1. --启动kafka
  2. --分别进入node1,node2,node3
  3. cd /root/kafka/kafka_2.10-0.8.2.2
  4. ./start-kafka.sh

 

6.4.启动Flume服务

  1. --进入node2,启动
  2. cd /root/flumedir
  3. flume-ng agent -n a1 -c conf -f flume2kafka -Dflume.root.logger=DEBUG,console

 

6.5.产生日志信息并写入到Flume

运行java 代码,产生日志信息并写入到Flume服务器

  1. package com.b510.big.data.flume.client;
  2. import java.nio.charset.Charset;
  3. import java.text.SimpleDateFormat;
  4. import java.util.Date;
  5. import java.util.Random;
  6. import java.util.concurrent.ExecutorService;
  7. import java.util.concurrent.Executors;
  8. import java.util.concurrent.TimeUnit;
  9. import org.apache.flume.Event;
  10. import org.apache.flume.EventDeliveryException;
  11. import org.apache.flume.api.RpcClient;
  12. import org.apache.flume.api.RpcClientFactory;
  13. import org.apache.flume.event.EventBuilder;
  14. /**
  15. * @author Hongten
  16. *
  17. * 功能: 模拟产生用户日志信息,并且向Flume发送数据
  18. */
  19. public class FlumeClient {
  20. public static void main(String[] args) {
  21. ExecutorService exec = Executors.newCachedThreadPool();
  22. exec.execute(new GenerateDataAndSend2Flume());
  23. exec.shutdown();
  24. }
  25. }
  26. class GenerateDataAndSend2Flume implements Runnable {
  27. FlumeRPCClient flumeRPCClient;
  28. static Random random = new Random();
  29. GenerateDataAndSend2Flume() {
  30. // 初始化RPC客户端
  31. flumeRPCClient = new FlumeRPCClient();
  32. flumeRPCClient.init(Common.FLUME_HOST_NAME, Common.FLUME_PORT);
  33. }
  34. @Override
  35. public void run() {
  36. while (true) {
  37. Date date = new Date();
  38. SimpleDateFormat simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMM);
  39. String d = simpleDateFormat.format(date);
  40. Long timestamp = new Date().getTime();
  41. // ip地址生成
  42. String ip = random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER);
  43. // ip地址对应的address(这里是为了构造数据,并没有按照真实的ip地址,找到对应的address)
  44. String address = Common.ADDRESS[random.nextInt(Common.ADDRESS.length)];
  45. Long userid = Math.abs(random.nextLong());
  46. String action = Common.USER_ACTION[random.nextInt(Common.USER_ACTION.length)];
  47. // 日志信息构造
  48. // example : 199.80.45.117 云南 2018-12-20 1545285957720 3086250439781555145 www.hongten.com Buy
  49. String data = ip + "\t" + address + "\t" + d + "\t" + timestamp + "\t" + userid + "\t" + Common.WEB_SITE + "\t" + action;
  50. //System.out.println(data);
  51. // 往Flume发送数据
  52. flumeRPCClient.sendData2Flume(data);
  53. try {
  54. TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
  55. } catch (InterruptedException e) {
  56. flumeRPCClient.cleanUp();
  57. System.out.println("interrupted exception : " + e);
  58. }
  59. }
  60. }
  61. }
  62. class FlumeRPCClient {
  63. private RpcClient client;
  64. private String hostname;
  65. private int port;
  66. public void init(String hostname, int port) {
  67. this.hostname = hostname;
  68. this.port = port;
  69. this.client = getRpcClient(hostname, port);
  70. }
  71. public void sendData2Flume(String data) {
  72. Event event = EventBuilder.withBody(data, Charset.forName(Common.CHAR_FORMAT));
  73. try {
  74. client.append(event);
  75. } catch (EventDeliveryException e) {
  76. cleanUp();
  77. client = null;
  78. client = getRpcClient(hostname, port);
  79. }
  80. }
  81. public RpcClient getRpcClient(String hostname, int port) {
  82. return RpcClientFactory.getDefaultInstance(hostname, port);
  83. }
  84. public void cleanUp() {
  85. // Close the RPC connection
  86. client.close();
  87. }
  88. }
  89. // 所有的常量定义
  90. class Common {
  91. public static final String CHAR_FORMAT = "UTF-8";
  92. public static final String DATE_FORMAT_YYYYDDMM = "yyyy-MM-dd";
  93. // this is a test web site
  94. public static final String WEB_SITE = "www.hongten.com";
  95. // 用户的action
  96. public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };
  97. public static final int MAX_IP_NUMBER = 224;
  98. // ip所对应的地址
  99. public static String[] ADDRESS = { "北京", "天津", "上海", "广东", "重庆", "河北", "山东", "河南", "云南", "山西", "甘肃", "安徽", "福建", "黑龙江", "海南", "四川", "贵州", "宁夏", "新疆", "湖北", "湖南", "山西", "辽宁", "吉林", "江苏", "浙江", "青海", "江西", "西藏", "内蒙", "广西", "香港", "澳门", "台湾", };
  100. // Flume conf
  101. public static final String FLUME_HOST_NAME = "node2";
  102. public static final int FLUME_PORT = 41414;
  103. }

 

6.6.监听Kafka

  1. --进入node3,启动kafka消费者
  2. cd /home/kafka-2.10/bin
  3. ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic all_my_log

 

运行效果:

  1. 168.208.193.207 安徽 2018-12-20 1545287646527 5462770148222682599 www.hongten.com Login
  2. 103.143.79.127 新疆 2018-12-20 1545287646529 3389475301916412717 www.hongten.com Login
  3. 111.208.80.39 山东 2018-12-20 1545287646531 535601622597096753 www.hongten.com Shopping_Car
  4. 105.30.86.46 四川 2018-12-20 1545287646532 7825340079790811845 www.hongten.com Login
  5. 205.55.33.74 新疆 2018-12-20 1545287646533 4228838365367235561 www.hongten.com Logout
  6. 34.44.60.134 安徽 2018-12-20 1545287646536 702584874247456732 www.hongten.com Double_Click
  7. 154.169.15.145 广东 2018-12-20 1545287646537 1683351753576425036 www.hongten.com View
  8. 126.28.192.28 湖南 2018-12-20 1545287646538 8319814684518483148 www.hongten.com Edit
  9. 5.140.156.73 台湾 2018-12-20 1545287646539 7432409906375230025 www.hongten.com Logout
  10. 72.175.210.95 西藏 2018-12-20 1545287646540 5233707593244910849 www.hongten.com View
  11. 121.25.190.25 广西 2018-12-20 1545287646541 268200251881841673 www.hongten.com Buy

 

6.7.在Kafka创建Topic

  1. --进入node1,创建一个topicfiltered_log
  2. --设置3partitions
  3. --replication-factor=3
  4. ./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic filtered_log --partitions 3 --replication-factor 3

 

6.8.Storm清洗数据

  • Storm从Kafka消费数据
  • Storm对数据进行筛选(Buy-已经购买,Shopping_Car-潜在购买)
  • Storm把筛选的数据放入到Kafka
  1. package com.b510.big.data.storm.process;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Properties;
  5. import storm.kafka.KafkaSpout;
  6. import storm.kafka.SpoutConfig;
  7. import storm.kafka.StringScheme;
  8. import storm.kafka.ZkHosts;
  9. import storm.kafka.bolt.KafkaBolt;
  10. import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
  11. import storm.kafka.bolt.selector.DefaultTopicSelector;
  12. import backtype.storm.Config;
  13. import backtype.storm.LocalCluster;
  14. import backtype.storm.StormSubmitter;
  15. import backtype.storm.generated.AlreadyAliveException;
  16. import backtype.storm.generated.InvalidTopologyException;
  17. import backtype.storm.spout.SchemeAsMultiScheme;
  18. import backtype.storm.topology.BasicOutputCollector;
  19. import backtype.storm.topology.OutputFieldsDeclarer;
  20. import backtype.storm.topology.TopologyBuilder;
  21. import backtype.storm.topology.base.BaseBasicBolt;
  22. import backtype.storm.tuple.Fields;
  23. import backtype.storm.tuple.Tuple;
  24. import backtype.storm.tuple.Values;
  25. public class LogFilterTopology {
  26. public static void main(String[] args) {
  27. ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
  28. //Spout从'filtered_log' topic里面获取数据
  29. SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.ALL_MY_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
  30. List<String> zkServers = new ArrayList<>();
  31. for (String host : zkHosts.brokerZkStr.split(",")) {
  32. zkServers.add(host.split(":")[0]);
  33. }
  34. spoutConfig.zkServers = zkServers;
  35. spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
  36. spoutConfig.forceFromStart = true;
  37. spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
  38. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  39. // 创建KafkaSpout
  40. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  41. TopologyBuilder builder = new TopologyBuilder();
  42. // Storm从Kafka消费数据
  43. builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
  44. // Storm对数据进行筛选(Buy-已经购买,Shopping_Car-潜在购买)
  45. builder.setBolt(Common.FILTER_BOLT, new FilterBolt(), 8).shuffleGrouping(Common.KAFKA_SPOUT);
  46. // 创建KafkaBolt
  47. @SuppressWarnings({ "unchecked", "rawtypes" })
  48. KafkaBolt kafkaBolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector(Common.FILTERED_LOG_TOPIC)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
  49. // Storm把筛选的数据放入到Kafka
  50. builder.setBolt(Common.KAFKA_BOLT, kafkaBolt, 2).shuffleGrouping(Common.FILTER_BOLT);
  51. Properties props = new Properties();
  52. props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
  53. props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
  54. props.put("serializer.class", Common.STORM_SERILIZER_CLASS);
  55. Config conf = new Config();
  56. conf.put("kafka.broker.properties", props);
  57. conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
  58. if (args == null || args.length == 0) {
  59. // 本地方式运行
  60. LocalCluster localCluster = new LocalCluster();
  61. localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
  62. } else {
  63. // 集群方式运行
  64. conf.setNumWorkers(3);
  65. try {
  66. StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  67. } catch (AlreadyAliveException | InvalidTopologyException e) {
  68. System.out.println("error : " + e);
  69. }
  70. }
  71. }
  72. }
  73. class FilterBolt extends BaseBasicBolt {
  74. private static final long serialVersionUID = 1L;
  75. @Override
  76. public void execute(Tuple input, BasicOutputCollector collector) {
  77. String logStr = input.getString(0);
  78. // 只针对我们感兴趣的关键字进行过滤
  79. // 这里我们过滤包含'Buy', 'Shopping_Car'的日志信息
  80. if (logStr.contains(Common.KEY_WORD_BUY) || logStr.contains(Common.KEY_WORD_SHOPPING_CAR)) {
  81. collector.emit(new Values(logStr));
  82. }
  83. }
  84. @Override
  85. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  86. declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
  87. }
  88. }
  89. class Common {
  90. public static final String ALL_MY_LOG_TOPIC = "all_my_log";
  91. public static final String FILTERED_LOG_TOPIC = "filtered_log";
  92. public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";
  93. public static final String DATE_FORMAT_HHMMSS = "HHmmss";
  94. public static final String DATE_FORMAT_HHMMSS_DEFAULT_VALUE = "000001";
  95. public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";
  96. public static final int ZOOKEEPER_PORT = 2181;
  97. public static final String ZOOKEEPER_QUORUM = "node1:" + ZOOKEEPER_PORT + ",node2:" + ZOOKEEPER_PORT + ",node3:" + ZOOKEEPER_PORT + "";
  98. public static final String ZOOKEEPER_ROOT = "/MyKafka";
  99. public static final String ZOOKEEPER_ID = "MyTrack";
  100. public static final String KAFKA_SPOUT = "kafkaSpout";
  101. public static final String FILTER_BOLT = "filterBolt";
  102. public static final String PROCESS_BOLT = "processBolt";
  103. public static final String HBASE_BOLT = "hbaseBolt";
  104. public static final String KAFKA_BOLT = "kafkaBolt";
  105. // Storm Conf
  106. public static final String STORM_METADATA_BROKER_LIST = "node1:9092,node2:9092,node3:9092";
  107. public static final String STORM_REQUEST_REQUIRED_ACKS = "1";
  108. public static final String STORM_SERILIZER_CLASS = "kafka.serializer.StringEncoder";
  109. // key word
  110. public static final String KEY_WORD_BUY = "Buy";
  111. public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";
  112. //hbase
  113. public static final String TABLE_USER_ACTION = "t_user_actions";
  114. public static final String COLUMN_FAMILY = "cf";
  115. //间隔多少秒写入Hbase一次
  116. public static final int WRITE_RECORD_TO_TABLE_PER_SECOND = 1;
  117. public static final int TABLE_MAX_VERSION = (60/WRITE_RECORD_TO_TABLE_PER_SECOND) * 60 * 24;
  118. }

 

6.9.监听Kafka

  1. --进入node3,启动kafka消费者
  2. cd /home/kafka-2.10/bin
  3. ./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic filtered_log

 

效果:

  1. 87.26.135.185 黑龙江 2018-12-20 1545290594658 7290881731606227972 www.hongten.com Shopping_Car
  2. 60.96.96.38 青海 2018-12-20 1545290594687 6935901257286057015 www.hongten.com Shopping_Car
  3. 43.159.110.193 江苏 2018-12-20 1545290594727 7096698224110515553 www.hongten.com Shopping_Car
  4. 21.103.139.11 山西 2018-12-20 1545290594693 7805867078876194442 www.hongten.com Shopping_Car
  5. 139.51.213.184 广东 2018-12-20 1545290594729 8048796865619113514 www.hongten.com Buy
  6. 58.213.148.89 河北 2018-12-20 1545290594708 5176551342435592748 www.hongten.com Buy
  7. 36.205.221.116 湖南 2018-12-20 1545290594715 4484717918039766421 www.hongten.com Shopping_Car
  8. 135.194.103.53 北京 2018-12-20 1545290594769 4833011508087432349 www.hongten.com Shopping_Car
  9. 180.21.100.66 贵州 2018-12-20 1545290594752 5270357330431599426 www.hongten.com Buy
  10. 167.71.65.70 山西 2018-12-20 1545290594790 275898530145861990 www.hongten.com Buy
  11. 125.51.21.199 宁夏 2018-12-20 1545290594814 3613499600574777198 www.hongten.com Buy

 

 

6.10.Storm再次消费Kafka数据处理后保存数据到Hbase

  • Storm再次从Kafka消费数据
  • Storm对数据进行统计(Buy-已经购买人数,Shopping_Car-潜在购买人数)
  • Storm将数据写入到Hbase
  1. package com.b510.big.data.storm.process;
  2. import java.io.IOException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Properties;
  9. import org.apache.hadoop.conf.Configuration;
  10. import org.apache.hadoop.hbase.HColumnDescriptor;
  11. import org.apache.hadoop.hbase.HTableDescriptor;
  12. import org.apache.hadoop.hbase.TableName;
  13. import org.apache.hadoop.hbase.client.HBaseAdmin;
  14. import org.apache.hadoop.hbase.client.HConnection;
  15. import org.apache.hadoop.hbase.client.HConnectionManager;
  16. import org.apache.hadoop.hbase.client.HTableInterface;
  17. import org.apache.hadoop.hbase.client.Put;
  18. import storm.kafka.KafkaSpout;
  19. import storm.kafka.SpoutConfig;
  20. import storm.kafka.StringScheme;
  21. import storm.kafka.ZkHosts;
  22. import backtype.storm.Config;
  23. import backtype.storm.LocalCluster;
  24. import backtype.storm.StormSubmitter;
  25. import backtype.storm.generated.AlreadyAliveException;
  26. import backtype.storm.generated.InvalidTopologyException;
  27. import backtype.storm.spout.SchemeAsMultiScheme;
  28. import backtype.storm.task.TopologyContext;
  29. import backtype.storm.topology.BasicOutputCollector;
  30. import backtype.storm.topology.IBasicBolt;
  31. import backtype.storm.topology.OutputFieldsDeclarer;
  32. import backtype.storm.topology.TopologyBuilder;
  33. import backtype.storm.topology.base.BaseBasicBolt;
  34. import backtype.storm.tuple.Fields;
  35. import backtype.storm.tuple.Tuple;
  36. import backtype.storm.tuple.Values;
  37. public class LogProcessTopology {
  38. public static void main(String[] args) {
  39. ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
  40. //Spout从'filtered_log' topic里面获取数据
  41. SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.FILTERED_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
  42. List<String> zkServers = new ArrayList<>();
  43. for (String host : zkHosts.brokerZkStr.split(",")) {
  44. zkServers.add(host.split(":")[0]);
  45. }
  46. spoutConfig.zkServers = zkServers;
  47. spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
  48. spoutConfig.forceFromStart = true;
  49. spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
  50. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  51. // 创建KafkaSpout
  52. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  53. TopologyBuilder builder = new TopologyBuilder();
  54. // Storm再次从Kafka消费数据
  55. builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
  56. // Storm对数据进行统计(Buy-已经购买人数,Shopping_Car-潜在购买人数)
  57. builder.setBolt(Common.PROCESS_BOLT, new ProcessBolt(), 3).shuffleGrouping(Common.KAFKA_SPOUT);
  58. // Storm将数据写入到Hbase
  59. builder.setBolt(Common.HBASE_BOLT, new HbaseBolt(), 3).shuffleGrouping(Common.PROCESS_BOLT);
  60. Properties props = new Properties();
  61. props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
  62. props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
  63. props.put("serializer.class", Common.STORM_SERILIZER_CLASS);
  64. Config conf = new Config();
  65. conf.put("kafka.broker.properties", props);
  66. conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
  67. if (args == null || args.length == 0) {
  68. // 本地方式运行
  69. LocalCluster localCluster = new LocalCluster();
  70. localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
  71. } else {
  72. // 集群方式运行
  73. conf.setNumWorkers(3);
  74. try {
  75. StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  76. } catch (AlreadyAliveException | InvalidTopologyException e) {
  77. System.out.println("error : " + e);
  78. }
  79. }
  80. }
  81. }
  82. class ProcessBolt extends BaseBasicBolt {
  83. private static final long serialVersionUID = 1L;
  84. @Override
  85. public void execute(Tuple input, BasicOutputCollector collector) {
  86. String logStr = input.getString(0);
  87. if (logStr != null) {
  88. String infos[] = logStr.split("\\t");
  89. //180.21.100.66 贵州 2018-12-20 1545290594752 5270357330431599426 www.hongten.com Buy
  90. collector.emit(new Values(infos[2], infos[6]));
  91. }
  92. }
  93. @Override
  94. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  95. declarer.declare(new Fields("date", "user_action"));
  96. }
  97. }
  98. class HbaseBolt implements IBasicBolt {
  99. private static final long serialVersionUID = 1L;
  100. HBaseDAO hBaseDAO = null;
  101. SimpleDateFormat simpleDateFormat = null;
  102. SimpleDateFormat simpleDateFormatHHMMSS = null;
  103. int userBuyCount = 0;
  104. int userShoopingCarCount = 0;
  105. //这里要考虑避免频繁写入数据到hbase
  106. int writeToHbaseMaxNum = Common.WRITE_RECORD_TO_TABLE_PER_SECOND * 1000;
  107. long begin = System.currentTimeMillis();
  108. long end = 0;
  109. @SuppressWarnings("rawtypes")
  110. @Override
  111. public void prepare(Map map, TopologyContext context) {
  112. hBaseDAO = new HBaseDAOImpl();
  113. simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMMHHMMSS);
  114. simpleDateFormatHHMMSS = new SimpleDateFormat(Common.DATE_FORMAT_HHMMSS);
  115. hBaseDAO.createTable(Common.TABLE_USER_ACTION, new String[]{Common.COLUMN_FAMILY}, Common.TABLE_MAX_VERSION);
  116. }
  117. @Override
  118. public void execute(Tuple input, BasicOutputCollector collector) {
  119. // 如果时间是第二天的凌晨1s
  120. // 需要对count做清零处理
  121. //不过这里的判断不是很准确,因为在此时,可能前一天的数据还没有处理完
  122. if (simpleDateFormatHHMMSS.format(new Date()).equals(Common.DATE_FORMAT_HHMMSS_DEFAULT_VALUE)) {
  123. userBuyCount = 0;
  124. userShoopingCarCount = 0;
  125. }
  126. if (input != null) {
  127. // base one ProcessBolt.declareOutputFields()
  128. String date = input.getString(0);
  129. String userAction = input.getString(1);
  130. if (userAction.equals(Common.KEY_WORD_BUY)) {
  131. //同一个user在一天之内可以重复'Buy'动作
  132. userBuyCount++;
  133. }
  134. if (userAction.equals(Common.KEY_WORD_SHOPPING_CAR)) {
  135. userShoopingCarCount++;
  136. }
  137. end = System.currentTimeMillis();
  138. if ((end - begin) > writeToHbaseMaxNum) {
  139. System.out.println("hbase_key: " + Common.KEY_WORD_BUY + "_" + date + " , userBuyCount: " + userBuyCount + ", userShoopingCarCount :" + userShoopingCarCount);
  140. //往hbase中写入数据
  141. String quailifer = simpleDateFormat.format(new Date());
  142. hBaseDAO.insert(Common.TABLE_USER_ACTION ,
  143. Common.KEY_WORD_BUY + "_" + date,
  144. Common.COLUMN_FAMILY,
  145. new String[] { quailifer },
  146. new String[] { "{user_buy_count:" + userBuyCount + "}" }
  147. );
  148. hBaseDAO.insert(Common.TABLE_USER_ACTION ,
  149. Common.KEY_WORD_SHOPPING_CAR + "_" + date,
  150. Common.COLUMN_FAMILY,
  151. new String[] { quailifer },
  152. new String[] { "{user_shopping_car_count:" + userShoopingCarCount + "}" }
  153. );
  154. begin = System.currentTimeMillis();
  155. }
  156. }
  157. }
  158. @Override
  159. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  160. }
  161. @Override
  162. public Map<String, Object> getComponentConfiguration() {
  163. return null;
  164. }
  165. @Override
  166. public void cleanup() {
  167. }
  168. }
  169. interface HBaseDAO {
  170. public void createTable(String tableName, String[] columnFamilys, int maxVersion);
  171. public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]);
  172. }
  173. class HBaseDAOImpl implements HBaseDAO {
  174. HConnection hConnection = null;
  175. static Configuration conf = null;
  176. public HBaseDAOImpl() {
  177. conf = new Configuration();
  178. conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);
  179. try {
  180. hConnection = HConnectionManager.createConnection(conf);
  181. } catch (IOException e) {
  182. e.printStackTrace();
  183. }
  184. }
  185. public void createTable(String tableName, String[] columnFamilys, int maxVersion) {
  186. try {
  187. HBaseAdmin admin = new HBaseAdmin(conf);
  188. if (admin.tableExists(tableName)) {
  189. System.err.println("table existing in hbase.");
  190. } else {
  191. HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
  192. for (String columnFamily : columnFamilys) {
  193. HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
  194. hColumnDescriptor.setMaxVersions(maxVersion);
  195. tableDesc.addFamily(hColumnDescriptor);
  196. }
  197. admin.createTable(tableDesc);
  198. System.err.println("table is created.");
  199. }
  200. admin.close();
  201. } catch (Exception e) {
  202. e.printStackTrace();
  203. }
  204. }
  205. @Override
  206. public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]) {
  207. HTableInterface table = null;
  208. try {
  209. table = hConnection.getTable(tableName);
  210. Put put = new Put(rowKey.getBytes());
  211. for (int i = 0; i < quailifer.length; i++) {
  212. String col = quailifer[i];
  213. String val = value[i];
  214. put.add(family.getBytes(), col.getBytes(), val.getBytes());
  215. }
  216. table.put(put);
  217. System.err.println("save record successfuly.");
  218. } catch (Exception e) {
  219. e.printStackTrace();
  220. } finally {
  221. try {
  222. table.close();
  223. } catch (IOException e) {
  224. e.printStackTrace();
  225. }
  226. }
  227. }
  228. }

 

Storm处理逻辑:

1.每秒向Hbase写入数据

2.明天凌晨会重置数据

如果,我们一直运行上面的程序,那么,系统就会一直往Hbase里面写入数据,那么这样,我们就可以采集到我们生成报表的数据了。

那么下面就是报表实现

 

6.11.读取Hbase数据通过POI生成Excel Report

  • 读取Hbase数据
  • 通过POI生成Excel报表
  1. package com.b510.big.data.poi;
  2. import java.io.File;
  3. import java.io.FileInputStream;
  4. import java.io.FileOutputStream;
  5. import java.io.IOException;
  6. import java.io.InputStream;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. import org.apache.hadoop.conf.Configuration;
  10. import org.apache.hadoop.hbase.Cell;
  11. import org.apache.hadoop.hbase.CellUtil;
  12. import org.apache.hadoop.hbase.client.Get;
  13. import org.apache.hadoop.hbase.client.HConnection;
  14. import org.apache.hadoop.hbase.client.HConnectionManager;
  15. import org.apache.hadoop.hbase.client.HTableInterface;
  16. import org.apache.hadoop.hbase.client.Result;
  17. import org.apache.poi.xssf.usermodel.XSSFCell;
  18. import org.apache.poi.xssf.usermodel.XSSFSheet;
  19. import org.apache.poi.xssf.usermodel.XSSFWorkbook;
  20. public class ReportUtil {
  21. public static void main(String[] args) throws Exception {
  22. String year = "2018";
  23. String month = "12";
  24. String day = "21";
  25. String hour = "14";
  26. generateReport(year, month, day, hour);
  27. }
  28. private static void generateReport(String year, String month, String day, String hour) {
  29. HBaseDAO hBaseDAO = new HBaseDAOImpl();
  30. // format: yyyyMMddHH
  31. String begin = year + month + day + hour;
  32. String[] split = generateQuailifers(begin);
  33. List<Integer> userBuyCountList = getData(hBaseDAO, year, month, day, split, Common.KEY_WORD_BUY);
  34. List<Integer> userShoppingCarCountList = getData(hBaseDAO, year, month, day, split, Common.KEY_WORD_SHOPPING_CAR);
  35. //System.err.println(userBuyCountList.size());
  36. //System.err.println(userShoppingCarCountList.size());
  37. writeExcel(year, month, day, hour, userBuyCountList, userShoppingCarCountList);
  38. }
  39. private static void writeExcel(String year, String month, String day, String hour, List<Integer> userBuyCountList, List<Integer> userShoppingCarCountList) {
  40. try {
  41. File file = new File(Common.REPORT_TEMPLATE);
  42. InputStream in = new FileInputStream(file);
  43. XSSFWorkbook wb = new XSSFWorkbook(in);
  44. XSSFSheet sheet = wb.getSheetAt(0);
  45. if (sheet != null) {
  46. XSSFCell cell = null;
  47. cell = sheet.getRow(0).getCell(0);
  48. cell.setCellValue("One Hour Report-" + year + "-" + month + "-" + day + " From " + hour + ":00 To " + hour + ":59");
  49. putData(userBuyCountList, sheet, 3);
  50. putData(userShoppingCarCountList, sheet, 7);
  51. FileOutputStream out = new FileOutputStream(Common.REPORT_ONE_HOUR);
  52. wb.write(out);
  53. out.close();
  54. System.err.println("done.");
  55. }
  56. } catch (Exception e) {
  57. System.err.println("Exception" + e);
  58. }
  59. }
  60. private static void putData(List<Integer> userBuyCountList, XSSFSheet sheet, int rowNum) {
  61. XSSFCell cell;
  62. if (userBuyCountList != null && userBuyCountList.size() > 0) {
  63. for (int i = 0; i < userBuyCountList.size(); i++) {
  64. cell = sheet.getRow(rowNum).getCell(i + 1);
  65. cell.setCellValue(userBuyCountList.get(i));
  66. }
  67. }
  68. }
  69. private static List<Integer> getData(HBaseDAO hBaseDAO, String year, String month, String day, String[] split, String preKey) {
  70. List<Integer> list = new ArrayList<Integer>();
  71. Result rs = hBaseDAO.getOneRowAndMultiColumn(Common.TABLE_USER_ACTION, preKey + "_" + year + "-" + month + "-" + day, split);
  72. for (Cell cell : rs.rawCells()) {
  73. String value = new String(CellUtil.cloneValue(cell)).split(":")[1].trim();
  74. value = value.substring(0, value.length() - 1);
  75. list.add(Integer.valueOf(value));
  76. }
  77. return list;
  78. }
  79. private static String[] generateQuailifers(String begin) {
  80. StringBuilder sb = new StringBuilder();
  81. for (int i = 0; i < 60;) {
  82. if (i == 0 || i == 5) {
  83. sb.append(begin).append("0").append(i).append("00").append(",");
  84. } else {
  85. sb.append(begin).append(i).append("00").append(",");
  86. }
  87. i = i + 5;
  88. }
  89. sb.append(begin).append("5959");
  90. String sbStr = sb.toString();
  91. String[] split = sbStr.split(",");
  92. return split;
  93. }
  94. }
  95. interface HBaseDAO {
  96. Result getOneRowAndMultiColumn(String tableName, String rowKey, String[] cols);
  97. }
  98. class HBaseDAOImpl implements HBaseDAO {
  99. HConnection hConnection = null;
  100. static Configuration conf = null;
  101. public HBaseDAOImpl() {
  102. conf = new Configuration();
  103. conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);
  104. try {
  105. hConnection = HConnectionManager.createConnection(conf);
  106. } catch (IOException e) {
  107. e.printStackTrace();
  108. }
  109. }
  110. @Override
  111. public Result getOneRowAndMultiColumn(String tableName, String rowKey, String[] cols) {
  112. HTableInterface table = null;
  113. Result rsResult = null;
  114. try {
  115. table = hConnection.getTable(tableName);
  116. Get get = new Get(rowKey.getBytes());
  117. for (int i = 0; i < cols.length; i++) {
  118. get.addColumn(Common.COLUMN_FAMILY.getBytes(), cols[i].getBytes());
  119. }
  120. rsResult = table.get(get);
  121. } catch (Exception e) {
  122. e.printStackTrace();
  123. } finally {
  124. try {
  125. table.close();
  126. } catch (IOException e) {
  127. e.printStackTrace();
  128. }
  129. }
  130. return rsResult;
  131. }
  132. }
  133. class Common {
  134. // report
  135. public static final String REPORT_TEMPLATE = "./resources/report.xlsx";
  136. public static final String REPORT_ONE_HOUR = "./resources/one_report.xlsx";
  137. public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";
  138. public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";
  139. // key word
  140. public static final String KEY_WORD_BUY = "Buy";
  141. public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";
  142. // hbase
  143. public static final String TABLE_USER_ACTION = "t_user_actions";
  144. public static final String COLUMN_FAMILY = "cf";
  145. }

 

7.源码下载

Source Code:Flume_Kafka_Storm_Hbase_Hdfs_Poi_src.zip

相应的Jar文件,由于so big,自己根据import *信息加入。

 

8.总结

学习Big Data一段时间了,通过自己的学习和摸索,实现自己想要的应用,还是很有成就感的哈....当然,踩地雷也是一种不错的体验...:)

 

========================================================

More reading,and english is important.

I'm Hongten

 

  1. 大哥哥大姐姐,觉得有用打赏点哦!你的支持是我最大的动力。谢谢。
    Hongten博客排名在100名以内。粉丝过千。
    Hongten出品,必是精品。

E | hongtenzone@foxmail.com  B | http://www.cnblogs.com/hongten

========================================================

 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号