经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Flink » 查看文章
Flink+Druid构建实时OLAP的探索
来源:cnblogs  作者:ChouYarn  时间:2019/8/12 8:39:58  对本文有异议

场景

k12在线教育公司的业务场景中,有一些业务场景需要实时统计和分析,如分析在线上课老师数量、学生数量,实时销售额,课堂崩溃率等,需要实时反应上课的质量问题,以便于对整个公司的业务情况有大致的了解。

方案对比

对比了很多解决方案,如下几种,列出来供参考。

方案实时入库SQL支持度
Spark+CarbonData 支持 Spark SQL语法丰富
Kylin 不支持 支持join
Flink+Druid 支持 0.15以前不支持SQL,不支持join
  1. 上一篇文章所示,使用Spark+CarbonData也是一种解决方案,但是他的缺点也是比较明显,如不能和Flink进行结合,因为我们整个的大数据规划的大致方向是,Spark用来作为离线计算,Flink作为实时计算,并且这两个大方向短时间内不会改变;
  2. Kylin一直是老牌OLAP引擎,但是有个缺点无法满足我们的需求,就是在技术选型的那个时间点kylin还不支持实时入库(后续2.0版本支持实时入库),所以就选择了放弃;
  3. 使用Flink+Druid方式实现,这个时间选择这个方案,简直是顺应潮流呀,Flink现在如日中天,各大厂都在使用,Druid是OLAP的新贵,关于它的文章也有很多,我也不赘述太多。有兴趣的可以看下这篇文章,我的博客其它文章也有最新版本的安装教程,实操方案哦。

设计方案

实时处理采用Flink SQL,实时入库Druid方式采用 druid-kafka-indexing-service,另一种方式入库方式,Tranquility,这种方式测试下来问题多多,放弃了。数据流向如下图。

 

场景举例

实时计算课堂连接掉线率。此事件包含两个埋点上报,进入教室和掉线分别上报数据。druid设计的字段

flink的处理

将上报的数据进行解析,上报使用的是json格式,需要解析出所需要的字段然后发送到kafka。字段包含如下

  1. sysTimeDateTime格式
  2. pt,格式yyyy-MM-dd
  3. eventId,事件类型(enterRoom|disconnect)
  4. lessonId,课程ID
Druid处理

启动Druid Supervisor,消费Kafka里的数据,使用预聚合,配置如下

  1. {
  2. "type": "kafka",
  3. "dataSchema": {
  4. "dataSource": "sac_core_analyze_v1",
  5. "parser": {
  6. "parseSpec": {
  7. "dimensionsSpec": {
  8. "spatialDimensions": [],
  9. "dimensions": [
  10. "eventId",
  11. "pt"
  12. ]
  13. },
  14. "format": "json",
  15. "timestampSpec": {
  16. "column": "sysTime",
  17. "format": "auto"
  18. }
  19. },
  20. "type": "string"
  21. },
  22. "metricsSpec": [
  23. {
  24. "filter": {
  25. "type": "selector",
  26. "dimension": "msg_type",
  27. "value": "disconnect"
  28. },
  29. "aggregator": {
  30. "name": "lesson_offline_molecule_id",
  31. "type": "cardinality",
  32. "fields": ["lesson_id"]
  33. },
  34. "type": "filtered"
  35. }, {
  36. "filter": {
  37. "type": "selector",
  38. "dimension": "msg_type",
  39. "value": "enterRoom"
  40. },
  41. "aggregator": {
  42. "name": "lesson_offline_denominator_id",
  43. "type": "cardinality",
  44. "fields": ["lesson_id"]
  45. },
  46. "type": "filtered"
  47. }
  48. ],
  49. "granularitySpec": {
  50. "type": "uniform",
  51. "segmentGranularity": "DAY",
  52. "queryGranularity": {
  53. "type": "none"
  54. },
  55. "rollup": true,
  56. "intervals": null
  57. },
  58. "transformSpec": {
  59. "filter": null,
  60. "transforms": []
  61. }
  62. },
  63. "tuningConfig": {
  64. "type": "kafka",
  65. "maxRowsInMemory": 1000000,
  66. "maxBytesInMemory": 0,
  67. "maxRowsPerSegment": 5000000,
  68. "maxTotalRows": null,
  69. "intermediatePersistPeriod": "PT10M",
  70. "basePersistDirectory": "/tmp/1564535441619-2",
  71. "maxPendingPersists": 0,
  72. "indexSpec": {
  73. "bitmap": {
  74. "type": "concise"
  75. },
  76. "dimensionCompression": "lz4",
  77. "metricCompression": "lz4",
  78. "longEncoding": "longs"
  79. },
  80. "buildV9Directly": true,
  81. "reportParseExceptions": false,
  82. "handoffConditionTimeout": 0,
  83. "resetOffsetAutomatically": false,
  84. "segmentWriteOutMediumFactory": null,
  85. "workerThreads": null,
  86. "chatThreads": null,
  87. "chatRetries": 8,
  88. "httpTimeout": "PT10S",
  89. "shutdownTimeout": "PT80S",
  90. "offsetFetchPeriod": "PT30S",
  91. "intermediateHandoffPeriod": "P2147483647D",
  92. "logParseExceptions": false,
  93. "maxParseExceptions": 2147483647,
  94. "maxSavedParseExceptions": 0,
  95. "skipSequenceNumberAvailabilityCheck": false
  96. },
  97. "ioConfig": {
  98. "topic": "sac_druid_analyze_v2",
  99. "replicas": 2,
  100. "taskCount": 1,
  101. "taskDuration": "PT600S",
  102. "consumerProperties": {
  103. "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092"
  104. },
  105. "pollTimeout": 100,
  106. "startDelay": "PT5S",
  107. "period": "PT30S",
  108. "useEarliestOffset": false,
  109. "completionTimeout": "PT1200S",
  110. "lateMessageRejectionPeriod": null,
  111. "earlyMessageRejectionPeriod": null,
  112. "stream": "sac_druid_analyze_v2",
  113. "useEarliestSequenceNumber": false
  114. },
  115. "context": null,
  116. "suspended": false
  117. }
View Code

 

最重要的配置是metricsSpec,他主要定义了预聚合的字段和条件。

数据查询

数据格式如下

pteventIdlesson_offline_molecule_idlesson_offline_denominator_id
2019-08-09 enterRoom "AQAAAAAAAA==" "AQAAAAAAAA=="
2019-08-09 disconnect "AQAAAAAAAA==" "AQAAAAAAAA=="

结果可以按照这样的SQL出

  1. SELECT pt,CAST(APPROX_COUNT_DISTINCT(lesson_offline_molecule_id) AS DOUBLE)/CAST(APPROX_COUNT_DISTINCT(lesson_offline_denominator_id) AS DOUBLE) from sac_core_analyze_v1 group by pt

可以使用Druid的接口查询结果,肥肠的方便~

原文链接:http://www.cnblogs.com/ChouYarn/p/11328900.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号