🌟个人博客:www.hellocode.top
🏰Java知识导航:Java-Navigate
🔥CSDN:HelloCode.
🌞知乎HelloCode
🌴掘金HelloCode
⚡如有问题,欢迎指正,一起学习~~

NIO

non-blocking io(new io) 非阻塞的IO

三大组件

Channel & Buffer

  1. Channel:双向的数据传输通道

    • FileChannel
    • DatagramChannel
    • SocketChannel
    • ServerSocketChannel
  2. Buffer:内存缓冲区

    • ByteBuffer:MappedByteBuffer、DirectByteBuffer、HeapByteBuffer
    • ShortBuffer
    • IntBuffer
    • LongBuffer
    • FloatBuffer
    • DoubleBuffer
    • CharBuffer

Selector

多线程版本

在NIO出现之前,服务器处理多客户端连接时采用多线程版设计,每个客户端即为一个socket,服务器会启动一个线程为socket提供服务,每个线程专管一个socket连接

  • 内存占用过高
  • 线程上下文切换成本高
  • 只适合连接数少的场景

线程池版本

通过线程池来复用线程资源,控制最大线程数

  • 阻塞模式下,线程仅能处理一个socket连接
  • 仅适合短连接场景

Selector版本

配合一个线程来管理多个channel,获取这些channel上发生的事件,这些channel在非阻塞模式下,不会让线程吊死在一个channel上。适合连接数特别多,但流量低的场景。(在线程和连接中添加一个selector层来监控channel上的读写事件)

ByteBuffer

进行文件操作,获取FileChannel,读取文件时,需要有一个暂存缓冲区存放对应内容(ByteBuffer)

ByteBuffer.allocate()获取Bytebuffer

获取channel:

  1. 通过输入输出流获取(getChannel方法)
  2. RandomAccessFile(getChannel方法)

ByteBuffer结构:

类似数组,核心参数:

  • capacity:容量
  • position:读写指针
  • limit:读写限制
在读写模式切换时,其实就是position和limit指针的变换过程,切换后才能读取到正确的数据
  • clean转换写模式时,如果有未读取完毕的数据,会直接覆盖
  • compact意为压缩,转换写模式时,如果有未读取完的数据,会保留,将position移至未读取完的元素后继续写入

常用buffer方法:

  • allocate(int n):为ByteBuffer分配空间(堆内存)

    • allocateDirect:分配空间(直接内存)
    • 堆内存读写效率较低,受到 垃圾回收机制 (GC)的影响
    • 直接内存使用的是系统内存,效率会高一些(少一次数据拷贝),不受 GC 影响,分配内存时效率低,使用不当会造成内存泄漏
  • channel.read(buffer):通过channel向buffer写入内容

    • buffer.put((byte) 127):buffer自己的写入方法
  • buffer.get():buffer自己的获取字节方法

    • channel.write(buffer):从buffer读向channel写
    • get可以让position指针后移,如果想重复读取数据,可以调用rewind方法将position重新置为0,或者调用 get(int i)方法获取索引i的内容,它不会移动指针
    • mark & reset:mark是做一个标记,记录positiopn位置,reset是跳转到mark记录的位置
  • hasRemaining():是否还有剩余未读数据
  • flip():切换为读模式
  • clear()compact():切换为写模式(清空buffer)

字符串与ByteBuffer互相转换:

  1. 字符串转换为ByteBuffer

    • ByteBuffer.allocate():为ByteBuffer分配空间
    • buffer.put("hello".getBytes()):向buffer填入字节数组
  2. 使用Charset

    • StandardCharsets.UTF_8.encode("hello"):获得指定字符串的ByteBuffer
    • StandardCharsets.UTF_8.decode(Bytebuffer buffer).toString():将ByteBuffer转为字符串
  3. wrap

    • ByteBuffer.wrap(byte[] bytes)
第一种方式在写入完毕后,还是写模式,position指针还是指向末尾;而后两种方法在写入后会自动切换为读模式,将position指向0位置

分散读、集中写

Scattering Reads(分散读取),将一个文件分散读取到多个ByteBuffer中

  • 同样是ByteBuffer的read方法
  • 传入一个ByteBuffer数组,每个ByteBuffer分配好空间,读满就会把后续内容读入后面的ByteBuffer

Gathering Writes(集中写入),将多个ByteBuffer写入到一个文件中

  • 使用ByteBuffer的write方法传入一个BufferByte数组
减少数据在ByteBuffer之间的数据拷贝

黏包、半包分析

网络上多条数据发送给服务器时,假设数据使用 \n进行分割

Hello,word\n
I am Zhangsan\n
How are you\n

可能在接收时,被进行了重新组合,如下:

Hello,word\nI am Zhangsan\nHo
w are you\n

这就是出现了黏包和半包问题

public static void split(ByteBuffer source){
    source.flip();
    for(int i = 0; i < source.limit(); i++){
        // 找到一条完整消息
        if(source.get(i) == '\n'){
            // 计算消息长度
            int length = i + 1 - source.position();
            ByteBuffer target = ByteBuffer.allocate(length);
      
            for(int j = 0; j < length; j++){
                target.put(source.get());
            }
      
            // 打印target
        }
    }
    source.compact();
}

文件编程

FileChannel

FileChannel智能工作在阻塞模式下

获取:

  • 通过FileInputStream的getchannel获取,只能读
  • 通过FileOutputStream的getchannel获取,只能写
  • 通过RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

读取:

  • channel.read:读取到的数据暂存到ByteBuffer中
  • 返回值代表读取到的字节数,-1表示读取到末尾了

写入:

ByteBuffer buffer = ...;
buffer.put(...);    // 存入数据
buffer.flip();    // 切换读模式

while(buffer.hasRemaining()){
    channel.write(buffer);
}
write不能保证一次将buffer中的内容全部写入channel,FileChannel可以,但是SocketChannel不行,推荐上面的规范写法

关闭:

推荐使用try-with-source关闭

位置:

  • 获取当前位置:channel.position()
  • 设置当前位置:channel.position(newPos)

    • 如果设置为文件的末尾,读取时会返回-1
    • 设置为文件的末尾,写入会追加数据
    • 如果超过了文件末尾,再写入时会有空洞(00)

大小:

使用size方法获取文件的大小

强制写入:

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘,可以调用 force(true)方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

两个Channel传输数据

  • channel.transferTo(起始位置,传输的数据量,目标channel)
  • 一次最多传输2G的数据,如果数据过大,可以多次传输
使用了操作系统的零拷贝进行优化,效率高

Path

JDK7 引入了 Path 和 Paths 类
  • Path 用来表示文件路径
  • Paths 是工具类,用来获取Path实例
Path source = Paths.get("1.txt");        // 相对路径,使用user.dir 环境变量来定位 1.txt
Path source = Paths.get("d:\\1.txt");    // 绝对路径,代表了 d:\1.txt
Path source = Paths.get("d:/1.txt");    // 绝对路径,代表了 d:\1.txt
Path source = Paths.get("d:\\data","projects");        // 代表了  d:\data\projects
  • .代表了当前路径
  • ..代表了上一级路径

Files

  • 检查文件是否存在:Files.exists(Path)
  • 创建一级目录:Files.createDirectory(Path)

    • 如果目录已经存在,抛异常
    • 不能一次创建多级目录,否则会抛异常
  • 创建多级目录:Files.createDircetories(Path)
  • 拷贝文件:Files.copy(Path source,Path target)

    • 如果文件已存在,会抛异常
    • 如果希望source覆盖掉target,需要用 StandardCopyOption 来控制:Files.copy(source, target, StandardCoptyOption.REPLACE_EXISTING)
  • 移动文件:Files.move(source, target, StandardCopyOption.ATOMIC_MOVE)

    • StandardCopyOption.ATOMIC_MOVE 保证文件移动的原子性
  • 删除文件:Files.delete(target),文件不存在,抛异常
  • 删除目录:Files.delete(target),如果目录还有内容,抛异常
  • 遍历目录

    • 使用 Files.walkFileTree(Path 起始目录, SimpleFileVistor<> visitor)

      重写匿名内部类的方法完成遍历

删除操作执行后,被删除的文件不会进入回收站

网络编程

非阻塞 VS 阻塞

阻塞模式

/**
 * @blog: <a href="https://www.hellocode.top">...</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-19  15:10
 * @Description: TODO
 */
public class Server {
    public static void main(String[] args) throws IOException {
        // 使用nio,单线程

        // 1. 创建服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ByteBuffer buffer = ByteBuffer.allocate(10);
        // 2. 指定监听端口
        ssc.bind(new InetSocketAddress(8080));

        // 3. 建立客户端连接,SocketChannel与客户端之间通信
        List<SocketChannel> channels = new ArrayList<>();
        while (true){
            System.out.println("Connecting ...");
            SocketChannel sc = ssc.accept();        // 阻塞,直到有客户端连接
            channels.add(sc);

            for(SocketChannel channel : channels){
                System.out.println("Before Read ...");
                channel.read(buffer);       // 阻塞,直到客户端发送了数据
                buffer.flip();
                while(buffer.hasRemaining()){
                    System.out.print((char) buffer.get());
                }
                buffer.clear();
                System.out.println("After Read ...");
            }
        }
    }
}
/**
 * @blog: https://www.hellocode.top
 * @Author: HelloCode.
 * @CreateTime: 2023-10-19  15:27
 * @Description: TODO
 */
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));
        System.out.println("waiting....");
    }
}
阻塞模式下,很多方法都会导致线程阻塞

非阻塞模式

  • 在服务器绑定端口前,通过 serverSocketChannel.configureBlocking(false)可以设置非阻塞模式,在该配置下,accept建立客户端连接时即为非阻塞,线程会继续向下运行,如果没有连接建立,accept方法的返回值为null
  • SocketChannel也可以设置为非阻塞模式,方法名一致,配置非阻塞后当使用 channel.read()时,也为非阻塞模式,如果没有读到数据,read返回0
非阻塞模式通过while-true循环,可以让单线程监控多个连接,但是会导致CPU空转,太过繁忙

Selector方式实现非阻塞

  1. 创建selector,管理多个channel:Selector.open()
  2. 将channel注册到selector中:channel.register(selector, 关注事件, 附件),返回SelectionKey

    1. SelectionKey,事件发生后可以通过它知道事件和哪个channel的事件
    2. 事件类型:

      accept:会在有连接请求时触发

      connect:在客户端连接后触发

      read:可读事件,表示有数据到了

      write:可写事件

    3. 附件一般即为各自channel的附属Buffer
  3. SelectionKey对象调用 interestOps()方法设置感兴趣的事件;ServerSocketChannel一般只需要对accept处理,ServerSocket一般对read和write处理

Selector何时不阻塞:

  1. 事件发生时
  2. 调用 selector.wakeup()
  3. 调用 selector.close()
  4. selector所在线程interrupt
public static void main(String[] args) throws IOException {
    // 创建selector,管理多个channel
    Selector selector = Selector.open();

    ByteBuffer buffer = ByteBuffer.allocate(16);
    ServerSocketChannel ssc = ServerSocketChannel.open();

    ssc.configureBlocking(false);

    // 将ssc注册到selector
    SelectionKey sscKey = ssc.register(selector, 0, null);
    // 声明关注事件(accept)
    sscKey.interestOps(SelectionKey.OP_ACCEPT);

    ssc.bind(new InetSocketAddress(8080));
    List<SocketChannel> channels = new ArrayList<>();

    while(true){
        // 调用selector的select方法,没有实现非阻塞,当有事件发生时,线程才会恢复运行
        selector.select();
        // 处理事件,selectionKeys集合内部包含了所有发生的事件
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while(iterator.hasNext()){
            SelectionKey key = iterator.next();
            System.out.println(key);
            // 区分事件类型
            if (key.isAcceptable()) {
                // accept事件
                ServerSocketChannel channel =(ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);
                SelectionKey scKey = sc.register(selector, 0, null);
                scKey.interestOps(SelectionKey.OP_READ);
                System.out.println(sc);
            } else if (key.isReadable()) {
                try {
                    // read事件,读取数据
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 正常断开时,read返回值为-1
                    int read = channel.read(buffer);
                    if(read == -1){
                        // 正常断开时,需要手动取消事件
                        key.cancel();
                    }else{
                        buffer.flip();
                        System.out.println(Charset.defaultCharset().decode(buffer));
                    }
                } catch (IOException e) {
                    // 当客户端关闭时(断开连接),会走read事件
                    // 需要手动取消对应的key
                    e.printStackTrace();
                    key.cancel();
                }
            }
            // 处理完后删除对应的key(重要)
            iterator.remove();
        }
    }
}
  • Select在事件未处理时不会阻塞(使用cancle取消事件也可以),不能置之不理
  • Selector会在发生事件后,向selectedKeys中添加key,但是不会自动删除,在处理完后需要我们手动删除
  • 当客户端断开连接时,会触发read事件,我们需要通过try...catch处理异常,手动取消事件(cancel),从Selector的key集合中真正的删除 key
  • 此外,当客户端通过close方法断开时,不会触发catch中的代码,但是当断开时触发的read事件,read方法返回值为-1

处理消息的边界

