码迷,mamicode.com
首页 > Web开发 > 详细

Pigeon源码分析(五) -- 服务端netty部分

时间:2021-06-07 21:09:49      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:host   adl   channel   ali   stat   expand   oaf   tty   object   

服务端netty的channelHandler有这么多

public class NettyServerPipelineFactory implements ChannelPipelineFactory {

    private NettyServer server;

    private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();

    public NettyServerPipelineFactory(NettyServer server) {
        this.server = server;
    }

    public ChannelPipeline getPipeline() {
        ChannelPipeline pipeline = pipeline();
        pipeline.addLast("framePrepender", new FramePrepender());
        pipeline.addLast("frameDecoder", new FrameDecoder());
        pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
        pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
        pipeline.addLast("providerDecoder", new ProviderDecoder());
        pipeline.addLast("providerEncoder", new ProviderEncoder());
        pipeline.addLast("serverHandler", new NettyServerHandler(server));
        return pipeline;
    }

}

这其中跟业务相关最强的是这三个  FrameDecoder  ProviderDecoder  NettyServerHandler

     //magic
        os.write(CodecConstants.MAGIC); // 0x39 0x3A
        //serialize
        os.writeByte(msg.getSerialize());//序列化类型 这里注意 消息头部分就3个字节 和上面可是不一样的 
        //bodyLength
        os.writeInt(Integer.MAX_VALUE);//消息体长度

我们先分析 FrameDecoder  FrameDecoder#decode

protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
            throws Exception {

        Object message = null;

        if (buffer.readableBytes() <= 2) {
            return message;
        }

        byte[] headMsgs = new byte[2];

        buffer.getBytes(buffer.readerIndex(), headMsgs);

        if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) {
            //old protocol
            message = doDecode(buffer);

        } else if ((byte) 0xAB == headMsgs[0] && (byte) 0xBA == headMsgs[1]) {
            //new protocol
            message = _doDecode(buffer);

        } else {
            throw new IllegalArgumentException("Decode invalid message head:" +
                    headMsgs[0] + " " + headMsgs[1] + ", " + "message:" + buffer);
        }

        return message;

    }

这里分析下 非thrift协议,也就是 

 if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) {
            //old protocol
            message = doDecode(buffer);
protected Object doDecode(ChannelBuffer buffer)
            throws Exception {

        CodecEvent codecEvent = null;

        if (buffer.readableBytes() <= CodecConstants.FRONT_LENGTH) { HEAD_LENGTH + BODY_FIELD_LENGTH 3 + 4
            return codecEvent;
        }

        int totalLength = (int) buffer.getUnsignedInt(
                buffer.readerIndex() +
                        CodecConstants.HEAD_LENGTH);//从开始位置数3个字节,读出来四个字节,该值就是消息体字节数

        int frameLength = totalLength + CodecConstants.FRONT_LENGTH;//消息体长度 + 7个字节的头长度 就是总长度

        if (buffer.readableBytes() >= frameLength) {

            ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);
            buffer.readerIndex(buffer.readerIndex() + frameLength);

            codecEvent = new CodecEvent(frame, false);
            codecEvent.setReceiveTime(System.currentTimeMillis());//从bytebuffer中切出来 frameLength 字节数,构成一个CodecEvent
        }

        return codecEvent;
    }

  经过了 FrameDecoder的解码,现在handler链中的对象就不再是原生的 ByteBuffer了,而是CodecEvent

  接下来分析  ProviderDecoder ProviderDecoder 继承自 AbstractDecoder,其主要的逻辑也是在 AbstractDecoder

  AbstractDecoder # decode

public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
            throws Exception {

        if (msg == null || !(msg instanceof CodecEvent)) {
            return null;
        }

        CodecEvent codecEvent = (CodecEvent) msg;

        if (codecEvent.isValid()) {

            Object message = null;

            if (codecEvent.isUnified()) {
                message = _doDecode(ctx, channel, codecEvent);
                codecEvent.setInvocation((InvocationSerializable) message);
            } else {
                message = doDecode(ctx, channel, codecEvent);
                codecEvent.setInvocation((InvocationSerializable) message);
            }

        }

        return codecEvent;
    }

  还是看非thrift协议的解码

  

protected Object doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent)
            throws IOException {
        Object msg = null;
        ChannelBuffer buffer = codecEvent.getBuffer();
        //head
        buffer.skipBytes(CodecConstants.MEGIC_FIELD_LENGTH);//跳过前两个字节
        byte serialize = buffer.readByte();//读一个字节的序列化类型
        Long sequence = null;

        try {
            //body length
            int totalLength = buffer.readInt();//消息体长度
            int frameLength = totalLength + CodecConstants.FRONT_LENGTH;//帧长度
            //body
            int bodyLength = (totalLength - CodecConstants.TAIL_LENGTH);//消息体里包含了11个字节的尾巴。其中8个字节long类型的序列号,后三个字节固定是 30,29,28
            ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), bodyLength);//这是切出来纯的请求体
            buffer.readerIndex(buffer.readerIndex() + bodyLength);
            //tail
            sequence = buffer.readLong();
            buffer.skipBytes(CodecConstants.EXPAND_FIELD_LENGTH);
            //deserialize
            ChannelBufferInputStream is = new ChannelBufferInputStream(frame);

            msg = deserialize(serialize, is);//对纯的请求体做反序列化
            //after
            doAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime());
        } catch (Throwable e) {
            SerializationException se = new SerializationException(e);

            try {
                if (sequence != null) {
                    doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(sequence.longValue(),
                            serialize, se));
                }

                logger.error("Deserialize failed. host:"
                        + ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress()
                        + "\n" + e.getMessage(), se);

            } catch (Throwable t) {
                logger.error("[doDecode] doFailResponse failed.", t);
            }
        }
        return msg;
    }

ProviderDecoder # deserialize

public Object deserialize(byte serializerType, InputStream is) {
        Object decoded = SerializerFactory.getSerializer(serializerType).deserializeRequest(is);
        return decoded;
    }

没啥好说的了,就是反序列化了

注意,此时CodecEvent中已经有了反序列化好的原始请求对象了

message = doDecode(ctx, channel, codecEvent);
                codecEvent.setInvocation((InvocationSerializable) message);

剩下的就是请求到达 NettyServerHandler。这部分逻辑其实在 Pigeon源码分析(四) -- 服务端接收请求过程 - MaXianZhe - 博客园 (cnblogs.com)分析过了

Pigeon源码分析(五) -- 服务端netty部分

标签:host   adl   channel   ali   stat   expand   oaf   tty   object   

原文地址:https://www.cnblogs.com/juniorMa/p/14859716.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!