1、使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer。

工程POM文件代码如下:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5
6 <groupId>com.miniooc</groupId>
7 <artifactId>springboot-kafka-producer</artifactId>
8 <version>1.0.0-SNAPSHOT</version>
9 <packaging>jar</packaging>
10
11 <name>springboot-kafka-producer</name>
12 <description>Demo project for Spring Boot</description>
13
14 <parent>
15 <groupId>org.springframework.boot</groupId>
16 <artifactId>spring-boot-starter-parent</artifactId>
17 <version>2.0.3.RELEASE</version>
18 <relativePath/>
19 </parent>
20
21 <properties>
22 <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
23 </properties>
24
25 <dependencies>
26 <dependency>
27 <groupId>org.springframework.boot</groupId>
28 <artifactId>spring-boot-starter-web</artifactId>
29 </dependency>
30 <dependency>
31 <groupId>org.springframework.boot</groupId>
32 <artifactId>spring-boot-starter-actuator</artifactId>
33 </dependency>
34 <dependency>
35 <groupId>org.springframework.kafka</groupId>
36 <artifactId>spring-kafka</artifactId>
37 </dependency>
38 <dependency>
39 <groupId>org.springframework.boot</groupId>
40 <artifactId>spring-boot-starter-test</artifactId>
41 <scope>test</scope>
42 </dependency>
43
44 <!-- 添加 gson 依赖 -->
45 <dependency>
46 <groupId>com.google.code.gson</groupId>
47 <artifactId>gson</artifactId>
48 <version>2.8.5</version>
49 </dependency>
50 <!-- 添加 lombok 依赖 -->
51 <dependency>
52 <groupId>org.projectlombok</groupId>
53 <artifactId>lombok</artifactId>
54 <version>1.16.22</version>
55 <scope>provided</scope>
56 </dependency>
57 </dependencies>
58
59 <dependencyManagement>
60 <dependencies>
61 <dependency>
62 <groupId>org.springframework.cloud</groupId>
63 <artifactId>spring-cloud-dependencies</artifactId>
64 <version>${spring-cloud.version}</version>
65 <type>pom</type>
66 <scope>import</scope>
67 </dependency>
68 </dependencies>
69 </dependencyManagement>
70
71 <build>
72 <plugins>
73 <plugin>
74 <groupId>org.springframework.boot</groupId>
75 <artifactId>spring-boot-maven-plugin</artifactId>
76 </plugin>
77 </plugins>
78 </build>
79
80
81 </project>
注释部分为手动添加的 gson、lombok 依赖。
2、创建消息实体类
1 package com.miniooc.kafka.message;
2
3 import lombok.Data;
4
5 import java.io.Serializable;
6 import java.util.Date;
7 import java.util.List;
8
9 @Data
10 public class OrderBasic implements Serializable {
11
12 /**
13 * 订单ID
14 */
15 private String orderId;
16 /**
17 * 订单编号
18 */
19 private String orderNumber;
20 /**
21 * 订单日期
22 */
23 private Date date;
24 /**
25 * 订单信息
26 */
27 private List<String> desc;
28
29 }
3、创建消息生产类
1 /**
2 *
3 */
4 package com.miniooc.kafka.producer;
5
6 import com.google.gson.GsonBuilder;
7 import com.miniooc.kafka.message.OrderBasic;
8 import lombok.extern.java.Log;
9 import org.springframework.beans.factory.annotation.Value;
10 import org.springframework.kafka.core.KafkaTemplate;
11 import org.springframework.stereotype.Component;
12
13 import javax.annotation.Resource;
14
15 /**
16 * Kafka消息生产类
17 */
18 @Log
19 @Component
20 public class KafkaProducer {
21
22 @Resource
23 private KafkaTemplate<String, String> kafkaTemplate;
24
25 @Value("${kafka.topic.order}")
26 private String topicOrder;
27
28 /**
29 * 发送订单消息
30 *
31 * @param orderBasic 订单信息
32 */
33 public void sendOrderMessage(OrderBasic orderBasic) {
34 GsonBuilder builder = new GsonBuilder();
35 builder.setPrettyPrinting();
36 builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
37 String message = builder.create().toJson(orderBasic);
38 kafkaTemplate.send(topicOrder, message);
39 log.info("\n" + message);
40 }
41 }
4、编辑资源配置文件 application.properties
1 server.port=9526
2 spring.application.name=kafka-producer
3 kafka.bootstrap.servers=localhost:9092
4 kafka.topic.order=topic-order
5 kafka.group.id=group-order
5、启动 zookeeper
D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

6、启动 kafka
D:\kafka>bin\windows\kafka-server-start.bat config\server.properties

7、运行工程,通过控制器调用消息生产类,创建一条消息到kafka

看到红框内容,说明消息发送成功。
8、再使用IDEA新建工程引导方式,创建消息消费工程 springboot-kafka-producer。
9、创建消息消费类,并监听topic。
1 package com.miniooc.kafka.consumer;
2
3 import com.google.gson.Gson;
4 import com.google.gson.GsonBuilder;
5 import com.google.gson.reflect.TypeToken;
6 import com.miniooc.kafka.message.OrderBasic;
7 import lombok.extern.java.Log;
8 import org.springframework.kafka.annotation.KafkaListener;
9 import org.springframework.messaging.handler.annotation.Payload;
10 import org.springframework.stereotype.Component;
11
12 @Log
13 @Component
14 public class KafkaConsumer {
15
16 @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
17 public void consume(@Payload String message) {
18 GsonBuilder builder = new GsonBuilder();
19 builder.setPrettyPrinting();
20 builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
21 Gson gson = builder.create();
22 OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {
23 }.getType());
24 String json = gson.toJson(orderBasic);
25 log.info("\n接受并消费消息\n" + json);
26 }
27 }
10、运行工程。

看到红框内容,说明消息消费成功。
SpringBoot Kafka 整合完成!