当ByteBuffer分配的内存不足以一次读取完客户端发送的数据时,会分多次进行读取,可能会出现半包问题(乱码)

解决方法

  • 一种思想是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思想是按分隔符拆分,缺点是效率低
  • TLV格式,即 Type类型、Length长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer 需要提前分配,如果内容过大,则影响server吞吐量

    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式
public static void split(ByteBuffer source){
    source.flip();
    for(int i = 0; i < source.limit(); i++){
        // 找到一条完整消息
        if(source.get(i) == '\n'){
            // 计算消息长度
            int length = i + 1 - source.position();
            ByteBuffer target = ByteBuffer.allocate(length);

            for(int j = 0; j < length; j++){
                target.put(source.get());
            }
            target.flip();
            // 打印target
            System.out.println(Charset.defaultCharset().decode(target));
        }
    }
    source.compact();
}
public static void main(String[] args) throws IOException {
    // 创建selector,管理多个channel
    Selector selector = Selector.open();


    ServerSocketChannel ssc = ServerSocketChannel.open();

    ssc.configureBlocking(false);

    // 将ssc注册到selector
    SelectionKey sscKey = ssc.register(selector, 0, null);
    // 声明关注事件(accept)
    sscKey.interestOps(SelectionKey.OP_ACCEPT);

    ssc.bind(new InetSocketAddress(8080));
    List<SocketChannel> channels = new ArrayList<>();

    while(true){
        // 调用selector的select方法,没有实现非阻塞,当有事件发生时,线程才会恢复运行
        selector.select();
        // 处理事件,selectionKeys集合内部包含了所有发生的事件
        Set<SelectionKey> selectionKeys = selector.selectedKeys();
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        while(iterator.hasNext()){
            SelectionKey key = iterator.next();
            System.out.println(key);
            // 区分事件类型
            if (key.isAcceptable()) {
                // accept事件
                ServerSocketChannel channel =(ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);
                // 将Buffer作为附件关联到SelectionKey 上
                ByteBuffer buffer = ByteBuffer.allocate(5);
                SelectionKey scKey = sc.register(selector, 0, buffer);
                scKey.interestOps(SelectionKey.OP_READ);
                System.out.println(sc);
            } else if (key.isReadable()) {
                try {
                    // read事件,读取数据
                    SocketChannel channel = (SocketChannel) key.channel();

                    // 获取 SelectionKey 附件中的 Buffer
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int read = channel.read(buffer);
                    if(read == -1){
                        key.cancel();
                    }else{
                        split(buffer);
                        if(buffer.position() == buffer.limit()){
                            // buffer满了,需要扩容了
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() << 1);
                            buffer.flip();
                            newBuffer.put(buffer);
                            //替换附件
                            key.attach(newBuffer);
                        }
                    }
                } catch (IOException e) {
                    // 当客户端关闭时(断开连接),会走read事件
                    // 需要手动取消对应的key
                    e.printStackTrace();
                    key.cancel();
                }
            }
            // 处理完后删除对应的key(重要)
            iterator.remove();
        }
    }
}
public class Client {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));
        sc.write(Charset.defaultCharset().encode("hello\nword\n"));
        System.out.println("waiting....");
    }
}

ByteBuffer大小分配

  • 每个channel 都需要记录可能被切分的消息,因为ByteBuffer 不是线程安全的,因此需要为每个 channel 维护一个独立的 ByteBuffer
  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb的话,要支持百万连接就需要1 TB内存,因此需要设计大小可变的 ByteBuffer

    • 一种思路是首先分配一个较小的 buffer,例如4k,如果发现数据不够,再分配 8k 的buffer,将4k buffer 内容拷贝至 8 k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
    • 另一种思想是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

写入内容过多问题

/**
 * @blog: https://www.hellocode.top
 * @Author: HelloCode.
 * @CreateTime: 2023-10-19  18:22
 * @Description: TODO
 */
public class WriterServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if(key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);

                    // 向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    // 返回值代表实际写入的字节数
                    while (buffer.hasRemaining()){
                        int write = sc.write(buffer);
                        System.out.println(write);
                    }
                }
            }
        }
    }
}
/**
 * @blog: https://www.hellocode.top
 * @Author: HelloCode.
 * @CreateTime: 2023-10-19  18:27
 * @Description: TODO
 */
public class WriteClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost",8080));

        // 接收数据
        int count = 0;
        while(true){
            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}

image-20231019183213855

image-20231019183224859

在发送很多数据时,一次发送不完,就会一直循环发送,反复尝试直到写完,不符合非阻塞模式

配合可写事件改进:

/**
 * @blog: https://www.hellocode.top
 * @Author: HelloCode.
 * @CreateTime: 2023-10-19  18:22
 * @Description: TODO
 */
public class WriterServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
  
        while(true){
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if(key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey scKey = sc.register(selector, 0, null);

                    // 向客户端发送大量数据
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    // 返回值代表实际写入的字节数
                    int write = sc.write(buffer);
                    System.out.println(write);
                    // 判断是否有剩余内容
                    if(buffer.hasRemaining()){
                        // 关注一个可写事件(可能原来还关注了其他事件)
                        // 通过相加可以不覆盖原来关注的事件
//                        scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
                        scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
                        // 把未写完的数据挂到sckey上
                        scKey.attach(buffer);
                    }
                }else if(key.isWritable()){
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel channel = (SocketChannel) key.channel();
                    int write = channel.write(buffer);
                    System.out.println(write);
                    // 清理操作
                    if(!buffer.hasRemaining()){
                        // 清除Buffer
                        key.attach(null);
                        // 取消可写事件
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    }
}

多线程优化

现在都是多核CPU,设计时要充分考虑别让cpu 的力量被白白浪费

分两组选择器

  • 单线程配一个选择器,专门处理accept事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-20  14:36
 * @Description: TODO
 */
public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("Boss");
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);

        Selector boss = Selector.open();
        SelectionKey bossKey = ssc.register(boss, SelectionKey.OP_ACCEPT, null);
        // 创建固定数量的worker
        // Runtime.getRuntime().availableProcessors() 获取cpu核心数
        Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors() + 1];
        for (int i = 0; i < 2; i++) {
            workers[i] = new Worker("worker-" + i);
        }

        AtomicInteger index = new AtomicInteger();
        while(true){
            boss.select();
            Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
            while(iter.hasNext()){
                SelectionKey key = iter.next();
                iter.remove();
                if(key.isAcceptable()){
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    // 关联 selector
                    // 轮询算法,负载均衡
                    workers[index.getAndIncrement() % workers.length].register(sc);

                }
            }
        }
    }

    static class Worker implements Runnable{
        private Thread thread;
        private Selector selector;
        private String name;
        private volatile boolean start = false;      // 还未初始化
        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

        public Worker(String name){
            this.name = name;
        }

        // 初始化线程和Selector
        public void register(SocketChannel sc) throws IOException {
            if(!start){
                thread = new Thread(this,name);
                selector = Selector.open();
                thread.start();
                start = true;
            }
            queue.add(() -> {
                try {
                    sc.register(selector, SelectionKey.OP_READ, null);
                } catch (ClosedChannelException e) {
                    throw new RuntimeException(e);
                }
            });
            selector.wakeup();      // 唤醒select方法

        }

        @Override
        public void run() {
            // 监控读写事件
            while(true){
                try {
                    selector.select();
                    Runnable task = queue.poll();
                    if(task != null){
                        task.run(); // 执行了 sc.register(selector, SelectionKey.OP_READ, null);
                    }
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while(iter.hasNext()){
                        SelectionKey key = iter.next();
                        iter.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            SocketChannel channel = (SocketChannel) key.channel();
                            channel.read(buffer);
                            buffer.flip();
                            System.out.println(Charset.defaultCharset().decode(buffer));
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

如何拿到CPU个数

  • Runtime.getRuntime().availableProcessors()
  • 如果工作在docker容器下,因为容器不是物理隔离的,会拿到物理CPU个数,而不是容器申请时的个数
  • 这个问题到jdk 10 才修复,使用jvm 参数 UseContainerSupport 配置,默认开启

概念剖析

NIO vs BIO

stream vs channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络channel 可以配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行

IO 模型

同步阻塞、同步非阻塞、(同步)多路复用、异步非阻塞

  • 同步:线程自己去获取结果(一个线程)
  • 异步:线程自己不去获取结果,而是由其他线程送结果(至少两个线程)

多路复用是同步的

异步都是非阻塞的

当调用一次 channel.readstream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

  • 等待数据阶段
  • 复制数据阶段

image-20231020164523568

  • 阻塞 IO
  • 非阻塞 IO
  • 多路复用
  • 信号驱动
  • 异步 IO

零拷贝

传统的IO将一个文件通过socket 写出

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");
byte[] buf = new byte[(int)f.Tength()];
file.read(buf);
Socket socket =...;
socket.getOutputStream().write(buf);

image-20230421160027006

  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel) 的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,期间也不会使用 cpu

    DMA 也可以理解为硬件单元,用来解放 CPU 完成文件 IO
  2. 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA
  3. 调用 write 方法,这时将数据从用户缓冲区(bytel[] buf)写入 socket 缓冲区,cpu 会参与拷贝
  4. 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能
    力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到中间环节较多,java 的 lO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统
来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

NIO 优化

通过DirectByteBuffer

  • ByteBuffer.allocate(10) -> HeapByteBuffer:使用的还是Java 内存
  • ByteBuffer.allocateDirect(10) -> DirectByteBuffer:使用的是操作系统内存(操作系统和Java程序共享,都可以访问)

image-20230421182614849

大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuffer 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • java 中的 DirectByteBuffer 对象仅维护了此内存的虚引用,内存回收分成两步

    • DirectByteBuffer 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用
transferTo/transferFrom 方法拷贝数据

image-20230421182906793

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到:

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化(linux2.4)

image-20230421183206129

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA 将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将内核缓冲区的数据写入网卡,不会使用 cpu

整个过程

  • 仅仅只发生了一次用户态与内核态的切换
  • 数据拷贝了 2 次
  • 所谓的【零拷贝】,并不是真正无拷贝,而是在不会拷贝重复数据到 ivm 内存中

零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输
上述的 transferTo 和 sendFile 都是零拷贝,并不是一次拷贝都没有,而是不会在Java中再进行数据拷贝

AIO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

文件 AIO

先来看看 AsynchronousFileChannel

@Slf4j
public class AioDemo1 {
    public static void main(String[] args) throws IOException {
        try{
            AsynchronousFileChannel s = 
                AsynchronousFileChannel.open(
                    Paths.get("1.txt"), StandardOpenOption.READ);
            ByteBuffer buffer = ByteBuffer.allocate(2);
            log.debug("begin...");
            s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    log.debug("read completed...{}", result);
                    buffer.flip();
                    debug(buffer);
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    log.debug("read failed...");
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        }
        log.debug("do other things...");
        System.in.read();
    }
}
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - begin...
13:44:56 [DEBUG] [main] c.i.aio.AioDemo1 - do other things...
13:44:56 [DEBUG] [Thread-5] c.i.aio.AioDemo1 - read completed...2
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 0d                                           |a.              |
+--------+-------------------------------------------------+----------------+

可以看到

  • 响应文件读取成功的是另一个线程 Thread-5
  • 主线程并没有 IO 操作阻塞

守护线程

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

public class AioServer {
    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.accept(null, new AcceptHandler(ssc));
        System.in.read();
    }

    private static void closeChannel(AsynchronousSocketChannel sc) {
        try {
            System.out.printf("[%s] %s closen", Thread.currentThread().getName(), sc.getRemoteAddress());
            sc.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        public ReadHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            try {
                if (result == -1) {
                    closeChannel(sc);
                    return;
                }
                System.out.printf("[%s] %s readn", Thread.currentThread().getName(), sc.getRemoteAddress());
                attachment.flip();
                System.out.println(Charset.defaultCharset().decode(attachment));
                attachment.clear();
                // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
                sc.read(attachment, attachment, this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            closeChannel(sc);
            exc.printStackTrace();
        }
    }

    private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        private WriteHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
            if (attachment.hasRemaining()) {
                sc.write(attachment);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            closeChannel(sc);
        }
    }

    private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
        private final AsynchronousServerSocketChannel ssc;

        public AcceptHandler(AsynchronousServerSocketChannel ssc) {
            this.ssc = ssc;
        }

        @Override
        public void completed(AsynchronousSocketChannel sc, Object attachment) {
            try {
                System.out.printf("[%s] %s connectedn", Thread.currentThread().getName(), sc.getRemoteAddress());
            } catch (IOException e) {
                e.printStackTrace();
            }
            ByteBuffer buffer = ByteBuffer.allocate(16);
            // 读事件由 ReadHandler 处理
            sc.read(buffer, buffer, new ReadHandler(sc));
            // 写事件由 WriteHandler 处理
            sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
            // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
            ssc.accept(null, this);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();
        }
    }
}

Nettey入门

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

这里的异步并不是指异步 IO ,Netty的 IO 模型还是基于 IO 多路复用的

Netty 在 Java 网络应用框架中的地位就类似于 Spring 在 JavaEE 开发的地位

Netty vs NIO

  • NIO 工作量大,bug 多
  • 需要自己构建协议
  • 解决 TCP 传输问题,如粘包、半包
  • epoll 空轮询导致 CPU 100%
  • Netty 对 API 进行增强,使之更易使用

Hello World

  1. 加入依赖
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>
  1. 服务器代码
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-22  11:03
 * @Description: TODO
 */
public class HelloServer {
    public static void main(String[] args) {
        // 启动器,负责组装netty 组件
        new ServerBootstrap()
                // 添加EventLoop组:NioEventLoopGroup(selector、thread)
                .group(new NioEventLoopGroup())
                // 选择一个服务器的ServerSocketChannel的实现
                .channel(NioServerSocketChannel.class)
                // boss负责处理连接,worker负责处理读写
                // childHandler 就是负责指明 类似于 worker(child) 需要负责处理的事情(具体逻辑)
                .childHandler(
                        // channel 代表和客户端进行数据读写的通道
                        // Initializer 初始化,负责添加其他 Handler
                        new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 添加具体的 Handler
                        nioSocketChannel.pipeline().addLast(new StringDecoder());  // 负责解码的Handler
                        // 自定义的 Handler
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override   // 读事件
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                // 打印上一步转换好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                })
                // 绑定监听端口
                .bind(8080);
    }
}
  1. 客户端
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-22  11:15
 * @Description: TODO
 */
public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        // 启动类
        new Bootstrap()
                // 添加组件
                .group(new NioEventLoopGroup())
                // 选择客户端channel 实现
                .channel(NioSocketChannel.class)
                // 添加处理器
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override   // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 添加字符串编码器
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 连接到服务器
                .connect(new InetSocketAddress("localhost",8080))
                // 阻塞方法,直到连接建立才向下执行
                .sync()
                // 代表连接对象
                .channel()
                // 向服务器发送数据
                .writeAndFlush("hello world");
    }
}

image-20230516132020193

  • channel可以理解为数据通道
  • msg理解为流动的数据,最开始是ByteBuf,但经过pipeline(流水线)的加工,会变成其他类型的对象,最后输出又变成了ByteBuf
  • handler理解为数据的处理工序

    • 工序有多道,合在一起就是pipeline,pipeline负责发布时间(读、读取完成…)传播给每个handler
    • handler对自己感兴趣的事件进行处理(重写了相应时间的处理方法)
    • handler分Inbound(入站)和Outbound(出站)
  • eventLoop理解为处理数据的工人

    • 工人可以管理多个channel的io操作,并且一旦工人负责了某个channel,就要负责到底
    • 工人既可以执行io操作,也可以进行任务处理,每位工人有任务队列,队列里可以对方多个channel的待处理任务,任务分为普通任务、定时任务
    • 工人按照pipeline顺序,依此按照handler的规划处理数据,可以为每道工序指定不同的工人

组件

EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

img

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,

    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

事件循环组

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup

    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

常用方法

普通任务 & 定时任务

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-22  12:11
 * @Description: TODO
 */
public class TestEventLoop {
    public static void main(String[] args) {
        // EventLoopGroup 继承了线程池,拥有线程池的一些方法
        // 可以处理 IO 事件、普通任务、定时任务(空参构造默认线程数是 CPU 核心数 * 2,可以指定)
        // System.out.println(NettyRuntime.availableProcessors());
        EventLoopGroup group = new NioEventLoopGroup();
        // 可以处理 普通任务、定时任务
        // EventLoopGroup group = new DefaultEventLoop();

        // 常用方法
        // 获取下一个事件循环对象
        System.out.println(group.next());

        // 执行普通任务
        group.next().submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread().getName());
            }
        });

        // 执行定时任务
        group.next().scheduleAtFixedRate(() -> {
            System.out.println(Thread.currentThread().getName() + ":ok");
        }, 0, 1, TimeUnit.SECONDS);

        System.out.println(Thread.currentThread().getName());
    }
}

