经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » Java相关 » Java » 查看文章
基于Java NIO 写的一个简单版 Netty 服务端
来源:cnblogs  作者:gg12138  时间:2024/4/3 9:21:42  对本文有异议

A Simple Netty Based On JAVA NIO

基于Java NIO 写的一个简单版 Netty 服务端

前置知识

NIO

  • NIO 一般指 同步非阻塞 IO,同样用于**描述程序访问数据方式 **的还有BIO(同步阻塞)、AIO(异步非阻塞)
  • 同步异步指获取结果的方式,同步为主动去获取结果,不管结果是否准备好,异步为等待结果准备好的通知
  • 阻塞非阻塞是线程在结果没有到来之前,是否进行等待,阻塞为进行等待,非阻塞则不进行等待
  • NIO 主动地去获取结果,但是在结果没有准备好之前,不会进行等待。而是通过一个 多路复用器 管理多个通道,由一个线程轮训地去检查是否准备好即可。在网络编程中,多路复用器通常由操作系统提供,Linux中主要有 select、poll、epoll。同步非阻塞指线程不等待数据的传输,而是完成后由多路复用器通知,线程再将数据从内核缓冲区拷贝到用户空间内存进行处理。

Java NIO

  • 基于 NIO 实现的网络框架,可以用少量的线程,处理大量的连接,更适用于高并发场景。于是,Java提供了NIO包提供相关组件,用于实现同步非阻塞IO
    • 核心三个类Channel、Buffer、Selector。Channel代表一个数据传输通道,但不进行数据存取,有Buffer类进行数据管理,Selector为一个复用器,管理多个通道

Bytebuffer

  • 该类为NIO 包中用于操作内存的抽象类,具体实现由HeapByteBuffer、DirectByteBuffer两种
  • HeapByteBuffer为堆内内存,底层通过 byte[ ] 存取数据
  • DirectByteBuffer 为堆外内存,通过JDK提供的 Unsafe类去存取;同时创建对象会关联的一个Cleaner对象,当对象被GC时,通过cleaner对象去释放堆外内存

各核心组件介绍

NioServer

为启动程序类,监听端口,初始化Channel

  • 下面为NIO模式下简单服务端处理代码
  1. // 1、创建服务端Channel,绑定端口并配置非阻塞
  2. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  3. serverSocketChannel.socket().bind(new InetSocketAddress(6666));
  4. serverSocketChannel.configureBlocking(false);
  5. // 2、创建多路复用器selector,并将channel注册到多路复用器上
  6. // 不能直接调用channel的accept方法,因为属于非阻塞,直接调用没有新连接会直接返回
  7. Selector selector = Selector.open();
  8. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  9. // 3、循环处理多路复用器的IO事件
  10. while(true){
  11. // 3.1、select属于阻塞的方法,这里阻塞等待1秒
  12. // 如果返回0,说明没有事件处理
  13. if (selector.select(1000) == 0){
  14. System.out.println("服务器等待了1秒,无IO事件");
  15. continue;
  16. }
  17. // 3.2、遍历事件进行处理
  18. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  19. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  20. while(iterator.hasNext()){
  21. SelectionKey key = iterator.next();
  22. // accept事件,说明有新的客户端连接
  23. if (key.isAcceptable()){
  24. // 新建一个socketChannel,注册到selector,并关联buffer
  25. SocketChannel socketChannel = serverSocketChannel.accept();
  26. socketChannel.configureBlocking(false);
  27. socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
  28. System.out.println("客户端连接:"+socketChannel.getRemoteAddress());
  29. }
  30. // read事件 (内核缓冲区的数据准备好了)
  31. if(key.isReadable()){
  32. SocketChannel channel = (SocketChannel)key.channel();
  33. ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
  34. try {
  35. // 将数据写进buffer
  36. int readNum = channel.read(byteBuffer);
  37. if (readNum == -1){
  38. System.out.println("读取-1时,表示IO流已结束");
  39. channel.close();
  40. break;
  41. }
  42. // 打印buffer
  43. byteBuffer.flip();
  44. byte[] bytes = new byte[readNum];
  45. byteBuffer.get(bytes, 0, readNum);
  46. System.out.println("读取到数据:" + new String(bytes));
  47. } catch (IOException e) {
  48. System.out.println("读取发生异常,广播socket");
  49. channel.close();
  50. }
  51. }
  52. // write事件 (操作系统有内存写出了)
  53. if (key.isWritable()){
  54. SocketChannel channel = (SocketChannel)key.channel();
  55. // 读取read时暂存数据
  56. byte[] bytes = (byte[])key.attachment();
  57. if (bytes != null){
  58. System.out.println("可写事件发生,写入数据: " + new String(bytes));
  59. channel.write(ByteBuffer.wrap(bytes));
  60. }
  61. // 清空暂存数据,并切换成关注读事件
  62. key.attach(null);
  63. key.interestOps(SelectionKey.OP_READ);
  64. }
  65. iterator.remove();
  66. }
  67. }

