经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Scala » 查看文章
alpakka-kafka(1)-producer
来源:cnblogs  作者:雪川大虫  时间:2021/2/22 9:06:55  对本文有异议

  alpakka项目是一个基于akka-streams流处理编程工具的scala/java开源项目,通过提供connector连接各种数据源并在akka-streams里进行数据处理。alpakka-kafka就是alpakka项目里的kafka-connector。对于我们来说:可以用alpakka-kafka来对接kafka,使用kafka提供的功能。或者从另外一个角度讲:alpakka-kafka就是一个用akka-streams实现kafka功能的scala开发工具。

alpakka-kafka提供了kafka的核心功能:producer、consumer,分别负责把akka-streams里的数据写入kafka及从kafka中读出数据并输入到akka-streams里。用akka-streams集成kafka的应用场景通常出现在业务集成方面:在一项业务A中产生一些业务操作指令写入kafka,然后通过kafka把指令传送给另一项业务B,业务B从kafka中获取操作指令并进行相应的业务操作。如:有两个业务模块:收货管理和库存管理,一方面收货管理向kafka写入收货记录。另一头库存管理从kafka中读取收货记录并更新相关库存数量记录。注意,这两项业务是分别操作的。在alpakka中,实际的业务操作基本就是在akka-streams里的数据处理(transform),其实是典型的CQRS模式:读写两方互不关联,写时不管受众是谁,如何使用、读者不关心谁是写方。这里的写和读两方分别代表kafka里的producer和consumer。

本篇我们先介绍alpakka-kafka的producer功能及其使用方法。如前所述:alpakka是用akka-streams实现了kafka-producer功能。alpakka提供的producer也就是akka-streams的一种组件,可以与其它的akka-streams组件组合形成更大的akka-streams个体。构建一个producer需要先完成几个配件类构成:

1、producer-settings配置:alpakka-kafka在reference.conf里的akka.kafka.producer配置段落提供了足够支持基本运作的默认producer配置。用户可以通过typesafe config配置文件操作工具来灵活调整配置

2、de/serializer序列化工具:alpakka-kafka提供了String类型的序列化/反序列化函数,可以直接使用

4、bootstrap-server:一个以逗号分隔的kafka-cluster节点ip清单文本

下面是一个具体的例子:

  1. implicit val system = ActorSystem("kafka_sys")
  2. val bootstrapServers = "localhost:9092"
  3. val config = system.settings.config.getConfig("akka.kafka.producer")
  4. val producerSettings =
  5. ProducerSettings(config, new StringSerializer, new StringSerializer)
  6. .withBootstrapServers(bootstrapServers)

这里使用ActorSystem只是为了读取.conf文件里的配置,还没有使用任何akka-streams组件。akka.kafka.producer配置段落在alpakka-kafka的reference.conf里提供了默认配置,不需要在application.conf里重新定义。

alpakka-kafka提供了一个最基本的producer,非akka-streams组件,sendProducer。下面我们示范一下sendProducer的使用和效果:

 

  1. import akka.actor.ActorSystem
  2. import akka.kafka.scaladsl.{Consumer, SendProducer}
  3. import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
  4. import akka.kafka._
  5. import org.apache.kafka.common.serialization._
  6. import scala.concurrent.duration._
  7. import scala.concurrent.{Await, Future}
  8. object SendProducerDemo extends App {
  9. implicit val system = ActorSystem("kafka_sys")
  10. implicit val executionContext = system.dispatcher
  11. val bootstrapServers = "localhost:9092"
  12. val config = system.settings.config.getConfig("akka.kafka.producer")
  13. val producerSettings =
  14. ProducerSettings(config, new StringSerializer, new StringSerializer)
  15. .withBootstrapServers(bootstrapServers)
  16. val producer = SendProducer(producerSettings)
  17. val topic = "greatings"
  18. val lstfut: Seq[Future[RecordMetadata]] =
  19. (100 to 200).reverse
  20. .map(_.toString)
  21. .map(value => new ProducerRecord[String, String](topic, s"hello-$value"))
  22. .map(msg => producer.send(msg))
  23. val futlst = Future.sequence(lstfut)
  24. Await.result(futlst, 2.seconds)
  25. scala.io.StdIn.readLine()
  26. producer.close()
  27. system.terminate()
  28. }

 

