Netty框架搭建服务端

Netty框架搭建服务端

最近项目组来了新任务,要写一个socket服务端用于接收、处理设备上报的数据。捣鼓了好几天,从没接触过网络IO编程的我,又学到了新东西,下面把它记录下来。

首先理解概念:

Java 网络IO编程:BIO、NIO、AIO的区别

1、BIO编程

网络编程的基本模型是C/S模型,即两个进程间的通信。

服务端提供IP和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信。

传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。

简单的描述一下BIO的服务端通信模型:采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理没处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型。

该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,Java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就死掉了。

2、NIO 编程

NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。

新增的着两种通道都支持阻塞和非阻塞两种模式。

阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。

对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。

BIO与NIO一个比较重要的不同,是我们使用BIO的时候往往会引入多线程,每个连接一个单独的线程;而NIO则是使用单线程或者只使用少量的多线程,每个连接共用一个线程。

NIO的最重要的地方是当一个连接创建后,不需要对应一个线程,这个连接会被注册到多路复用器上面,所以所有的连接只需要一个线程就可以搞定,当这个线程中的多路复用器进行轮询的时候,发现连接上有请求的话,才开启一个线程进行处理,也就是一个请求一个线程模式。

3、AIO 编程

NIO 2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。

异步的套接字通道是真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。他不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。

与NIO不同,当进行读写操作时,AIO只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。

即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。 在JDK1.7中,这部分内容被称作NIO.2,主要在java.nio.channels包下增加了下面四个异步通道:

  1. AsynchronousSocketChannel
  2. AsynchronousServerSocketChannel
  3. AsynchronousFileChannel
  4. AsynchronousDatagramChannel

搞清楚了网络IO编程的概念后,接下来就是如何编码实现的问题。经了解发现在这方面已经有了很多很成熟的框架可以应用,例如:

  1. t-io
  2. Voovan
  3. Netty
  4. Mina

我选择使用Netty框架来完成这次的任务,下面上代码。

NIO服务端

package com.huaweisoft.service.netty1;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.Map;

/**
 * @Title: NioServer
 * @Description: nio服务端
 * @date 2020/10/29
 */
@Component
@Slf4j
public class NioServer {

    @Autowired
    private ServerChannelInitializer serverChannelInitializer;

    /**
     * // 从配置文件中(获取服务端监听端口号
     */
    @Value("${socket.port}")
    private Integer port;

    //配置服务端的NIO线程组
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    /**
     * 创建bootstrap
     */
    ServerBootstrap serverBootstrap = new ServerBootstrap();

    //连接map
    public static Map<String, ChannelHandlerContext> map = new HashMap<String, ChannelHandlerContext>();

