经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Lua » 查看文章
nginx lua集成kafka-小鱼-
来源:cnblogs  作者:-小鱼-  时间:2019/8/5 9:36:49  对本文有异议

NGINX lua集成kafka

第一步:进入opresty目录

  1. [root@node03 openresty]# cd /export/servers/openresty/
  2. [root@node03 openresty]# ll
  3. total 356
  4. drwxr-xr-x 2 root root 4096 Jul 26 11:33 bin
  5. drwxrwxr-x 44 1000 1000 4096 Jul 26 11:31 build
  6. drwxrwxr-x 43 1000 1000 4096 Nov 13 2017 bundle
  7. -rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure
  8. -rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHT
  9. drwxr-xr-x 6 root root 4096 Jul 26 11:33 luajit
  10. drwxr-xr-x 6 root root 4096 Aug 1 08:14 lualib
  11. -rw-r--r-- 1 root root 5413 Jul 26 11:32 Makefile
  12. drwxr-xr-x 11 root root 4096 Jul 26 11:35 nginx
  13. drwxrwxr-x 2 1000 1000 4096 Nov 13 2017 patches
  14. drwxr-xr-x 44 root root 4096 Jul 26 11:33 pod
  15. -rw-rw-r-- 1 1000 1000 3689 Nov 13 2017 README.markdown
  16. -rw-rw-r-- 1 1000 1000 8690 Nov 13 2017 README-win32.txt
  17. -rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.index
  18. drwxr-xr-x 5 root root 4096 Jul 26 11:33 site
  19. drwxr-xr-x 2 root root 4096 Aug 1 10:54 testlua
  20. drwxrwxr-x 2 1000 1000 4096 Nov 13 2017 util
  21. [root@node03 openresty]#

说明:接下来我们关注两个目录lualibnginx

? 1.lualib: 是存放opresty所需要的集成软件包的

? 2.nginx:是nginx服务目录

接下来,我们进入lualib目录一看究竟:

  1. [root@node03 openresty]# cd lualib/
  2. [root@node03 lualib]# ll
  3. total 116
  4. -rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so
  5. drwxr-xr-x 3 root root 4096 Jul 26 11:33 ngx
  6. drwxr-xr-x 2 root root 4096 Jul 26 11:33 rds
  7. drwxr-xr-x 2 root root 4096 Jul 26 11:33 redis
  8. drwxr-xr-x 9 root root 4096 Aug 1 10:34 resty

这里我们看到了redis和ngx集成软件包,说明我们可以之间使用nginx和redis而无需导入任何依赖包!!!!

下面看看resty里面有些说明呢????

  1. [root@node03 lualib]# cd resty/
  2. [root@node03 resty]# ll
  3. total 152
  4. -rw-r--r-- 1 root root 6409 Jul 26 11:33 aes.lua
  5. drwxr-xr-x 2 root root 4096 Jul 26 11:33 core
  6. -rw-r--r-- 1 root root 596 Jul 26 11:33 core.lua
  7. drwxr-xr-x 2 root root 4096 Jul 26 11:33 dns
  8. drwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka #这是我们自己导入的
  9. drwxr-xr-x 2 root root 4096 Jul 26 11:33 limit
  10. -rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.lua
  11. drwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache
  12. -rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua
  13. -rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua
  14. -rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua
  15. -rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua
  16. -rw-r--r-- 1 root root 616 Jul 26 11:33 random.lua
  17. -rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua
  18. -rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua
  19. -rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua
  20. -rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua
  21. -rw-r--r-- 1 root root 1045 Jul 26 11:33 sha384.lua
  22. -rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua
  23. -rw-r--r-- 1 root root 236 Jul 26 11:33 sha.lua
  24. -rw-r--r-- 1 root root 698 Jul 26 11:33 string.lua
  25. -rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.lua
  26. drwxr-xr-x 2 root root 4096 Jul 26 11:33 upstream
  27. drwxr-xr-x 2 root root 406 Jul 26 11:33 websocket

这里我们看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

注意:这里的 kafka这个包是没有的,说明opnresty么有集成kafka。此处我已经提前导入啦kafka集成包

