BIO

Java中的BIO,即Blocking I/O(同步阻塞I/O),是传统的Java IO编程方式。它的特点是同步并阻塞,服务器实现模式为一个连接一个线程。也就是说,当客户端有连接请求时,服务器端会启动一个线程进行处理。如果这个连接在一段时间内没有进行数据读写操作,那么线程就会处于阻塞状态,等待数据的到来。这种方式在连接数较少且固定的场景下表现良好,但对于连接数较多或需要高并发的场景,BIO可能会导致线程资源的浪费和性能瓶颈。
在BIO的工作机制中,服务器端会启动一个ServerSocket,注册端口,并调用accept方法监听客户端的Socket连接。当客户端发起连接请求时,服务器端的accept方法会返回一个新的Socket对象,然后服务器端会为这个Socket对象创建一个新的线程来处理数据读写操作。

示例代码:
以下代码通过 ServerSocket 创建连接,当调用 serverSocket.accept() 方法时将会阻塞等待连接,这种情况下这个主线程只能接受一个连接

public class BIOServer {   
    public static void main(String[] args) {   
        try {   
            // 创建ServerSocket对象并监听指定端口  
            ServerSocket serverSocket = new ServerSocket(8080);  
            // 等待客户端连接  
            Socket socket = serverSocket.accept();  
            // 获取输入流,读取客户端发送的数据  
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));  
            System.out.println(in.readLine()); 
            // 获取输出流,向客户端发送数据  
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);  
            out.println("Hello from server!");  
            // 关闭资源  
            in.close();  
            out.close();  
            socket.close();  
            serverSocket.close();  
        }  catch (IOException e) {   
            e.printStackTrace();  
        }   
    }   
} 
public class BIOClient {   
    public static void main(String[] args) {   
        try {   
            // 创建Socket对象并连接到服务器  
            Socket socket = new Socket("localhost", 8080);  
            // 获取输出流,向服务器发送数据  
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            out.println("Hello from client!");  
            // 获取输入流,读取服务器返回的数据  
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));  
            String response = in.readLine();  
            System.out.println(response);
            // 关闭资源  
            in.close();  
            out.close();  
            socket.close();  
        }  catch (IOException e) {   
            e.printStackTrace();  
        }   
    }   
} 

修改:
如果想使用 bio 实现同时接收多个连接的话,则需要通过创建多个线程用于接收客户端连接

public class BioServer { 
    public static void main(String[] args) { 
        try { 
            ServerSocket serverSocket = new ServerSocket(8080);
            for (int i = 0; i < 2; i++) { 
                new Thread(() -> { 
                    try { 
                        Socket socket = serverSocket.accept();
                        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                        while (true) { 
                            String x = in.readLine();
                            if (x != null) { 
                                System.out.println(x);
                            } 
                        } 
                    }  catch (IOException e) { 
                        e.printStackTrace();
                    } 
                } ).start();
            } 
        }  catch (IOException e) { 
            e.printStackTrace();
        } 
    } 
} 
public class BioClient1 { 
    public static void main(String[] args) { 
        try { 
            Socket socket = new Socket("localhost", 8080);
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            for (int i = 0; i < 10; i++) { 
                out.println("[连接1]:Hello from client!");
                TimeUnit.SECONDS.sleep(2);
            } 
            out.close();
            socket.close();
        }  catch (IOException e) { 
            e.printStackTrace();
        }  catch (InterruptedException e) { 
            e.printStackTrace();
        } 
    } 
} 
public class BioClient2 { 
    public static void main(String[] args) { 
        try { 
            Socket socket = new Socket("localhost", 8080);
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            for (int i = 0; i < 10; i++) { 
                out.println("[连接2]:Hello from client!");
                TimeUnit.SECONDS.sleep(2);
            } 
            out.close();
            socket.close();
        }  catch (IOException e) { 
            e.printStackTrace();
        }  catch (InterruptedException e) { 
            e.printStackTrace();
        } 
    } 
} 

image.png
所以通过 bio 同样可以实现多个客户端建立长连接,但是这样的话一个客户端就需要占用服务端一个线程

NIO

