码迷,mamicode.com
首页 > Web开发 > 详细

netty服务端的创建

时间:2019-11-11 23:06:10      阅读:173      评论:0      收藏:0      [点我收藏+]

标签:on()   this   lock   rri   called   execution   outbound   exec   opera   

就以netty-example中的EchoServer这个经典例子作为楔子吧



// 创建bossGroup和workerGroup,reactor模式的实现
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
// 在启动引导类ServerBootstrap中配置各种serverSocketChannel和socketChannel属性
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.attr(AttributeKey.newInstance("attr"), "attr")
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new channelInitializer {
@Override
public void initChannel(SocketChannel ch)throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
})
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttr");

// bind方法开始启动服务端
ChannelFuture f = b.bind(PORT).sync();

// 在sync方法中main线程阻塞直到服务器关闭
f.channel().closeFuture().sync();


首先看下EventLoopGroup的构造


public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}

public NioEventLoopGroup(int nThreads, Executor executor){
    this(nThreads, executor, SelectorProvider.provider());
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                         final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

在越过一层层的重载方法时,我们注意到bossGroup设置了线程数、生产线程的工厂类型、selectorProvider和selector策略工厂,
线程池拒绝策略等。尤其要注意线程数,当调用默认构造函数时,会传入DEFAULT_EVENT_LOOP_THREADS,它的默认值是2倍CPU可运行核心数。
之后来到真正的NioeventLoopGroup的构造函数



protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

    // 1:创建一个线程池, 线程池工厂主要设置了线程池内线程的前缀名
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 2:将创建eventLoop,并和线程池内线程一一对应
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    // 3:设置NioEventLoop的轮训方式
    chooser = chooserFactory.newChooser(children);

    // 4:为每个nviEventLoop添加一个操作完成的监听器
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

我们进入第二个标注即创建nioEventLoop的地方,并着重看下openSelector()方法



private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 未包装的selector,即jdkNio的selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

    // 不开启selectionKeySet优化,则直接返回包装过的selector
    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                        "sun.nio.ch.SelectorImpl",
                        false,
                        PlatformDependent.getSystemClassLoader());
            } catch (Throwable cause) {
                return cause;
            }
        }
    });

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    // 优化过后的SelectionKeySet,底层是用array代替了set,并通过反射的方式对Selector来了一招偷梁换柱
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                ReflectionUtil.trySetAccessible(selectedKeysField, true);
                ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);

                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException | IllegalAccessException e) {
                return e;
            }
        }
    });

    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

group至此就创建完毕, bossGroup和workGroup除了线程数不一样外,其他地方均一致。

之后进入ServerBootstrap的配置环节



// 在group方法中设置bossGroup为parentGroup,workGroup为childGroupo
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
ObjectUtil.checkNotNull(childGroup, "childGroup");
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
//channel方法中设置了一个工厂,该工厂生产的对象是设置进来的泛型,结合工厂的名字(反射)和属性(构造函数),
//可以猜测该工厂是利用反射+泛型+工厂模式来生产设置进来的serverSocketChannel
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}

public ReflectiveChannelFactory(Class<? extends T> clazz) {
    ObjectUtil.checkNotNull(clazz, "clazz");
    try {
        this.constructor = clazz.getConstructor();
    } catch (NoSuchMethodException e) {
        throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                " does not have a public non-arg constructor", e);
    }
}


除此之外还设置了handler、childHandler、option、childOption、attr、childAttr6个属性,根据group的命名规则,
可以猜测不带child的属性是给bossGroup内的nioEventLoop使用,而以child开头是给workGroupo内的nioEventLoop使用的

当serverBootstrap配置完后,开始服务端真正的启动工作。进入b.bind(),一路跳转到AbstractBootstrap.doBind(SocketAddress localAddress)方法。



private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();

    //若已经注册完成,则开始绑定,否则添加一个监听器,待注册完成后绑定
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener((ChannelFutureListener) future ->
                doBind0(regFuture, channel, localAddress, promise));
        return promise;
    }
}

先看看initAndRegister()方法,首先是创建NioServerSocketChannel,通过工厂模式实例化一个channel。



final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);

    ChannelFuture regFuture = config().group().register(channel);

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.
    // 上面的注释大意是,走到这里promise没有失败,要么就是由当前eventLoop注册完成,要么就是丢到任务队列里由其他eventLoop
    //注册完成了,所以现在执行绑定和链接是安全的
    return regFuture;
}