我们看看kafka里面多有哪些包:

  1. [root@node03 resty]# cd kafka
  2. [root@node03 kafka]# ll
  3. total 48
  4. -rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua
  5. -rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua
  6. -rw-r--r-- 1 root root 710 Aug 1 10:42 errors.lua
  7. -rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua
  8. -rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua
  9. -rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua
  10. -rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua
  11. -rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua

附上kafka集成包:

链接:https://pan.baidu.com/s/1pFLhz3E_txb3ZWIRWxfQYg
提取码:0umg

第二步:创建kafka测试lua文件

1.退回到openresty

  1. [root@node03 kafka]# cd /export/servers/openresty/

2.创建测试文件

  1. [root@node03 openresty]# mkdir -r testlua
  2. #这里文件名自己取,文件位置自己定,但必须找得到

这里文件名自己取,文件位置自己定,但必须找得到!!!!!!!!!!!下面会用到!!!!!!!!!!

3.进入刚刚创建的文件夹并创建kafkalua.lua脚本文件

创建文件:vim kafkalua.lua或者touch kafkalua.lua

  1. [root@node03 openresty]# cd testlua/
  2. [root@node03 testlua]# ll
  3. total 8
  4. -rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua

kafkalua.lua:

  1. --测试语句可以不用
  2. ngx.say('hello kafka file configuration successful!!!!!!')
  3. --数据采集阈值限制,如果lua采集超过阈值,则不采集
  4. local DEFAULT_THRESHOLD = 100000
  5. -- kafka分区数
  6. local PARTITION_NUM = 6
  7. -- kafka主题名称
  8. local TOPIC = 'B2CDATA_COLLECTION1'
  9. -- 轮询器共享变量KEY
  10. local POLLING_KEY = "POLLING_KEY"
  11. -- kafka集群(定义kafka broker地址,ip需要和kafkahost.name配置一致)
  12. local function partitioner(key, num, correlation_id)
  13. return tonumber(key)
  14. end
  15. --kafka broker列表
  16. local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
  17. --kafka参数,
  18. local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
  19. -- 共享内存计数器,用于kafka轮询使用
  20. local shared_data = ngx.shared.shared_data
  21. local pollingVal = shared_data:get(POLLING_KEY)
  22. if not pollingVal then
  23. pollingVal = 1
  24. shared_data:set(POLLING_KEY, pollingVal)
  25. end
  26. --获取每一条消息的计数器,对PARTITION_NUM取余数,均衡分区
  27. local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
  28. shared_data:incr(POLLING_KEY, 1)
  29. -- 并发控制
  30. local isGone = true
  31. --获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护
  32. if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
  33. isGone = false
  34. end
  35. -- 数据采集
  36. if isGone then
  37. local time_local = ngx.var.time_local
  38. if time_local == nil then
  39. time_local = ""
  40. end
  41. local request = ngx.var.request
  42. if request == nil then
  43. request = ""
  44. end
  45. local request_method = ngx.var.request_method
  46. if request_method == nil then
  47. request_method = ""
  48. end
  49. local content_type = ngx.var.content_type
  50. if content_type == nil then
  51. content_type = ""
  52. end
  53. ngx.req.read_body()
  54. local request_body = ngx.var.request_body
  55. if request_body == nil then
  56. request_body = ""
  57. end
  58. local http_referer = ngx.var.http_referer
  59. if http_referer == nil then
  60. http_referer = ""
  61. end
  62. local remote_addr = ngx.var.remote_addr
  63. if remote_addr == nil then
  64. remote_addr = ""
  65. end
  66. local http_user_agent = ngx.var.http_user_agent
  67. if http_user_agent == nil then
  68. http_user_agent = ""
  69. end
  70. local time_iso8601 = ngx.var.time_iso8601
  71. if time_iso8601 == nil then
  72. time_iso8601 = ""
  73. end
  74. local server_addr = ngx.var.server_addr
  75. if server_addr == nil then
  76. server_addr = ""
  77. end
  78. local http_cookie = ngx.var.http_cookie
  79. if http_cookie == nil then
  80. http_cookie = ""
  81. end
  82. --封装数据
  83. local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
  84. --引入kafkaproducer
  85. local producer = require "resty.kafka.producer"
  86. --创建producer
  87. local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
  88. --发送数据
  89. local ok, err = bp:send(TOPIC, partitions, message)
  90. --打印错误日志
  91. if not ok then
  92. ngx.log(ngx.ERR, "kafka send err:", err)
  93. return
  94. end
  95. end