    public void start() {
        try {
            serverBootstrap.group(bossGroup, workerGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // .option(ChannelOption.SO_KEEPALIVE, true) // 2小时无数据激活心跳机制
//                    .childHandler(new ServerChannelInitializer());
                    .childHandler(serverChannelInitializer);
            log.info("netty服务器在[{}]端口启动监听", port);
            // 服务器异步创建绑定
            ChannelFuture f = serverBootstrap.bind(port).sync();
            // 关闭服务器通道
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.info("[出现异常] 释放资源");
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 关闭服务器方法
     */
    @PreDestroy
    public void close() {
        log.info("关闭服务器....");
        //优雅退出
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

服务业务实现

package com.huaweisoft.service.netty1;

import cn.hutool.core.date.DateUtil;
import com.huaweisoft.dao.SocketMapper;
import com.huaweisoft.dto.RiverDataDTO;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Title: NioServerHandler
 * @Description: 服务业务实现
 * @date 2020/10/29
 */

@Service
@Slf4j
@ChannelHandler.Sharable
public class NioServerHandler extends SimpleChannelInboundHandler<Object> {

    @Autowired
    private SocketMapper mapper;

    /**
     * 读取客户端发来的数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        String data = msg.toString();
        if (data != null) {
            log.info("服务端接收到的数据为:" + data);
            // 解析数据并插入数据库
            parseData(data);
        } else {
            //logger.info(""+"不是监测数据"+ msg.toString()+" "+ DateUtils.dateToString(new Date()));
        }
        ctx.writeAndFlush(DateUtil.date(new Date()).toString());
//        ctx.flush();
    }

    private void parseData(String dataSource) throws RuntimeException {
        try {
            String[] strs = dataSource.split(" ");
            SimpleDateFormat sdf = new SimpleDateFormat("yyMMddHHmmss");
            String flag = strs[0].replace("*", "");
            switch (flag) {
                case "SW":
                    RiverDataDTO dto = new RiverDataDTO();
                    dto.setStcd(strs[1].substring(0, strs[1].length() - 2))
                            .setExkey(strs[1].substring(strs[1].length() - 2))
                            .setTm(sdf.parse(strs[2])).setUpz(new BigDecimal(strs[4]))
                            .setRdtm(new Date());
                    log.info("解析数据:" + dto.toString());
                    mapper.insertRiverData(dto);
                    log.info("插入数据库成功!");
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            log.error("系统出错:" + e);
            throw new RuntimeException("系统出错:" + e);
        }
    }

    /**
     * 读取完毕客户端发送过来的数据之后的操作
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("服务端接收数据完毕..");
//        ctx.channel().write(DateUtil.date(new Date()).toString());
//        ctx.channel().flush();
        ctx.flush();
    }

    /**
     * 客户端主动断开服务端的链接,关闭流
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("链接断开:{}", ctx.channel().remoteAddress());
        removeChannnelMap(ctx);
        // 关闭流
        ctx.close();
    }

    /**
     * 客户端主动连接服务端
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("链接创建:{}", ctx.channel().remoteAddress());
        ctx.writeAndFlush(DateUtil.date(new Date()).toString());
        super.channelActive(ctx);
    }

    /**
     * 发生异常处理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        log.error("连接异常,连接异常:" + DateUtil.formatDate(new Date()) + cause.getMessage(), cause);
        ctx.fireExceptionCaught(cause);
        removeChannnelMap(ctx);
        ctx.close();
    }

    /**
     * 删除map中ChannelHandlerContext
     */
    private void removeChannnelMap(ChannelHandlerContext ctx) {
        for (String key : NioServer.map.keySet()) {
            if (NioServer.map.get(key) != null && NioServer.map.get(key).equals(ctx)) {
                NioServer.map.remove(key);
            }
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE))
                ctx.close();
            //标志该链接已经close 了 
        }
    }

}

服务器Channel通道初始化设置

package com.huaweisoft.service.netty1;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @Title: NioServerHandler
 * @Description: 服务器Channel通道初始化设置
 * @date 2020/10/29
 */
@Component
@ChannelHandler.Sharable
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Autowired
    private NioServerHandler nioServerHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
 
          /* LineBasedFrameDecoder的工作原理是:依次遍历ByteBuf中的可读字节,
        判断看其是否有”\n” 或 “\r\n”, 如果有就以此位置为结束位置。
        从可读索引到结束位置的区间的字节就组成了一行。 它是以换行符为结束标志的解码器,
        支持携带结束符和不带结束符两种解码方式,同时支持配置单行的最大长度,
        如果读到了最大长度之后仍然没有发现换行符,则抛出异常,同时忽略掉之前读到的异常码流。*/
        pipeline.addLast(new LineBasedFrameDecoder(10010));
        //字符串解码和编码
        //LineBasedFrameDecoder + StringDecoder 就是一个按行切换的文本解码器。
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
        //服务器的逻辑
        pipeline.addLast("handler", nioServerHandler);
    }
}

整合SpringBoot

springboot启动类代码,用于服务启动时自动初始化Socket服务端:

package com.huaweisoft;

import com.huaweisoft.service.netty1.NioServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan("com.huaweisoft.*")
public class Application implements CommandLineRunner {

    @Autowired
    private NioServer server;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        server.start();
    }
}

启动服务,测试,大功告成!

image.png

总结

网络IO编程看似简单,但其中要考虑的东西并不少,多线程、同步/异步、阻塞/非阻塞等。我也只是刚学会如何应用,对底层的逻辑还不太了解,有时间一定得研究框架的源码。

(完)

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://www.keikei.vip/archives/netty框架搭建服务端