// 与我们猜测一致,通过反射获得的构造函数实例化serverSocketChannel。
@Override
public T newChannel() {
    try {
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

// 注意此处传入了OP_ACCEPT感兴趣事件
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// 调用父类构造函数,在构造函数里设置好channel,selectProvidor创建的serverSocketChannel及OP_ACCEPT事件
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            logger.warn(
                        "Failed to close a partially initialized socket.", e2);
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

// 在顶级抽象父类里,创建好id、unsafe、pipeline。netty的unsafe是关联socket或其他可进行IO操作组件的一个类,与jdk的unsafe对象类似,一般不
// 需要用户关注。
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

之后开始执行初始化,将bootstrapServer中的option和attr赋予serverSocketChannel,通过pipeline添加一个channelInitializer,
进而通过channelInitializer将childOption、childAttr、workerGroup、childHandler保存在一个叫ServerBootstrapAcceptor的handler中。
从名字可以猜测,此handler应该是在客户端连接时来处理相关事件的。而channelInitializer会在完成由子类实现的initChannel方法后将自己从pipeline中移除。

    
        void init(Channel channel) {
        setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
        setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry, Object>[] currentChildOptions =
                childOptions.entrySet().toArray(newOptionArray(0));
        final Entry, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));

        p.addLast(new ChannelInitializer() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(() -> pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)));
            }
        });
    }

初始化完毕后便是注册,稍显麻烦



@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

@Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        AbstractChannel.this.eventLoop = eventLoop;

        // 如果当前线程是之前nioEventLoop绑定的线程则直接注册,否则添加到eventLoop的线程池中
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(() -> register0(promise));
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

// 通过nioEventLoop线程池的线程执行任务
    @Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

// 判断当前线程是否是NioEventLoop线程
@Override
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

@Override
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

    @Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 调用jdk的serverSocketChannel注册到selector上,注意这里注册的感兴趣事件为0,表示对所有事件都不感兴趣
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

首先判断是否注册以及传进来的eventLoop参数是否属于NioEventLoop类,然后判断当前线程是否是NioEventLoop线程,
若是,则直接注册,否则添加到eventLoop的线程池中。在eventLoop的execute方法中,将任务添加到任务队列,再次
判断当前线程是否是NioEventLoop线程,若不是,启动一个新线程来执行。显然新启动的线程属于nioEventLoop线程。
在register0——doRegister方法链中,我们看到netty最终调用了jdk底层的channel绑定了selector,由于此时还未
绑定端口,所以ops即感兴趣事件是0。同时,把this,即NioServerSocketChannel作为attachment添加到selectionKey上,
这是为了之后在select出事件时,可以获取channel进行操作。
注册完成,回到AbstractBootStrap, 开始执行doBind0方法。这也是需要由nioEventLoop执行的,所以也丢到了线程池里。



private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}


@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    // 调用下一个OutBoundHandlerContext执行,默认传递到headContext
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

// 传到headContext,调用unsafe来执行。
@Override
    public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
        unsafe.bind(localAddress, promise);
    }

// AbstractUnsafe的bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
try {
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }

        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }    

// unsafe最终调用jdk的channel完成绑定操作
    @Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

nioEventLoop通过pipeline的方式绑定,pipeline调用tailContext的bind方法,tailContext又会不断寻找下一个OutBoundHandler来执行bind方法,
默认会传到headContext,headContext再调用底层的unsafe来执行bind。unsafe完成bind后,会通知pipeline调用fireChannelActive方法。

fireChannelActive首先会传递到headContext,



@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();

        readIfIsAutoRead();
    }

headContext先将active事件传播,然后调用readIfIsAutoRead方法。此方法不断传播,直到headContext,headContext又调用unsafe的beginRead



@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

在beginRead方法中,将channel注册感兴趣事件为创建NioServerSocketChannel时传入的OP_ACCEPT事件。

至此,创建、初始化、注册、绑定都完成了,随后主线程调用sync——>await,阻塞在此处,任由netty自由发挥。

总结:
⑴服务端启动有4个流程:创建、初始化、注册、绑定
⑵netty通过反射,将selectKey由set转为了array
⑶默认情况下,bossGroup有一个线程,workerGroup有2倍CPU核心数线程

netty服务端的创建

标签:on()   this   lock   rri   called   execution   outbound   exec   opera   

原文地址:https://www.cnblogs.com/spiritsx/p/11839066.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!