第三步:修改nginx配置文件nginx.conf

1.进入ngin/conf目录

  1. [root@node03 openresty]# cd /export/servers/openresty/nginx/conf/
  2. [root@node03 conf]# ll
  3. total 76
  4. -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf
  5. -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default
  6. -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params
  7. -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default
  8. -rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf
  9. -rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win
  10. -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types
  11. -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default
  12. -rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf
  13. -rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default
  14. -rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params
  15. -rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default
  16. -rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params
  17. -rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default
  18. -rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf

2.修改nginx.conf

  1. [root@node03 conf]# vim nginx.conf
  2. #1.说明找到第一个server
  3. #2.在server上面添加两行代码如下
  4. #3.在server里面添加kafka相关的代码如下
  5. #------------------添加的代码---------------------------------------
  6. #开启共享字典,设置内存大小为10M,供每个nginx的线程消费
  7. lua_shared_dict shared_data 10m;
  8. #配置本地域名解析
  9. resolver 127.0.0.1;
  10. #------------------添加的代码---------------------------------------
  11. server {
  12. listen 80;
  13. server_name localhost;
  14. #charset koi8-r;
  15. #access_log logs/host.access.log main;
  16. location / {
  17. root html;
  18. index index.html index.htm;
  19. }
  20. #------------------添加的代码---------------------------------------
  21. location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空
  22. #开启nginx监控
  23. stub_status on;
  24. #加载lua文件
  25. default_type text/html;
  26. #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
  27. content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
  28. }
  29. #------------------添加的代码---------------------------------------
  30. }

说明:location /kafkalua{...}这里的kafkalua是工程名,可以随意取也可以不取,但是必须要记住!!!

看到我们上面配置了两个location,第一个为location /{...}第二个为location /kafkalua{...}那么他们有什么区别呢???先向下看,迷雾将会慢慢揭开。

第四步:启动nginx

