经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Spring » 查看文章
基于SpringBoot+Netty实现即时通讯(IM)功能
来源:cnblogs  作者:seizedays  时间:2023/10/20 8:45:58  对本文有异议

简单记录一下实现的整体框架,具体细节在实际生产中再细化就可以了。

第一步 引入netty依赖

SpringBoot的其他必要的依赖像Mybatis、Lombok这些都是老生常谈了 就不在这里放了

  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-all</artifactId>
  4. <version>4.1.85.Final</version>
  5. </dependency>

 

第二步 接下来就是准备工作。

消息服务类(核心代码) 聊天服务的功能就是靠这个类的start()函数来启动的 绑定端口8087 之后可以通socket协议访问这个端口来执行通讯

  1. import com.bxt.demo.im.handler.WebSocketHandler;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.*;
  4. import io.netty.channel.group.ChannelGroup;
  5. import io.netty.channel.group.DefaultChannelGroup;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioServerSocketChannel;
  9. import io.netty.handler.codec.http.HttpObjectAggregator;
  10. import io.netty.handler.codec.http.HttpServerCodec;
  11. import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
  12. import io.netty.handler.stream.ChunkedWriteHandler;
  13. import io.netty.util.concurrent.GlobalEventExecutor;
  14. import lombok.extern.slf4j.Slf4j;
  15. import java.util.Map;
  16. import java.util.concurrent.ConcurrentHashMap;
  17. /**
  18. * @Description: 即时通讯服务类
  19. * @author: bhw
  20. * @date: 2023年09月27日 13:44
  21. */
  22. @Slf4j
  23. public class IMServer {
      // 用来存放连入服务器的用户集合
  24. public static final Map<String, Channel> USERS = new ConcurrentHashMap<>(1024);
  25.   // 用来存放创建的群聊连接
  26. public static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  27. public static void start() throws InterruptedException {
  28. log.info("IM服务开始启动");
  29. EventLoopGroup bossGroup = new NioEventLoopGroup();
  30. EventLoopGroup workGroup = new NioEventLoopGroup();
  31. // 绑定端口
  32. ServerBootstrap bootstrap = new ServerBootstrap();
  33. bootstrap.group(bossGroup,workGroup)
  34. .channel(NioServerSocketChannel.class)
  35. .childHandler(new ChannelInitializer<SocketChannel>() {
  36. @Override
  37. protected void initChannel(SocketChannel socketChannel) throws Exception {
  38. ChannelPipeline pipeline = socketChannel.pipeline();
  39. // 添加http编码解码器
  40. pipeline.addLast(new HttpServerCodec())
  41. //支持大数据流
  42. .addLast(new ChunkedWriteHandler())
  43. // 对http消息做聚合操作 FullHttpRequest FullHttpResponse
  44. .addLast(new HttpObjectAggregator(1024*64))
  45. //支持websocket
  46. .addLast(new WebSocketServerProtocolHandler("/"))
  47. .addLast(new WebSocketHandler());
  48. }
  49. });
  50. ChannelFuture future = bootstrap.bind(8087).sync();
  51. log.info("服务器启动开始监听端口: {}", 8087);
  52. future.channel().closeFuture().sync();
  53. //关闭主线程组
  54. bossGroup.shutdownGracefully();
  55. //关闭工作线程组
  56. workGroup.shutdownGracefully();
  57. }
  58. }

 

 创建聊天消息实体类

  1. /**
  2. * @Description: 聊天消息对象 可以自行根据实际业务扩展
  3. * @author: seizedays
  4. */
  5. @Data
  6. public class ChatMessage extends IMCommand {
  7. //消息类型
  8. private Integer type;
  9. //消息目标对象
  10. private String target;
  11. //消息内容
  12. private String content;
  13. }

连接类型枚举类,暂时定义为建立连接、发送消息和加入群组三种状态码

  1. @AllArgsConstructor
  2. @Getter
  3. public enum CommandType {
  4. //建立连接
  5. CONNECT(10001),
  6. //发送消息
  7. CHAT(10002),
  8. //加入群聊
  9. JOIN_GROUP(10003),
  10. ERROR(-1)
  11. ;
  12. private Integer code;
  13. public static CommandType match(Integer code){
  14. for (CommandType value : CommandType.values()) {
  15. if (value.code.equals(code)){
  16. return value;
  17. }
  18. }
  19. return ERROR;
  20. }
  21. }

命令动作为聊天的时候 消息类型又划分为私聊和群聊两种 枚举类如下:

  1. @AllArgsConstructor
  2. @Getter
  3. public enum MessageType {
  4. //私聊
  5. PRIVATE(1),
  6. //群聊
  7. GROUP(2),
  8. ERROR(-1)
  9. ;
  10. private Integer type;
  11. public static MessageType match(Integer code){
  12. for (MessageType value : MessageType.values()) {
  13. if (value.type.equals(code)){
  14. return value;
  15. }
  16. }
  17. return ERROR;
  18. }
  19. }

 