Java的NIO(New I/O)是Java中的一个非阻塞I/O模型,也是I/O多路复用的基础。从JDK 1.4开始,Java提供了一系列改进的输入/输出处理的新特性,这些特性被统称为NIO。
NIO的主要特点包括:

  1. 非阻塞IO:与传统的IO流(如FileInputStream和FileOutputStream)不同,NIO的流是非阻塞的。这意味着,当线程调用读取或写入方法时,如果没有数据可读或无法写入数据,线程不会阻塞,而是可以继续执行其他任务。这大大提高了并发性和系统吞吐量。
  2. 通道(Channels)和缓冲区(Buffers):NIO使用通道和缓冲区来处理数据。通道是数据进出的通道,而缓冲区则用于存储数据。所有的I/O操作都是通过通道开始的,数据总是从通道中读入缓冲区,或者从缓冲区写入到通道中。
  3. 多路复用:NIO引入了选择器(Selector)的概念,它允许一个单独的线程监视多个通道,以查看哪些通道已经准备好进行读取或写入操作。这大大减少了线程的数量,提高了系统的可伸缩性。

NIO的应用场景非常广泛,包括网络编程、文件操作、数据库操作以及多线程数据同步等。在网络编程中,NIO的高并发性能使得它可以高效地处理大量的网络连接和I/O操作。在文件操作中,NIO提供了更高效的读写方式,如内存映射文件(MappedByteBuffer),比传统的InputStream和OutputStream更快、更可靠。
总之,Java的NIO是一个现代化、高效率的IO操作框架,适用于需要高效处理各种数据读写、具有高并发的网络/IO操作、高速光盘读取、大容量文件传输、多线程数据同步等场景。

示例代码:

  1. 创建一个 serveSocketChannel 通道

  2. 设置非阻塞模式并绑定端口

  3. 创建一个 selector

  4. 将 serverSocket 的 channel 注册到 selector 中,并指定监听类型

  5. selector.select() 方法会阻塞等待事件触发

  6. selector.selectedKeys() 会获取所有触发过的事件(不会自动清空)

  7. 当获取到连接时, 将客户端 socketChannel 注册到 selector 中

    public class NioServer { 
     public static void main(String[] args) throws IOException { 
         int port = 8080;
         // 打开一个ServerSocketChannel
         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
         // 设置为非阻塞模式
         serverSocketChannel.configureBlocking(false);
         // 绑定端口
         serverSocketChannel.bind(new InetSocketAddress(port));
         // 打开一个Selector
         Selector selector = Selector.open();
         // 将ServerSocketChannel注册到Selector上,并监听ACCEPT事件
         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
         System.out.println("Server started on port " + port);
         // 处理循环
         while (true) { 
             // 阻塞,等待至少一个通道就绪
             int readyChannels = selector.select();
             if (readyChannels == 0) { 
                 continue;
             } 
             // 获取就绪的通道集合
             Set<SelectionKey> selectedKeys = selector.selectedKeys();
             Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
    
             while (keyIterator.hasNext()) { 
                 SelectionKey key = keyIterator.next();
    
                 if (key.isAcceptable()) { 
                     // 处理新的连接
                     ServerSocketChannel server = (ServerSocketChannel) key.channel();
                     SocketChannel client = server.accept();
                     client.configureBlocking(false);
                     // 将客户端 socketChannel 注册到 selector 中
                     client.register(selector, SelectionKey.OP_READ);
                     System.out.println("Accepted connection from " + client);
                 }  else if (key.isReadable()) { 
                     // 处理读操作
                     SocketChannel client = (SocketChannel) key.channel();
                     CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
                     // 这里的缓冲区设置了 1024 字节,消息太大时将无法接收
                     ByteBuffer buffer = ByteBuffer.allocate(1024);
                     int bytesRead = client.read(buffer);
                     if (bytesRead == -1) { 
                         // 连接关闭
                         client.close();
                     }  else { 
                         buffer.flip();
                         CharBuffer charBuffer = decoder.decode(buffer);
                         String data = charBuffer.toString();
                         System.out.println(data);
                     } 
                 } 
    
                 // 从集合中移除已处理的键(注意清除的是 selector 的 selectedKeys 集合,防止重复处理 key)
                 keyIterator.remove();
             } 
         } 
     } 
    } 