IO 事件

public static void main(String[] args) {
    // IO 事件
    new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        System.out.println(buf.toString(Charset.defaultCharset()));
                    }
                });
            }
        })
        .bind(8080);
}

可以看到两个工人轮流处理channel,但工人与 channel 之间进行了绑定

img

分工细化

netty还是推荐我们将 eventLoop划分的细一些,类似前面的 boss 和 worker

细分1:

  • 在group方法中,可以传递两个EventLoop参数,一个负责accept事件,另一个负责读写事件
new ServerBootstrap()
    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))

细分2:

  • 当Handler中的事件耗时较长时,可能会影响NIO 线程的工作,此时还需细分
  • 创建一个独立的EventLoopGroup(不需要处理IO 事件,DefaultEventLoopGroup即可)
  • pipeline.addLast()方法中,可以传递三个参数:group、handler 的 name、handler
  • 当有多个handler时,需要交给下一handler处理时,调用 super.channelRead()等方法
public static void main(String[] args) {
    // IO 事件
    EventLoopGroup group = new NioEventLoopGroup();
    new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(group,"handler1", new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        System.out.println(buf.toString(Charset.defaultCharset()));

                        // 将消息传递给下一个handler
                        ctx.fireChannelRead(msg);
                    }
                }).addLast("handler2",new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        System.out.println(buf.toString(Charset.defaultCharset()));
                    }
                });
            }
        })
        .bind(8080);
}

handler 执行中如何换人

关键代码:io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
    EventExecutor executor = next.executor();    // 返回下一个handler 的 EventLoop
  
    // 是,直接调用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
    else {
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }

}
如果两个 handler 绑定的是同一个线程,那么就直接调用;否则,就把要调用的代码封装为一个任务对象,由下一个handler 的线程来调用

Channel

主要作用:

  • close()可以用来关闭 channel
  • closeFuture()用来处理channel 的关闭

    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline()方法添加处理器
  • write()方法将数据写入
  • writeAndFlush()方法将数据写入并刷出

ChannelFuture

方法一:使用sync同步结果

  • 在sync处阻塞等待(直到nio线程连接建立完毕)
public static void main(String[] args) throws InterruptedException {
    // 启动类
    // 带有 Future、Promise 的类型 都是和异步方法配套使用,用来处理结果
    ChannelFuture channelFuture = new Bootstrap()
        // 添加组件
        .group(new NioEventLoopGroup())
        // 选择客户端channel 实现
        .channel(NioSocketChannel.class)
        // 添加处理器
        .handler(new ChannelInitializer<NioSocketChannel>() {
            @Override   // 在连接建立后被调用
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                // 添加字符串编码器
                nioSocketChannel.pipeline().addLast(new StringEncoder());
            }
        })
        // 连接到服务器
        // 异步非阻塞   main发起了调用,真正执行 connect 的是nio 线程
        .connect(new InetSocketAddress("localhost", 8080));
    channelFuture.sync();
    // 如果没有sync,直接过去channel对话,连接还没有建立好,就无法发送数据
    Channel channel = channelFuture.channel();
    // 向服务器发送数据
    channel.writeAndFlush("hello");
}

方法二:使用 addListener(回调对象) 方法异步处理结果

public static void main(String[] args) throws InterruptedException {
    // 启动类
    ChannelFuture channelFuture = new Bootstrap()
        // 添加组件
        .group(new NioEventLoopGroup())
        // 选择客户端channel 实现
        .channel(NioSocketChannel.class)
        // 添加处理器
        .handler(new ChannelInitializer<NioSocketChannel>() {
            @Override   // 在连接建立后被调用
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                // 添加字符串编码器
                nioSocketChannel.pipeline().addLast(new StringEncoder());
            }
        })
        // 连接到服务器
        .connect(new InetSocketAddress("localhost", 8080));
  
    // 添加回调方法
    channelFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            // 当 nio 线程连接建立完成 之后调用该方法
            Channel channel = channelFuture.channel();
            channel.writeAndFlush("hello");
        }
    });
}

关闭问题

  • 调用 channel.close()关闭channel时,也是异步操作,会交给nio 线程来关闭channel
  • 类似的,如果有的操作是需要在关闭之后进行操作,有同步和异步两种方法
  • 需要使用 channel.closeFuture()获取ChannelFuture 对象

同步处理关闭

  • 调用 future.sync()方法,进行同步阻塞
  • 当channel 成功关闭时会解除阻塞

异步处理关闭

  • 调用 future.addListener()方法,传递回调参数
  • 当nio 线程执行完关闭操作后,会调用对应的回调方法
当 channel 关闭时,对应的 Java 客户端程序并没有结束运行,是因为 NioEventLoopGroup 里面还有一些线程,并没有被结束

优雅关闭

  • 在关闭 channel 连接后,调用 NioEventLoopGroup 中的 shutdownGracefully()方法
  • 不是立刻停止,会把手头的工作处理完后停止
服务器和客户端一样,在需要关闭时,不能忽略 NioEventLoopGroup 中资源的释放

为什么要异步

  • 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势
  • 异步并没有缩短响应时间,反而有所增加
  • 合理进行任务拆分,也是利用异步的关键
  • 提高的是单位时间内处理请求的个数(吞吐量)

Future & Promise

在异步处理时,经常用到这两个接口

首先要说明 Netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。

  • jdk Future:只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • netty Promise:不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
setSuccess--设置成功结果
setFailure--设置失败结果

Handler & Pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品

先搞清楚顺序,服务端:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-23  21:17
 * @Description: TODO
 */
public class TestPipeline {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                        // 通过 channel 拿到 pipeline
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 向pipeline 添加处理器
                        // netty 会默认添加一个 head <-> tail(双向链表)
                        pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println("1");
                                // 将数据传递给下一个 handler
                                // 内部就是 ctx.fireChannelRead(msg);
                                super.channelRead(ctx, msg);
                            }
                        });

                        pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println("2");
                                super.channelRead(ctx, msg);
                            }
                        });

                        pipeline.addLast("h3",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println("3");
                                super.channelRead(ctx, msg);
                                // 只有有写出的代码时,才会执行出站 handler
                                socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                            }
                        });

                        // 出站 handler
                        pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println("4");
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("h5",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println("5");
                                super.write(ctx, msg, promise);
                            }
                        });

                        pipeline.addLast("h6",new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println("6");
                                super.write(ctx, msg, promise);
                            }
                        });
                        // head <-> h1 <-> h2 <-> h3 <-> h4 <-> h5 <-> h6 <-> tail
                    }
                })
                .bind(8080);
    }
}
1
2
3
6
5
4

可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表

img

  • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
  • 在出站处理器调用时,如果是使用 channel.write(),则是从tail 往前找 出站处理器;如果使用的 ctx.write(),则是从当前 handler 往前找出站 handler

img

EmbeddedChannel

  • 用来测试的channel
  • 可以绑定多个 handler,在测试时就不需要去启动服务端和客户端了
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-23  21:41
 * @Description: TODO
 */
public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };

        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        // 模拟入站
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        // 模拟出站
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
    }
}

ByteBuf

  • ByteBufAllocator.DEFAULT.buffer():创建一个默认的ByteBuf(池化基于直接内存的 ByteBuf)
  • 默认容量256,可以传参指定
  • 支持动态扩容

优势

  • 池化-可以重用 ByteBuf 实例,更节约内存,减少内存溢出的可能
  • 读写指针分离,不需要像ByteBuffer 一样切换读写模式
  • 可以自动扩容
  • 支持链式调用,使用更流畅
  • 很多地方体现零拷贝,例如:slice、duplicate、CompositeByteBuf
public class TestByteBuf {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        System.out.println(buffer);
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 300; i++) {
            sb.append("a");
        }
        buffer.writeBytes(sb.toString().getBytes());
        System.out.println(buffer);
    }
}
PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 256)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 300, cap: 512)

直接内存 vs 堆内存

  • 基于堆:ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
  • 基于直接内存:ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
  • 默认使用直接内存
  • 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
  • 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放

池化 vs 非池化

池化的最大意义在于可以重用 ByteBuf

  • 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
  • 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
  • 高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能是否开启,可以通过下面的系统环境变量来设置

-Dio.netty.allocator.type={unpooled | pooled}

  • 4.1 之后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
  • 4.1 之前,池化功能还不成熟,默认是非池化实现

组成

  • 容量
  • 最大容量:默认是整数最大值
  • 读指针
  • 写指针

最开始读写指针都在 0 位置

在 NIO 中读写共用一个指针,需要切换读写模式,Netty 采用双指针,简化操作

常用方法

写入:

方法签名含义备注
writeBoolean(boolean value)写入 boolean 值用一字节 01/00 代表 true/false
writeByte(int value)写入 byte 值
writeShort(int value)写入 short 值
writeInt(int value)写入 int 值Big Endian(大端),即 0x250,写入后 00 00 02 50
writeIntLE(int value)写入 int 值Little Endian(小端),即 0x250,写入后 50 02 00 00
writeLong(long value)写入 long 值
writeChar(int value)写入 char 值
writeFloat(float value)写入 float 值
writeDouble(double value)写入 double 值
writeBytes(ByteBuf src)写入 netty 的 ByteBuf
writeBytes(byte[] src)写入 byte[]
writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset)写入字符串

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
  • 网络传输,默认习惯是 Big Endian(大端)

先写入 4 个字节:

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{1, 2, 3, 4});
log(buffer);

read index:0 write index:4 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+

再写进一个int整数,也就是四个字节

