程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

高性能netty之多通道实现(多通道用户界面及交互技术)

balukai 2025-04-09 14:10:30 文章精选 9 ℃



一、netty简介

Netty的主要目的是基于NIO构建具有网络和业务逻辑组件的分离和松耦合的高性能协议服务器。它可以实现多种协议,例如HTTP或你自己的特定协议。

Netty有一系列丰富的特性:

  • 有一套统一的API来处理异步和同步编程模式
  • 使用非常灵活
  • 简单但却强大的线程机制
  • 业务组件分离方便重用
  • 极小的缩减不必要的Memory Copy
  • 二、netty核心概念

    Netty是一个非阻塞框架。与阻塞IO相比,这导致高吞吐量。了解无阻塞IO对于了解Netty的核心组件及其关系至关重要

    三、netty核心组件

    1.Channel

    Channel是Java NIO的基础。它表示一个开放的连接,能够执行IO操作,例如读取和写入。简单的说,Channel 就是代表连接,实体之间的连接,程序之间的连接,文件之间的连接,设备之间的连接。同时它也是数据入站和出站的载体。

    2.Future

    Netty 通道中的每个IO操作都是非阻塞的。这意味着调用后立即返回所有操作。标准Java库中有一个Future接口,但是对于Netty而言并不方便-我们只能向Future询问操作的完成情况,或在操作完成之前阻塞当前线程。这就是Netty拥有自己的ChannelFuture接口的原因。我们可以将回调传递给ChannelFuture,该回调将在操作完成时被调用。

    3.EventLoop 和 EventLoopGroup

    既然有了 Channel 连接服务,让信息之间可以流动。如果服务发出的消息称作“出站”消息,服务接受的消息称作“入站”消息。那么消息的“出站”/“入站”就会产生事件(Event)。例如:连接已激活;数据读取;用户事件;异常事件;打开链接;关闭链接等等。

    在netty中一个Channel都会分配一个EventLoop,一个EventLoop可以服务于多个Channel。

    EventLoopGroup,可以理解为EventLoop组,一个EventLoopGroup包含了多个EventLoop。

    4.Handlers

    Netty提供了ChannelHandler实现的巨大层次结构。值得注意的是适配器只是空的实现,例如
    ChannelInboundHandlerAdapter和
    ChannelOutboundHandlerAdapter。当我们只需要处理所有事件的子集时,可以扩展这些适配器。而且,有许多特定协议(例如HTTP)的实现,例如HttpRequestDecoder,HttpResponseEncoder,HttpObjectAggregator。

    5.Encoders and Decoders

    在使用网络协议时,我们需要执行数据序列化和反序列化。为此,Netty 为能够解码传入数据的解码器引入了ChannelInboundHandler的特殊扩展。大多数解码器的基类是ByteToMessageDecoder。

    为了对传出的数据进行编码,Netty具有ChannelOutboundHandler的扩展,称为编码器。MessageToByteEncoder是大多数编码器实现的基础。我们可以使用编码器和解码器将消息从字节序列转换为Java对象,反之亦然。

    四,netty多通道

    上面我们对netty做了简单介绍及了解了netty相关的核心组件,下面我们讨论下netty多通道的实现。

    一般我们通过 handler() 或childHandler() 都只添加了一个 Channel通道,对于复杂的应用单通道可能无法满足要求,这里我通过示例介绍一种多通道的实现方式:

    1.Netty服务端:
    nettyServer.java

    @Component
    public class NettyServer {
    
    /**netty主机*/
    @Value("${rzt.inquiry.netty-host}")
    private String nettyHost;
    /**netty端口*/
    @Value("${rzt.inquiry.netty-port}")
    private Integer nettyPort;
    /**netty最大长度*/
    @Value("${rzt.inquiry.netty-maxlength}")
    private Integer nettyMaxLength;
    
    /**通过nio方式来接收连接和处理连接*/
    private EventLoopGroup boss = new NioEventLoopGroup();
    
    /**worker*/
    private EventLoopGroup work = new NioEventLoopGroup();
    
    /**创建bootstrap*/
    private ServerBootstrap bootstrap = new ServerBootstrap();
    
    /**通道适配器*/
    @Resource
    private NettyServerHandler nettyServerHandler;
    
    /**NETTY编码格式*/
    public final String CHARSET = "UTF-8";
    
    /**
    * 启动netty
    */
    public void start() {
    try {
    bootstrap.group(boss, work);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.option(ChannelOption.SO_BACKLOG, 100);
    bootstrap.handler(new LoggingHandler(LogLevel.INFO));
    
    /**设置过滤器(设置事件处理)*/
    bootstrap.childHandler(new ChannelInitializer() {
    @Override
    protected void initChannel(SocketChannel ch) {
    ChannelPipeline cp = ch.pipeline();
    //1. 添加心跳支持(每隔指定时间来检查一下channelRead方法被调用的情况,如果在指定时间内该链上的channelRead方法都没有被触发,就会调用userEventTriggered)
    cp.addLast(new IdleStateHandler(0, 0, 0, TimeUnit.DAYS));
    /**设置已@CRT为分隔符*/
    ByteBuf delimiter = Unpooled.copiedBuffer(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER.getBytes());
    cp.addLast("framer",
    new DelimiterBasedFrameDecoder(nettyMaxLength * nettyMaxLength, false, delimiter));
    /**解码器*/
    cp.addLast("decoder", new StringDecoder(Charset.forName(CHARSET)));
    /**编码器*/
    cp.addLast("encoder", new StringEncoder(Charset.forName(CHARSET)));
    cp.addLast(new DefaultEventExecutorGroup(8));
    /**服务端业务逻辑*/
    cp.addLast("handler", nettyServerHandler);
    
    }
    });
    /**服务端绑定IP及端口监听*/
    MySlf4j.textInfo("netty服务器在[{0}]端口启动监听", nettyPort);
    ChannelFuture futrue = bootstrap.bind(nettyHost, nettyPort).sync();
    /**监听服务器关闭监听*/
    futrue.channel().closeFuture().sync();
    } catch (InterruptedException ex) {
    MySlf4j.textError("netty服务端启动异常{0}", MySlf4j.ExceptionToString(ex));
    boss.shutdownGracefully();
    work.shutdownGracefully();
    }
    }
    
    /**
    * 关闭服务器方法
    */
    @PreDestroy
    public void close() {
    MySlf4j.textInfo("关闭netty服务端....");
    //优雅退出
    boss.shutdownGracefully();
    work.shutdownGracefully();
    }
    
    }

    NettyServerHandler.java

    @Component
    @ChannelHandler.Sharable
    public class NettyServerHandler extends SimpleChannelInboundHandler {
    
    @Autowired
    private ICreditInquiryService creditInquiryService;
    
    /**释放通道数*/
    private int lossConnectCount = 0;
    
    /**
    * 服务端消息处理
    */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
    msg = msg.replaceAll(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER, "");
    if (msg instanceof String) {
    if ("ping-pong-ping-pong".equals(msg)) {
    MySlf4j.textInfo("[心跳监测] {0}:通道活跃", channelHandlerContext.channel().id());
    lossConnectCount = 0;
    return;
    }
    }
    ..........
    //接受netty客户端请求数据,并响应
    channelFuture = channelHandlerContext.writeAndFlush(responseBody);
    }
    
    /**
    * 触发器
    */
    @Override
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object evt) throws Exception {
    MySlf4j.textInfo("[已经有xx小时没有接收到客户端消息]");
    if (evt instanceof IdleStateEvent) {
    IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
    // READER_IDLE:一段时间内没有数据接收
    if (idleStateEvent.state() == IdleState.READER_IDLE) {
    lossConnectCount++;
    if (lossConnectCount > 2) {
    MySlf4j.textInfo("[释放不活跃通道] {0}", channelHandlerContext.channel().id());
    channelHandlerContext.channel().close();
    }
    }
    } else {
    super.userEventTriggered(channelHandlerContext, evt);
    }
    }
    
    }

    2.netty客户端

    nettyClient.java

    @Component
    public class NettyClient {
    
    /**netty主机*/
    @Value("${rzt.auth.netty-host}")
    private String nettyHost;
    /**netty端口*/
    @Value("${rzt.auth.netty-port}")
    private Integer nettyPort;
    
    /**初始化重试次数*/
    private static int retry = 0;
    
    /**初始化Bootstrap实例*/
    private Bootstrap bootstrap = new Bootstrap();
    
    /** 工人线程组*/
    private EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    /**key为目标host,value为目标host的连接池*/
    public static ChannelPoolMap poolMap;
    
    @Autowired
    private NettyChannelPoolHandler nettyChannelPoolHandler;
    
    /**
    * netty客户端启动
    */
    public void init() {
    MySlf4j.textInfo("netty客户端启动连接,host:{0},port:{1}", nettyHost, nettyPort);
    bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
    .option(ChannelOption.SO_KEEPALIVE, true);
    //1. 创建连接池map
    poolMap = new AbstractChannelPoolMap() {
    @Override
    protected SimpleChannelPool newPool(InetSocketAddress inetSocketAddress) {
    //maxConnections: 最大连接数,超过则进入pendingAcquireQueue等待获取连接
    return new FixedChannelPool(bootstrap.remoteAddress(inetSocketAddress), nettyChannelPoolHandler,
    20);
    }
    };
    
    InetSocketAddress addr1 = new InetSocketAddress(nettyHost, nettyPort);
    //创建4个channel连接发送消息
    for (int i = 0; i < 4; i++) {
    //2. 取出连接addr1地址的连接池
    final SimpleChannelPool pool1 = poolMap.get(addr1);
    
    //3. 获取一个连接
    Future channelFuture = pool1.acquire();
    channelFuture.addListener(new FutureListener() {
    @Override
    public void operationComplete(Future channelFuture) throws Exception {
    if (channelFuture.isSuccess()) {
    //连接地址1的某个channel
    Channel ch = channelFuture.getNow();
    //使用连接发送消息
    ch.writeAndFlush("ping-pong-ping-pong".concat(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER));
    //释放连接,将连接放回连接池
    pool1.release(ch);
    }
    }
    });
    }
    }
    }

    NettyClientHandler.java

    @Component
    @ChannelHandler.Sharable
    public class NettyClientHandler extends SimpleChannelInboundHandler {
    
    @Autowired
    private IAnalysisWriteService analysisWriteService;
    
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
    MySlf4j.textInfo("[OVERLENGTH]客户端收到消息{0},消息长度:{1}", msg, msg.length());
    /** 创建线程池 */
    ExecutorService excutorService = Executors.newFixedThreadPool(5);
    excutorService.execute(new Runnable() {
    @Override
    public void run() {
    //通过线程池处理响应数据
    .....
    }
    });
    }
    }

    服务启动后,我们通过netstat -an就可以看到建立了4条Channel。


    Tags:

    最近发表
    标签列表