netty

Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。
简单地说 Netty 封装了 JDK 的 NIO,不用再写一大堆复杂的代码。既然代替了原生的 NIO ,肯定有比它好的理由,主要有如下几点:

  1. Netty 底层IO模型可以随意切换,比如可以从NIO切换到BIO,但一般很少会这么做。
  2. Netty 自带拆包解包,从NIO各种繁复的细节中脱离出来,让开发者重点关心业务逻辑。
  3. Netty 解决了 NIO 中 Selector 空轮询 BUG,这个 BUG 应该很多人听说过,虽然官方声明 jdk1.6 的 update18 修复了该问题,只不过是降低了发生的概率。
  4. 对 Selector 做了很多细小的优化,reactor 线程模型能做到高效的并发处理。

参考文章:
Netty入门与实战教程 - LingBrown - 博客园

基本使用

导入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

服务端代码

在 xxl-job 框架中,就用到了 netty 监听指定端口,然后自定义处理器获取请求内容并处理,所以启动了 socketService 之后也是可以直接通过 http 请求的方式进行请求的
但是如果 Netty 服务器只是简单地作为一个 TCP 服务器,并没有实现 HTTP 协议的处理,那么浏览器发出的 HTTP 请求将不会被正确解析。浏览器期望收到符合 HTTP 协议的响应,而简单的 TCP 服务器可能只返回原始字节流,这不会被浏览器识别为有效的 HTTP 响应

public class NettyServer { 

    public static void main(String[] args) throws Exception { 
        // 配置服务器的 NIO 线程组
        // 用于接收客户端的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 用于处理已经被接收的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try { 
            // 创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 设置两个线程组
            bootstrap.group(bossGroup, workerGroup)
                    // 使用 NIO 的ServerSocketChannel作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    // 设置线程队列等待连接的个数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 创建一个通道初始化对象(匿名内部类)
                    .childHandler(new ChannelInitializer<SocketChannel>() { 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception { 
                            // 给 pipeline 设置处理器
                            // 打印日志,方便进行网络调试
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                            // 将接收到的对象转成字符串
                            ch.pipeline().addLast(new StringDecoder());
                            // 将发送对象转成字符串
                            ch.pipeline().addLast(new StringEncoder());
                            // 自定义的处理器  
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { 
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, String msg) { 
                                    System.out.println("Server received: " + msg);
                                    ctx.writeAndFlush("Server response: " + msg);
                                } 
                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
                                    cause.printStackTrace();
                                    ctx.close();
                                } 
                            } );
                        } 
                    } );
            // 绑定端口,同步等待成功
            ChannelFuture future = bootstrap.bind(8080).sync();
            // 对关闭通道进行监听
            future.channel().closeFuture().sync();
        }  finally { 
            // 优雅地关闭所有的 EventLoopGroup,释放所有的资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        } 
    } 
} 

这里是一些关键部分的解释:

  • EventLoopGroup: Netty 用于处理 I/O 操作的多线程事件循环。bossGroup 用于接收传入的连接,workerGroup 用于处理已经被接受的连接。
  • ServerBootstrap: Netty 用于设置服务器的引导类,它提供了一系列配置方法,用于设置服务器的参数。
  • channel(NioServerSocketChannel.class): 指定服务器通道的实现类,这里使用 NIO 的实现。
  • option(ChannelOption.SO_BACKLOG, 128): 设置 TCP 参数,SO_BACKLOG 是操作系统用于 TCP 连接队列的长度。
  • childHandler(…): 为新接受的连接设置处理器。在这里,我们添加了一个日志处理器、字符串解码器、字符串编码器和自定义的业务处理器。
  • addLast(…): 向 ChannelPipeline 中添加处理器。ChannelPipeline 是 Netty 中处理网络事件的关键组件,它管理了一个处理器链,用于处理入站和出站数据以及事件。
  • bind(8080).sync(): 绑定服务器到指定端口(这里是 8080),并同步等待操作完成。
  • closeFuture().sync(): 等待服务器套接字关闭。这是一种阻塞调用,它会一直等待直到服务器通道关闭。
  • shutdownGracefully(): 优雅地关闭 EventLoopGroup,释放所有资源。