buffer.writeInt(5);
read index:0 write index:8 capacity:10
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+

还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置。

buffer.setByte(4,1);

扩容

再写进一个整数时,容量就不够了(初始容量为10),这个时候就会引发扩容

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);
log(buffer);
buffer.writeInt(6);
log(buffer);

img

具体的扩容规则:

  • 如果写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
  • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10 = 1024(2^9=512 已经不够了)
  • 扩容不能超过 max capacity 会报错

读取:

例如读了 4 次,每次一个字节

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);
log(buffer);
buffer.writeInt(6);
log(buffer);
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
log(buffer);

读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分

img

如果需要重复读取 int 整数 5,怎么办?

可以在 read 前先做个标记 mark

buffer.markReaderIndex();
System.out.println(buffer.readInt());
log(buffer);

img

这时要重复读取的话,重置到标记位置 reset

buffer.resetReaderIndex();
log(buffer);
System.out.println(buffer.readInt());
log(buffer);

img

还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index

内存释放

retain & release

由于堆外内存并不直接控制于JVM,因此只能等到full GC的时候才能垃圾回收

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

回收内存的源码实现,请关注下面方法的不同实现

protected abstract void deallocate()

Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为 1
  • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
  • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

谁来负责 release 呢?

不是我们想象的(一般情况下)

ByteBuf buf = ...
try {
    ...
} finally {
    buf.release();
}

请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release,详细分析如下:

  • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
  • 入站 ByteBuf 处理原则

    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
    • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
    • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
    • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
  • 出站 ByteBuf 处理原则

    • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
  • 异常处理原则

    • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

slice

【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针

img

原始 ByteBuf 进行一些初始操作

ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10);
origin.writeBytes(new byte[]{1, 2, 3, 4});
origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));
+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

这时调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片,切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write

ByteBuf slice = origin.slice();
System.out.println(ByteBufUtil.prettyHexDump(slice));
// slice.writeByte(5); 如果执行,会报 IndexOutOfBoundsException 异常
+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

如果原始 ByteBuf 再次读操作(又读了一个字节)

origin.readByte();
System.out.println(ByteBufUtil.prettyHexDump(origin));

这时的 slice 不受影响,因为它有独立的读写指针

+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 04                                        |...             |
+--------+-------------------------------------------------+----------------+

如果 slice 的内容发生了更改

slice.setByte(2, 5);
System.out.println(ByteBufUtil.prettyHexDump(slice));
+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 03 05                                        |...             |
+--------+-------------------------------------------------+----------------+

这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存

System.out.println(ByteBufUtil.prettyHexDump(origin));
+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 05                                           |..              |
+--------+-------------------------------------------------+----------------+

duplicate

【零拷贝】的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的

img

copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关

CompositeByteBuf

【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝

有两个 ByteBuf 如下

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
System.out.println(ByteBufUtil.prettyHexDump(buf1));
System.out.println(ByteBufUtil.prettyHexDump(buf2));
+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+

现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?

方法1:

ByteBuf buf3 = ByteBufAllocator.DEFAULT
    .buffer(buf1.readableBytes()+buf2.readableBytes());
buf3.writeBytes(buf1);
buf3.writeBytes(buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+
这种方法不太好,因为进行了数据的内存复制操作

方法2:

CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer();
// true 表示增加新的 ByteBuf 自动递增 write index, 否则 write index 会始终为 0
buf3.addComponents(true, buf1, buf2);
+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。

  • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
  • 缺点,复杂了很多,多次操作会带来性能的损耗

Unpooled

Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作

这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
 
// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
+-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06 07 08 09 0a                   |..........      |
+--------+-------------------------------------------------+----------------+

也可以用来包装普通字节数组,底层也不会有拷贝操作

ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));
class io.netty.buffer.CompositeByteBuf
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05 06                               |......          |
+--------+-------------------------------------------------+----------------+

双向通信

练习

实现一个 echo server

public class TestEchoService {

    public static void main(String[] args) {

        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug((String) msg);
                                ch.writeAndFlush(msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                }).bind(8080);
    }

}

客户端

public class TestEchoClient {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        Channel channel = new Bootstrap()
                .channel(NioSocketChannel.class)
                .group(group)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug((String) msg);
                                super.channelRead(ctx, msg);
                            }
                        });
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        channel.closeFuture().addListener(future -> {
            group.shutdownGracefully();
        });

        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String line = scanner.nextLine();
                if ("q".equals(line)) {
                    channel.close();
                    break;
                }
                channel.writeAndFlush(line);
            }
        }).start();

    }
}

读和写的误解

我最初在认识上有这样的误区,认为只有在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互阻塞,才可以实现高效的双向通信,但实际上,Java Socket 是全双工的:在任意时刻,线路上存在A 到 B 和 B 到 A 的双向信号传输。即使是阻塞 IO,读和写是可以同时进行的,只要分别采用读线程和写线程即可,读不会阻塞写、写也不会阻塞读

public class TestServer {
    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888);
        Socket s = ss.accept();

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                // 例如在这个位置加入 thread 级别断点,可以发现即使不写入数据,也不妨碍前面线程读取客户端数据
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

客户端:

public class TestClient {
    public static void main(String[] args) throws IOException {
        Socket s = new Socket("localhost", 8888);

        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Nettey进阶

粘包与半包

TCP 传输中,客户端发送数据,实际是把数据写入到了 TCP 的缓存中,粘包和半包也就会在此时产生。客户端给服务端发送了两条消息ABC和DEF,服务端这边的接收会有多少种情况呢?

有可能是一次性收到了所有的消息ABCDEF,有可能是收到了三条消息AB、CD、EF。

粘包现象

  • 上面所说的一次性收到了所有的消息ABCDEF,类似于粘包。
  • 如果客户端发送的包的大小比 TCP 的缓存容量小,并且 TCP 缓存可以存放多个包,那么客户端和服务端的一次通信就可能传递了多个包,这时候服务端从 TCP 缓存就可能一下读取了多个包,这种现象就叫粘包。

服务端

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-25  13:58
 * @Description: TODO
 */
public class HelloWorldServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .group(boss,worker)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println(((ByteBuf)msg).toString());
                                }
                            });
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("server error:" + e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

客户端:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-25  14:03
 * @Description: TODO
 */
public class HelloWorldClient {
    public static void main(String[] args) {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class)
                    .group(worker)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                // 在channel 建立成功后, 触发 avtive 事件
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    for (int i = 0; i < 10; i++) {
                                        ByteBuf buf = ctx.alloc().buffer(16);
                                        buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
                                        ctx.writeAndFlush(buf);
                                    }
                                }
                            });
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8080);
            future.channel().closeFuture().sync();
        }catch (Exception e){
            System.out.println("client error:" + e);
        }finally {
            worker.shutdownGracefully();
        }
    }
}
PooledUnsafeDirectByteBuf(ridx: 0, widx: 160, cap: 1024)

上面代码一次性发送了160 的数据,按理说应该分10次,每次16,这就是粘包现象

半包现象

  • 上面说的后面那种收到了三条消息AB、CD、EF,类似于半包。
  • 如果客户端发送的包的大小比 TCP 的缓存容量大,那么这个数据包就会被分成多个包,通过 Socket 多次发送到服务端,服务端第一次从接受缓存里面获取的数据,实际是整个包的一部分,这时候就产生了半包(半包不是说只收到了全包的一半,是说收到了全包的一部分)。

在服务端加上一下配置,将接收缓冲区调小一些

serverBootstrap.option(ChannelOption.SO_RCVBUF,10);

对应的打印信息如下:

PooledUnsafeDirectByteBuf(ridx: 0, widx: 36, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 40, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 40, cap: 512)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 40, cap: 512)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 4, cap: 496)

第一次出现了粘包(16正常,36粘包)

从第2次开始出现半包现象

在使用 TCP 协议时都会有粘包和半包问题,UDP没有该问题

TCP 滑动窗口

  • TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差

在这里插入图片描述

为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值

在这里插入图片描述

  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用

    • 图中深色的部分即要发送的数据,高亮的部分即窗口
    • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
    • 如果 1001~2000 这个段的数据 ack 回来了,窗口就可以向前滑动
    • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

MSS限制

  • 链路层对一次能够发送的最大数据有限制,这个限制称之为 MTU(maximum transmission unit),不同的链路设备的 MTU 值也有所不同,例如

    • 以太网的 MTU 是 1500
    • FDDI(光纤分布式数据接口)的 MTU 是 4352
    • 本地回环地址的 MTU 是 65535 - 本地测试不走网卡
  • MSS 是最大段长度(maximum segment size),它是 MTU 刨去 tcp 头和 ip 头后剩余能够作为数据传输的字节数
  • ipv4 tcp 头占用 20 bytes,ip 头占用 20 bytes,因此以太网 MSS 的值为 1500 - 40 = 1460
  • TCP 在传递大量数据时,会按照 MSS 大小将数据进行分割发送
  • MSS 的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为 MSS

解决方法

方法一:短链接

将 TCP 连接改成短连接,一个请求一个短连接。这样的话,建立连接到释放连接之间的消息即为传输的信息,消息也就产生了边界。这样的方法就是十分简单,不需要在我们的应用中做过多修改。但缺点也就很明显了,效率低下,TCP连接和断开都会涉及三次握手以及四次握手,每个消息都会涉及这些过程,十分浪费性能。 因此,并不推荐这种方式。

  • 客户端在一次消息发送完毕后断开连接
  • 能解决粘包问题,不能解决半包问题

    • 调整系统的接收缓冲区(滑动窗口):serverBootstrap.option(ChannelOption.SO_RCVBUF, 10)
    • 调整netty 的接收缓冲区(bytebuf):serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16))

      默认 1024;三个参数分别代表最小值、初始值、最大值

    • 系统接收缓冲区是全局的,netty 的缓冲区是针对 channel 的
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-25  14:03
 * @Description: TODO
 */
public class HelloWorldClient {
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            send();
        }
    }
    private static void send(){
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class)
                    .group(worker)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                // 在channel 建立成功后, 触发 avtive 事件
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ByteBuf buf = ctx.alloc().buffer(16);
                                    buf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
                                    ctx.writeAndFlush(buf);
                                    ctx.channel().close();
                                }
                            });
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8080);
            future.channel().closeFuture().sync();
        }catch (Exception e){
            System.out.println("client error:" + e);
        }finally {
            worker.shutdownGracefully();
        }
    }
}
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)
PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 1024)

方法二:封装成帧

定长解码器

  • 在服务端添加 handler:nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));(表示固定长度为10)
  • 固定长度 这种方式下,消息边界也就是固定长度即可。 优点就是实现很简单,缺点就是空间有极大的浪费,如果传递的消息中大部分都比较短,这样就会有很多空间是浪费的。 因此,这种方式一般也是不推介的。

行解码器(分隔符)

  • netty 中提供了解码器 handler,用来实现行解码器

    • LineBasedFrameDecoder:指定最大长度,以换行符为分割(\n或者 \r\n
    • DelimiterBasedFrameDecoder:指定最大长度和分隔符
  • 需要指定最大长度,也就是到了最大长度还没有找到分隔符,就会抛一个异常(避免消息本身格式不对而无限寻找下去)
  • 分隔符 这种方式下,消息边界也就是分隔符本身。优点是空间不再浪费,实现也比较简单。缺点是当内容本身出现分割符时需要转义,所以无论是发送还是接受,都需要进行整个内容的扫描。 因此,这种方式效率也不是很高,但可以尝试使用。
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-25  14:03
 * @Description: TODO
 */
public class Client3 {
    public static void main(String[] args) {
        send();
    }
    private static void send(){
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class)
                    .group(worker)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                // 在channel 建立成功后, 触发 avtive 事件
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ByteBuf buf = ctx.alloc().buffer();
                                    char c = '0';
                                    Random r = new Random();
                                    for (int i = 0; i < 10; i++) {
                                        StringBuilder sb = makeString(c, r.nextInt(256) + 1);
                                        buf.writeBytes(sb.toString().getBytes());
                                        System.out.println(sb.toString());
                                        c++;
                                    }
                                    ctx.writeAndFlush(buf);
                                    System.out.println("finish ...");
                                }
                            });
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8080);
            future.channel().closeFuture().sync();
        }catch (Exception e){
            System.out.println("client error:" + e);
        }finally {
            worker.shutdownGracefully();
        }
    }
    private static StringBuilder makeString(char ch, int len){
        StringBuilder sb = new StringBuilder(len + 2);
        for (int i = 0; i < len; i++) {
            sb.append(ch);
        }
        sb.append("\n");
        return sb;
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-25  13:58
 * @Description: TODO
 */
public class Server3 {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .group(boss,worker)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            // 行解码器(最大长度约定为10,分隔符为换行符)
                            nioSocketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println(msg);
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println(((ByteBuf) msg).toString(Charset.defaultCharset()));
                                }
                            });
                        }
                    });