EventLoop

处理 Channel 中数据的读写

  • 在上面的Server中,大量并发时单线程地处理读写事件会导致延迟,因此将读写处理抽取出来,可利用多线程实现高并发
  • 一个EventLoop会关联一个selector,只会处理这个selector上的Channel
  1. public class EventLoop2 implements Runnable{
  2. private final Thread thread;
  3. /**
  4. * 复用器,当前线程只处理这个复用器上的channel
  5. */
  6. public Selector selector;
  7. /**
  8. * 待处理的注册任务
  9. */
  10. private final Queue<Runnable> queue = new LinkedBlockingQueue<>();
  11. /**
  12. * 初始化复用器,线程启动
  13. * @throws IOException
  14. */
  15. public EventLoop2() throws IOException {
  16. this.selector = SelectorProvider.provider().openSelector();
  17. this.thread = new Thread(this);
  18. thread.start();
  19. }
  20. /**
  21. * 将通道注册给当前的线程处理
  22. * @param socketChannel
  23. * @param keyOps
  24. */
  25. public void register(SocketChannel socketChannel,int keyOps){
  26. // 将注册新的socketChannel到当前selector封装成一个任务
  27. queue.add(()->{
  28. try {
  29. MyChannel myChannel = new MyChannel(socketChannel, this);
  30. SelectionKey key = socketChannel.register(selector, keyOps);
  31. key.attach(myChannel);
  32. } catch (Exception e){
  33. e.printStackTrace();
  34. }
  35. });
  36. // 唤醒阻塞等待的selector线程
  37. selector.wakeup();
  38. }
  39. /**
  40. * 循环地处理 注册事件、读写事件
  41. */
  42. @Override
  43. public void run() {
  44. while (!thread.isInterrupted()){
  45. try {
  46. int select = selector.select(1000);
  47. // 处理注册到当前selector的事件
  48. if (select == 0){
  49. Runnable task;
  50. while ((task = queue.poll()) != null){
  51. task.run();
  52. }
  53. continue;
  54. }
  55. // 处理读写事件
  56. System.out.println("服务器收到读写事件,select:" + select);
  57. processReadWrite();
  58. }catch (Exception e){
  59. e.printStackTrace();
  60. }
  61. }
  62. }
  63. /**
  64. * 处理读写事件
  65. * @throws Exception
  66. */
  67. private void processReadWrite() throws Exception{
  68. System.out.println(Thread.currentThread() + "开始监听读写事件");
  69. // 3.2、遍历事件进行处理
  70. Set<SelectionKey> selectionKeys = selector.selectedKeys();
  71. Iterator<SelectionKey> iterator = selectionKeys.iterator();
  72. while(iterator.hasNext()){
  73. SelectionKey key = iterator.next();
  74. MyChannel myChannel = (MyChannel)key.attachment();
  75. if(key.isReadable()){
  76. // 将数据读进buffer
  77. myChannel.doRead(key);
  78. }
  79. if (key.isWritable()){
  80. myChannel.doWrite(key);
  81. }
  82. iterator.remove();
  83. }
  84. }
  85. }

