Java的IO机制

张贤 2020年03月17日 146次浏览

BIO、NIO、AIO 的区别

BIO

包括基于字节流的 InputStream 和 OutputStream,以及基于字符流的 Reader 和Writer。

NIO

NonBlock-IO:构建多路复用的、同步非阻塞的 IO 操作

  • Channels
    其中 FileChannels 有 transferTo 和 transferFrom 两个方法,适合大文件的拷贝,避免了两次用户态和内核态的上下文切换,即"零拷贝",效率高

  • Buffers

  • Selectors
    允许单线程处理多个 Channels。底层调用的是系统级别的 select\poll\epoll


使用单线程的轮询事件的机制,可以高效定位到就绪的 channel,只有 select 阶段是阻塞的,可以避免大量客户端连接时频繁切换线程的开销。

select、poll 和 epoll 的区别

  • select
    单个进程所能够打开的最大连接数由 FD_SETSIZE 宏定义,大小是 32 个整数的大小(在 32 为机器上,大小是3232,64 位机器上,大小是 3264),我们可以对其进行修改,然后重新编译内核,但是性能无法保证,需要做进一步测试
  • poll
    本质上与 select 没有区别,大师它没有最大连接数的限制,因为它是基于链表来存储的
  • epoll
    虽然连接数有上限,但是很大,1G 内存可以打开 10 万左右的连接

FD 剧增后带来的 IO 效率问题

  • select
    因为每次调用时都会对连接进行线性遍历,所以随着 FD 的增加会造成遍历速度的"线性下降"

  • poll
    同上

  • epoll
    由于 epoll 是根据每个 fd 上的 callback 函数来实现的,只有活跃的 socket 才会主动调用 callback,所以在活跃 socket 较少的情况下,使用 epoll 不会出现性能线性下降的问题。但是在所有 socket 都很活跃的情况下,可能会有性能问题

消息传递方式

  • select
    内核需要将消息传递到用户态,需要内核的拷贝动作

  • poll
    同上

  • epoll
    通过内核态和用户态共享同一块内存来实现,性能较高

AIO

Asynchronous IO,基于事件和回调机制,异步非阻塞

  • 基于回调:实现 Completionhandler 接口,调用时触发回调函数
  • 返回 Future:通过 isDone() 查看是否准备好,通过 get() 等待返回数据

BIO 例子

public class BIOPlainEchoServer {
    public void serve(int port) throws IOException {
        //将 ServerSocket 绑定到指定的端口
        final ServerSocket socket = new ServerSocket(port);
        while (true) {
            //阻塞直到收到客户端的连接
            final Socket clientSocket = socket.accept();
            System.out.println("Accepted connection from " + clientSocket);
            //创建一个子线程去处理客户端的请求
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                        PrintWriter writer = new PrintWriter(clientSocket.getOutputStream());
                        //从客户端读取数据并原封不动写回去
                        while (true) {
                            writer.println(reader.readLine());
                            writer.flush();
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();

        }
    }

    public void improvedServe(int port) throws IOException {
        //将 ServerSocket 绑定到指定的端口
        final ServerSocket socket = new ServerSocket(port);
        //创建一个线程池
        ExecutorService service = Executors.newFixedThreadPool(6);
        while (true) {
            //阻塞直到收到客户端的连接
            final Socket clientSocket = socket.accept();
            System.out.println("Accepted connection from " + clientSocket);
            service.execute(() -> {
                try {
                    BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                    PrintWriter writer = new PrintWriter(clientSocket.getOutputStream());
                    //从客户端读取数据并原封不动写回去
                    while (true) {
                        writer.println(reader.readLine());
                        writer.flush();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });

        }
    }
}

NIO 例子

public class NIOPlainEchoServer {

    public void serve(int port) throws IOException {
        System.out.println("Listening for connection on port: " + port);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket ss = serverSocketChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        //将 ServerSocket 绑定到指定的端口
        ss.bind(address);
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        //将 channel 注册到 selector 里,并说明 selector 关注的点,这里是关注建立连接这个事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            //阻塞等待就绪的 channel,即没有与客户端建立连接前就一直轮询
            selector.select();
            //获取到selector里所有就绪的 SelectionKey 实例,每将一个 channel 注册到 selector 就会产生一个 SelectionKey
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //将就绪的 SelectionKey 从 selector 中移除,因为马上就要去处理它,防止重复执行
                iterator.remove();

                //若 SelectionKey 处于 Acceptable 状态
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel clientChannel = server.accept();
                    System.out.println("Accepted connection from " + clientChannel);
                    clientChannel.configureBlocking(false);
                    //向 selector 注册 clientChannel,主要关注读写事件,并传入一个ByteBuffer实例供读写缓存
                    clientChannel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));
                }

                //若 SelectionKey 处于可读状态
                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer out = (ByteBuffer) key.attachment();
                    //从 channel 里读取数据存入 ByteBuffer
                    clientChannel.read(out);
                }

                //若 SelectionKey 处于可写状态
                if (key.isWritable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer out = (ByteBuffer) key.attachment();
                    out.flip();
                    //将 ByteBuffer 的数据写入 channel
                    clientChannel.write(out);
                    out.compact();
                }
            }
        }
    }
}

BIO、NIO、AIO 对比

属性模型阻塞 BIO非阻塞 NIO异步 AIO
blocking阻塞并同步非阻塞但同步非阻塞并异步
线程数(server:client)|1:1|1:N0:N
复杂度简单较复杂复杂
吞吐量