码迷,mamicode.com
首页 > Web开发 > 详细

netty-read

时间:2017-08-26 23:36:30      阅读:415      评论:0      收藏:0      [点我收藏+]

标签:let   protected   cti   size   buffer   bootstra   abstract   cal   ota   

 

NioEventLoop是ServerSocketChannel和SocketChannel通用的EventLoop,从NioEventLoop的执行逻辑开始

 1 protected void run() {
 2   for (;;) {
 3     switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
 4       case SelectStrategy.CONTINUE:
 5         continue;
 6       case SelectStrategy.SELECT:
 7         select(wakenUp.getAndSet(false));
 8         if (wakenUp.get()) {
 9           selector.wakeup();
10         }
11       default:
12         // fallthrough
13     }
14 
15     cancelledKeys = 0;
16     needsToSelectAgain = false;
17     final int ioRatio = this.ioRatio;
18     if (ioRatio == 100) {
19       try {
20         //处理IO事件
21         processSelectedKeys();
22       } finally {
23         // Ensure we always run tasks.
24         //处理队列中任务
25         runAllTasks();
26       }
27     } else {
28       final long ioStartTime = System.nanoTime();
29       try {
30         processSelectedKeys();
31       } finally {
32         // Ensure we always run tasks.
33         final long ioTime = System.nanoTime() - ioStartTime;
34         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
35       }
36     }
37   }
38 }

 

 1 //eventLoop持有selector,可以得到selectedKeys
 2 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
 3 
 4   if (selectedKeys.isEmpty()) {
 5     return;
 6   }
 7 
 8   Iterator<SelectionKey> i = selectedKeys.iterator();
 9   for (;;) {
10     final SelectionKey k = i.next();
11     //java.nio.channels.ServerSocketChannel的attachment是io.netty.channel.socket.ServerSocketChannel,
12     //java.nio.channels.SocketChannel的attachment是io.netty.channel.socket.SocketChannel,
13     final Object a = k.attachment();
14     i.remove();
15 
16     if (a instanceof AbstractNioChannel) {
17       processSelectedKey(k, (AbstractNioChannel) a);
18     } else {
19       @SuppressWarnings("unchecked")
20       NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
21       processSelectedKey(k, task);
22     }
23 
24     if (!i.hasNext()) {
25       break;
26     }
27 
28     if (needsToSelectAgain) {
29       selectAgain();
30       selectedKeys = selector.selectedKeys();
31 
32       // Create the iterator again to avoid ConcurrentModificationException
33       if (selectedKeys.isEmpty()) {
34         break;
35       } else {
36         i = selectedKeys.iterator();
37       }
38     }
39   }
40 }

 

 1 //涵盖了SelectionKey.OP_CONNECT、SelectionKey.OP_WRITE、SelectionKey.OP_READ、SelectionKey.OP_ACCEPT四种事件
 2 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 3   final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
 4 
 5   int readyOps = k.readyOps();
 6 
 7   if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
 8     int ops = k.interestOps();
 9     ops &= ~SelectionKey.OP_CONNECT;
10     k.interestOps(ops);
11 
12     unsafe.finishConnect();
13   }
14 
15   if ((readyOps & SelectionKey.OP_WRITE) != 0) {
16     ch.unsafe().forceFlush();
17   }
18   if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
19       //重点关注    
20     unsafe.read();
21   }
22 }

 

下面分别对NioMessageUnsafe以及NioByteUnsafe的read操作进行分析

