一、netty简介
Netty的主要目的是基于NIO构建具有网络和业务逻辑组件的分离和松耦合的高性能协议服务器。它可以实现多种协议,例如HTTP或你自己的特定协议。
Netty有一系列丰富的特性:
二、netty核心概念
Netty是一个非阻塞框架。与阻塞IO相比,这导致高吞吐量。了解无阻塞IO对于了解Netty的核心组件及其关系至关重要。
三、netty核心组件
1.Channel
Channel是Java NIO的基础。它表示一个开放的连接,能够执行IO操作,例如读取和写入。简单的说,Channel 就是代表连接,实体之间的连接,程序之间的连接,文件之间的连接,设备之间的连接。同时它也是数据入站和出站的载体。
2.Future
Netty 通道中的每个IO操作都是非阻塞的。这意味着调用后立即返回所有操作。标准Java库中有一个Future接口,但是对于Netty而言并不方便-我们只能向Future询问操作的完成情况,或在操作完成之前阻塞当前线程。这就是Netty拥有自己的ChannelFuture接口的原因。我们可以将回调传递给ChannelFuture,该回调将在操作完成时被调用。
3.EventLoop 和 EventLoopGroup
既然有了 Channel 连接服务,让信息之间可以流动。如果服务发出的消息称作“出站”消息,服务接受的消息称作“入站”消息。那么消息的“出站”/“入站”就会产生事件(Event)。例如:连接已激活;数据读取;用户事件;异常事件;打开链接;关闭链接等等。
在netty中一个Channel都会分配一个EventLoop,一个EventLoop可以服务于多个Channel。
EventLoopGroup,可以理解为EventLoop组,一个EventLoopGroup包含了多个EventLoop。
4.Handlers
Netty提供了ChannelHandler实现的巨大层次结构。值得注意的是适配器只是空的实现,例如
ChannelInboundHandlerAdapter和
ChannelOutboundHandlerAdapter。当我们只需要处理所有事件的子集时,可以扩展这些适配器。而且,有许多特定协议(例如HTTP)的实现,例如HttpRequestDecoder,HttpResponseEncoder,HttpObjectAggregator。
5.Encoders and Decoders
在使用网络协议时,我们需要执行数据序列化和反序列化。为此,Netty 为能够解码传入数据的解码器引入了ChannelInboundHandler的特殊扩展。大多数解码器的基类是ByteToMessageDecoder。
为了对传出的数据进行编码,Netty具有ChannelOutboundHandler的扩展,称为编码器。MessageToByteEncoder是大多数编码器实现的基础。我们可以使用编码器和解码器将消息从字节序列转换为Java对象,反之亦然。
四,netty多通道
上面我们对netty做了简单介绍及了解了netty相关的核心组件,下面我们讨论下netty多通道的实现。
一般我们通过 handler() 或childHandler() 都只添加了一个 Channel通道,对于复杂的应用单通道可能无法满足要求,这里我通过示例介绍一种多通道的实现方式:
1.Netty服务端:
nettyServer.java
@Component
public class NettyServer {
/**netty主机*/
@Value("${rzt.inquiry.netty-host}")
private String nettyHost;
/**netty端口*/
@Value("${rzt.inquiry.netty-port}")
private Integer nettyPort;
/**netty最大长度*/
@Value("${rzt.inquiry.netty-maxlength}")
private Integer nettyMaxLength;
/**通过nio方式来接收连接和处理连接*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**worker*/
private EventLoopGroup work = new NioEventLoopGroup();
/**创建bootstrap*/
private ServerBootstrap bootstrap = new ServerBootstrap();
/**通道适配器*/
@Resource
private NettyServerHandler nettyServerHandler;
/**NETTY编码格式*/
public final String CHARSET = "UTF-8";
/**
* 启动netty
*/
public void start() {
try {
bootstrap.group(boss, work);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 100);
bootstrap.handler(new LoggingHandler(LogLevel.INFO));
/**设置过滤器(设置事件处理)*/
bootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline cp = ch.pipeline();
//1. 添加心跳支持(每隔指定时间来检查一下channelRead方法被调用的情况,如果在指定时间内该链上的channelRead方法都没有被触发,就会调用userEventTriggered)
cp.addLast(new IdleStateHandler(0, 0, 0, TimeUnit.DAYS));
/**设置已@CRT为分隔符*/
ByteBuf delimiter = Unpooled.copiedBuffer(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER.getBytes());
cp.addLast("framer",
new DelimiterBasedFrameDecoder(nettyMaxLength * nettyMaxLength, false, delimiter));
/**解码器*/
cp.addLast("decoder", new StringDecoder(Charset.forName(CHARSET)));
/**编码器*/
cp.addLast("encoder", new StringEncoder(Charset.forName(CHARSET)));
cp.addLast(new DefaultEventExecutorGroup(8));
/**服务端业务逻辑*/
cp.addLast("handler", nettyServerHandler);
}
});
/**服务端绑定IP及端口监听*/
MySlf4j.textInfo("netty服务器在[{0}]端口启动监听", nettyPort);
ChannelFuture futrue = bootstrap.bind(nettyHost, nettyPort).sync();
/**监听服务器关闭监听*/
futrue.channel().closeFuture().sync();
} catch (InterruptedException ex) {
MySlf4j.textError("netty服务端启动异常{0}", MySlf4j.ExceptionToString(ex));
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
/**
* 关闭服务器方法
*/
@PreDestroy
public void close() {
MySlf4j.textInfo("关闭netty服务端....");
//优雅退出
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
NettyServerHandler.java
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler {
@Autowired
private ICreditInquiryService creditInquiryService;
/**释放通道数*/
private int lossConnectCount = 0;
/**
* 服务端消息处理
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
msg = msg.replaceAll(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER, "");
if (msg instanceof String) {
if ("ping-pong-ping-pong".equals(msg)) {
MySlf4j.textInfo("[心跳监测] {0}:通道活跃", channelHandlerContext.channel().id());
lossConnectCount = 0;
return;
}
}
..........
//接受netty客户端请求数据,并响应
channelFuture = channelHandlerContext.writeAndFlush(responseBody);
}
/**
* 触发器
*/
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object evt) throws Exception {
MySlf4j.textInfo("[已经有xx小时没有接收到客户端消息]");
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// READER_IDLE:一段时间内没有数据接收
if (idleStateEvent.state() == IdleState.READER_IDLE) {
lossConnectCount++;
if (lossConnectCount > 2) {
MySlf4j.textInfo("[释放不活跃通道] {0}", channelHandlerContext.channel().id());
channelHandlerContext.channel().close();
}
}
} else {
super.userEventTriggered(channelHandlerContext, evt);
}
}
}
2.netty客户端
nettyClient.java
@Component
public class NettyClient {
/**netty主机*/
@Value("${rzt.auth.netty-host}")
private String nettyHost;
/**netty端口*/
@Value("${rzt.auth.netty-port}")
private Integer nettyPort;
/**初始化重试次数*/
private static int retry = 0;
/**初始化Bootstrap实例*/
private Bootstrap bootstrap = new Bootstrap();
/** 工人线程组*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();
/**key为目标host,value为目标host的连接池*/
public static ChannelPoolMap poolMap;
@Autowired
private NettyChannelPoolHandler nettyChannelPoolHandler;
/**
* netty客户端启动
*/
public void init() {
MySlf4j.textInfo("netty客户端启动连接,host:{0},port:{1}", nettyHost, nettyPort);
bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true);
//1. 创建连接池map
poolMap = new AbstractChannelPoolMap() {
@Override
protected SimpleChannelPool newPool(InetSocketAddress inetSocketAddress) {
//maxConnections: 最大连接数,超过则进入pendingAcquireQueue等待获取连接
return new FixedChannelPool(bootstrap.remoteAddress(inetSocketAddress), nettyChannelPoolHandler,
20);
}
};
InetSocketAddress addr1 = new InetSocketAddress(nettyHost, nettyPort);
//创建4个channel连接发送消息
for (int i = 0; i < 4; i++) {
//2. 取出连接addr1地址的连接池
final SimpleChannelPool pool1 = poolMap.get(addr1);
//3. 获取一个连接
Future channelFuture = pool1.acquire();
channelFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
//连接地址1的某个channel
Channel ch = channelFuture.getNow();
//使用连接发送消息
ch.writeAndFlush("ping-pong-ping-pong".concat(CreditConstantParamUtils.CREDIT_NETTY_DELIMITER));
//释放连接,将连接放回连接池
pool1.release(ch);
}
}
});
}
}
}
NettyClientHandler.java
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler {
@Autowired
private IAnalysisWriteService analysisWriteService;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
MySlf4j.textInfo("[OVERLENGTH]客户端收到消息{0},消息长度:{1}", msg, msg.length());
/** 创建线程池 */
ExecutorService excutorService = Executors.newFixedThreadPool(5);
excutorService.execute(new Runnable() {
@Override
public void run() {
//通过线程池处理响应数据
.....
}
});
}
}
服务启动后,我们通过netstat -an就可以看到建立了4条Channel。