标签:
netty是对Nio的一个封装,关于网络的所有操作都是通过事件的方式完成的。例如连接创建、read事件、write事件都是通过Nio来完成 的。那netty是怎么启动监听的呢? 在什么地方启动的呢?此处不为大家设置悬念,一次性告诉大家。通过循环扫描的方式来实现监听的。具体的方法类位于NioEventLoop的run方法中 (赶紧进去看看吧!! 浅显易懂)。
下面是netty的acceptor线程创建连接的代码。位于类NioEventLoop的processSelectedKey中(至于 processSelectedKey是怎么被调用的,自己看看调用链就行了(eclipse用ctrl+Shift+H就可以查看到选中方法的调用 链))。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
//得到当前的key关注的事件
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
//一个刚刚创建的NioServersocketChannel感兴趣的事件是0。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {//可以读取操作 --对于serverSocket来说就是acceptor事件、对于socketChannel来说就是read事件
//INFO: channel类型为io.netty.channel.socket.nio.NioSocketChannel unsafe类型为io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe
Object obj = k.attachment();//得到NioServerSocketChannel或者NioSocketChannel
if(obj instanceof NioServerSocketChannel){
System.out.println(obj.getClass().getName()+ " 开始接收连接");
}else{
System.out.println(obj.getClass().getName()+ " 开始接收字节");
}
//不同的socketChannel对于那个的unsafe是不同的。例如Server端的是messageUnsafe 而 clinet端是byteUnsafe
unsafe.read();//对于接受链接或者read兴趣都会添加进入read操作调用serverSocket->NioMessageUnsafe
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {//对于半包消息进行输出操作
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}
这里我们以Read事件的处理(NioByteUnsafe)为线索进行讲解。后续会有基于byte的unsafe进行讲解的(Unsafe不知道为啥要这 么叫,本人也感到挺费解的,不过现在看来感觉就是一个工具对象。不要从名称上惧怕它)。下面来看NioByteUnsafe(该类是AbstractNioByteChannel的一个内部类)的read方法进行讲 解。直接讲代码(后面也会有图形讲解,方便大家理解):
public void read() {
//得到config对象、pipeline对象
final ChannelConfig config = config();
//得到对应的管道对象
final ChannelPipeline pipeline = pipeline();
//实际的内存分配器---
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
//创建一个allocHandle对象--AdaptiveRecvByteBufAllocator
//RecvByteBufAllocator负责内存分配的算法问题
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
if (!config.isAutoRead()) {
removeReadOp();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int byteBufCapacity = allocHandle.guess();
int totalReadAmount = 0;
do {
//可能是 direct或者 heap 从与当前socket相关的allocator得到byteBuf数组
// byteBuf =allocHandle.allocate(allocator);
//每次从内核中读取数据netty都会分配内存
byteBuf = allocator.ioBuffer(byteBufCapacity);
//获得可以写入的容量的大小
int writable = byteBuf.writableBytes(); //分一个多大的内存就从socket中读取多大的数据
int localReadAmount = doReadBytes(byteBuf);//从socket中读取数据到bytebuf中
if (localReadAmount <= 0) {//发生了读取事件,但是读取的长度是负数,
// not was read release the buffer
byteBuf.release();//释放到Thread Cache中
close = localReadAmount < 0;//是否进行关闭,关键要看读取到的数据的长度是否为-1;
break;
}
//发起读取事件---如果是第一次积累数据的话,那么就会将当前的bytebuf作为累积对象,供继续使用
pipeline.fireChannelRead(byteBuf);
byteBuf = null;//由pipeline进行byteBuf的释放
//避免内存溢出,
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (localReadAmount < writable) {
// Read less than what the buffer can hold,
// which might mean we drained the recv buffer completely.
break;
}
} while (++ messages < maxMessagesPerRead);//每次读取的消息的数量都会有限制的,也就说,每次处理read事件的消息量是可以配置的
//读取完成---处理完一次 读取事件
pipeline.fireChannelReadComplete();
//对本次读取的数据量进行记录,便于下一次为当前的Channel分配合适大小的buffer
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
}
}
}
//上述代码段说明: /** config.getRecvByteBufAllocator().newHandle(); 负责内存分配算法 而 ByteBufAllocator 负责具体的内存分配-分配到堆还是直接内存 */
这就是对一个read的处理基本流程,就是将从socket中读取到的放入到分配器分配的bytebuf,然后将其传入到pipeline.fireChannelRead(byteBuf);中,至于在pipeline是怎样的传递的,我们从这个方法中是无法查看到的。这也是我们这篇文章的主要内容(别的内存也很重要哦!关键是我已经添加了很多注释了!)。就是要看看在得到bytebuf后,pipeline是怎么处理传入进去的bytebuf的。我们来对pipeline.fireChannelRead(byteBuf);穷追(ctrl+shift+H eclipse)到具体的实现,
我们发现,最终会调用到的ChannelHandler接口的
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
ChannelHandler有很多我们具体选用哪一个呢?动动脑子就知道,我们pipeline中存储的都是ChannelHandler,有哪些个Handler,就要看我们在启动代码中是怎样设置了。来看看我的启动代码(精简版,没有写全,所以这里看不懂得话,建议你写个Netty的小demo).上代码:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// .option(ChannelOption.ALLOCATOR, )//设置内存分配器
.option(ChannelOption.SO_SNDBUF, 1024)//发送缓冲器
.option(ChannelOption.SO_RCVBUF, 1024)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT)//接收缓冲器
.handler(new LoggingHandler(LogLevel.INFO))//serverSocketChannel对应的ChannelPipeline的handler
.childHandler(new ChannelInitializer<SocketChannel>() {//客户端新接入的连接socketChannel对应的ChannelPipeline的Handler
@Override
public void initChannel(SocketChannel ch) {
SocketChannelConfig config=ch.config();
ChannelPipeline p = ch.pipeline();
p
.addLast(new LineBasedFrameDecoder(30))//也会将回车符删除掉--是以换行符作为分隔的
.addLast(new DiscardServerHandler());
}
});
由此可以看到,这里第一个被调用的ChannelHandler是LineBasedFrameDecoder。看看LineBasedFrameDecoder是怎么实现ChannelRead方法的。翻看了弗雷之后,我们终于找到了channelRead方法。由此可以看到,在AbstractNioByteChannel的read方法中的pipeline.fireChannelRead(byteBuf);按照我的启动代码(虽然说是按照我的,但是按照你们的也是这样,因为byte在通过网络接收之后,都要进行decode,第一个经过的channelHandler肯定是ByteToMessageDecoder,不信,你看看自己的启动代码试试),最终调用的是ByteToMessageDecoder.channelRead() ,上代码:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
//缓冲区的大小没有超过需要写入的数据的大小
if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
expandCumulation(ctx, data.readableBytes());//扩展缓冲区--查看实现后,就是通过分配一个更大的,然后复制一下字节数据
}
cumulation.writeBytes(data);//将数据写入到积累对象中
data.release();//释放bytebuffer(heap或者direct)--通过引用的方式进行释放缓冲区(至于什么是引用方式释放,我们会有一个特定的章节进行讲解)
}
//收集完毕之后解析收集到的字符串---通常调用子类的方法实现,在具体实现中,用out来承载解析出来的msg
callDecode(ctx, cumulation, out);//实现的时候,不要释放我们的累积对象cumulation
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {//如果累积对象为null或者没有可读内容的话,那么就将累积对象释放掉(因为空了或者为null了)
cumulation.release();
cumulation = null;
}
int size = out.size();
decodeWasNull = size == 0;
//针对解析后的out结果中的msg的对象,将解析出来的message(具体的类型,请自己看实现.是怎样做的)传递到pipeline中。
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.get(i));
}
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
提示: 一个pipeline,为某个socketChannel所有,也就是说pipeline里的channelHandler,也是为某个socketchannel所享用的。不会出现多个线程共享一个channelHanler的情况(我们可以让他们共享一个handler,但是我们得保证这个共享的handler是一个无状态的handler,例如我们现在就要讲解的ByteToMessageDecoder就是一个有状态的handler,所以就不能共享,就要在每次初始化socketChannel的pipeline时,都要重新new一个ByteToMessageDecoder,不信大家,可以可以看一下ByteToMessageDecoder的实现。我直接粘贴代码吧!!(看看我的注释哦)如下:).
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
ByteBuf cumulation;//因为单词cumulation --累积 意思,也就是,这个成员对象,就是用来作为半包的累积存储的对象来使用的
private boolean singleDecode;
private boolean decodeWasNull;
private boolean first;
}
----------------------------------------------------------------------------------------------------------
讲到这里,会涉及到一个解析出来的message在被pipeline中的其它handler处理完毕后的内存释放问题。怎么解决? 什么时候释放这些message占用的空间呢?
标签:
原文地址:http://my.oschina.net/hotbain/blog/421150