请注意,代码示例中还需要一个 NettyServerHandler 类,该类需要继承 ChannelInboundHandlerAdapter 或其他 ChannelHandler 实现,并处理服务器的业务逻辑,例如接收消息、发送响应等。在上面的代码中,NettyServerHandler 被添加到了 ChannelPipeline 中。

客户端代码

public class NettyClient { 

    private final String host;
    private final int port;

    public NettyClient(String host, int port) { 
        this.host = host;
        this.port = port;
    } 

    public void start() throws InterruptedException { 
        // 创建客户端事件循环组  
        EventLoopGroup group = new NioEventLoopGroup();

        try { 
            // 创建客户端启动辅助类  
            Bootstrap bootstrap = new Bootstrap();

            // 设置事件循环组  
            bootstrap.group(group)
                    // 设置通道类型为NioSocketChannel  
                    .channel(NioSocketChannel.class)
                    // 设置日志处理器,用于输出日志信息,便于调试  
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // 添加自定义的处理器  
                    .handler(new ChannelInitializer<SocketChannel>() { 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception { 
                            // 添加字符串编解码器  
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            // 添加自定义的业务处理器  
                            ch.pipeline().addLast(new MyClientHandler());
                        } 
                    } );

            // 连接到服务器  
            ChannelFuture future = bootstrap.connect(host, port).sync();

            // 等待连接关闭  
            future.channel().closeFuture().sync();
        }  finally { 
            // 关闭事件循环组  
            group.shutdownGracefully();
        } 
    } 

    /**
     * 自定义的业务处理器,用于处理接收到的消息和发送消息等操作  
     */
    private static class MyClientHandler extends SimpleChannelInboundHandler<String> { 

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { 
            // 处理接收到的消息  
            System.out.println("Received from server: " + msg);
        } 

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
            // 处理异常  
            cause.printStackTrace();
            ctx.close();
        } 

        // 发送消息到服务器  
        public void sendMessage(ChannelHandlerContext ctx, String msg) { 
            ctx.writeAndFlush(msg);
        } 
    } 

    public static void main(String[] args) throws InterruptedException { 
        // 创建Netty客户端并启动  
        new NettyClient("localhost", 8080).start();
    } 

} 

Http 服务器

在自定义处理器的实现接口中,指定类型为 FullHttpRequest 即可接收和返回 http 响应

public class NettyServer { 

    public static void main(String[] args) throws Exception { 
        // 配置服务器的 NIO 线程组
        // 用于接收客户端的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 用于处理已经被接收的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try { 
            // 创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 设置两个线程组
            bootstrap.group(bossGroup, workerGroup)
                    // 使用 NIO 的ServerSocketChannel作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    // 设置线程队列等待连接的个数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 创建一个通道初始化对象(匿名内部类)
                    .childHandler(new ChannelInitializer<SocketChannel>() { 
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception { 
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加 HTTP 编码器和解码器
                            pipeline.addLast(new HttpServerCodec());
                            // 添加 ChunkedWriteHandler 来支持大文件的流式传输
                            pipeline.addLast(new ChunkedWriteHandler());
                            // 添加聚合器,将多个消息转换成单一的 FullHttpRequest 或 LastHttpContent
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            // 添加自定义的处理器
                            pipeline.addLast(new SimpleChannelInboundHandler<FullHttpRequest>() { 
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { 
                                    // 这里处理 HTTP 请求
                                    FullHttpResponse response = new DefaultFullHttpResponse(
                                            HttpVersion.HTTP_1_1,
                                            HttpResponseStatus.OK,
                                            Unpooled.copiedBuffer("Hello, World!", CharsetUtil.UTF_8)
                                    );
                                    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
                                    response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
                                    // 发送响应
                                    ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
                                } 

                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
                                    cause.printStackTrace();
                                    ctx.close();
                                } 
                            } );
                        } 
                    } );
            // 绑定端口,同步等待成功
            ChannelFuture future = bootstrap.bind(8080).sync();
            // 对关闭通道进行监听
            future.channel().closeFuture().sync();
        }  finally { 
            // 优雅地关闭所有的 EventLoopGroup,释放所有的资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        } 
    } 
} 

image.png