以上示范用sendProducer向kafka写入100条hello消息。使用的是集合遍历,没有使用akka-streams的Source。为了检验具体效果,我们可以使用kafka提供的一些手工指令,如下:

 

 

  1. \w> ./kafka-topics --create --topic greatings --bootstrap-server localhost:9092
  2. Created topic greatings.
  3. \w> ./kafka-console-consumer --topic greatings --bootstrap-server localhost:9092
  4. hello-100
  5. hello-101
  6. hello-102
  7. hello-103
  8. hello-104
  9. hello-105
  10. hello-106
  11. ...

 

既然producer代表写入功能,那么在akka-streams里就是Sink或Flow组件的功能了。下面这个例子是producer Sink组件plainSink的示范:

  1. import akka.Done
  2. import akka.actor.ActorSystem
  3. import akka.kafka.scaladsl._
  4. import akka.kafka._
  5. import akka.stream.scaladsl._
  6. import org.apache.kafka.clients.producer.ProducerRecord
  7. import org.apache.kafka.common.serialization._
  8. import scala.concurrent._
  9. import scala.concurrent.duration._
  10. object plain_sink extends App {
  11. implicit val system = ActorSystem("kafka_sys")
  12. val bootstrapServers = "localhost:9092"
  13. val config = system.settings.config.getConfig("akka.kafka.producer")
  14. val producerSettings =
  15. ProducerSettings(config, new StringSerializer, new StringSerializer)
  16. .withBootstrapServers(bootstrapServers)
  17. implicit val executionContext = system.dispatcher
  18. val topic = "greatings"
  19. val done: Future[Done] =
  20. Source(1 to 100)
  21. .map(_.toString)
  22. .map(value => new ProducerRecord[String, String](topic, s"hello-$value"))
  23. .runWith(Producer.plainSink(producerSettings))
  24. Await.ready(done,3.seconds)
  25. scala.io.StdIn.readLine()
  26. system.terminate()
  27. }

这是一个典型的akka-streams应用实例,其中Producer.plainSink就是一个akka-streams Sink组件。

以上两个示范都涉及到构建一个ProducerRecord类型并将之写入kafka。ProducerRecord是一个基本的kafka消息类型:

  1. public ProducerRecord(String topic, K key, V value) {
  2. this(topic, null, null, key, value, null);
  3. }

topic是String类型,key, value 是 Any 类型的。 alpakka-kafka在ProducerRecord之上又拓展了一个复杂点的消息类型ProducerMessage.Envelope类型:

  1. sealed trait Envelope[K, V, +PassThrough] {
  2. def passThrough: PassThrough
  3. def withPassThrough[PassThrough2](value: PassThrough2): Envelope[K, V, PassThrough2]
  4. }
  5.  
  6.  
  7. final case class Message[K, V, +PassThrough](
  8. record: ProducerRecord[K, V],
  9. passThrough: PassThrough
  10. ) extends Envelope[K, V, PassThrough] {
  11. override def withPassThrough[PassThrough2](value: PassThrough2): Message[K, V, PassThrough2] =
  12. copy(passThrough = value)
  13. }

ProducerMessage.Envelope增加了个PassThrough参数,用来与消息一道传递额外的元数据。alpakka-kafka streams组件使用这个消息类型作为流元素,最终把它转换成一或多条ProducerRecord写入kafka。如下: 

  1. object EventMessages {
  2. //一对一条ProducerRecord
  3. def createMessage[KeyType,ValueType,PassThroughType](
  4. topic: String,
  5. key: KeyType,
  6. value: ValueType,
  7. passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
  8. val single = ProducerMessage.single(
  9. new ProducerRecord[KeyType,ValueType](topic,key,value),
  10. passThrough
  11. )
  12. single
  13. }
  14. //一对多条ProducerRecord
  15. def createMultiMessage[KeyType,ValueType,PassThroughType] (
  16. topics: List[String],
  17. key: KeyType,
  18. value: ValueType,
  19. passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
  20. import scala.collection.immutable
  21. val msgs = topics.map { topic =>
  22. new ProducerRecord(topic,key,value)
  23. }.toSeq
  24. val multi = ProducerMessage.multi(
  25. msgs,
  26. passThrough
  27. )
  28. multi
  29. }
  30. //只传递通过型元数据
  31. def createPassThroughMessage[KeyType,ValueType,PassThroughType](
  32. topic: String,
  33. key: KeyType,
  34. value: ValueType,
  35. passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
  36. ProducerMessage.passThrough(passThrough)
  37. }
  38.  
  39. }

flexiFlow是一个alpakka-kafka Flow组件,流入ProducerMessage.Evelope,流出Results类型: 

  1. def flexiFlow[K, V, PassThrough](
  2. settings: ProducerSettings[K, V]
  3. ): Flow[Envelope[K, V, PassThrough], Results[K, V, PassThrough], NotUsed] = { ... }

