Netty框架学习(一) -- Netty框架简介 & I/O模型

@[TOC]

介绍

  1. Netty是由JBOSS提供的一个Java开源框架,现为Github独立项目。
  2. Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能,高可用的网络IO程序。
  3. Netty主要针对TCP协议下,面向Client端高并发应用,或者Peer-to-Peer场景下大量数据持续传输的应用。
  4. Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景

在这里插入图片描述

官网介绍 : [ Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了诸如TCP和UDP套接字服务器之类的网络编程。]

  • 更高的吞吐量,更低的延迟
  • 减少资源消耗
  • 减少不必要的内存复制

应用场景

  1. 分布式系统中的各个节点之间服务远程调用,RPC框架必不可少,所以Netty做为异步高性能的通信框架,做为基础通信组件被RPC框架使用
  2. 阿里的分布式框架Dubbo, Dubbo协议默认使用Netty作为基础通信组件,用于实现各进程节点之间内部通信

I/O模型 (BIO、NIO、AIO)

  • BIO :同步阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。

  • NIO :同步非阻塞,服务器实现模式为一个线程处理多个请求,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。

  • AIO : 异步非阻塞,AIO引入异步通道的概念,采用了 Proactor模式,简化了程序编写,有效的请求菜启动线程,它的特点是先由操作系统完成后才通知服务端和程序启动线程去处理,一般适用于连接数多且连接时间较长的应用

I/O使用场景:

  • BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,简单易懂。
  • NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器之间通讯,JDK1.4后支持
  • AIP方式适用于连接数多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,JDK1.7后支持。

BIO

在这里插入图片描述 BIO编程简单流程:

  1. 服务端启动一个ServerSocket
  2. 客户端启动socket对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯
  3. 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
  4. 如果有响应,客户端线程会等待请求结束后,在继续执行

案例 :

  1. 使用BIO模型编写一个服务端,监听端口,当有客户端连接时,就启动一个线程与之通讯
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author yxl
 * @version 1.0
 * @date 2021/3/9 20:18
 */
