标签:gate new tin zed message action subscript localize 使用
初始化SDK:
/**
* 初始化SDK
*
* @param context context
*/
public void initSDK(Context context) {
String clientId = String.valueOf(System.currentTimeMillis()+userId);
mqttAndroidClient = new MqttAndroidClient(mContext, serverUri, clientId);
subscriptionTopics = new ArrayList<>();
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (reconnect) {
Log.d(TAG, "Reconnected to : " + serverURI);
// Because Clean Session is true, we need to re-subscribe
// subscribeToTopic();
//publishMessage();
} else {
Log.d(TAG, "Connected to: " + serverURI);
}
connectSuccess = true;
subscribeToTopic();
}
@Override
public void connectionLost(Throwable cause) {
connectSuccess = false;
Log.e(TAG, "The Connection was lost." + cause.getLocalizedMessage());
}
// THIS DOES NOT WORK!
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.d(TAG, "Incoming message: " +topic+ new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
}
连接远程服务:
/**
* 连接远程服务
*/
public void connectServer() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
try {
//addToHistory("Connecting to " + serverUri);
mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
connectSuccess = true;
DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
disconnectedBufferOptions.setBufferEnabled(true);
disconnectedBufferOptions.setBufferSize(100);
disconnectedBufferOptions.setPersistBuffer(false);
disconnectedBufferOptions.setDeleteOldestMessages(false);
mqttAndroidClient.setBufferOpts(disconnectedBufferOptions);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.e(TAG, "Failed to connect to: " + serverUri);
exception.printStackTrace();
Log.d(TAG, "onFailure: " + exception.getCause());
connectSuccess = false;
}
});
} catch (MqttException ex) {
ex.printStackTrace();
}
}
获取订阅信息:
/**
*获取订阅信息
*/
public void connectGateway(String gatewayId, String userId) {
//获取订阅信息
if (!subscriptionTopics.contains(gatewayId)) {
subscriptionTopics.add(gatewayId);
}
Log.d(TAG, "pre sub topic: connect status=" + connectSuccess);
Log.d(TAG, "subtopic " + subscriptionTopics);
subscribeToTopic();
}
订阅mqtt消息:
/**
* 订阅mqtt消息
*/
private void subscribeToTopic() {
try {
if(subscriptionTopics.size()==0)
return;
String[] topics = new String[subscriptionTopics.size()];
subscriptionTopics.toArray(topics);
int[] qoc = new int[topics.length];
IMqttMessageListener[] mqttMessageListeners = new IMqttMessageListener[topics.length];
for (int i = 0; i < topics.length; i++) {
IMqttMessageListener mqttMessageListener = new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// message Arrived!消息送达后做出的处理
Log.d(TAG, topic + " : " + new String(message.getPayload()));
handleReceivedMessage(new String(message.getPayload()), topic);
}
};
mqttMessageListeners[i] = mqttMessageListener;
Log.d(TAG, "subscribeToTopic: qoc= " + qoc[i]);
}
mqttAndroidClient.subscribe(topics, qoc, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
Log.d(TAG, "Subscribed!");
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
Log.d(TAG, "Failed to subscribe");
}
}, mqttMessageListeners);
} catch (MqttException ex) {
System.err.println("Exception whilst subscribing");
ex.printStackTrace();
}
}
处理收到的消息:
private void handleReceivedMessage(String message, String gatewayId) {
//可以发送一条广播通知程序
}
发送mqtt消息:
/**
* 发送 mqtt 消息
*
* @param publishMessage 要发送的信息的 字符串
*/
private void publishMessage(String publishMessage, String publishTopic) {
try {
publishTopic = userId + "/" + publishTopic;
MqttMessage message = new MqttMessage();
message.setPayload(publishMessage.getBytes());
mqttAndroidClient.publish(publishTopic, message);
Log.d(TAG, "publishMessage:Message Published \n" + publishTopic + ":" + message);
if (!mqttAndroidClient.isConnected()) {
Log.d(TAG, mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
}
} catch (MqttException e) {
System.err.println("Error Publishing: " + e.getMessage());
e.printStackTrace();
}
}
没有封装的类:
public class SubscribeClient {
private final static String CONNECTION_STRING = "tcp://mqtt地址:mqtt端口";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;//低耗网络,但是又需要及时获取数据,心跳30s
private final static String CLIENT_ID = "client1";
private final static String[] TOPICS = {
//订阅信息
};
private final static int[] QOS_VALUES = {0, 0, 2, 0};
private MqttClient mqttClient = null;
public SubscribeClient(String i) {
try {
mqttClient = new MqttClient(CONNECTION_STRING);
SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
mqttClient.registerSimpleHandler(simpleCallbackHandler);//注册接收消息方法
mqttClient.connect(CLIENT_ID + i, CLEAN_START, KEEP_ALIVE);
mqttClient.subscribe(TOPICS, QOS_VALUES);//订阅接主题
/**
* 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
*/
mqttClient.publish(PUBLISH_TOPICS, "keepalive".getBytes(), QOS_VALUES[0], true);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 简单回调函数,处理client接收到的主题消息
*
* @author pig
*/
class SimpleCallbackHandler implements MqttSimpleCallback {
/**
* 当客户机和broker意外断开时触发
* 可以再此处理重新订阅
*/
@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
System.out.println("客户机和broker已经断开");
}
/**
* 客户端订阅消息后,该方法负责回调接收处理消息
*/
@Override
public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {
// TODO Auto-generated method stub
System.out.println("订阅主题: " + topicName);
System.out.println("消息数据: " + new String(payload));
System.out.println("消息级别(0,1,2): " + Qos);
System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained);
}
}
/**
* 高级回调
*
* @author pig
*/
class AdvancedCallbackHandler implements MqttSimpleCallback {
@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
}
@Override
public void publishArrived(String arg0, byte[] arg1, int arg2,
boolean arg3) throws Exception {
// TODO Auto-generated method stub
}
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
new SubscribeClient("" + i);
}
}
标签:gate new tin zed message action subscript localize 使用
原文地址:http://www.cnblogs.com/IT-lss/p/8004749.html