创建连接请求的拦截器

  1. import com.alibaba.fastjson2.JSON;
  2. import com.bxt.common.vo.Result;
  3. import com.bxt.demo.im.cmd.IMCommand;
  4. import com.bxt.demo.im.server.IMServer;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  7. /**
  8. * @Description: 用户连接到服务端的拦截器
  9. * @author: bhw
  10. * @date: 2023年09月27日 14:28
  11. */
  12. public class ConnectionHandler {
  13. public static void execute(ChannelHandlerContext ctx, IMCommand command) {
  14. if (IMServer.USERS.containsKey(command.getNickName())) {
  15. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(command.getNickName() + "已经在线,不能重复连接"))));
  16. ctx.channel().disconnect();
  17. return;
  18. }
  19. IMServer.USERS.put(command.getNickName(), ctx.channel());
  20. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("系统消息:" + command.getNickName() + "与服务端连接成功"))));
  21. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success(JSON.toJSONString(IMServer.USERS.keySet())))));
  22. }
  23. }

加入群组功能的拦截器

  1. /**
  2. * @Description: 加入群聊拦截器
  3. * @author: bhw
  4. * @date: 2023年09月27日 15:07
  5. */
  6. public class JoinGroupHandler {
  7. public static void execute(ChannelHandlerContext ctx) {
  8. try {
  9. IMServer.GROUP.add(ctx.channel());
  10. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("加入系统默认群组成功!"))));
  11. } catch (Exception e) {
  12. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常"))));
  13. }
  14. }
  15. }

发送聊天到指定对象的功能拦截器

  1. import com.alibaba.excel.util.StringUtils;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.bxt.common.vo.Result;
  4. import com.bxt.demo.im.cmd.ChatMessage;
  5. import com.bxt.demo.im.cmd.MessageType;
  6. import com.bxt.demo.im.server.IMServer;
  7. import io.netty.channel.Channel;
  8. import io.netty.channel.ChannelHandlerContext;
  9. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  10. import java.util.Objects;
  11. /**
  12. * @Description: 聊天拦截器
  13. * @author: bhw
  14. * @date: 2023年09月27日 15:07
  15. */
  16. public class ChatHandler {
  17. public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
  18. try {
  19. ChatMessage message = JSON.parseObject(frame.text(), ChatMessage.class);
  20. MessageType msgType = MessageType.match(message.getType());
  21. if (msgType.equals(MessageType.PRIVATE)) {
  22. if (StringUtils.isBlank(message.getTarget())){
  23. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,请选择消息发送对象"))));
  24. return;
  25. }
  26. Channel channel = IMServer.USERS.get(message.getTarget());
  27. if (Objects.isNull(channel) || !channel.isActive()){
  28. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,对方不在线"))));
  29. IMServer.USERS.remove(message.getTarget());
  30. return;
  31. }
  32. channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("私聊消息(" + message.getTarget() + "):" + message.getContent()))));
  33. } else if (msgType.equals(MessageType.GROUP)) {
  34. IMServer.GROUP.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("群消息:发送者(" + message.getNickName() + "):" + message.getContent()))));
  35. }else {
  36. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:不支持的消息类型"))));
  37. }
  38. } catch (Exception e) {
  39. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常"))));
  40. }
  41. }
  42. }

最后是websocket拦截器 接收到客户端的指令后选择对应的拦截器实现相应的功能:

  1. import com.alibaba.fastjson2.JSON;
  2. import com.bxt.common.vo.Result;
  3. import com.bxt.demo.im.cmd.CommandType;
  4. import com.bxt.demo.im.cmd.IMCommand;
  5. import com.bxt.demo.im.server.IMServer;
  6. import io.netty.channel.Channel;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.SimpleChannelInboundHandler;
  9. import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
  10. import lombok.extern.slf4j.Slf4j;
  11. /**
  12. * @Description: websocket拦截器
  13. * @author: bhw
  14. * @date: 2023年09月27日 13:59
  15. */
  16. @Slf4j
  17. public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
  18. @Override
  19. protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
  20. System.out.println(frame.text());
  21. try {
  22. IMCommand command = JSON.parseObject(frame.text(), IMCommand.class);
  23. CommandType cmdType = CommandType.match(command.getCode());
  24. if (cmdType.equals(CommandType.CONNECT)){
  25. ConnectionHandler.execute(ctx, command);
  26. } else if (cmdType.equals(CommandType.CHAT)) {
  27. ChatHandler.execute(ctx,frame);
  28. } else if (cmdType.equals(CommandType.JOIN_GROUP)) {
  29. JoinGroupHandler.execute(ctx);
  30. } else {
  31. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("不支持的code"))));
  32. }
  33. }catch (Exception e){
  34. ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(e.getMessage()))));
  35. }
  36. }
  37. @Override
  38. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  39. // 当连接断开时被调用
  40. Channel channel = ctx.channel();
  41. // 从 USERS Map 中移除对应的 Channel
  42. removeUser(channel);
  43. super.channelInactive(ctx);
  44. }
  45. private void removeUser(Channel channel) {
  46. // 遍历 USERS Map,找到并移除对应的 Channel
  47. IMServer.USERS.entrySet().removeIf(entry -> entry.getValue() == channel);
  48. }
  49. }

第三步 启动服务

  1. @SpringBootApplication
  2. public class DemoApplication {
  3. public static void main(String[] args) {
  4. SpringApplication.run(DemoApplication.class, args);
  5. // 启动IM服务
  6. try {
  7. IMServer.start();
  8. } catch (InterruptedException e) {
  9. throw new RuntimeException(e);
  10. }
  11. }
  12. }

现在 客户端通过socket协议访问8087端口即可实现基本的聊天室功能了!

原文链接:https://www.cnblogs.com/seizedays/p/17772433.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号