//            serverBootstrap.option(ChannelOption.SO_RCVBUF,10);
            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16));
            ChannelFuture future = serverBootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            System.out.println("server error:" + e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

LTC解码器(基于长度字段的帧解码器)

  • netty 提供了 LengthFieldBasedFrameDecoder解码器

    • maxFrameLength:帧的最大长度(超过该长度还没发现分割标准就失败,防止格式不正确)
    • lengthFieldOffset:长度字段偏移量(长度这个字段从哪里开始)
    • lengthFieldLength:长度字段长度
    • lengthAdjustment:长度字段为基准,跳过几个字节到内容(除了长度和内容,可能还有header 这种附加内容)
    • initialBytesToStrip:从头剥离几个字节(如果解析后的内容不需要长度等字段,可以剥离)
  • 专门的 length 字段 这种方式,就有点类似 Http 请求中的 Content-Length,有一个专门的字段存储消息的长度。作为服务端,接受消息时,先解析固定长度的字段(length字段)获取消息总长度,然后读取后续内容。优点是精确定位用户数据,内容也不用转义。缺点是长度理论上有限制,需要提前限制可能的最大长度从而定义长度占用字节数。 因此,十分推介用这种方式。
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-25  16:33
 * @Description: TODO
 */
public class TestLengthFieldDecoder {
    public static void main(String[] args) {
        // 4字节长度  1字节版本号  内容
        EmbeddedChannel embeddedChannel = new EmbeddedChannel(
                // 保留 版本号、内容(剥离了长度)
                new LengthFieldBasedFrameDecoder(
                        1024,
                        0,
                        4,
                        1,
                        4),
                new ChannelInboundHandlerAdapter(){
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        System.out.println(((ByteBuf)msg).toString(Charset.defaultCharset()));
                    }
                }
        );

        // 4字节长度  实际内容
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        send(buffer,"Hello World");
        send(buffer,"Hi");
        embeddedChannel.writeInbound(buffer);
    }

    private static void send(ByteBuf buf,String content) {
        byte[] bytes = content.getBytes();    // 实际内容
        int length = bytes.length;      // 实际内容长度
        buf.writeInt(length);        // int 4字节
        buf.writeByte(1);           // 版本号
        buf.writeBytes(bytes);
    }
}

协议设计与解析

Redis

  • set key value
  • Redis中把命令看成一个数组,首先要求传递一个数组长度
  • 后续发送每个命令或者键值的长度
*3
$3
set
$4
name
$8
zhangsan

Java代码根据Redis 协议规则向Redis 服务器发送请求并接收响应:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-25  17:27
 * @Description: TODO
 */
public class TestRedis {
    public static void main(String[] args) {
        // 换行的ASCII码:\n \r\n
        final byte[] LINE = {13, 10};
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .channel(NioSocketChannel.class)
                    .group(group)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 向Redis 发送命令
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ByteBuf buf = ctx.alloc().buffer();
                                    buf.writeBytes("*3".getBytes());
                                    buf.writeBytes(LINE);
                                    buf.writeBytes("$3".getBytes());
                                    buf.writeBytes(LINE);
                                    buf.writeBytes("set".getBytes());
                                    buf.writeBytes(LINE);
                                    buf.writeBytes("$4".getBytes());
                                    buf.writeBytes(LINE);
                                    buf.writeBytes("name".getBytes());
                                    buf.writeBytes(LINE);
                                    buf.writeBytes("$8".getBytes());
                                    buf.writeBytes(LINE);
                                    buf.writeBytes("zhangsan".getBytes());
                                    buf.writeBytes(LINE);
                                    ctx.writeAndFlush(buf);
                                }
                            });
                            // 接收 Redis 服务器的响应
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println(buf.toString(Charset.defaultCharset()));
                                }
                            });
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 6379);
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            group.shutdownGracefully();
        }

    }
}

image-20231025173624904

image-20231025173616824

只要你按照对应的协议去编写内容,就可以和相应协议进行通信;Netty 提供了很多现成的协议,比如 Redis、Http等

HTTP

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-25  17:39
 * @Description: TODO
 */
public class TestHttp {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap()
                    .group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // Netty 提供的 HTTP 协议编解码器(组合了编码和解码器)
                            // 在命名上,一般是Codec 就同时包括编码和解码
                            // 在HttpServerCodec解析后,会分为 HttpRequest和 HttpContent 两部分
                            // 既是入站处理器也是出站处理器
                            socketChannel.pipeline().addLast(new HttpServerCodec());
                            /* 处理方式一:
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    if(msg instanceof HttpRequest){
                                        // 处理请求行,请求头
                                    }else if(msg instanceof HttpContent){
                                        // 请求体
                                    }
                                }
                            });
                            */

                            // 处理方式二:
                            // SimpleChannelInboundHandler:只会处理感兴趣的消息(泛型)
                            // 该处理器会跳过其他类型的消息
                            socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
                                    System.out.println(httpRequest.uri());

                                    // 返回响应
                                    DefaultFullHttpResponse response =
                                            new DefaultFullHttpResponse(
                                                httpRequest.protocolVersion(),      // 协议版本
                                                HttpResponseStatus.OK
                                            );
                                    byte[] bytes = "<h1>Hello, World !</h1>".getBytes();
                                    // 响应头
                                    response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH,bytes.length);
                                    // 响应内容
                                    response.content().writeBytes(bytes);
                              
                                    channelHandlerContext.writeAndFlush(response);
                                }
                            });
                        }
                    });
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

image-20231025180644076

自定义协议

要素:

  • 魔数,用来在第一时间判定是否是无效的数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊......跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

应用场景:聊天室

  • 定义一个编解码器,需要继承 ByteTiMessageCodec类,泛型表示需要将ByteBuf转换为什么类型
  • 重写 encodedecode方法

    • encode:编码,将 自定义消息转换为 ByteBuf
    • decode:解码,将 ByteBuf 转换为 自定义消息

Message(抽象父类,定义消息协议基本信息)

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  13:59
 * @Description: TODO
 */
@Data
public abstract class Message implements Serializable {
    public static Class<?> getMessageClass(int messageType){
        return messageClasses.get(messageType);
    }

    private int sequenceId;
    private int messageType;

    public abstract int getMessageType();

    // 登录业务
    public static final int LOGIN_REQUEST_MESSAGE = 0;
    public static final int LOGIN_RESPONSE_MESSAGE = 1;

    // 聊天业务
    public static final int CHAT_REQUEST_MESSAGE = 2;
    public static final int CHAT_RESPONSE_MESSAGE = 3;

    // 群组业务
    public static final int GROUP_CREATE_REQUEST_MESSAGE = 4;
    public static final int GROUP_CREATE_RESPONSE_MESSAGE = 5;
    public static final int GROUP_JOIN_REQUEST_MESSAGE = 6;
    public static final int GROUP_JOIN_RESPONSE_MESSAGE = 7;
    public static final int GROUP_QUIT_REQUEST_MESSAGE = 8;
    public static final int GROUP_QUIT_RESPONSE_MESSAGE = 9;
    public static final int GROUP_CHAT_REQUEST_MESSAGE = 10;
    public static final int GROUP_CHAT_RESPONSE_MESSAGE = 11;
    public static final int GROUP_MEMBERS_REQUEST_MESSAGE = 12;
    public static final int GROUP_MEMBERS_RESPONSE_MESSAGE = 13;

    private static final Map<Integer,Class<?>> messageClasses = new HashMap<>();
}

具体子类(根据业务类型定义)

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  14:11
 * @Description: TODO
 */
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message{
    private String username;
    private String password;
    private String nickname;

    public LoginRequestMessage(){
    }

    public LoginRequestMessage(String username, String password, String nickname){
        this.username = username;
        this.password = password;
        this.nickname = nickname;
    }
    @Override
    public int getMessageType() {
        return LOGIN_REQUEST_MESSAGE;
    }
}

自定义Codec,指定具体编解码方法:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  13:59
 * @Description: TODO
 */
public class MessageCodec extends ByteToMessageCodec<Message> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
        // 编码器
        // 将自定义消息包装到形参中的 ByteBuf 即可

        // 1. 魔数(我们定义为4字节的 1234)
        byteBuf.writeBytes(new byte[]{1,2,3,4});

        // 2. 1字节的版本
        byteBuf.writeByte(1);

        // 3. 1字节的序列化算法(为了简单,采用jdk)
        // 为了可扩展:0代表jdk方式、1 代表 json
        byteBuf.writeByte(0);

        // 4. 指令类型(和业务相关),比如:登录消息、注册消息、单聊、群聊等消息(1字节)
        // 通过抽象父类 Message 定义了指令类型,由子类具体声明
        byteBuf.writeByte(message.getMessageType());

        // 5. 4字节请求序号
        byteBuf.writeInt(message.getSequenceId());
        byteBuf.writeByte(0xff);        // 仅仅为了对齐填充(使协议长度为 2 的整数倍)

        // 6. 获取消息内容的字节数组(父类 Message 实现了 Serializable,可以被序列化)
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        // 将message 消息写入对象输出流,对象输出流将对应消息写入 ByteArrayOutputStream
        oos.writeObject(message);
        byte[] bytes = bos.toByteArray();

        // 7. 正文长度
        byteBuf.writeInt(bytes.length);

        // 8. 正文内容
        byteBuf.writeBytes(bytes);

        // 自定义协议:4 + 1 + 1 + 1 + 4 + 正文内容 = 15 + 正文内容
        // 为了内容对其,需要是 2 的整数倍,因此在第 5 步后加 1 字节无意义的内容填充为 16 字节
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 解码器(和编码是逆过程)
        // 和编码规定的字节数 一一对应

        // 1. 魔数
        int magicNum = byteBuf.readInt();

        // 2. 版本
        byte version = byteBuf.readByte();

        // 3. 序列化方式
        byte serializerType = byteBuf.readByte();
        // 4. 指令类型
        byte messageType = byteBuf.readByte();
        // 5. 指令序号
        int sequenceId = byteBuf.readInt();
        // 跳过无意义的填充字节
        byteBuf.readByte();
        // 正文长度
        int length = byteBuf.readInt();
        // 读取正文内容
        byte[] bytes = new byte[length];
        byteBuf.readBytes(bytes, 0, length);

        // 反序列化
        if(serializerType == 0){
            // JDK 序列化方式
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
            Message message = (Message) ois.readObject();

            // 将解码对象存到List中(Netty 规定,便于后续 handler 使用)
            list.add(message);

            // 打印测试
            System.out.println(magicNum + ":" + version + ":" + serializerType + ":" + messageType
                    + ":" + sequenceId + ":" + length);
            System.out.println(message);
        }
    }
}

测试:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  14:33
 * @Description: TODO
 */
@Slf4j
public class TestMessageCodec {
    public static void main(String[] args) throws Exception {
        EmbeddedChannel channel = new EmbeddedChannel(
                new LoggingHandler(),
                new MessageCodec());

        // 测试 encode
        LoginRequestMessage message = new LoginRequestMessage("zhangsan","123","张三");
        channel.writeOutbound(message);

        // 测试 decode
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        // 将 message 中的内容通过编码传输到 buf 中(buf 中的数据即为编码后的数据)
        new MessageCodec().encode(null,message,buf);
        // 入站
        channel.writeInbound(buf);
    }
}
15:16:43,944 DEBUG LoggingHandler:147 - [id: 0xembedded, L:embedded - R:embedded] WRITE: 257B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 f1 |................|
|00000010| ac ed 00 05 73 72 00 2e 74 6f 70 2e 68 65 6c 6c |....sr..top.hell|
|00000020| 6f 63 6f 64 65 2e 63 68 61 74 2e 6d 65 73 73 61 |ocode.chat.messa|
|00000030| 67 65 2e 4c 6f 67 69 6e 52 65 71 75 65 73 74 4d |ge.LoginRequestM|
|00000040| 65 73 73 61 67 65 34 b8 1c 84 de 68 cc 48 02 00 |essage4....h.H..|
|00000050| 03 4c 00 08 6e 69 63 6b 6e 61 6d 65 74 00 12 4c |.L..nicknamet..L|
|00000060| 6a 61 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 |java/lang/String|
|00000070| 3b 4c 00 08 70 61 73 73 77 6f 72 64 71 00 7e 00 |;L..passwordq.~.|
|00000080| 01 4c 00 08 75 73 65 72 6e 61 6d 65 71 00 7e 00 |.L..usernameq.~.|
|00000090| 01 78 72 00 22 74 6f 70 2e 68 65 6c 6c 6f 63 6f |.xr."top.helloco|
|000000a0| 64 65 2e 63 68 61 74 2e 6d 65 73 73 61 67 65 2e |de.chat.message.|
|000000b0| 4d 65 73 73 61 67 65 45 13 8f 99 de 7c a1 0a 02 |MessageE....|...|
|000000c0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|000000d0| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|000000e0| 00 00 00 00 00 00 00 74 00 06 e5 bc a0 e4 b8 89 |.......t........|
|000000f0| 74 00 03 31 32 33 74 00 08 7a 68 61 6e 67 73 61 |t..123t..zhangsa|
|00000100| 6e                                              |n               |
+--------+-------------------------------------------------+----------------+
15:16:43,945 DEBUG LoggingHandler:147 - [id: 0xembedded, L:embedded - R:embedded] FLUSH
15:16:43,948 DEBUG LoggingHandler:147 - [id: 0xembedded, L:embedded - R:embedded] READ: 257B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 00 00 00 00 00 ff 00 00 00 f1 |................|
|00000010| ac ed 00 05 73 72 00 2e 74 6f 70 2e 68 65 6c 6c |....sr..top.hell|
|00000020| 6f 63 6f 64 65 2e 63 68 61 74 2e 6d 65 73 73 61 |ocode.chat.messa|
|00000030| 67 65 2e 4c 6f 67 69 6e 52 65 71 75 65 73 74 4d |ge.LoginRequestM|
|00000040| 65 73 73 61 67 65 34 b8 1c 84 de 68 cc 48 02 00 |essage4....h.H..|
|00000050| 03 4c 00 08 6e 69 63 6b 6e 61 6d 65 74 00 12 4c |.L..nicknamet..L|
|00000060| 6a 61 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 |java/lang/String|
|00000070| 3b 4c 00 08 70 61 73 73 77 6f 72 64 71 00 7e 00 |;L..passwordq.~.|
|00000080| 01 4c 00 08 75 73 65 72 6e 61 6d 65 71 00 7e 00 |.L..usernameq.~.|
|00000090| 01 78 72 00 22 74 6f 70 2e 68 65 6c 6c 6f 63 6f |.xr."top.helloco|
|000000a0| 64 65 2e 63 68 61 74 2e 6d 65 73 73 61 67 65 2e |de.chat.message.|
|000000b0| 4d 65 73 73 61 67 65 45 13 8f 99 de 7c a1 0a 02 |MessageE....|...|
|000000c0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
|000000d0| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
|000000e0| 00 00 00 00 00 00 00 74 00 06 e5 bc a0 e4 b8 89 |.......t........|
|000000f0| 74 00 03 31 32 33 74 00 08 7a 68 61 6e 67 73 61 |t..123t..zhangsa|
|00000100| 6e                                              |n               |
+--------+-------------------------------------------------+----------------+
16909060:1:0:0:0:241
LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=zhangsan, password=123, nickname=张三)
15:16:43,983 DEBUG LoggingHandler:147 - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
此时还有一个问题,可能会出现粘包 或者 半包 的问题

