码迷,mamicode.com
首页 > 其他好文 > 详细

MQTT Implementation

时间:2019-10-31 16:15:08      阅读:97      评论:0      收藏:0      [点我收藏+]

标签:protocol   lin   res   str   wildcards   esc   buffer   htm   keepaliv   

1. MQTT Message

技术图片 

  技术图片 

public class MqttMessage{
    private final MqttFixedHeader mqttFixedHeader;
    private final Object variableHeader;
    private final Object payload;
}

public final class MqttFixedHeader {
    private final MqttMessageType messageType;
    private final boolean isDup;
    private final MqttQoS qosLevel;
    private final boolean isRetain;
    private final int remainingLength;
}

public enum MqttMessageType {
    CONNECT(1),
    CONNACK(2),
    PUBLISH(3),
    PUBACK(4),
    PUBREC(5),
    PUBREL(6),
    PUBCOMP(7),
    SUBSCRIBE(8),
    SUBACK(9),
    UNSUBSCRIBE(10),
    UNSUBACK(11),
    PINGREQ(12),
    PINGRESP(13),
    DISCONNECT(14);
}

2. Decode MQTT FixedHeader

技术图片

技术图片

  技术图片

技术图片

 技术图片

Max of remaining length is 268435455 =   255 M

private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) {
    //read 1 byte
    short b1 = buffer.readUnsignedByte();
    
    //mqtt type 
    MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
    
    //dup flag if PUBLISH TYPE
    boolean dupFlag = (b1 & 0x08) == 0x08;
    
    //QoS if PUBLISH TYPE
    int qosLevel = (b1 & 0x06) >> 1;
    
    //RETAIN if PUBLISH TYPE
    boolean retain = (b1 & 0x01) != 0;

    //read length of payload
    int remainingLength = 0;
    int multiplier = 1;
    short digit;
    int loops = 0;
    do {
        //read 1 byte
        digit = buffer.readUnsignedByte();
        
        // 127 = 0x7F
        remainingLength += (digit & 127) * multiplier;
        
        //128 = 0x80
        multiplier *= 128;
        
        //max bytes is 4 
        loops++;
    } while ((digit & 128) != 0 && loops < 4);

    // MQTT protocol limits Remaining Length to 4 bytes
    if (loops == 4 && (digit & 128) != 0) {
        throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ‘)‘);
    }
    
    //construct MqttFixedHeader
    MqttFixedHeader decodedFixedHeader = new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
    return validateFixedHeader(resetUnusedFields(decodedFixedHeader));
}

 

3. Decode MQTT VariableHeader

技术图片  

private static Result<Integer> decodeMsbLsb(ByteBuf buffer, int min, int max) {
    short msbSize = buffer.readUnsignedByte();
    short lsbSize = buffer.readUnsignedByte();
    final int numberOfBytesConsumed = 2;
    int result = msbSize << 8 | lsbSize;
    if (result < min || result > max) {
        result = -1;
    }
    return new Result<Integer>(result, numberOfBytesConsumed);

 

private static Result<?> decodeVariableHeader(ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
    switch (mqttFixedHeader.messageType()) {
        case CONNECT:
            return decodeConnectionVariableHeader(buffer);
        case CONNACK:
            return decodeConnAckVariableHeader(buffer);
        case SUBSCRIBE:
        case UNSUBSCRIBE:
        case SUBACK:
        case UNSUBACK:
        case PUBACK:
        case PUBREC:
        case PUBCOMP:
        case PUBREL:
            return decodeMessageIdVariableHeader(buffer);
        case PUBLISH:
            return decodePublishVariableHeader(buffer, mqttFixedHeader);
        case PINGREQ:
        case PINGRESP:
        case DISCONNECT:
            // Empty variable header
            return new Result<Object>(null, 0);
    }
    //should never reach here
    return new Result<Object>(null, 0); 
}

3.1 If Connect Type

技术图片 

private static Result<Integer> decodeMsbLsb(ByteBuf buffer, int min, int max) {
    short msbSize = buffer.readUnsignedByte();
    short lsbSize = buffer.readUnsignedByte();
    final int numberOfBytesConsumed = 2;
    int result = msbSize << 8 | lsbSize;
    if (result < min || result > max) {
        result = -1;
    }
    return new Result<Integer>(result, numberOfBytesConsumed);
}

 

private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
    final Result<Integer> decodedSize = decodeMsbLsb(buffer);
    int size = decodedSize.value;
    int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed;
    if (size < minBytes || size > maxBytes) {
        buffer.skipBytes(size);
        numberOfBytesConsumed += size;
        return new Result<String>(null, numberOfBytesConsumed);
    }
    String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
    buffer.skipBytes(size);
    numberOfBytesConsumed += size;
    return new Result<String>(s, numberOfBytesConsumed);
}

 

 技术图片 

 技术图片 

 技术图片 