public class BIOServer {
    public static void main(String[] args) throws IOException {

        //1.创建一个线程池
        ExecutorService executorService = Executors.newCachedThreadPool();

        ServerSocket serverSocket = new ServerSocket(6666);
        System.out.println("服务器启动了");
        while(true){
            Socket socket = serverSocket.accept();
            System.out.println("连接到一个客户端");
            //创建线程池与之
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    handler(socket);
                }
            });
        }
    }

    public static void handler(Socket socket){
        try {
        	System.out.println("当前线程ID"+ Thread.currentThread().getId() + "名称" + Thread.currentThread().getName());
            byte[] bytes = new byte[1024];
            InputStream inputStream = socket.getInputStream();
            while (true){
                int read = inputStream.read();
                if(read != -1){
                    System.out.println("-");
                    System.out.println(new String(bytes,0,read));
                }else{
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("关闭客户端的连接");
        }

    }
}

使用 telnet 测试

在这里插入图片描述

在这里插入图片描述

缺点:

  1. 每个请求都需要创建独立都线程,与对应都客户端进行数据Read, 业务处理, 数据 Write .
  2. 当并发数较大时,需要创建大量线程来处理连接 ,系统资源占有较大。
  3. 连接建立后,如果当前线程暂时没有数据可读,则线程阻塞在Read操作上,造成资源浪费。

NIO

  1. Java NIO 全称 Java non-blocking IO, 是指JDK提供新当API,从JDK1.4开始,Java 提供了一系列改进当输入/输出的新特性,被统称为 NIO ,是异步非阻塞的
  2. NIO 相关类都被放在 Java.nio 包及子包下,并且对原 Java.io 包中很多类进行改写。
  3. NIO 有三大核心部分 : Channel (通道),Buffer(缓存区),Selector(选择器)
  4. NIO 是面向缓存区,或者面向快编程对,数据读到一个它稍后处理对缓存区,需要时可在缓存区中前后移动,这就增加类处理过程中的灵活性。使用它可以提供非阻塞式的高伸缩性网络

一、Buffer :

├─ByteBuffer
├─IntBuffer
├─LongBuffer
├─ShortBuffer
├─StringCharBuffer
├─DoubleBuffer
├─CharBuffer
└ FloatBuffer
  • 缓存区 Bufer ,本质上是一个可以读取数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松的使用内存块,缓存区对象内置了一些机制 ,能够跟踪和记录缓存区的状态变化。Channel 提供文件网络读取的渠道,但是读取或者写入的数据必须经由buffer 。
public class BasicBuffer {

    public static void main(String[] args) {
        //创建一个IntBuffer 大小为5
        IntBuffer intBuffer = IntBuffer.allocate(5);
        intBuffer.put(1);
        intBuffer.put(2);
        intBuffer.put(3);
        intBuffer.put(4);
        intBuffer.put(45);
        //读写转换
        intBuffer.flip();
        while (intBuffer.hasRemaining()){
            System.out.println(intBuffer.get());
        }

    }
}

在这里插入图片描述

所有的 Boffer 都继承 并且有几个重要的参数

public abstract class IntBuffer
    extends Buffer
    implements Comparable<IntBuffer>
public abstract class Buffer {

    /**
     * The characteristics of Spliterators that traverse and split elements
     * maintained in Buffers.
     */
    static final int SPLITERATOR_CHARACTERISTICS =
        Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;

    // Invariants: mark <= position <= limit <= capacity
    private int mark = -1;    //
    private int position = 0;  //当你写数据到Buffer中时,position表示当前的位置 初始值为o
    private int limit;
    private int capacity;
}
  • mark: 标记

  • capacity:容量,既可以容纳的最大数据量,在缓存区创建时被设定并且不能更改。

  • position:位置,下一个要被读或者写的元素的索引,每次读写缓存区数据时都会改变值,为下次读写做准备。

  • limit:表示缓存区的当前终点,不能对缓存区超过极限的位置进行读写,且极限时可以修改的。

断点:

在这里插入图片描述

二、Channel :

  1. Java的通道类似于流,但是有些区别,通道可以同时进行读写,而流只能读或者写,也可以实现异步读写数据,也可以从缓存区读数据,写入到缓存区数据
  2. Channel 是一个 NIO 的接口
  3. 常用的 Channel 类有 :FileChannel、DatagramChannrl、ServerSocketChannel

FileChannel :

public class BasicFileChannel {

    public static void main(String[] args) throws IOException {
        String str = "Hello World";
        //创建一个输出流Channel
        FileOutputStream fileOutputStream = new FileOutputStream("/Users/yanxiaolong/a.txt");

        //拿到FileChannel
        FileChannel channel = fileOutputStream.getChannel();
        
        //创建缓存区Buffer
        ByteBuffer allocate = ByteBuffer.allocate(1034);
        allocate.put(str.getBytes(StandardCharsets.UTF_8));
        //对ByteBuffer进行flip
        allocate.flip();
        //将ByteBuffer写入FileChannel
        channel.write(allocate);

        //关闭流
        fileOutputStream.close();
    }
}

在这里插入图片描述

public class BasicFileChannel2 {

    public static void main(String[] args) throws IOException {
        //读取文件创建输入流
        File file = new File("/Users/yanxiaolong/a.txt");
        FileInputStream fileInputStream = new FileInputStream(file);
        FileChannel channel = fileInputStream.getChannel();

        //创建缓存区Buffer
        ByteBuffer allocate = ByteBuffer.allocate((int)file.length());

        channel.read(allocate);
        System.out.println(new String(allocate.array()));

        fileInputStream.close();
    }
}

在这里插入图片描述

MappedByteBuffer 文件Copy

public class BasicFileChannel3 {

    public static void main(String[] args) throws IOException {
        long start = System.currentTimeMillis();
        FileChannel inChannel = FileChannel.open(Paths.get("/Users/yanxiaolong/a.txt"), StandardOpenOption.READ);
        FileChannel outChannel = FileChannel.open(Paths.get("/Users/yanxiaolong/b.txt"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE_NEW);

        //内存映射文件
        MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
        MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

        //直接对缓冲区进行数据的读写操作
        byte[] dst = new byte[inMappedBuf.limit()];
        inMappedBuf.get(dst);
        outMappedBuf.put(dst);

        inChannel.close();
        outChannel.close();
        long end = System.currentTimeMillis();
        System.out.println("内存映射文件所花时间:"+(end-start));

    }
}

在这里插入图片描述

三、Selector :

  1. Java 的 NIO,非阻塞的iO方式/可以用一个线程,处理多个的客户端连接,就会使用到selector(选择器), 是SelecttableChannle 对象的多路复用器
  2. Selector 能够检测多个注册的通道上是否有事情发生(注意: 多个Channel以事件的方式可以注册到同一个Selector),如果有事情发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程区管理多个通道,也就是管理多个连接和请求。
  3. 只有连接真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用区维护多个线程
  4. 避免了多线程之间都上下文切换导致的开销

在这里插入图片描述

服务端

public class NIOServer {

    public static void main(String[] args) throws IOException {
        //创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //创建选择器
        Selector selector = Selector.open();

        serverSocketChannel.bind(new InetSocketAddress(6666));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        //循环等待客户端连接
        while (true){
            if(selector.select(1000) == 0){
                System.out.println("服务器等待了一秒 无连接");
                continue;
            }
            //如果大于0就获取相关的selectedKeys连接,已经获取到关注到事件
            //selector.selectedKeys(); 返回事件的集合
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                //获取下一个元素
                SelectionKey next = iterator.next();
                //如果是OP_ACCEPT,有新的客户端连接
                if(next.isAcceptable()){
                    //该客户端生成一个SocketChannel
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    System.out.println("客户端连接成功 生成一个 socketChannel" + socketChannel.hashCode());
                    //注册,关联ByteBuffer
                    socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                }
                //发送OP_READ
                if(next.isReadable()){
                    SocketChannel socketChannel = (SocketChannel) next.channel();
                    ByteBuffer byteBuffer =  (ByteBuffer) next.attachment();
                    socketChannel.read(byteBuffer);
                    System.out.println("from 客户端:" + new String(byteBuffer.array()));
                }
                iterator.remove();
            }


        }
    }


}

客户端

public class NIOClient {
    public static void main(String[] args) throws IOException {
        //得到网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //设置非阻塞
        socketChannel.configureBlocking(false);
        //socket连接地址
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
        //连接服务器
        if(!socketChannel.connect(inetSocketAddress)){
            while(!socketChannel.finishConnect()){
                System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作");
            }
        }

        //连接成功就发送数据
        String str = "hello world";

        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));
        //发送数据 将Buffer 数据写入Channel
        socketChannel.write(byteBuffer);
        System.in.read();


    }
}