NioMessageUnsafe用于ServerSocketChannel,读取的是SocketChannel对象

 1 private final class NioMessageUnsafe extends AbstractNioUnsafe {
 2     //......
 3     
 4     public void read() {
 5       assert eventLoop().inEventLoop();
 6       final ChannelConfig config = config();
 7       final ChannelPipeline pipeline = pipeline();
 8       final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
 9       allocHandle.reset(config);
10 
11       boolean closed = false;
12       Throwable exception = null;
13 
14       try {
15         do {
16             //真正的读取操作
17           int localRead = doReadMessages(readBuf);
18           //读完了
19           if (localRead == 0) {
20             break;
21           }
22           //localRead<0说明对端关闭了
23           if (localRead < 0) {
24             closed = true;
25             break;
26           }
27 
28           allocHandle.incMessagesRead(localRead);
29         } while (allocHandle.continueReading());
30       } catch (Throwable t) {
31         exception = t;
32       }
33 
34       int size = readBuf.size();
35       for (int i = 0; i < size; i ++) {
36         readPending = false;
37         //发送给Handler处理,即回调Handler的channelRead。
38         //对于serverSocketChannel来说,readBuf中装的是SocketChannel,
39         //fireChannelRead就是发送给ServerBootstrapAcceptor,由ServerBootstrapAcceptor注册SocketChannel
40         pipeline.fireChannelRead(readBuf.get(i));
41       }
42       readBuf.clear();
43       allocHandle.readComplete();
44       //回调Handler的channelReadComplete
45       pipeline.fireChannelReadComplete();
46 
47       if (exception != null) {
48         closed = closeOnReadError(exception);
49             //回调Handler的ExceptionCaught
50         pipeline.fireExceptionCaught(exception);
51       }
52 
53       if (closed) {
54         inputShutdown = true;
55         if (isOpen()) {
56             close(voidPromise());
57         }
58       }
59     }
60     
61     //......
62 }

 

 1 protected int doReadMessages(List<Object> buf) throws Exception {
 2     //调用java.nio.channels.ServerSocketChannel.accept()
 3   SocketChannel ch = SocketUtils.accept(javaChannel());
 4 
 5   try {
 6     if (ch != null) {
 7       buf.add(new NioSocketChannel(this, ch));
 8       return 1;
 9     }
10   } catch (Throwable t) {
11     try {
12       ch.close();
13     } catch (Throwable t2) {
14       logger.warn("Failed to close a socket.", t2);
15     }
16   }
17   return 0;
18 }

 

NioByteUnsafe用于SocketChannel,读取的是字节序列

 1 protected class NioByteUnsafe extends AbstractNioUnsafe {
 2     //......
 3     public final void read() {
 4       final ChannelConfig config = config();
 5       final ChannelPipeline pipeline = pipeline();
 6       final ByteBufAllocator allocator = config.getAllocator();
 7       final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
 8       allocHandle.reset(config);
 9 
10       ByteBuf byteBuf = null;
11       boolean close = false;
12       try {
13         do {
14             //每次读取都重新分配Buffer
15           byteBuf = allocHandle.allocate(allocator);
16           //真正的读取操作
17           allocHandle.lastBytesRead(doReadBytes(byteBuf));
18           //读到0表示数据读完,-1表示对端已关闭
19           if (allocHandle.lastBytesRead() <= 0) {
20             // nothing was read. release the buffer.
21             byteBuf.release();
22             byteBuf = null;
23             //读到-1,表示对端已关闭
24             close = allocHandle.lastBytesRead() < 0;
25             break;
26           }
27 
28           allocHandle.incMessagesRead(1);
29           readPending = false;
30           //将读取到的数据发送给Handler,即回调Handler的ChannelRead
31           pipeline.fireChannelRead(byteBuf);
32           byteBuf = null;
33         } while (allocHandle.continueReading());
34             
35         allocHandle.readComplete();
36         //回调Handler的ChannelReadComplete
37         pipeline.fireChannelReadComplete();
38 
39         if (close) {
40           closeOnRead(pipeline);
41         }
42       } catch (Throwable t) {
43         handleReadException(pipeline, byteBuf, t, close, allocHandle);//会调用pipeline.fireChannelReadComplete和pipeline.fireExceptionCaught
44       }
45     }
46     //......
47 }

 

1 protected int doReadBytes(ByteBuf byteBuf) throws Exception {
2   final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
3   allocHandle.attemptedBytesRead(byteBuf.writableBytes());
4   //从java.nio.channels.SocketChannel中读取allocHandle.attemptedBytesRead()个字节置byteBuf中
5   return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
6 }

 

netty-read

标签:let   protected   cti   size   buffer   bootstra   abstract   cal   ota   

原文地址:http://www.cnblogs.com/holoyong/p/7436612.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!