可以配合 LengthFieldBasedFrameDecoder解码器解决问题

改进:

EmbeddedChannel channel = new EmbeddedChannel(
                // 最大长度1024,正文长度偏移量12,4个字节,不需要调整,保留所有内容
                new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
                new LoggingHandler(),
                new MessageCodec());

如果出现半包问题,没有加帧解码器,就有可能出现异常,比如读到了正文长度是255,然后去读255个正文内容,但是内容不完整,就有异常

加上帧解码器后,判断如果消息不完整,则不会向后续 handler 传递,会等待消息

Sharable

如果要将Handler 抽取出来,复用,应该怎么做

如果直接抽取一个变量,多次使用

  • 对于会保存内容的,如:帧解码器(半包时会记录内容,等待后续消息),在多线程下,就可能出现数据混乱的问题
  • 对于不会保存内容的,如:LoggingHandler,只打印日志,不记录数据,就不会出现问题

在 Netty 中,通过 @Sharable注解作为标记,能够被多线程使用的类,会使用该注解进行标记说明:

image-20231029153303497

image-20231029153246903

如果我们的自定义 Handler 需要添加 @Sharable注解,标识可共享,步骤如下:

image-20231029154205121

  • 首先,ByteToMessageCodec类不允许子类添加 @Sharable注解,它认为 ByteBuf 到 Message 可能会出现半包、粘包问题,因此需要保存状态信息,即不能被多线程共享
  • 因此,可以使用 MessageToMessageCodec类,它认为 从 Message 到 Message,即已经接收到完整的消息了,不用再保存状态信息,因此可以添加 @Sharable注解,被多线程共享

image-20231029154522610

聊天室案例

包结构:

image-20231029225715148

Message

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  13:59
 * @Description: TODO
 */
@Data
public abstract class Message implements Serializable {
    public static Class<?> getMessageClass(int messageType){
        return messageClasses.get(messageType);
    }

    private int sequenceId;
    private int messageType;

    public abstract int getMessageType();

    // 登录业务
    public static final int LOGIN_REQUEST_MESSAGE = 0;
    public static final int LOGIN_RESPONSE_MESSAGE = 1;

    // 聊天业务
    public static final int CHAT_REQUEST_MESSAGE = 2;
    public static final int CHAT_RESPONSE_MESSAGE = 3;

    // 群组业务
    public static final int GROUP_CREATE_REQUEST_MESSAGE = 4;
    public static final int GROUP_CREATE_RESPONSE_MESSAGE = 5;
    public static final int GROUP_JOIN_REQUEST_MESSAGE = 6;
    public static final int GROUP_JOIN_RESPONSE_MESSAGE = 7;
    public static final int GROUP_QUIT_REQUEST_MESSAGE = 8;
    public static final int GROUP_QUIT_RESPONSE_MESSAGE = 9;
    public static final int GROUP_CHAT_REQUEST_MESSAGE = 10;
    public static final int GROUP_CHAT_RESPONSE_MESSAGE = 11;
    public static final int GROUP_MEMBERS_REQUEST_MESSAGE = 12;
    public static final int GROUP_MEMBERS_RESPONSE_MESSAGE = 13;

    // 心跳
    public static final int PING_MESSAGE = 14;
    public static final int PONG_MESSAGE = 15;

    private static final Map<Integer,Class<?>> messageClasses = new HashMap<>();

}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  14:11
 * @Description: TODO
 */
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message{
    private String username;
    private String password;

    public LoginRequestMessage(){
    }

    public LoginRequestMessage(String username, String password){
        this.username = username;
        this.password = password;
    }
    @Override
    public int getMessageType() {
        return LOGIN_REQUEST_MESSAGE;
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  18:02
 * @Description: TODO
 */
@Data
@ToString(callSuper = true)
public class LoginResponseMessage extends AbstractResponseMessage{
    public LoginResponseMessage(boolean success, String reason){
        super(success, reason);
    }
    @Override
    public int getMessageType() {
        return LOGIN_RESPONSE_MESSAGE;
    }
}
@Data
@ToString(callSuper = true)
public class ChatRequestMessage extends Message {
    private String content;
    private String to;
    private String from;

    public ChatRequestMessage() {
    }

    public ChatRequestMessage(String from, String to, String content) {
        this.from = from;
        this.to = to;
        this.content = content;
    }

    @Override
    public int getMessageType() {
        return CHAT_REQUEST_MESSAGE;
    }
}
@Data
@ToString(callSuper = true)
public class ChatResponseMessage extends AbstractResponseMessage {

    private String from;
    private String content;

    public ChatResponseMessage(boolean success, String reason) {
        super(success, reason);
    }

    public ChatResponseMessage(String from, String content) {
        this.from = from;
        this.content = content;
    }

    @Override
    public int getMessageType() {
        return CHAT_RESPONSE_MESSAGE;
    }
}

......

protocol

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  15:43
 * @Description: TODO
 */
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, List<Object> list) throws Exception {
        // 编码器
        // 将自定义消息包装到形参中的 ByteBuf 即可

        ByteBuf byteBuf = channelHandlerContext.alloc().buffer();
        // 1. 魔数(我们定义为4字节的 1234)
        byteBuf.writeBytes(new byte[]{1,2,3,4});

        // 2. 1字节的版本
        byteBuf.writeByte(1);

        // 3. 1字节的序列化算法(为了简单,采用jdk)
        // 为了可扩展:0代表jdk方式、1 代表 json
        byteBuf.writeByte(0);

        // 4. 指令类型(和业务相关),比如:登录消息、注册消息、单聊、群聊等消息(1字节)
        // 通过抽象父类 Message 定义了指令类型,由子类具体声明
        byteBuf.writeByte(message.getMessageType());

        // 5. 4字节请求序号
        byteBuf.writeInt(message.getSequenceId());
        byteBuf.writeByte(0xff);        // 仅仅为了对齐填充(使协议长度为 2 的整数倍)

        // 6. 获取消息内容的字节数组(父类 Message 实现了 Serializable,可以被序列化)
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        // 将message 消息写入对象输出流,对象输出流将对应消息写入 ByteArrayOutputStream
        oos.writeObject(message);
        byte[] bytes = bos.toByteArray();

        // 7. 正文长度
        byteBuf.writeInt(bytes.length);

        // 8. 正文内容
        byteBuf.writeBytes(bytes);

        list.add(byteBuf);

        // 自定义协议:4 + 1 + 1 + 1 + 4 + 正文内容 = 15 + 正文内容
        // 为了内容对其,需要是 2 的整数倍,因此在第 5 步后加 1 字节无意义的内容填充为 16 字节
    }

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        // 解码器(和编码是逆过程)
        // 和编码规定的字节数 一一对应

        // 1. 魔数
        int magicNum = byteBuf.readInt();

        // 2. 版本
        byte version = byteBuf.readByte();

        // 3. 序列化方式
        byte serializerType = byteBuf.readByte();
        // 4. 指令类型
        byte messageType = byteBuf.readByte();
        // 5. 指令序号
        int sequenceId = byteBuf.readInt();
        // 跳过无意义的填充字节
        byteBuf.readByte();
        // 正文长度
        int length = byteBuf.readInt();
        // 读取正文内容
        byte[] bytes = new byte[length];
        byteBuf.readBytes(bytes, 0, length);

        // 反序列化
        if(serializerType == 0){
            // JDK 序列化方式
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
            Message message = (Message) ois.readObject();

            // 将解码对象存到List中(Netty 规定,便于后续 handler 使用)
            list.add(message);
        }
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  17:33
 * @Description: 封装Decoder
 */
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
    public ProcotolFrameDecoder(){
        this(1024,12,4,0,0);
    }

    public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }
}

Server

handler:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  21:31
 * @Description: TODO
 */
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
        String to = chatRequestMessage.getTo();
        String from = chatRequestMessage.getFrom();
        // 获取到接收者的 channel
        Channel channel = SessionFactory.getSession().getChannel(to);
        if(channel != null){
            // 对方在线,可以发送消息
            channel.writeAndFlush(new ChatResponseMessage(from, chatRequestMessage.getContent()));
        }else{
            // 对方不在线
            channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"对方用户不存在或不在线"));
        }
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  21:42
 * @Description: 群聊处理器
 */
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
        String groupName = groupChatRequestMessage.getGroupName();
        String from = groupChatRequestMessage.getFrom();
        String content = groupChatRequestMessage.getContent();
        List<Channel> channels = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
        for (Channel channel : channels) {
            channel.writeAndFlush(new GroupChatResponseMessage(from,content));
        }
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  21:43
 * @Description: 创建群聊处理器
 */
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
        String groupName = groupCreateRequestMessage.getGroupName();
        Set<String> members = groupCreateRequestMessage.getMembers();
        // 群管理器
        GroupSession groupSession = GroupSessionFactory.getGroupSession();
        Group group = groupSession.createGroup(groupName, members);
        if(group == null){
            // 创建成功
            channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(true,groupName + "创建成功"));

            // 向对应用户发送拉群消息
            // 获取在线的群成员
            List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
            for (Channel channel : membersChannel) {
                // 向群组每位成员通知进群消息
                channel.writeAndFlush(new GroupCreateResponseMessage(true,"您已被拉入" + groupName));
            }
        }else{
            // 创建失败
            channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(false,groupName + "创建失败,群名不能重复"));
        }

    }
}
/**
 * @author: HelloCode.
 * @date: 2023/10/29 21:28
 * @description: 登录处理器
 */
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestMessage msg) throws Exception {
        String username = msg.getUsername();
        String password = msg.getPassword();
        UserService userService = UserServiceFactory.getUserService();
        boolean login = userService.login(username, password);
        // 响应消息
        LoginResponseMessage resp;
        if (login) {
            // 登录成功
            resp = new LoginResponseMessage(true, "登录成功");
            // 保存Channel 信息
            SessionFactory.getSession().bind(channelHandlerContext.channel(), username);
        } else {
            resp = new LoginResponseMessage(false, "用户名或密码错误");
        }
        channelHandlerContext.writeAndFlush(resp);
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  22:12
 * @Description: 用户退出处理器
 */
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
    @Override
    /* 连接断开时会触发(正常断开,执行quit命令) */
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 已经断开",ctx.channel());
    }

    @Override
    /* 连接断开时会触发(异常断开,捕捉到异常时会触发,比如客户端直接点击按钮终止程序) */
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 异常断开,异常是 {}",ctx.channel(),cause.getMessage());
    }
}

......

service:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  18:01
 * @Description: UserService工厂类,提升扩展性
 */