Results类型定义如下:

  1. final case class Result[K, V, PassThrough] private (
  2. metadata: RecordMetadata,
  3. message: Message[K, V, PassThrough]
  4. ) extends Results[K, V, PassThrough] {
  5. def offset: Long = metadata.offset()
  6. def passThrough: PassThrough = message.passThrough
  7. }

也就是说flexiFlow可以返回写入kafka后kafka返回的操作状态数据。我们再看看flexiFlow的使用案例: 

  1. import akka.kafka.ProducerMessage._
  2. import akka.actor.ActorSystem
  3. import akka.kafka.scaladsl._
  4. import akka.kafka.{ProducerMessage, ProducerSettings}
  5. import akka.stream.scaladsl.{Sink, Source}
  6. import org.apache.kafka.clients.producer.ProducerRecord
  7. import org.apache.kafka.common.serialization.StringSerializer
  8.  
  9. import scala.concurrent._
  10. import scala.concurrent.duration._
  11.  
  12. object flexi_flow extends App {
  13. implicit val system = ActorSystem("kafka_sys")
  14. val bootstrapServers = "localhost:9092"
  15. val config = system.settings.config.getConfig("akka.kafka.producer")
  16. val producerSettings =
  17. ProducerSettings(config, new StringSerializer, new StringSerializer)
  18. .withBootstrapServers(bootstrapServers)
  19.  
  20. // needed for the future flatMap/onComplete in the end
  21. implicit val executionContext = system.dispatcher
  22. val topic = "greatings"
  23.  
  24. val done = Source(1 to 100)
  25. .map { number =>
  26. val value = number.toString
  27. EventMessages.createMessage(topic,"key",value,number)
  28. }
  29. .via(Producer.flexiFlow(producerSettings))
  30. .map {
  31. case ProducerMessage.Result(metadata, ProducerMessage.Message(record, passThrough)) =>
  32. s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"
  33.  
  34. case ProducerMessage.MultiResult(parts, passThrough) =>
  35. parts
  36. .map {
  37. case MultiResultPart(metadata, record) =>
  38. s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}"
  39. }
  40. .mkString(", ")
  41.  
  42. case ProducerMessage.PassThroughResult(passThrough) =>
  43. s"passed through"
  44. }
  45. .runWith(Sink.foreach(println(_)))
  46.  
  47. Await.ready(done,3.seconds)
  48.  
  49. scala.io.StdIn.readLine()
  50. system.terminate()
  51. }
  52.  
  53. object EventMessages {
  54. def createMessage[KeyType,ValueType,PassThroughType](
  55. topic: String,
  56. key: KeyType,
  57. value: ValueType,
  58. passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
  59. val single = ProducerMessage.single(
  60. new ProducerRecord[KeyType,ValueType](topic,key,value),
  61. passThrough
  62. )
  63. single
  64. }
  65. def createMultiMessage[KeyType,ValueType,PassThroughType] (
  66. topics: List[String],
  67. key: KeyType,
  68. value: ValueType,
  69. passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
  70. import scala.collection.immutable
  71. val msgs = topics.map { topic =>
  72. new ProducerRecord(topic,key,value)
  73. }.toSeq
  74. val multi = ProducerMessage.multi(
  75. msgs,
  76. passThrough
  77. )
  78. multi
  79. }
  80. def createPassThroughMessage[KeyType,ValueType,PassThroughType](
  81. topic: String,
  82. key: KeyType,
  83. value: ValueType,
  84. passThrough: PassThroughType): ProducerMessage.Envelope[KeyType,ValueType,PassThroughType] = {
  85. ProducerMessage.passThrough(passThrough)
  86. }
  87.  
  88. }

producer除向kafka写入与业务相关的业务事件或业务指令外还会向kafka写入当前消息读取的具体位置offset,所以alpakka-kafka的produce可分成两种类型:上面示范的plainSink, flexiFlow只向kafka写业务数据。还有一类如commitableSink还包括了把消息读取位置offset写入commit的功能。如下:

  1. val control =
  2. Consumer
  3. .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
  4. .map { msg =>
  5. ProducerMessage.single(
  6. new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
  7. msg.committableOffset
  8. )
  9. }
  10. .toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply)
  11. .run()
  12.  
  13. control.drainAndShutdown()

如上所示,committableSource从kafka读取业务消息及读取位置committableOffsset,然后Producer.committableSink把业务消息和offset再写入kafka。

下篇讨论我们再具体介绍consumer。

 

原文链接:http://www.cnblogs.com/tiger-xc/p/14419008.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号