经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Flink » 查看文章
[Flink]测试用的fake温度传感器
来源:cnblogs  作者:HmLy  时间:2019/11/13 8:39:57  对本文有异议

 

Flink-测试用的fake温度传感器

 

Flink中,测试时,会用到自定义的source。

下为一例。。 该例使用温度传感器的格式演示fake日志数据源。

代码用Scala写的。

 

传感器...

 

  • 传感器 - 样例类

    SensorReads.scala

     
     
     
    ?x
     
     
     
     
    1
    1. package sr
    2
    1. ?
    3
    1. /**
    4
    1. *
    5
    1. */
    6
    1. case class SensorReads(id:String,
    7
    1. timestap:Long,
    8
    1. tempture:Double)
     
     

 

  • 传感器 - 数据源模拟

    SnsorSrc_4096T.scala

     
     
     
    46
     
     
     
     
    1
    1. package sr
    2
    1. ?
    3
    1. import org.apache.flink.streaming.api.functions.source.SourceFunction
    4
    1. import scala.util.Random
    5
    1. ?
    6
    1. /**
    7
    1. * period, is 4096 millis.
    8
    1. */
    9
    1. case class SnsorSrc_4096T extends SourceFunction[SensorReads] {
    10
    1.  
    11
    1. var isInRunning: Boolean = true
    12
    1.  
    13
    1. ////
    14
    1. override def run(sourceContext: SourceFunction.SourceContext[
    15
    1. SensorReads]): Unit = {
    16
    1.  
    17
    1.  
    18
    1. val rand: Random = new Random
    19
    1.  
    20
    1. var tptNow4 =
    21
    1. (1 to 4).map(
    22
    1. "snsor_" + _.toString -> (23 + 16 * rand.nextGaussian))
    23
    1. ?
    24
    1.  
    25
    1.  
    26
    1. while (isInRunning) {
    27
    1. tptNow4 = tptNow4.map(
    28
    1. t => t._1 -> (t._2 + rand.nextGaussian))
    29
    1.  
    30
    1.  
    31
    1. val timeStampNow: Long = System.currentTimeMillis
    32
    1.  
    33
    1. tptNow4.foreach{
    34
    1. t =>
    35
    1. sourceContext.collect( // O.U.T
    36
    1. SensorReads(t._1, timeStampNow, t._2) )
    37
    1. Thread.sleep(512) }
    38
    1. //not set, is stm
    39
    1.  
    40
    1. Thread.sleep(2048) }
    41
    1.  
    42
    1. }
    43
    1.  
    44
    1. override def cancel(): Unit = isInRunning = false
    45
    1.  
    46
    1. }
     
     

 

 

测试

 

SnsrSrcAappli.scala

 
 
 
13
 
 
 
 
1
  1. package applis
2
  1. ?
3
  1. import org.apache.flink.streaming.api.scala._
4
  1. import sr._
5
  1. ?
6
  1. object SnsrSrcAappli extends App{
7
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
8
  1.  
9
  1. env.addSource(SnsorSrc_4096T() )
10
  1. .print("aaa")
11
  1.  
12
  1. env.execute()
13
  1. }
 
 

 

数据源模拟用case-class,此处使用则可以不写new。

 

输出

 

IDEA控制台上run:

 
 
 
17
 
 
 
 
1
  1. log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
2
  1. log4j:WARN Please initialize the log4j system properly.
3
  1. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4
  1. aaa:3> SensorReads(snsor_1,1573556705508,30.383394411578916)
5
  1. aaa:4> SensorReads(snsor_2,1573556705508,21.397405872448672)
6
  1. aaa:5> SensorReads(snsor_3,1573556705508,20.598086139457727)
7
  1. aaa:6> SensorReads(snsor_4,1573556705508,18.30066983735531)
8
  1. aaa:7> SensorReads(snsor_1,1573556709627,30.120955223032546)
9
  1. aaa:8> SensorReads(snsor_2,1573556709627,22.38746867201145)
10
  1. aaa:1> SensorReads(snsor_3,1573556709627,20.45357507067989)
11
  1. aaa:2> SensorReads(snsor_4,1573556709627,17.18467261133715)
12
  1. aaa:3> SensorReads(snsor_1,1573556713729,31.686487593592904)
13
  1. aaa:4> SensorReads(snsor_2,1573556713729,20.67106361911623)
14
  1. aaa:5> SensorReads(snsor_3,1573556713729,21.27724215221553)
15
  1. aaa:6> SensorReads(snsor_4,1573556713729,16.84273306583804)
16
  1. ?
17
  1. Process finished with exit code -1
 
 

 

...

如果SnsorSrc_4096T.scala中,「当前温度」.foreach这样写:

 
 
 
5
 
 
 
 
1
  1. tptNow4.foreach{
2
  1. t =>
3
  1. sourceContext.collect( // O.U.T
4
  1. SensorReads(t._1, System.currentTimeMillis, t._2) )
5
  1. Thread.sleep(512) }
 
 

 

那么结果就会是:

 
 
 
25
 
 
 
 
1
  1. log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
2
  1. log4j:WARN Please initialize the log4j system properly.
3
  1. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
4
  1. aaa:5> SensorReads(snsor_1,1573561932216,20.427373784204445)
5
  1. aaa:6> SensorReads(snsor_2,1573561932739,19.043151948599565)
6
  1. aaa:7> SensorReads(snsor_3,1573561933251,16.506314894849734)
7
  1. aaa:8> SensorReads(snsor_4,1573561933764,42.18791135873409)
8
  1. aaa:1> SensorReads(snsor_1,1573561936326,20.216273863226476)
9
  1. aaa:2> SensorReads(snsor_2,1573561936838,19.77488458362011)
10
  1. aaa:3> SensorReads(snsor_3,1573561937351,17.49661332626548)
11
  1. aaa:4> SensorReads(snsor_4,1573561937864,42.37076203420432)
12
  1. aaa:5> SensorReads(snsor_1,1573561940425,19.582646754534526)
13
  1. aaa:6> SensorReads(snsor_2,1573561940938,18.148182987020572)
14
  1. aaa:7> SensorReads(snsor_3,1573561941451,17.028248074961432)
15
  1. aaa:8> SensorReads(snsor_4,1573561941963,42.969281620777075)
16
  1. aaa:1> SensorReads(snsor_1,1573561944525,20.659855873131406)
17
  1. aaa:2> SensorReads(snsor_2,1573561945038,19.437515708059177)
18
  1. aaa:3> SensorReads(snsor_3,1573561945550,18.336847248220565)
19
  1. aaa:4> SensorReads(snsor_4,1573561946063,43.58727112744526)
20
  1. aaa:5> SensorReads(snsor_1,1573561948625,19.317498008380674)
21
  1. aaa:6> SensorReads(snsor_2,1573561949137,21.86602577501872)
22
  1. aaa:7> SensorReads(snsor_3,1573561949650,19.109322091177216)
23
  1. aaa:8> SensorReads(snsor_4,1573561950163,43.48043890977487)
24
  1. ?
25
  1. Process finished with exit code -1
 
 

这样一来事件时间就都不一样了。 可根据需要模拟的情况改动....

原文链接:http://www.cnblogs.com/senwren/p/fake-snsr-Rd-src.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号