EventloopGroup

一组EventLoop,轮训地为eventLoop分配Channel

  1. public class EventLoopGroup {
  2. private EventLoop2[] children = new EventLoop2[1];
  3. private AtomicInteger idx = new AtomicInteger(0);
  4. public EventLoopGroup() throws IOException {
  5. for (int i = 0; i < children.length; i++){
  6. children[i] = new EventLoop2();
  7. }
  8. }
  9. public EventLoop2 next(){
  10. // 轮训每一个children
  11. return children[idx.getAndIncrement() & (children.length - 1)];
  12. }
  13. public void register(SocketChannel channel,int ops){
  14. next().register(channel,ops);
  15. }
  16. }

Channel

封装了SocketChannel 和 Pipline,将从Channel读写的消息,沿着Pipline上的节点进行处理

  • 在上面EventLoop中,注册Channel到对应的Selector前,会进行封装,将自定义的Channel放在读写事件触发时会返回的SelectionKey里面
  • 同时提供了数据读写处理方法,读写事件触发时调用该方法,数据会沿着pipline上去处理
  1. public class MyChannel {
  2. private SocketChannel channel;
  3. private EventLoop2 eventLoop;
  4. private Queue<ByteBuffer> writeQueue;
  5. private PipLine pipLine;
  6. /**
  7. * 一个channel关联一个eventLoop、一个pipLine、一个socketChannel、一个writeQueue
  8. * @param channel
  9. * @param eventLoop
  10. */
  11. public MyChannel(SocketChannel channel, EventLoop2 eventLoop) {
  12. this.channel = channel;
  13. this.eventLoop = eventLoop;
  14. this.writeQueue = new ArrayDeque<>();
  15. this.pipLine = new PipLine(this,eventLoop);
  16. this.pipLine.addLast(new MyHandler1());
  17. this.pipLine.addLast(new MyHandler2());
  18. }
  19. /**
  20. * 读事件处理
  21. * @param key
  22. * @throws Exception
  23. */
  24. public void doRead(SelectionKey key) throws Exception{
  25. try {
  26. ByteBuffer buffer = ByteBuffer.allocate(1024);
  27. int readNum = channel.read(buffer);
  28. if (readNum == -1){
  29. System.out.println("读取-1时,表示IO流已结束");
  30. channel.close();
  31. return;
  32. }
  33. // 转成可读状态
  34. buffer.flip();
  35. // 消息放入pipLine,交给头节点, 头节点开始传递
  36. pipLine.headContext.fireChannelRead(buffer);
  37. } catch (IOException e) {
  38. System.out.println("读取发生异常,广播socket");
  39. channel.close();
  40. }
  41. }
  42. /**
  43. * 真正地写出数据,关注写事件后,会触发
  44. * @param key
  45. * @throws IOException
  46. */
  47. public void doWrite(SelectionKey key) throws IOException{
  48. ByteBuffer buffer;
  49. while ((buffer =writeQueue.poll()) != null){
  50. channel.write(buffer);
  51. }
  52. // 回复读取状态
  53. key.interestOps(SelectionKey.OP_READ);
  54. }
  55. /**
  56. * 写出到队列
  57. * @param msg
  58. */
  59. public void doWriteQueue(ByteBuffer msg){
  60. writeQueue.add(msg);
  61. }
  62. /**
  63. * 从最后一个节点进行写出,写出到头节点是调用doWriteQueue
  64. * @param msg
  65. */
  66. public void write(Object msg){
  67. this.pipLine.tailContext.write(msg);
  68. }
  69. /**
  70. * 从最后一个节点进行flush,写出到头节点时调用doFlush
  71. */
  72. public void flush(){
  73. this.pipLine.tailContext.flush();
  74. }
  75. /**
  76. * 关注写事件,才能进行真正地写出
  77. */
  78. public void doFlush(){
  79. this.channel.keyFor(eventLoop.selector).interestOps(SelectionKey.OP_WRITE);
  80. }
  81. }