在这里插入图片描述

原理:

  1. 当客户端连接时,会通过ServerSocketChannel得到SocketChannel
  2. Selector继续监听,select方法 返回有事件发送的通道的个数
  3. 将socketChannel 注册到Selector上register (Selector sel,int ops),一个selector可以注册多个SocketChannel
  4. 注册后返回一个SelectionKey和Selector 关联
  5. 进一步得到各个SelectionKey, 通过SelectionKey 反向获取 SocketChannel , 方法Channel() ,通过 channel 写业务代码

  • SelectionKey : 表示SelectableChannel 和 Selector 之间的注册关系,每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含2个表示为整数值的操作集。操作集的每一位都表示该键的通道锁支持的一类可选操作

    • SelectionKey.OP_READ
    • SelectionKey.OP_WRITE
    • SelectionKey.OP_CONNECT
    • SelectionKey.OP_ACCEPT

NIO与零拷贝

介绍:

  1. 零拷贝是网络编程中的关键,很多性能优化都离不开
  2. 在Java 程序中,常用都零拷贝有 mmap(内存映射) 和 sendFile 。

传统I/O :

在这里插入图片描述

  • 传统的IO拷贝技术需要经过四次拷贝(CPU copy 和 DMA copy),四次的状态转换(用户态和内核态),效率较为低下

mmap优化 :

在这里插入图片描述

  • mmap 通过内存映射,将文件映射搭配内核缓存区,同时,用户看见可以共享内核的数据。这样,在进行网络传输时,就可以减少内核空间到用户软件到拷贝次数

sendFile :

在这里插入图片描述

  • Linux 2.1 版本 提供了 sendFile 函数,其基本原理如下:数据根本不经过用户状态,直接从内核缓冲区进入到 Socket Buffer,同时,由于和用户态完全无关,就减少了一次上下文切换。

mmap 和 sendFile的区别

  1. mmap适合小数据量读写,sendFile适合大文件传输
  2. mmap需要4次上下文切换,3次数据拷贝,sendFile 需要3次上下文切换,最少2次数据拷贝
  3. sendFile 可以利用DMA 方式,减少CPU拷贝,mmap (必须从内核拷贝到Socket 缓存区 )

案例:

public class NewIoServer {
    public static void main(String[] args) throws Exception {

        InetSocketAddress address = new InetSocketAddress(7001);

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        ServerSocket serverSocket = serverSocketChannel.socket();

        serverSocket.bind(address);

        //创建buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(4096);

        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();

            int readcount = 0;
            while (-1 != readcount) {
                try {

                    readcount = socketChannel.read(byteBuffer);

                } catch (Exception ex) {
                    break;
                }
                //倒带 position = 0 mark 作废
                byteBuffer.rewind();
            }
        }
    }
}

public class NewIoClient {
    public static void main(String[] args) throws Exception {

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 7001));
        String filename = "protoc-3.6.1-win32.zip";

        //得到一个文件channel
        FileChannel fileChannel = new FileInputStream(filename).getChannel();

        //准备发送
        long startTime = System.currentTimeMillis();

        //在linux下一个transferTo 方法就可以完成传输
        //在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件, 而且要主要
        //传输时的位置 =》 课后思考...
        //transferTo 底层使用到零拷贝
        long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);

        System.out.println("发送的总的字节数 =" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));

        //关闭
        fileChannel.close();

    }
}

