简单记录一下实现的整体框架,具体细节在实际生产中再细化就可以了。
第一步 引入netty依赖
SpringBoot的其他必要的依赖像Mybatis、Lombok这些都是老生常谈了 就不在这里放了
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.85.Final</version>
- </dependency>
第二步 接下来就是准备工作。
消息服务类(核心代码) 聊天服务的功能就是靠这个类的start()函数来启动的 绑定端口8087 之后可以通socket协议访问这个端口来执行通讯
- import com.bxt.demo.im.handler.WebSocketHandler;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.*;
- import io.netty.channel.group.ChannelGroup;
- import io.netty.channel.group.DefaultChannelGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.HttpServerCodec;
- import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
- import io.netty.handler.stream.ChunkedWriteHandler;
- import io.netty.util.concurrent.GlobalEventExecutor;
- import lombok.extern.slf4j.Slf4j;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * @Description: 即时通讯服务类
- * @author: bhw
- * @date: 2023年09月27日 13:44
- */
- @Slf4j
- public class IMServer {
// 用来存放连入服务器的用户集合 - public static final Map<String, Channel> USERS = new ConcurrentHashMap<>(1024);
- // 用来存放创建的群聊连接
- public static final ChannelGroup GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- public static void start() throws InterruptedException {
- log.info("IM服务开始启动");
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workGroup = new NioEventLoopGroup();
- // 绑定端口
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(bossGroup,workGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline pipeline = socketChannel.pipeline();
- // 添加http编码解码器
- pipeline.addLast(new HttpServerCodec())
- //支持大数据流
- .addLast(new ChunkedWriteHandler())
- // 对http消息做聚合操作 FullHttpRequest FullHttpResponse
- .addLast(new HttpObjectAggregator(1024*64))
- //支持websocket
- .addLast(new WebSocketServerProtocolHandler("/"))
- .addLast(new WebSocketHandler());
- }
- });
- ChannelFuture future = bootstrap.bind(8087).sync();
- log.info("服务器启动开始监听端口: {}", 8087);
- future.channel().closeFuture().sync();
- //关闭主线程组
- bossGroup.shutdownGracefully();
- //关闭工作线程组
- workGroup.shutdownGracefully();
- }
- }
创建聊天消息实体类
- /**
- * @Description: 聊天消息对象 可以自行根据实际业务扩展
- * @author: seizedays
- */
- @Data
- public class ChatMessage extends IMCommand {
- //消息类型
- private Integer type;
- //消息目标对象
- private String target;
- //消息内容
- private String content;
- }
连接类型枚举类,暂时定义为建立连接、发送消息和加入群组三种状态码
- @AllArgsConstructor
- @Getter
- public enum CommandType {
- //建立连接
- CONNECT(10001),
- //发送消息
- CHAT(10002),
- //加入群聊
- JOIN_GROUP(10003),
- ERROR(-1)
- ;
- private Integer code;
- public static CommandType match(Integer code){
- for (CommandType value : CommandType.values()) {
- if (value.code.equals(code)){
- return value;
- }
- }
- return ERROR;
- }
- }
命令动作为聊天的时候 消息类型又划分为私聊和群聊两种 枚举类如下:
- @AllArgsConstructor
- @Getter
- public enum MessageType {
- //私聊
- PRIVATE(1),
- //群聊
- GROUP(2),
- ERROR(-1)
- ;
- private Integer type;
- public static MessageType match(Integer code){
- for (MessageType value : MessageType.values()) {
- if (value.type.equals(code)){
- return value;
- }
- }
- return ERROR;
- }
- }
创建连接请求的拦截器
- import com.alibaba.fastjson2.JSON;
- import com.bxt.common.vo.Result;
- import com.bxt.demo.im.cmd.IMCommand;
- import com.bxt.demo.im.server.IMServer;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- /**
- * @Description: 用户连接到服务端的拦截器
- * @author: bhw
- * @date: 2023年09月27日 14:28
- */
- public class ConnectionHandler {
- public static void execute(ChannelHandlerContext ctx, IMCommand command) {
- if (IMServer.USERS.containsKey(command.getNickName())) {
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(command.getNickName() + "已经在线,不能重复连接"))));
- ctx.channel().disconnect();
- return;
- }
- IMServer.USERS.put(command.getNickName(), ctx.channel());
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("系统消息:" + command.getNickName() + "与服务端连接成功"))));
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success(JSON.toJSONString(IMServer.USERS.keySet())))));
- }
- }
加入群组功能的拦截器
- /**
- * @Description: 加入群聊拦截器
- * @author: bhw
- * @date: 2023年09月27日 15:07
- */
- public class JoinGroupHandler {
- public static void execute(ChannelHandlerContext ctx) {
- try {
- IMServer.GROUP.add(ctx.channel());
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("加入系统默认群组成功!"))));
- } catch (Exception e) {
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常"))));
- }
- }
- }
发送聊天到指定对象的功能拦截器
- import com.alibaba.excel.util.StringUtils;
- import com.alibaba.fastjson2.JSON;
- import com.bxt.common.vo.Result;
- import com.bxt.demo.im.cmd.ChatMessage;
- import com.bxt.demo.im.cmd.MessageType;
- import com.bxt.demo.im.server.IMServer;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import java.util.Objects;
- /**
- * @Description: 聊天拦截器
- * @author: bhw
- * @date: 2023年09月27日 15:07
- */
- public class ChatHandler {
- public static void execute(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
- try {
- ChatMessage message = JSON.parseObject(frame.text(), ChatMessage.class);
- MessageType msgType = MessageType.match(message.getType());
- if (msgType.equals(MessageType.PRIVATE)) {
- if (StringUtils.isBlank(message.getTarget())){
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,请选择消息发送对象"))));
- return;
- }
- Channel channel = IMServer.USERS.get(message.getTarget());
- if (Objects.isNull(channel) || !channel.isActive()){
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:消息发送失败,对方不在线"))));
- IMServer.USERS.remove(message.getTarget());
- return;
- }
- channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("私聊消息(" + message.getTarget() + "):" + message.getContent()))));
- } else if (msgType.equals(MessageType.GROUP)) {
- IMServer.GROUP.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.success("群消息:发送者(" + message.getNickName() + "):" + message.getContent()))));
- }else {
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("系统消息:不支持的消息类型"))));
- }
- } catch (Exception e) {
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("消息内容异常"))));
- }
- }
- }
最后是websocket拦截器 接收到客户端的指令后选择对应的拦截器实现相应的功能:
- import com.alibaba.fastjson2.JSON;
- import com.bxt.common.vo.Result;
- import com.bxt.demo.im.cmd.CommandType;
- import com.bxt.demo.im.cmd.IMCommand;
- import com.bxt.demo.im.server.IMServer;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
- import lombok.extern.slf4j.Slf4j;
- /**
- * @Description: websocket拦截器
- * @author: bhw
- * @date: 2023年09月27日 13:59
- */
- @Slf4j
- public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
- System.out.println(frame.text());
- try {
- IMCommand command = JSON.parseObject(frame.text(), IMCommand.class);
- CommandType cmdType = CommandType.match(command.getCode());
- if (cmdType.equals(CommandType.CONNECT)){
- ConnectionHandler.execute(ctx, command);
- } else if (cmdType.equals(CommandType.CHAT)) {
- ChatHandler.execute(ctx,frame);
- } else if (cmdType.equals(CommandType.JOIN_GROUP)) {
- JoinGroupHandler.execute(ctx);
- } else {
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error("不支持的code"))));
- }
- }catch (Exception e){
- ctx.channel().writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(Result.error(e.getMessage()))));
- }
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- // 当连接断开时被调用
- Channel channel = ctx.channel();
- // 从 USERS Map 中移除对应的 Channel
- removeUser(channel);
- super.channelInactive(ctx);
- }
- private void removeUser(Channel channel) {
- // 遍历 USERS Map,找到并移除对应的 Channel
- IMServer.USERS.entrySet().removeIf(entry -> entry.getValue() == channel);
- }
- }
第三步 启动服务
- @SpringBootApplication
- public class DemoApplication {
- public static void main(String[] args) {
- SpringApplication.run(DemoApplication.class, args);
- // 启动IM服务
- try {
- IMServer.start();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
现在 客户端通过socket协议访问8087端口即可实现基本的聊天室功能了!