private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(ByteBuf buffer) {
    final Result<String> protoString = decodeString(buffer);
    int numberOfBytesConsumed = protoString.numberOfBytesConsumed;

    //read level
    final byte protocolLevel = buffer.readByte();
    numberOfBytesConsumed += 1;
    
    //version
    final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
    
    //read Connect Flags
    final int b1 = buffer.readUnsignedByte();
    numberOfBytesConsumed += 1;

    //read keepAlive
    final Result<Integer> keepAlive = decodeMsbLsb(buffer);
    numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;
    
    final boolean hasUserName = (b1 & 0x80) == 0x80;
    final boolean hasPassword = (b1 & 0x40) == 0x40;
    final boolean willRetain = (b1 & 0x20) == 0x20;
    final int willQos = (b1 & 0x18) >> 3;
    final boolean willFlag = (b1 & 0x04) == 0x04;
    final boolean cleanSession = (b1 & 0x02) == 0x02;
    
    if (mqttVersion == MqttVersion.MQTT_3_1_1) {
        final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
        if (!zeroReservedFlag) {
            // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
            // set to zero and disconnect the Client if it is not zero.
            // See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
            throw new DecoderException("non-zero reserved flag");
        }
    }

    final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
            mqttVersion.protocolName(),
            mqttVersion.protocolLevel(),
            hasUserName,
            hasPassword,
            willRetain,
            willQos,
            willFlag,
            cleanSession,
            keepAlive.value);
    return new Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);
}

3.2 If Publish Type

技术图片 

private static Result<MqttPublishVariableHeader> decodePublishVariableHeader(ByteBuf buffer,MqttFixedHeader mqttFixedHeader) {
    final Result<String> decodedTopic = decodeString(buffer);
    if (!isValidPublishTopicName(decodedTopic.value)) {
        throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
    }
    int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;

    int messageId = -1;
    if (mqttFixedHeader.qosLevel().value() > 0) {
        final Result<Integer> decodedMessageId = decodeMessageId(buffer);
        messageId = decodedMessageId.value;
        numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed;
    }
    final MqttPublishVariableHeader mqttPublishVariableHeader =    new MqttPublishVariableHeader(decodedTopic.value, messageId);
    return new Result<MqttPublishVariableHeader>(mqttPublishVariableHeader, numberOfBytesConsumed);
}

3.3 If SUBSCRIBE:UNSUBSCRIBE:SUBACK:UNSUBACK:PUBACK:PUBREC:PUBCOMP:PUBREL Type

技术图片 

private static Result<MqttMessageIdVariableHeader> decodeMessageIdVariableHeader(ByteBuf buffer) {
    final Result<Integer> messageId = decodeMessageId(buffer);
    return new Result<MqttMessageIdVariableHeader>(MqttMessageIdVariableHeader.from(messageId.value),messageId.numberOfBytesConsumed);
}

3.4 If CONNACK Type

技术图片 