AIO

  1. JDK1.7 引入 Asynchronous I/O, 即AIO ,在进行I/O 编程中,常用到两种哦是:Reactor 和 Proactor. Java 的 NIO 就是 Reactor , 当有事件触发时,服务端得到通知,进行相应的处理
  2. AIO 即 NIO 模式2.0 ,叫做异步不阻塞 的 IO。AIO 引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是由操作系统完成后才通知服务端程序启动去处理线程,一般用于连接数较多且连接事件较长的应用
  3. 目前AIO还没有广泛流行

NIO vs BIO

  1. BIO以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O的效率比流 I/O 高很多
  2. BIO是阻塞的,NIO是非阻塞的
  3. BIO基于字节流和字符流进行操作,而 NIO 基于 Channel (通道) 和 Buffer (缓存区) 进行操作,数据总是从通道读取到缓存区中,或者从缓存区写入到通道中,Selector (选择器) 用于监听多个通道事件 (比如 :连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

Netty线程模型

  • 原NIO存在 :API繁琐,需要熟练掌握三大组件、开发与维护难度大,需要考虑断线重连,网络闪断,半包读写,网络阻塞等

线程模型基本介绍:

  1. 不同的线程模型,对程序对性能有很大影响
  2. 目前存在对线程模型有 : 传统阻塞I/O服务模型 、 Reactor模式
  3. 根据Reactor 的数量和处理资源池线程的数量不同,有3种典型的实现 - 单Reactor 单线程 - 单Reactor 多线程 - 主从Reactor 多线程
  4. Netty线程模型(Netty主要基于主从 Reactor 多线程模型做了一定的改进,其中中从 Reactor 多线程模型有多个 Reactor)

Reactor 模式 :

  1. 基于I/O 复用模型,多个连接公用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需等待所有连接,当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始业务处理
  2. 基于线程是复用线程资源,不必再为每个连接而创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个业务

单线程模型 :

在这里插入图片描述

  • Reactor 内部通过selector 监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立都事件,则由Acceptor处理,Acceptor通过accept接受连接,并创建一个Handler来处理连接后续都各种事件。如果是读写事件,直接调用连接对应都Handler来处理
  • Handler 完成read -> (decode -> compute -> encode) -> send的业务流程
  • 这种模型好处是简单,坏处却很明显,当某个Handler阻塞时,会导致其他客户端的handler和accpetor都的不到执行,无法做到高性能,只适合用于业务处理非常快都场景

单线程模型就是只指定一个线程执行客户端连接和读写操作,也就是在一个Reactor中完成,对应在Netty中的实现就是将NioEventLoopGroup线程数设置为1,核心代码是:

 NioEventLoopGroup group = new NioEventLoopGroup(1);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .childHandler(new ServerHandlerInitializer());

多线程模型 :

在这里插入图片描述

  • 主线程中,Reactor对象通过selector监控连接事件,收到事件后通过dispatch进行分发,如果是连接建立事件,则由Acceptor处理,Acceptor 通过accept 接收连接,并创建一个Handler来处理后续事件,而Handler只负责响应事件,不进行业务操作,也就是只进行read读取操作和write写出数据,业务处理交给一个线程池进行处理
  • 线程池分配一个线程完成真正都业务处理,将响应结果交给主线程都Handler处理,Handler将结果send给client 在这里插入图片描述

多线程模型就是在一个单Reactor中进行客户端连接处理,然后业务处理交给线程池,核心代码如下:

NioEventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ServerHandlerInitializer());

主从多线程模型

在这里插入图片描述

  • 存在多个Reactor,每个Reactor都有自己的selector选择器,线程和dispatch
  • 主线程中的mainReactor通过自己的selector监控连接建立事件,收到事件后通过Accpetor接收,将新的连接分配给某个子线程
  • 子线程中的subReactor将mainReactor分配的连接加入连接队列中通过自己的selector进行监听,并创建一个Handler用于处理后续事件
  • Handler完成read->业务处理->send的完整业务流程

在这里插入图片描述

主从多线程模型是有多个Reactor,也就是存在多个selector,所以我们定义一个bossGroup和一个workGroup,核心代码如下:

NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_BACKLOG, 1024)
        .childHandler(new ServerHandlerInitializer());

Reactor模式优点:

  1. 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步都
  2. 可以最大程度都避免复杂多线程及同步问题,并且避免来多线程/进程都切换开销
  3. 扩展性好,可以方便都通过增加 Reactor 实例来充分利用CPU资源
  4. 复用性好,Reactor 模型本身与集体事件处理逻辑无关,具有很高的复用性


个人博客地址:http://blog.yanxiaolong.cn/

end
  • 作者:yxl(联系作者)
  • 发表时间:2021-03-24 16:22
  • 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  • 转载声明:如果是转载栈主转载的文章,请附上原文链接
  • 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  • 评论