public class UserServiceFactory {
    public static UserService getUserService(){
        return new UserServiceMemoryImpl();
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  16:20
 * @Description: 用户管理接口
 */
public interface UserService {
    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:21
     * @param: username
     * @param: password
     * @return: boolean
     * @description: 登录成功返回 true,失败返回 false
     */
    boolean login(String username, String password);
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  16:22
 * @Description: TODO
 */
public class UserServiceMemoryImpl implements UserService{
    private Map<String, String> allUserMap = new ConcurrentHashMap<>();

    {
        allUserMap.put("zhangsan","123");
        allUserMap.put("lisi","123");
        allUserMap.put("wangwu","123");
        allUserMap.put("zhaoliu","123");
        allUserMap.put("qianqi","123");
    }
    @Override
    public boolean login(String username, String password) {
        String pass = allUserMap.get(username);
        if(pass == null){
            return false;
        }
        return pass.equals(password);
    }
}

......

session:

public abstract class SessionFactory {

    private static Session session = new SessionMemoryImpl();

    public static Session getSession() {
        return session;
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  16:24
 * @Description: 会话管理接口
 */
public interface Session {
    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:24
     * @param: channel
     * @param: username
     * @return: void
     * @description: 绑定会话
     */
    void bind(Channel channel, String username);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:25
     * @param: channel
     * @return: void
     * @description: 解绑会话
     */
    void unbind(Channel channel);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:26
     * @param: channel
     * @param: name
     * @return: java.lang.Object
     * @description: 获取属性
     */
    Object getAttribute(Channel channel, String name);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:27
     * @param: channel
     * @param: name
     * @param: value
     * @return: void
     * @description: 设置属性
     */
    void setAttribute(Channel channel, String name, Object value);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:27
     * @param: username
     * @return: java.nio.channels.Channel
     * @description: 根据用户名获取 Channel
     */
    Channel getChannel(String username);
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  16:28
 * @Description: TODO
 */
public class SessionMemoryImpl implements Session{
    private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
    private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
    private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();

    @Override
    public void bind(Channel channel, String username) {
        usernameChannelMap.put(username,channel);
        channelUsernameMap.put(channel,username);
        channelAttributesMap.put(channel,new ConcurrentHashMap<>());
    }

    @Override
    public void unbind(Channel channel) {
        String username = channelUsernameMap.remove(channel);
        usernameChannelMap.remove(username);
        channelAttributesMap.remove(channel);
    }

    @Override
    public Object getAttribute(Channel channel, String name) {
        return channelAttributesMap.get(channel).get(name);
    }

    @Override
    public void setAttribute(Channel channel, String name, Object value) {
        channelAttributesMap.get(channel).put(name,value);
    }

    @Override
    public Channel getChannel(String username) {
        return usernameChannelMap.get(username);
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  16:33
 * @Description: 群组业务
 */
@Data
public class Group {
    // 聊天室名称
    private String name;
    // 聊天室成员
    private Set<String> members;

    public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());

    public Group(String name, Set<String> members) {
        this.name = name;
        this.members = members;
    }
}
public abstract class GroupSessionFactory {

    private static GroupSession session = new GroupSessionMemoryImpl();

    public static GroupSession getGroupSession() {
        return session;
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  16:32
 * @Description: 聊天组会话管理接口
 */
public interface GroupSession {
    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:33
     * @param: name
     * @param: members
     * @return: top.hellocode.chat.server.session.Group
     * @description: 创建一个聊天组,如果不存在则创建成功,否则返回组对象 (名字不能重复)
     */
    Group createGroup(String name, Set<String> members);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:34
     * @param: name
     * @param: member
     * @return: top.hellocode.chat.server.session.Group
     * @description: 加入聊天组,不存在返回null,否则返回组对象
     */
    Group joinMember(String name, String member);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:36
     * @param: name
     * @param: member
     * @return: top.hellocode.chat.server.session.Group
     * @description: 移除组成员,组不存在返回null,否则返回组对象
     */
    Group removeMember(String name, String member);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:36
     * @param: name
     * @return: top.hellocode.chat.server.session.Group
     * @description: 移除聊天组,组不存在返回null,否则返回组对象
     */
    Group removeGroup(String name);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:37
     * @param: name
     * @return: java.util.Set<java.lang.String>
     * @description: 获取组成员,没有成员则返回空集合
     */
    Set<String> getMembers(String name);

    /**
     * @author: HelloCode.
     * @date: 2023/10/29 16:38
     * @param: name
     * @return: java.util.List<java.nio.channels.Channel>
     * @description: 获取组成员的 channel 集合,只有在线的 channel 才会返回
     */
    List<Channel> getMembersChannel(String name);
}
public class GroupSessionMemoryImpl implements GroupSession {
    private final Map<String, Group> groupMap = new ConcurrentHashMap<>();

    @Override
    public Group createGroup(String name, Set<String> members) {
        Group group = new Group(name, members);
        return groupMap.putIfAbsent(name, group);
    }

    @Override
    public Group joinMember(String name, String member) {
        return groupMap.computeIfPresent(name, (key, value) -> {
            value.getMembers().add(member);
            return value;
        });
    }

    @Override
    public Group removeMember(String name, String member) {
        return groupMap.computeIfPresent(name, (key, value) -> {
            value.getMembers().remove(member);
            return value;
        });
    }

    @Override
    public Group removeGroup(String name) {
        return groupMap.remove(name);
    }

    @Override
    public Set<String> getMembers(String name) {
        return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
    }

    @Override
    public List<Channel> getMembersChannel(String name) {
        //判断群聊存不存在
        if(groupMap.get(name) == null){
            return null;
        }
        return getMembers(name).stream()
                .map(member -> SessionFactory.getSession().getChannel(member))
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  17:28
 * @Description: TODO
 */
@Slf4j
public class ChatServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        LoginRequestMessageHandler loginHandler = new LoginRequestMessageHandler();
        ChatRequestMessageHandler chatHandler = new ChatRequestMessageHandler();
        GroupCreateRequestMessageHandler groupCreateHandler = new GroupCreateRequestMessageHandler();
        GroupJoinRequestMessageHandler groupJoinHandler = new GroupJoinRequestMessageHandler();
        GroupMembersRequestMessageHandler groupMembersHandler = new GroupMembersRequestMessageHandler();
        GroupQuitRequestMessageHandler groupQuitHandler = new GroupQuitRequestMessageHandler();
        GroupChatRequestMessageHandler groupChatHandler = new GroupChatRequestMessageHandler();
        QuitHandler quitHandler = new QuitHandler();

        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            nioSocketChannel.pipeline()
                                    // 空闲检测器:防止假死现象发生,监控读写空闲时间是否过长
                                    // 5s 内如果没有收到 channel 的数据,就会触发一个事件
                                    .addLast(new IdleStateHandler(5, 0, 0))
                                    // 自定义 Handler 处理读写空闲事件,ChannelDuplexHandler 既可以作为出站处理器,也可以作为入站处理器
                                    .addLast(new ChannelDuplexHandler(){
                                        @Override
                                        // 用来触发特殊事件(一些自定义事件)
                                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                            IdleStateEvent event = (IdleStateEvent) evt;
                                            if (event.state() == IdleState.READER_IDLE) {
                                                // 触发了读空闲事件
                                                log.debug("已经 5 秒没有读到数据了......");
                                                ctx.channel().close();
                                            }
                                        }
                                    })
                                    .addLast(new ProcotolFrameDecoder())
                                    .addLast(LOGGING_HANDLER)
                                    .addLast(MESSAGE_CODEC);
                            // 自定义处理器(使用SimpleChannelInboundHandler,只针对泛型指定的消息做处理)
                            nioSocketChannel.pipeline()
                                    // 登录业务
                                    .addLast(loginHandler)
                                    // 聊天业务
                                    .addLast(chatHandler)
                                    // 群聊业务
                                    .addLast(groupCreateHandler)
                                    .addLast(groupJoinHandler)
                                    .addLast(groupMembersHandler)
                                    .addLast(groupQuitHandler)
                                    .addLast(groupChatHandler)
                                    // 退出处理
                                    .addLast(quitHandler);
                        }
                    });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        }catch (Exception e){
            log.error("server err",e);
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

......

client:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  17:25
 * @Description: TODO
 */
@Slf4j
public class ChatClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        // 用于线程之间通信
        CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
        // 登录状态标记
        AtomicBoolean LOGIN = new AtomicBoolean(false);

        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class)
                    .group(group)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    .addLast(new ProcotolFrameDecoder())
                                    .addLast(MESSAGE_CODEC)
                                    // 空闲检测器:防止假死现象发生,监控读写空闲时间是否过长
                                    // 3s 内如果没有向服务器写数据,就会触发一个写空闲事件(写的频率要比服务器读时间间隔短)
                                    .addLast(new IdleStateHandler(0, 3, 0))
                                    // 自定义 Handler 处理读写空闲事件,ChannelDuplexHandler 既可以作为出站处理器,也可以作为入站处理器
                                    .addLast(new ChannelDuplexHandler(){
                                        @Override
                                        // 用来触发特殊事件(一些自定义事件)
                                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                            IdleStateEvent event = (IdleStateEvent) evt;
                                            if (event.state() == IdleState.WRITER_IDLE) {
                                                // 触发了写空闲事件
//                                                log.debug("3s 没有写数据了,发送一个心跳包......");
                                                ctx.writeAndFlush(new PingMessage());
                                            }
                                        }
                                    });
                            // 创建自定义 Handler(业务相关)
                            socketChannel.pipeline().addLast("client handler",
                                    new ChannelInboundHandlerAdapter(){
                                        @Override
                                        // 连接建立后触发
                                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                            super.channelActive(ctx);
                                            // 负责接收用户在控制台的输入,向服务器发送各种消息
                                            // 创建新线程,防止用户输入阻塞 NIO 线程
                                            new Thread(() -> {
                                                Scanner sc = new Scanner(System.in);
                                                System.out.print("请输入用户名:");
                                                String username = sc.nextLine();
                                                System.out.print("请输入密码:");
                                                String password = sc.nextLine();

                                                // 构造消息对象进行登录(省略校验部分)
                                                LoginRequestMessage message = new LoginRequestMessage(username, password);

                                                // 发送消息(入站处理器,写入之后就会进行出站操作)
                                                ctx.writeAndFlush(message);
                                                System.out.println("等待后续操作...");

                                                try {
                                                    // 线程同步操作,等待登录响应通知
                                                    WAIT_FOR_LOGIN.await();
                                                } catch (InterruptedException e) {
                                                    throw new RuntimeException(e);
                                                }
                                                if (!LOGIN.get()) {
                                                    // 登录失败,释放资源
                                                    ctx.channel().close();
                                                    return;
                                                }
                                                // 登录成功
                                                while (true){
                                                    // 菜单
                                                    System.out.println("===============================");
                                                    System.out.println("send [username] [content]");
                                                    System.out.println("gsend [group name] [content]");
                                                    System.out.println("gcreate [group name] [m1,m2,m3...]");
                                                    System.out.println("gmembers [group name]");
                                                    System.out.println("gjoin [group name]");
                                                    System.out.println("gquit [group name]");
                                                    System.out.println("quit");
                                                    System.out.println("===============================");
                                                    // 接收命令
                                                    String command = sc.nextLine();
                                                    // 解析命令(省略非法命令校验,假设命令都是正确格式)
                                                    String[] s = command.split(" ");
                                                    switch (s[0]){
                                                        case "send":
                                                            ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
                                                            break;
                                                        case "gsend":
                                                            ctx.writeAndFlush(new GroupChatRequestMessage(username,s[1],s[2]));
                                                            break;
                                                        case "gcreate":
                                                            Set<String> members = new HashSet<>(Arrays.asList(s[2].split(",")));
                                                            // 加入自己
                                                            members.add(username);
                                                            ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],members));
                                                            break;
                                                        case "gmembers":
                                                            ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
                                                            break;
                                                        case "gjoin":
                                                            ctx.writeAndFlush(new GroupJoinRequestMessage(username,s[1]));
                                                            break;
                                                        case "gquit":
                                                            ctx.writeAndFlush(new GroupQuitRequestMessage(username,s[1]));
                                                            break;
                                                        case "quit":
                                                            ctx.channel().close();
                                                            return;
                                                        default:
                                                            System.out.println("指令输入错误,请重试");
                                                            break;
                                                    }
                                                }
                                            },"system in").start();
                                        }

                                        // 接收响应消息
                                        @Override
                                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            log.debug("msg: {}", msg);
                                            if(msg instanceof LoginResponseMessage){
                                                // 是登录响应
                                                LoginResponseMessage resp = (LoginResponseMessage) msg;
                                                if (resp.isSuccess()) {
                                                    // 设置登录标记
                                                    LOGIN.compareAndSet(false,true);
                                                }
                                                // 已经接收到响应,计数器减一,唤醒System in 线程
                                                WAIT_FOR_LOGIN.countDown();
                                            }
                                        }

                                        // 连接断开时触发
                                        @Override
                                        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                                            log.debug("连接已断开,按任意键退出...");
                                            ctx.channel().close();
                                        }

                                        // 出现异常时触发
                                        @Override
                                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                            log.debug("连接异常:{},按任意键退出",cause.getMessage());
                                            ctx.channel().close();
                                        }
                                    });
                        }
                    });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().sync();
        }catch (Exception e){
            log.error("client error",e);
        }finally {
            group.shutdownGracefully();
        }
    }
}

连接假死

原因:

  • 网络设备出现故障,例如网卡、机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源
  • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
  • 应用程序线程阻塞,无法进行数据读写

问题:

  • 假死的连接占用的资源不能自动释放
  • 向假死的连接发送数据,得到的反馈是发送超时