Handler 和 HandlerContext

handler 接口定义了可以扩展处理的消息,由开发人员实现具体的处理

handlerContext 类封装了handler的实现类,将handler的上一个节点和下一个节点,让消息可以延者链表传递

  1. public interface Handler {
  2. /**
  3. * 读取数据处理
  4. * @param ctx
  5. * @param msg
  6. */
  7. void channelRead(HandlerContext ctx,Object msg);
  8. /**
  9. * 写出数据
  10. * @param ctx
  11. * @param msg
  12. */
  13. void write(HandlerContext ctx,Object msg);
  14. /**
  15. * 刷下数据
  16. * @param ctx
  17. */
  18. void flush(HandlerContext ctx);
  19. }
  1. public class HandlerContext {
  2. private Handler handler;
  3. MyChannel channel;
  4. HandlerContext prev;
  5. HandlerContext next;
  6. public HandlerContext(Handler handler, MyChannel channel) {
  7. this.handler = handler;
  8. this.channel = channel;
  9. }
  10. /**
  11. * 读消息的传递,从头节点开始往后传
  12. * @param msg
  13. */
  14. public void fireChannelRead(Object msg){
  15. HandlerContext next = this.next;
  16. if (next != null){
  17. next.handler.channelRead(next,msg);
  18. }
  19. }
  20. /**
  21. * 从尾节点开始往前传
  22. * @param msg
  23. */
  24. public void write(Object msg){
  25. HandlerContext prev = this.prev;
  26. if (prev != null){
  27. prev.handler.write(prev,msg);
  28. }
  29. }
  30. /**
  31. * 从尾节点开始往前传
  32. */
  33. public void flush(){
  34. HandlerContext prev = this.prev;
  35. if (prev != null){
  36. prev.handler.flush(prev);
  37. }
  38. }
  39. }

Pipline

本质是链表,包含了头尾节点的HandlerContext,提供方法给开发人员加节点

  1. public class PipLine {
  2. private MyChannel channel;
  3. private EventLoop2 eventLoop;
  4. public HandlerContext headContext;
  5. public HandlerContext tailContext;
  6. public PipLine(MyChannel channel, EventLoop2 eventLoop) {
  7. this.channel = channel;
  8. this.eventLoop = eventLoop;
  9. PipHandler headHandler = new PipHandler();
  10. this.headContext = new HandlerContext(headHandler,channel);
  11. PipHandler tailHandler = new PipHandler();
  12. this.tailContext = new HandlerContext(tailHandler,channel);
  13. // 构建链表
  14. this.headContext.next = this.tailContext;
  15. this.tailContext.prev = this.headContext;
  16. }
  17. public void addLast(Handler handler){
  18. HandlerContext curr = new HandlerContext(handler, channel);
  19. // 连接在倒数第二个后面
  20. HandlerContext lastButOne = this.tailContext.prev;
  21. lastButOne.next = curr;
  22. curr.prev = lastButOne;
  23. // 连接在最后一个前面
  24. curr.next = tailContext;
  25. tailContext.prev = curr;
  26. }
  27. public static class PipHandler implements Handler{
  28. @Override
  29. public void channelRead(HandlerContext ctx, Object msg) {
  30. System.out.println("接收"+(String) msg +"进行资源释放");
  31. }
  32. @Override
  33. public void write(HandlerContext ctx, Object msg) {
  34. System.out.println("写出"+msg.toString());
  35. }
  36. @Override
  37. public void flush(HandlerContext ctx) {
  38. System.out.println("flush");
  39. }
  40. }
  41. }

原文链接:https://www.cnblogs.com/gg12138/p/18111254

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号