经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
SpringBoot Kafka 整合 实例 源码
来源:cnblogs  作者:传陆博客  时间:2018/11/1 9:42:31  对本文有异议

 

1、使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer。

工程POM文件代码如下:

  1. 1 <?xml version="1.0" encoding="UTF-8"?>
  2. 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. 4 <modelVersion>4.0.0</modelVersion>
  5. 5
  6. 6 <groupId>com.miniooc</groupId>
  7. 7 <artifactId>springboot-kafka-producer</artifactId>
  8. 8 <version>1.0.0-SNAPSHOT</version>
  9. 9 <packaging>jar</packaging>
  10. 10
  11. 11 <name>springboot-kafka-producer</name>
  12. 12 <description>Demo project for Spring Boot</description>
  13. 13
  14. 14 <parent>
  15. 15 <groupId>org.springframework.boot</groupId>
  16. 16 <artifactId>spring-boot-starter-parent</artifactId>
  17. 17 <version>2.0.3.RELEASE</version>
  18. 18 <relativePath/>
  19. 19 </parent>
  20. 20
  21. 21 <properties>
  22. 22 <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
  23. 23 </properties>
  24. 24
  25. 25 <dependencies>
  26. 26 <dependency>
  27. 27 <groupId>org.springframework.boot</groupId>
  28. 28 <artifactId>spring-boot-starter-web</artifactId>
  29. 29 </dependency>
  30. 30 <dependency>
  31. 31 <groupId>org.springframework.boot</groupId>
  32. 32 <artifactId>spring-boot-starter-actuator</artifactId>
  33. 33 </dependency>
  34. 34 <dependency>
  35. 35 <groupId>org.springframework.kafka</groupId>
  36. 36 <artifactId>spring-kafka</artifactId>
  37. 37 </dependency>
  38. 38 <dependency>
  39. 39 <groupId>org.springframework.boot</groupId>
  40. 40 <artifactId>spring-boot-starter-test</artifactId>
  41. 41 <scope>test</scope>
  42. 42 </dependency>
  43. 43
  44. 44 <!-- 添加 gson 依赖 -->
  45. 45 <dependency>
  46. 46 <groupId>com.google.code.gson</groupId>
  47. 47 <artifactId>gson</artifactId>
  48. 48 <version>2.8.5</version>
  49. 49 </dependency>
  50. 50 <!-- 添加 lombok 依赖 -->
  51. 51 <dependency>
  52. 52 <groupId>org.projectlombok</groupId>
  53. 53 <artifactId>lombok</artifactId>
  54. 54 <version>1.16.22</version>
  55. 55 <scope>provided</scope>
  56. 56 </dependency>
  57. 57 </dependencies>
  58. 58
  59. 59 <dependencyManagement>
  60. 60 <dependencies>
  61. 61 <dependency>
  62. 62 <groupId>org.springframework.cloud</groupId>
  63. 63 <artifactId>spring-cloud-dependencies</artifactId>
  64. 64 <version>${spring-cloud.version}</version>
  65. 65 <type>pom</type>
  66. 66 <scope>import</scope>
  67. 67 </dependency>
  68. 68 </dependencies>
  69. 69 </dependencyManagement>
  70. 70
  71. 71 <build>
  72. 72 <plugins>
  73. 73 <plugin>
  74. 74 <groupId>org.springframework.boot</groupId>
  75. 75 <artifactId>spring-boot-maven-plugin</artifactId>
  76. 76 </plugin>
  77. 77 </plugins>
  78. 78 </build>
  79. 79
  80. 80
  81. 81 </project>

注释部分为手动添加的 gson、lombok 依赖。

