码迷,mamicode.com
首页 > Web开发 > 详细

netty简单NIO模型

时间:2019-02-20 17:07:21      阅读:154      评论:0      收藏:0      [点我收藏+]

标签:read   java   NPU   []   bad   相关   throw   conf   例子   



首先是使用java原生nio类库编写的例子,开发一套nio框架不简单,所以选择了netty,该例完成后,是netty举例。

 

package com.smkj.netty;

public class TimeServer {
    public static void main(String[] args) {
        int port = 8080;
        if(args!=null&&args.length!=0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //采用默认值
            }
        }
        
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
    }
}

 

package com.smkj.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {
    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;

    /**
     * 初始化多路复用器,绑定监听端口
     * 
     * @param port
     */
    public MultiplexerTimeServer(int port) {
        try {
            selector = Selector.open();
            servChannel = ServerSocketChannel.open();
            servChannel.configureBlocking(false);
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port:" + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key); // 处理key
                    } catch (Exception e) {
                        // TODO: handle exception
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
            }
        }
        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册,所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
            }
        }

    }

    private void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {
            // 处理新接入的请求消息
            if (key.isAcceptable()) {

                // Accept the new connection
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);

                // Add the new connection to the selector
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                // Read the Data
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("the time server receive order:" + body);

                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                            ? new java.util.Date(System.currentTimeMillis()).toString()
                            : "Bad Order";
                            doWrite(sc,currentTime);
                }else if(readBytes<0) {
                    //对端链路关闭
                    key.cancel();
                    sc.close();
                }else {
                    //读到0字节 忽略
                    ;
                }
            }
        }
    }
    
    private void doWrite(SocketChannel channel,String response) throws IOException {
        
        if(response!=null&&response.trim().length()>0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
               if (!writeBuffer.hasRemaining())

                    System.out.println("Send response to client succeed.");

                }
        }
        
}

 

package com.smkj.netty;

public class TimeClient {
public static void main(String[] args) {
    int port = 8080;
    if(args!=null&&args.length!=0) {
        try {
            port = Integer.valueOf(args[0]);
        } catch (NumberFormatException e) {
            //采用默认值
        }
    }
    
    new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();
}
}
package com.smkj.netty;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {

    private String host;

    private int port;

    private Selector selector;

    private SocketChannel socketChannel;

    private volatile boolean stop;

 
    public TimeClientHandle(String host, int port) {

    this.host = host == null ? "127.0.0.1" : host;

    this.port = port;

    try {

        selector = Selector.open();

        socketChannel = SocketChannel.open();

        socketChannel.configureBlocking(false);

    } catch (IOException e) {

        e.printStackTrace();

        System.exit(1);

    }
    }
    /*
022
     * (non-Javadoc)
023
     *
024
     * @see java.lang.Runnable#run()
025
     */
    @Override
    public void run() {
    try {
        doConnect();
    } catch (IOException e) {
        e.printStackTrace();

        System.exit(1);

    }

    while (!stop) {

        try {

        selector.select(1000);

        Set<SelectionKey> selectedKeys = selector.selectedKeys();

        Iterator<SelectionKey> it = selectedKeys.iterator();

        SelectionKey key = null;

        while (it.hasNext()) {

            key = it.next();

            it.remove();

            try {

            handleInput(key);

            } catch (Exception e) {

            if (key != null) {

                key.cancel();

                if (key.channel() != null)

                key.channel().close();

            }

            }

        }

        } catch (Exception e) {

        e.printStackTrace();

        System.exit(1);

        }

    }

 

    // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源

    if (selector != null)

        try {

        selector.close();

        } catch (IOException e) {

        e.printStackTrace();

        }

    }

 

    private void handleInput(SelectionKey key) throws IOException {

 

    if (key.isValid()) {

        // 判断是否连接成功

        SocketChannel sc = (SocketChannel) key.channel();

        if (key.isConnectable()) {

        if (sc.finishConnect()) {

            sc.register(selector, SelectionKey.OP_READ);

            doWrite(sc);

        } else

            System.exit(1);// 连接失败,进程退出

        }

        if (key.isReadable()) {

        ByteBuffer readBuffer = ByteBuffer.allocate(1024);

        int readBytes = sc.read(readBuffer);

        if (readBytes > 0) {

            readBuffer.flip();

            byte[] bytes = new byte[readBuffer.remaining()];

            readBuffer.get(bytes);

            String body = new String(bytes, "UTF-8");

            System.out.println("Now is : " + body);

            this.stop = true;

        } else if (readBytes < 0) {

            // 对端链路关闭

            key.cancel();

            sc.close();
    } else

            ; // 读到0字节,忽略

        }

    }

 

    }
    private void doConnect() throws IOException {

    // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答

    if (socketChannel.connect(new InetSocketAddress(host, port))) {

        socketChannel.register(selector, SelectionKey.OP_READ);

        doWrite(socketChannel);

    } else

        socketChannel.register(selector, SelectionKey.OP_CONNECT);

    }


    private void doWrite(SocketChannel sc) throws IOException {

    byte[] req = "QUERY TIME ORDER".getBytes();

    ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);

    writeBuffer.put(req);

    writeBuffer.flip();

    sc.write(writeBuffer);

    if (!writeBuffer.hasRemaining())

        System.out.println("Send order 2 server succeed.");

    }

}

可以发现服务端的最后进行了remove()操作,将SelectionKey从迭代器中删除了,博主一开始总觉得很纳闷,SelectionKey中可是记录了相关的channel信息,如果将SelectionKey删除了,那不就代表着将通道信息也抹除了吗,那后续还怎么继续获取通道,说来惭愧,这问题问的确实缺乏水准。

后来博主理了理selector的思路,要知道,一码事归一码事,channel是注册在selector中的,在后面的轮询中,是先将已准备好的channel挑选出来,即selector.select(),再通过selectedKeys()生成的一个SelectionKey迭代器进行轮询的,一次轮询会将这个迭代器中的每个SelectionKey都遍历一遍,每次访问后都remove()相应的SelectionKey,但是移除了selectedKeys中的SelectionKey不代表移除了selector中的channel信息(这点很重要),注册过的channel信息会以SelectionKey的形式存储在selector.keys()中,也就是说每次select()后的selectedKeys迭代器中是不能还有成员的,但keys()中的成员是不会被删除的(以此来记录channel信息)。

那么为什么要删除呢,要知道,迭代器如果只需要访问的话,直接访问就好了,完全没必要remove()其中的元素啊,查询了相关资料,一致的回答是为了防止重复处理(大雾),后来又有信息说明:每次循环调用remove()是因为selector不会自己从已选择集合中移除selectionKey实例,必须在处理完通道时自己移除,这样,在下次select时,会将这个就绪通道添加到已选择通道集合中,其实到这里就已经可以理解了,selector不会自己删除selectedKeys()集合中的selectionKey,那么如果不人工remove(),将导致下次select()的时候selectedKeys()中仍有上次轮询留下来的信息,这样必然会出现错误,假设这次轮询时该通道并没有准备好,却又由于上次轮询未被remove()的原因被认为已经准备好了,这样能不出错吗?

即selector.select()会将准备好的channel以SelectionKey的形式放置于selector的selectedKeys()中供使用者迭代,使用的过程中需将selectedKeys清空,这样下次selector.select()时就不会出现错误了。

netty简单NIO模型

标签:read   java   NPU   []   bad   相关   throw   conf   例子   

原文地址:https://www.cnblogs.com/fengwenzhee/p/10407587.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!