1、使用IDEA新建工程引导方式,创建消息生产工程 springboot-kafka-producer。
工程POM文件代码如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 6 <groupId>com.miniooc</groupId> 7 <artifactId>springboot-kafka-producer</artifactId> 8 <version>1.0.0-SNAPSHOT</version> 9 <packaging>jar</packaging>10 11 <name>springboot-kafka-producer</name>12 <description>Demo project for Spring Boot</description>13 14 <parent>15 <groupId>org.springframework.boot</groupId>16 <artifactId>spring-boot-starter-parent</artifactId>17 <version>2.0.3.RELEASE</version>18 <relativePath/>19 </parent>20 21 <properties>22 <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>23 </properties>24 25 <dependencies>26 <dependency>27 <groupId>org.springframework.boot</groupId>28 <artifactId>spring-boot-starter-web</artifactId>29 </dependency>30 <dependency>31 <groupId>org.springframework.boot</groupId>32 <artifactId>spring-boot-starter-actuator</artifactId>33 </dependency>34 <dependency>35 <groupId>org.springframework.kafka</groupId>36 <artifactId>spring-kafka</artifactId>37 </dependency>38 <dependency>39 <groupId>org.springframework.boot</groupId>40 <artifactId>spring-boot-starter-test</artifactId>41 <scope>test</scope>42 </dependency>43 44 <!-- 添加 gson 依赖 -->45 <dependency>46 <groupId>com.google.code.gson</groupId>47 <artifactId>gson</artifactId>48 <version>2.8.5</version>49 </dependency>50 <!-- 添加 lombok 依赖 -->51 <dependency>52 <groupId>org.projectlombok</groupId>53 <artifactId>lombok</artifactId>54 <version>1.16.22</version>55 <scope>provided</scope>56 </dependency>57 </dependencies>58 59 <dependencyManagement>60 <dependencies>61 <dependency>62 <groupId>org.springframework.cloud</groupId>63 <artifactId>spring-cloud-dependencies</artifactId>64 <version>${spring-cloud.version}</version>65 <type>pom</type>66 <scope>import</scope>67 </dependency>68 </dependencies>69 </dependencyManagement>70 71 <build>72 <plugins>73 <plugin>74 <groupId>org.springframework.boot</groupId>75 <artifactId>spring-boot-maven-plugin</artifactId>76 </plugin>77 </plugins>78 </build>79 80 81 </project>
注释部分为手动添加的 gson、lombok 依赖。
2、创建消息实体类
1 package com.miniooc.kafka.message; 2 3 import lombok.Data; 4 5 import java.io.Serializable; 6 import java.util.Date; 7 import java.util.List; 8 9 @Data10 public class OrderBasic implements Serializable {11 12 /**13 * 订单ID14 */15 private String orderId;16 /**17 * 订单编号18 */19 private String orderNumber;20 /**21 * 订单日期22 */23 private Date date;24 /**25 * 订单信息26 */27 private List<String> desc;28 29 }
3、创建消息生产类
1 /** 2 * 3 */ 4 package com.miniooc.kafka.producer; 5 6 import com.google.gson.GsonBuilder; 7 import com.miniooc.kafka.message.OrderBasic; 8 import lombok.extern.java.Log; 9 import org.springframework.beans.factory.annotation.Value;10 import org.springframework.kafka.core.KafkaTemplate;11 import org.springframework.stereotype.Component;12 13 import javax.annotation.Resource;14 15 /**16 * Kafka消息生产类17 */18 @Log19 @Component20 public class KafkaProducer {21 22 @Resource23 private KafkaTemplate<String, String> kafkaTemplate;24 25 @Value("${kafka.topic.order}")26 private String topicOrder;27 28 /**29 * 发送订单消息30 *31 * @param orderBasic 订单信息32 */33 public void sendOrderMessage(OrderBasic orderBasic) {34 GsonBuilder builder = new GsonBuilder();35 builder.setPrettyPrinting();36 builder.setDateFormat("yyyy-MM-dd HH:mm:ss");37 String message = builder.create().toJson(orderBasic);38 kafkaTemplate.send(topicOrder, message);39 log.info("\n" + message);40 }41 }
4、编辑资源配置文件 application.properties
1 server.port=95262 spring.application.name=kafka-producer3 kafka.bootstrap.servers=localhost:90924 kafka.topic.order=topic-order5 kafka.group.id=group-order
5、启动 zookeeper
D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
6、启动 kafka
D:\kafka>bin\windows\kafka-server-start.bat config\server.properties
7、运行工程,通过控制器调用消息生产类,创建一条消息到kafka
看到红框内容,说明消息发送成功。
8、再使用IDEA新建工程引导方式,创建消息消费工程 springboot-kafka-producer。
9、创建消息消费类,并监听topic。
1 package com.miniooc.kafka.consumer; 2 3 import com.google.gson.Gson; 4 import com.google.gson.GsonBuilder; 5 import com.google.gson.reflect.TypeToken; 6 import com.miniooc.kafka.message.OrderBasic; 7 import lombok.extern.java.Log; 8 import org.springframework.kafka.annotation.KafkaListener; 9 import org.springframework.messaging.handler.annotation.Payload;10 import org.springframework.stereotype.Component;11 12 @Log13 @Component14 public class KafkaConsumer {15 16 @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")17 public void consume(@Payload String message) {18 GsonBuilder builder = new GsonBuilder();19 builder.setPrettyPrinting();20 builder.setDateFormat("yyyy-MM-dd HH:mm:ss");21 Gson gson = builder.create();22 OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {23 }.getType());24 String json = gson.toJson(orderBasic);25 log.info("\n接受并消费消息\n" + json);26 }27 }
10、运行工程。
看到红框内容,说明消息消费成功。
SpringBoot Kafka 整合完成!
本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728