2、创建消息实体类

  1. 1 package com.miniooc.kafka.message;
  2. 2
  3. 3 import lombok.Data;
  4. 4
  5. 5 import java.io.Serializable;
  6. 6 import java.util.Date;
  7. 7 import java.util.List;
  8. 8
  9. 9 @Data
  10. 10 public class OrderBasic implements Serializable {
  11. 11
  12. 12 /**
  13. 13 * 订单ID
  14. 14 */
  15. 15 private String orderId;
  16. 16 /**
  17. 17 * 订单编号
  18. 18 */
  19. 19 private String orderNumber;
  20. 20 /**
  21. 21 * 订单日期
  22. 22 */
  23. 23 private Date date;
  24. 24 /**
  25. 25 * 订单信息
  26. 26 */
  27. 27 private List<String> desc;
  28. 28
  29. 29 }

3、创建消息生产类

  1. 1 /**
  2. 2 *
  3. 3 */
  4. 4 package com.miniooc.kafka.producer;
  5. 5
  6. 6 import com.google.gson.GsonBuilder;
  7. 7 import com.miniooc.kafka.message.OrderBasic;
  8. 8 import lombok.extern.java.Log;
  9. 9 import org.springframework.beans.factory.annotation.Value;
  10. 10 import org.springframework.kafka.core.KafkaTemplate;
  11. 11 import org.springframework.stereotype.Component;
  12. 12
  13. 13 import javax.annotation.Resource;
  14. 14
  15. 15 /**
  16. 16 * Kafka消息生产类
  17. 17 */
  18. 18 @Log
  19. 19 @Component
  20. 20 public class KafkaProducer {
  21. 21
  22. 22 @Resource
  23. 23 private KafkaTemplate<String, String> kafkaTemplate;
  24. 24
  25. 25 @Value("${kafka.topic.order}")
  26. 26 private String topicOrder;
  27. 27
  28. 28 /**
  29. 29 * 发送订单消息
  30. 30 *
  31. 31 * @param orderBasic 订单信息
  32. 32 */
  33. 33 public void sendOrderMessage(OrderBasic orderBasic) {
  34. 34 GsonBuilder builder = new GsonBuilder();
  35. 35 builder.setPrettyPrinting();
  36. 36 builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
  37. 37 String message = builder.create().toJson(orderBasic);
  38. 38 kafkaTemplate.send(topicOrder, message);
  39. 39 log.info("\n" + message);
  40. 40 }
  41. 41 }

 4、编辑资源配置文件 application.properties

  1. 1 server.port=9526
  2. 2 spring.application.name=kafka-producer
  3. 3 kafka.bootstrap.servers=localhost:9092
  4. 4 kafka.topic.order=topic-order
  5. 5 kafka.group.id=group-order

5、启动 zookeeper

  1. D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

6、启动 kafka

  1. D:\kafka>bin\windows\kafka-server-start.bat config\server.properties

7、运行工程,通过控制器调用消息生产类,创建一条消息到kafka

看到红框内容,说明消息发送成功。

8、再使用IDEA新建工程引导方式,创建消息消费工程 springboot-kafka-producer。

9、创建消息消费类,并监听topic。

  1. 1 package com.miniooc.kafka.consumer;
  2. 2
  3. 3 import com.google.gson.Gson;
  4. 4 import com.google.gson.GsonBuilder;
  5. 5 import com.google.gson.reflect.TypeToken;
  6. 6 import com.miniooc.kafka.message.OrderBasic;
  7. 7 import lombok.extern.java.Log;
  8. 8 import org.springframework.kafka.annotation.KafkaListener;
  9. 9 import org.springframework.messaging.handler.annotation.Payload;
  10. 10 import org.springframework.stereotype.Component;
  11. 11
  12. 12 @Log
  13. 13 @Component
  14. 14 public class KafkaConsumer {
  15. 15
  16. 16 @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
  17. 17 public void consume(@Payload String message) {
  18. 18 GsonBuilder builder = new GsonBuilder();
  19. 19 builder.setPrettyPrinting();
  20. 20 builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
  21. 21 Gson gson = builder.create();
  22. 22 OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {
  23. 23 }.getType());
  24. 24 String json = gson.toJson(orderBasic);
  25. 25 log.info("\n接受并消费消息\n" + json);
  26. 26 }
  27. 27 }

10、运行工程。

看到红框内容,说明消息消费成功。

SpringBoot Kafka 整合完成! 

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号