private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(ByteBuf buffer) {
    final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
    //ack return code
    byte returnCode = buffer.readByte();
    final int numberOfBytesConsumed = 2;
    final MqttConnAckVariableHeader mqttConnAckVariableHeader =
            new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent);
    return new Result<MqttConnAckVariableHeader>(mqttConnAckVariableHeader, numberOfBytesConsumed);
}

 4. Payload Parser

4.1 If CONNECT Type

  Same format if string with MSB & LSB & Data

技术图片

 

 

 

private static Result<MqttConnectPayload> decodeConnectionPayload(ByteBuf buffer,MqttConnectVariableHeader mqttConnectVariableHeader) {
    final Result<String> decodedClientId = decodeString(buffer);
    final String decodedClientIdValue = decodedClientId.value;
    final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),(byte) mqttConnectVariableHeader.version());
    
    if (!isValidClientId(mqttVersion, decodedClientIdValue)) {
        throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
    }
    int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;

    Result<String> decodedWillTopic = null;
    Result<byte[]> decodedWillMessage = null;
    if (mqttConnectVariableHeader.isWillFlag()) {
        decodedWillTopic = decodeString(buffer, 0, 32767);
        numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
        decodedWillMessage = decodeByteArray(buffer);
        numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
    }
    Result<String> decodedUserName = null;
    Result<byte[]> decodedPassword = null;
    if (mqttConnectVariableHeader.hasUserName()) {
        decodedUserName = decodeString(buffer);
        numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
    }
    if (mqttConnectVariableHeader.hasPassword()) {
        decodedPassword = decodeByteArray(buffer);
        numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;
    }

    final MqttConnectPayload mqttConnectPayload =
            new MqttConnectPayload(
                    decodedClientId.value,
                    decodedWillTopic != null ? decodedWillTopic.value : null,
                    decodedWillMessage != null ? decodedWillMessage.value : null,
                    decodedUserName != null ? decodedUserName.value : null,
                    decodedPassword != null ? decodedPassword.value : null);
    return new Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);
}

4.2 If SUBSCRIBE Type

技术图片

private static Result<MqttSubscribePayload> decodeSubscribePayload(ByteBuf buffer,int bytesRemainingInVariablePart) {
    final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
    int numberOfBytesConsumed = 0;
    while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
        final Result<String> decodedTopicName = decodeString(buffer);
        numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
        int qos = buffer.readUnsignedByte() & 0x03;
        numberOfBytesConsumed++;
        subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, MqttQoS.valueOf(qos)));
    }
    return new Result<MqttSubscribePayload>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
}

4.3 If SUBSCRIBE-ACK Type

技术图片

private static Result<MqttSubAckPayload> decodeSubackPayload(ByteBuf buffer,int bytesRemainingInVariablePart) {
    final List<Integer> grantedQos = new ArrayList<Integer>();
    int numberOfBytesConsumed = 0;
    while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
        int qos = buffer.readUnsignedByte();
        if (qos != MqttQoS.FAILURE.value()) {
            qos &= 0x03;
        }
        numberOfBytesConsumed++;
        grantedQos.add(qos);
    }
    return new Result<MqttSubAckPayload>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
}

4.4 If PUBLISH Type

The Payload contains the Application Message that is being published. The content and format of the data is application specific.
The length of the payload can be calculated by subtracting the length of the variable header from the Remaining Length field
that is in the Fixed Header. It is valid for a PUBLISH Packet to contain a zero length payload.

bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
bytesRemainingInVariablePart -= mqttVariableHeader.numberOfBytesConsumed;
private static Result<ByteBuf> decodePublishPayload(ByteBuf buffer, int bytesRemainingInVariablePart) {
    ByteBuf b = buffer.readRetainedSlice(bytesRemainingInVariablePart);
    return new Result<ByteBuf>(b, bytesRemainingInVariablePart);
}

 

MQTT Implementation

标签:protocol   lin   res   str   wildcards   esc   buffer   htm   keepaliv   

原文地址:https://www.cnblogs.com/iiiDragon/p/MQTT.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!