1.进入nginx/sbin

  1. [root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/
  2. [root@node03 sbin]# ll
  3. total 16356
  4. -rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx

2.测试配置文件是否正确

  1. [root@node03 sbin]# nginx -t
  2. nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok
  3. nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful
  4. #看到已经成功啦

3.启动nginx

  1. [root@node03 sbin]# nginx
  2. #不显示任何东西一般是成功啦

4.查看nginx是否启动成功

  1. [root@node03 sbin]# ps -ef | grep nginx
  2. root 3730 1 0 09:24 ? 00:00:00 nginx: master process nginx
  3. nobody 3731 3730 0 09:24 ? 00:00:20 nginx: worker process is shutting down
  4. nobody 5766 3730 0 12:17 ? 00:00:00 nginx: worker process
  5. root 5824 3708 0 12:24 pts/1 00:00:00 grep nginx
  6. #看到有两个nginx进程,表示成功le

5.浏览器访问nginx

在浏览器输入:node03/kafkalua

说明:如何么有配置hosts则输入openresty所在设备的地址如:192.168.52.120/kafkalua

在浏览器输入:node03/ 或者 192.168.52.120/

再在浏览器输入:node03:80/kafkalua 和 node03:80/试试

搬来nginx.conf来看看:

node03:80/kafkalua这里的nide03是服务器的别名或者之间写文服务器地址,80是【listen 80;】配置的监听端口,80端口可以省略不写,如果这写成【listen 8088;】那么浏览器需输入node03:8088/kafkalua(这里不能省略8088),kafkalua是工程名。

  1. server {
  2. listen 80;
  3. server_name localhost;
  4. #charset koi8-r;
  5. #access_log logs/host.access.log main;
  6. location / {
  7. root html;
  8. index index.html index.htm;
  9. }
  10. #------------------添加的代码---------------------------------------
  11. location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空
  12. #开启nginx监控
  13. stub_status on;
  14. #加载lua文件
  15. default_type text/html;
  16. #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
  17. content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
  18. }

第五步:创建测试爬虫程序

1.创建maven工程导入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.jsoup</groupId>
  4. <artifactId>jsoup</artifactId>
  5. <version>1.11.3</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.httpcomponents</groupId>
  9. <artifactId>httpclient</artifactId>
  10. <version>4.5.4</version>
  11. </dependency>
  12. </dependencies>

2.伪爬虫程序

  1. public class SpiderGoAirCN {
  2. private static String basePath = "http://node03/kafkalua";
  3. public static void main(String[] args) throws Exception {
  4. for (int i = 0; i < 50000; i++) {
  5. // 请求查询信息
  6. spiderQueryao();
  7. // 请求html
  8. spiderHtml();
  9. // 请求js
  10. spiderJs();
  11. // 请求css
  12. spiderCss();
  13. // 请求png
  14. spiderPng();
  15. // 请求jpg
  16. spiderJpg();
  17. Thread.sleep(100);
  18. }
  19. }
  20. /**
  21. *
  22. * @throws Exception
  23. */
  24. public static void spiderQueryao() throws Exception {
  25. // 1.指定目标网站 ^.*/B2C40/query/jaxb/direct/query.ao.*$
  26. String url = basePath + "/B2C40/query/jaxb/direct/query.ao";
  27. // 2.发起请求
  28. HttpPost httpPost = new HttpPost(url);
  29. // 3. 设置请求参数
  30. httpPost.setHeader("Time-Local", getLocalDateTime());
  31. httpPost.setHeader("Requst",
  32. "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
  33. httpPost.setHeader("Request Method", "POST");
  34. httpPost.setHeader("Content-Type",
  35. "application/x-www-form-urlencoded; charset=UTF-8");
  36. httpPost.setHeader(
  37. "Referer",
  38. "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1="
  39. + getGoTime() + "&at=1&ct=0&it=0");
  40. httpPost.setHeader("Remote Address", "192.168.56.80");
  41. httpPost.setHeader(
  42. "User-Agent",
  43. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
  44. httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
  45. httpPost.setHeader("Server Address", "243.45.78.132");
  46. httpPost.setHeader(
  47. "Cookie",
  48. "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D"
  49. + getGoTime()
  50. + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1("
  51. + getGoTime() + ")");
  52. // 4.设置请求参数
  53. ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
  54. parameters
  55. .add(new BasicNameValuePair(
  56. "json",
  57. "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
  58. httpPost.setEntity(new UrlEncodedFormEntity(parameters));
  59. // 5. 发起请求
  60. CloseableHttpClient httpClient = HttpClients.createDefault();
  61. CloseableHttpResponse response = httpClient.execute(httpPost);
  62. // 6.获取返回值
  63. System.out.println(response != null);
  64. }
  65. public static void spiderHtml() throws Exception {
  66. // 1.指定目标网站 ^.*html.*$
  67. String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0";
  68. // 2.发起请求
  69. HttpPost httpPost = new HttpPost(url);
  70. // 3. 设置请求参数
  71. httpPost.setHeader("Time-Local", getLocalDateTime());
  72. httpPost.setHeader("Requst",
  73. "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
  74. httpPost.setHeader("Request Method", "POST");
  75. httpPost.setHeader("Content-Type",
  76. "application/x-www-form-urlencoded; charset=UTF-8");
  77. httpPost.setHeader(
  78. "Referer",
  79. "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
  80. httpPost.setHeader("Remote Address", "192.168.56.1");
  81. httpPost.setHeader(
  82. "User-Agent",
  83. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
  84. httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
  85. httpPost.setHeader("Server Address", "192.168.56.80");
  86. httpPost.setHeader(
  87. "Cookie",
  88. "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
  89. // 4.设置请求参数
  90. // httpPost.setEntity(new StringEntity(
  91. // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember="));
  92. ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
  93. parameters
  94. .add(new BasicNameValuePair(
  95. "json",
  96. "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
  97. httpPost.setEntity(new UrlEncodedFormEntity(parameters));
  98. // 5. 发起请求
  99. CloseableHttpClient httpClient = HttpClients.createDefault();
  100. CloseableHttpResponse response = httpClient.execute(httpPost);
  101. // 6.获取返回值
  102. System.out.println(response != null);
  103. }
  104. public static void spiderJs() throws Exception {
  105. // 1.指定目标网站
  106. String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js";
  107. // 2.发起请求
  108. HttpPost httpPost = new HttpPost(url);
  109. // 3. 设置请求参数
  110. httpPost.setHeader("Time-Local", getLocalDateTime());
  111. httpPost.setHeader("Requst",
  112. "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
  113. httpPost.setHeader("Request Method", "POST");
  114. httpPost.setHeader("Content-Type",
  115. "application/x-www-form-urlencoded; charset=UTF-8");
  116. httpPost.setHeader(
  117. "Referer",
  118. "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
  119. httpPost.setHeader("Remote Address", "192.168.56.1");
  120. httpPost.setHeader(
  121. "User-Agent",
  122. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
  123. httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
  124. httpPost.setHeader("Server Address", "192.168.56.80");
  125. httpPost.setHeader(
  126. "Cookie",
  127. "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
  128. // 4.设置请求参数
  129. ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
  130. parameters
  131. .add(new BasicNameValuePair(
  132. "json",
  133. "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
  134. httpPost.setEntity(new UrlEncodedFormEntity(parameters));
  135. // 5. 发起请求
  136. CloseableHttpClient httpClient = HttpClients.createDefault();
  137. CloseableHttpResponse response = httpClient.execute(httpPost);
  138. // 6.获取返回值
  139. System.out.println(response != null);
  140. }
  141. public static void spiderCss() throws Exception {
  142. // 1.指定目标网站
  143. String url = basePath +"/B2C40/dist/main/css/flight.css";
  144. // 2.发起请求
  145. HttpPost httpPost = new HttpPost(url);
  146. // 3. 设置请求参数
  147. httpPost.setHeader("Time-Local", getLocalDateTime());
  148. httpPost.setHeader("Requst",
  149. "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
  150. httpPost.setHeader("Request Method", "POST");
  151. httpPost.setHeader("Content-Type",
  152. "application/x-www-form-urlencoded; charset=UTF-8");
  153. httpPost.setHeader("Referer",
  154. "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html");
  155. httpPost.setHeader("Remote Address", "192.168.56.1");
  156. httpPost.setHeader(
  157. "User-Agent",
  158. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
  159. httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
  160. httpPost.setHeader("Server Address", "192.168.56.80");
  161. httpPost.setHeader(
  162. "Cookie",
  163. "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
  164. // 4.设置请求参数
  165. ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
  166. parameters
  167. .add(new BasicNameValuePair(
  168. "json",
  169. "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
  170. httpPost.setEntity(new UrlEncodedFormEntity(parameters));
  171. // 5. 发起请求
  172. CloseableHttpClient httpClient = HttpClients.createDefault();
  173. CloseableHttpResponse response = httpClient.execute(httpPost);
  174. // 6.获取返回值
  175. System.out.println(response != null);
  176. }
  177. public static void spiderPng() throws Exception {
  178. // 1.指定目标网站
  179. String url =basePath + "/B2C40/dist/main/images/common.png";
  180. // 2.发起请求
  181. HttpPost httpPost = new HttpPost(url);
  182. // 3. 设置请求参数
  183. httpPost.setHeader("Time-Local", getLocalDateTime());
  184. httpPost.setHeader("Requst",
  185. "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
  186. httpPost.setHeader("Request Method", "POST");
  187. httpPost.setHeader("Content-Type",
  188. "application/x-www-form-urlencoded; charset=UTF-8");
  189. httpPost.setHeader(
  190. "Referer",
  191. "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
  192. httpPost.setHeader("Remote Address", "192.168.56.1");
  193. httpPost.setHeader(
  194. "User-Agent",
  195. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
  196. httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
  197. httpPost.setHeader("Server Address", "192.168.56.80");
  198. httpPost.setHeader(
  199. "Cookie",
  200. "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
  201. // 4.设置请求参数
  202. ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
  203. parameters
  204. .add(new BasicNameValuePair(
  205. "json",
  206. "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
  207. httpPost.setEntity(new UrlEncodedFormEntity(parameters));
  208. // 5. 发起请求
  209. CloseableHttpClient httpClient = HttpClients.createDefault();
  210. CloseableHttpResponse response = httpClient.execute(httpPost);
  211. // 6.获取返回值
  212. System.out.println(response != null);
  213. }
  214. public static void spiderJpg() throws Exception {
  215. // 1.指定目标网站
  216. String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg";
  217. // 2.发起请求
  218. HttpPost httpPost = new HttpPost(url);
  219. // 3. 设置请求参数
  220. httpPost.setHeader("Time-Local", getLocalDateTime());
  221. httpPost.setHeader("Requst",
  222. "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
  223. httpPost.setHeader("Request Method", "POST");
  224. httpPost.setHeader("Content-Type",
  225. "application/x-www-form-urlencoded; charset=UTF-8");
  226. httpPost.setHeader(
  227. "Referer",
  228. "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
  229. httpPost.setHeader("Remote Address", "192.168.56.1");
  230. httpPost.setHeader(
  231. "User-Agent",
  232. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
  233. httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
  234. httpPost.setHeader("Server Address", "192.168.56.80");
  235. httpPost.setHeader(
  236. "Cookie",
  237. "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
  238. // 4.设置请求参数
  239. ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
  240. parameters
  241. .add(new BasicNameValuePair(
  242. "json",
  243. "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
  244. httpPost.setEntity(new UrlEncodedFormEntity(parameters));
  245. // 5. 发起请求
  246. CloseableHttpClient httpClient = HttpClients.createDefault();
  247. CloseableHttpResponse response = httpClient.execute(httpPost);
  248. // 6.获取返回值
  249. System.out.println(response != null);
  250. }
  251. public static String getLocalDateTime() {
  252. DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00",
  253. Locale.ENGLISH);
  254. String nowAsISO = df.format(new Date());
  255. return nowAsISO;
  256. }
  257. public static String getISO8601Timestamp() {
  258. DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00");
  259. String nowAsISO = df.format(new Date());
  260. return nowAsISO;
  261. }
  262. public static String getGoTime() {
  263. DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
  264. String nowAsISO = df.format(new Date());
  265. return nowAsISO;
  266. }
  267. public static String getBackTime() {
  268. Date date = new Date();// 取时间
  269. Calendar calendar = new GregorianCalendar();
  270. calendar.setTime(date);
  271. calendar.add(calendar.DATE, +1);// 把日期往前减少一天,若想把日期向后推一天则将负数改为正数
  272. date = calendar.getTime();
  273. SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
  274. String dateString = formatter.format(date);
  275. return dateString;
  276. }
  277. }

第六步:启动kafka

1.创建主题topic

  1. [root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3
  2. --replication-factor 3 --create --topic B2CDATA_COLLECTION1

2.开启kafka消费者

  1. [root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092
  2. --topic B2CDATA_COLLECTION1

第七步:开启爬虫程序并观察结果

1.启动爬虫程序

2.观察消费者窗口如下

第八步:启动kafka-manager观察

1.启动kafka-manager

  1. [root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/
  2. [root@node01 bin]# ll
  3. total 36
  4. -rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager
  5. -rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat
  6. -rwxr-xr-x 1 root root 1383 May 1 06:27 log-config
  7. -rw-r--r-- 1 root root 105 May 1 06:27 log-config.bat
  8. [root@node01 bin]#
  9. #启动
  10. [root@node01 bin]# ./kafka-manager

启动后的窗口:

2.浏览器访问

浏览器输入:node01:9000

kafka manager使用不做讲解,观察B2CDATA_COLLECTION1主题消费情况:

? 有三个分区,每个分区消费的消息差多说明成功啦,

? 如果不一样,则是kafkalua.lua 脚本中没有配置分区策略,默认分区会导致 数据倾斜 我们需配置自己的分区策略!

完毕!!!!!!!!

原文链接:http://www.cnblogs.com/-xiaoyu-/p/11294905.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号