解决:

  • Netty 提供了空闲状态检测器来解决该问题:IdleStateHandler
  • 参数:读最大空闲时间、写最大空闲时间、读写最大空闲时间
  • 指定的最大空闲时间内没有收到或者写出数据,就会触发相应事件(事件类型为 IdleStateEvent

    • IdleState.READER_IDLE:读空闲事件
    • IdleState.WRITER_IDLE:写空闲事件
    • IdleState.ALL_IDLE:都空闲事件
nioSocketChannel.pipeline()
    // 空闲检测器:防止假死现象发生,监控读写空闲时间是否过长
    // 5s 内如果没有收到 channel 的数据,就会触发一个事件
    .addLast(new IdleStateHandler(5, 0, 0))
    // 自定义 Handler 处理读写空闲事件,ChannelDuplexHandler 既可以作为出站处理器,也可以作为入站处理器
    .addLast(new ChannelDuplexHandler(){
        @Override
        // 用来触发特殊事件(一些自定义事件)
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                // 触发了读空闲事件
                log.debug("已经 5 秒没有读到数据了......");
            }
        }
    });

心跳机制

  • 可以配合心跳机制解决连接假死问题
  • 让客户端每 3 秒自动向服务器发送一个心跳包来打破 5 秒限制
  • 这样可以防止正常用户被服务端断开的现象
socketChannel.pipeline()
                // 空闲检测器:防止假死现象发生,监控读写空闲时间是否过长
                // 3s 内如果没有向服务器写数据,就会触发一个写空闲事件(写的频率要比服务器读时间间隔短)
                .addLast(new IdleStateHandler(0, 3, 0))
                // 自定义 Handler 处理读写空闲事件,ChannelDuplexHandler 既可以作为出站处理器,也可以作为入站处理器
                .addLast(new ChannelDuplexHandler(){
                    @Override
                    // 用来触发特殊事件(一些自定义事件)
                    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                        IdleStateEvent event = (IdleStateEvent) evt;
                        if (event.state() == IdleState.WRITER_IDLE) {
                            // 触发了写空闲事件
                            //                                                log.debug("3s 没有写数据了,发送一个心跳包......");
                            ctx.writeAndFlush(new PingMessage());
                        }
                    }
                });

场景参数及优化

扩展序列化算法

序列化、反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[] 或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

参数

连接超时

CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannel 参数
  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
  • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

客户端:

Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    // 参数配置
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,300)
                    .handler(new LoggingHandler());

服务端:

Bootstrap bootstrap = new ServerBootstrap()
                    .group(group)
                    .channel(NioServerSocketChannel.class)
                    // ServerSocket参数配置
                    .option()
                    // Socket参数配置
                    .childOption()
                    .handler(new LoggingHandler());

SO_BACKLOG

  • 属于 ServerSocketChannel

image-20231031160115376

  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

其中

  • 在 Linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
  • sync queue-半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog指定,在 syncookies启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue-全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accept queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
Bootstrap bootstrap = new ServerBootstrap()
                    .group(group)
                    .channel(NioServerSocketChannel.class)
                    // ServerSocket参数配置
                    .option(ChannelOption.SO_BACKLOG, 200)    // 全连接队列大小
                    .handler(new LoggingHandler());
在 Netty 中,accept 的能力很强,只有在 accept 处理不了的时候,才会在队列中堆积;如果全连接队列满了,就会抛出异常

ulimit -n

  • 属于操作系统参数
  • 允许一个进程打开的最大文件(描述符)数量

TCP_NODELAY

  • 属于 SocketChannel 参数
  • 使用 childOption 进行设置
  • nagle 算法的开关,Netty 中默认开启(数据量小的时候,等待多个凑成大的再统一发送,可能造成延迟)
  • 建议设置为 true,不延迟

SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannel 参数
  • SO_RCVBUF 既可用于 SocketChannel 参数,也可以用于 ServerSocketChannel 参数(建议设置到 ServerSocketChannel 上)
  • 决定了滑动窗口的上限,建议不要手动调整,由操作系统动态调整

ALLOCATOR

  • 属于 SocketChannel 参数
  • 用来分配 ByteBuf
  • 调用 ctx.alloc()就会拿到 ALLOCATOR 分配器对象

RCVBUF_ALLOCATOR

  • 属于 SocketChannel 参数
  • 控制 Netty 接收缓冲区(ByteBuf)大小和是否是 Direct 内存
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

RPC 框架

为了简化起见,在原来聊天项目的基础上新增 RPC 请求和响应消息

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  13:59
 * @Description: TODO
 */
@Data
public abstract class Message implements Serializable {
  
    // 其他内容省略.....

    // RPC 消息
    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;

    private static final Map<Integer,Class<?>> messageClasses = new HashMap<>();

    static {
        // 其他内容省略.....
  
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }
}

请求消息:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  14:11
 * @Description: TODO
 */
@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message{
    // 调用的接口全限定名,服务端根据它找到实现
    private String interfaceName;
    // 调用接口中的方法名
    private String methodName;
    // 方法返回类型
    private Class<?> returnType;
    // 方法参数类型数组
    private Class[] parameterTypes;
    // 方法参数数值数组
    private Object[] parameterValues;

    public RpcRequestMessage(){
    }

    public RpcRequestMessage(int sequenceId, String interfaceName, String methodName,
                             Class<?> returnType, Class[] parameterTypes, Object[] parameterValues){
        super.setSequenceId(sequenceId);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValues = parameterValues;
    }
    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_REQUEST;
    }
}

响应消息:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-29  14:11
 * @Description: TODO
 */
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message{
    // 返回值
    private Object returnValue;
    // 异常值
    private Exception exceptionValue;

    @Override
    public int getMessageType() {
        return RPC_MESSAGE_TYPE_RESPONSE;
    }
}

服务端:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-31  21:32
 * @Description: TODO
 */
@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable codec = new MessageCodecSharable();

        // rpc 消息处理器,待实现
        RpcRequestMessageHandler rpcHandler = new RpcRequestMessageHandler();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            nioSocketChannel.pipeline().addLast(new ProcotolFrameDecoder())
                                    .addLast(loggingHandler)
                                    .addLast(codec)
                                    .addLast(rpcHandler);
                        }
                    });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        }catch (Exception e){
            log.error("sever error...",e);
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }

    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-31  22:03
 * @Description: TODO
 */
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponseMessage rpcResponseMessage) throws Exception {
        log.debug("{}",rpcResponseMessage);
    }
}

客户端:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-31  21:37
 * @Description: TODO
 */
@Slf4j
public class RpcClient {
    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable codec = new MessageCodecSharable();

        // rpc 响应消息处理器,待实现
        RpcResponseMessageHandler rpcHandler = new RpcResponseMessageHandler();

        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ProcotolFrameDecoder())
                                    .addLast(loggingHandler)
                                    .addLast(codec)
                                    .addLast(rpcHandler);
                        }
                    });
            Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();

            // 发送 RPC 消息
            RpcRequestMessage message = new RpcRequestMessage(
                    1,
                    "top.hellocode.chat.server.service.HelloService",
                    "sayHello",
                    String.class,
                    new Class[]{String.class},
                    new Object[]{"张三"}
            );
            channel.writeAndFlush(message);

            channel.closeFuture().sync();
        }catch (Exception e){
            log.error("client error...",e);
        }finally {
            group.shutdownGracefully();
        }
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-31  21:41
 * @Description: Rpc 请求处理器
 */
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequestMessage message){
        RpcResponseMessage resp = new RpcResponseMessage();

        try {
            // 获取接口实现类
            HelloService service = (HelloService) ServicesFactory.getService(
                    Class.forName(message.getInterfaceName())
            );
            // 获取方法
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            // 执行方法
            Object ret = method.invoke(service, message.getParameterValues());

            // 构造 Response
            resp.setReturnValue(ret);
        } catch (Exception e) {
            resp.setExceptionValue(e);
            throw new RuntimeException(e);
        }
        channelHandlerContext.writeAndFlush(resp);
    }


    // 反射测试
    public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        RpcRequestMessage message = new RpcRequestMessage(
                1,
                "top.hellocode.chat.server.service.HelloService",
                "sayHello",
                String.class,
                new Class[]{String.class},
                new Object[]{"张三"}
        );

        // 获取接口实现类
        HelloService service = (HelloService) ServicesFactory.getService(
                Class.forName(message.getInterfaceName())
        );
        // 获取方法
        Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
        // 执行方法
        Object ret = method.invoke(service, message.getParameterValues());
        System.out.println(ret);
    }
}

在使用 Gson 序列化和反序列化 Class 类型时(如 String.class等),会出现异常,需要自己指定序列化和反序列化方式:

static class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>>{

    @Override
    public Class<?> deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
        // json -> class
        try {
            return Class.forName(jsonElement.getAsString());
        } catch (ClassNotFoundException e) {
            throw new JsonParseException(e);
        }
    }

    @Override
    public JsonElement serialize(Class<?> aClass, Type type, JsonSerializationContext jsonSerializationContext) {
        // class -> json
        return new JsonPrimitive(aClass.getName());
    }
}

使用:

Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();

优化

RpcClientManager:

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-31  21:37
 * @Description: TODO
 */
@Slf4j
public class RpcClientManager {
    private static Channel channel = null;
  
    // 测试
    public static void main(String[] args) {
        HelloService service = getProxyService(HelloService.class);
        System.out.println(service.sayHello("张三"));
    }

    // 单例模式,获取唯一的channel对象
    public static Channel getChannel() {
        if (channel != null) {
            return channel;
        }
        synchronized (RpcClientManager.class) {
            if (channel != null) {
                return channel;
            }
            initChannel();
            return channel;
        }
    }

    // 创建代理类
    public static <T> T getProxyService(Class<T> serviceClass) {
        ClassLoader loader = serviceClass.getClassLoader();
        Class<?>[] interfaces = new Class[]{serviceClass};

        // 将方法调用转换为 消息对象
        Object proxyObj = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
            int sequenceId = SequenceIdGenerator.nextId();
            RpcRequestMessage message = new RpcRequestMessage(
                    sequenceId,
                    serviceClass.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 发送消息
            getChannel().writeAndFlush(message);

            // 准备一个空 Promise 对象,接收结果            指定 promise 对象异步接收结果的线程
            DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
            // 将 Promise 暂存起来,等待结果
            RpcResponseMessageHandler.PROMISES.put(sequenceId, promise);

            // 等待结果
            promise.await();
            if(promise.isSuccess()){
                return promise.getNow();
            }else{
                // 抛出异常
                throw new RuntimeException(promise.cause());
            }
        });
        return (T) proxyObj;
    }

    // 初始化 channel 对象
    private static void initChannel() {
        NioEventLoopGroup group = new NioEventLoopGroup();
        LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable codec = new MessageCodecSharable();

        // rpc 响应消息处理器,待实现
        RpcResponseMessageHandler rpcHandler = new RpcResponseMessageHandler();


        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ProcotolFrameDecoder())
                                .addLast(loggingHandler)
                                .addLast(codec)
                                .addLast(rpcHandler);
                    }
                });
        try {
            channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();

            channel.closeFuture().addListener(future -> {
                group.shutdownGracefully();
            });
        } catch (Exception e) {
            log.error("client error...", e);
        }
    }
}
/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-31  21:41
 * @Description: Rpc 请求处理器
 */
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequestMessage message){
        RpcResponseMessage resp = new RpcResponseMessage();
        resp.setSequenceId(message.getSequenceId());

        try {
            // 获取接口实现类
            HelloService service = (HelloService) ServicesFactory.getService(
                    Class.forName(message.getInterfaceName())
            );
            // 获取方法
            Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
            // 执行方法
            Object ret = method.invoke(service, message.getParameterValues());

            // 构造 Response
            resp.setReturnValue(ret);
        } catch (Exception e) {
            e.printStackTrace();
            resp.setExceptionValue(new RuntimeException("远程过程调用:" + e.getCause().getMessage()));
        }
        channelHandlerContext.writeAndFlush(resp);
    }
}

获取结果:

调用方法是主线程,而获取结果是另一个线程干的,想要获取结果,就需要通过 Promise 来交换(通过SeqenceId),使用一个Map一一对应

image-20231101100941746

/**
 * @blog: <a href="https://www.hellocode.top">HelloCode.</a>
 * @Author: HelloCode.
 * @CreateTime: 2023-10-31  22:03
 * @Description: TODO
 */
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
    // squenceId -> Promise
    // 便于获取结果
    public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponseMessage rpcResponseMessage) throws Exception {
        log.debug("{}",rpcResponseMessage);
        // 向 Promise 填充结果
        // 泛型如果使用 ? 的话,只能取,不能放(null 可以),所以这里用了 Object
        // 使用 remove,避免无用 Promise 占用资源
        Promise<Object> promise = PROMISES.remove(rpcResponseMessage.getSequenceId());
        if(promise != null){
            Object value = rpcResponseMessage.getReturnValue();
            Exception exception = rpcResponseMessage.getExceptionValue();
            if(exception != null){
                // 失败消息
                promise.setFailure(exception);
            }else{
                // 成功消息
                promise.setSuccess(value);
            }
        }
    }
}

源码分析

推荐:https://blog.csdn.net/yyuggjggg/article/details/126634821

最后修改:2023 年 11 月 01 日
如果觉得我的文章对你有用,请随意赞赏