码迷,mamicode.com
首页 > Windows程序 > 详细

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

时间:2015-07-02 13:44:24      阅读:262      评论:0      收藏:0      [点我收藏+]

标签:

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现

callback方式接收没有成功。所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因。

 

 

采用Callback式 发布主题

Java代码  技术分享
  1. package com.etrip.mqtt.callback;  
  2.   
  3. import java.net.URISyntaxException;  
  4.   
  5. import org.fusesource.hawtbuf.Buffer;  
  6. import org.fusesource.hawtbuf.UTF8Buffer;  
  7. import org.fusesource.mqtt.client.Callback;  
  8. import org.fusesource.mqtt.client.CallbackConnection;  
  9. import org.fusesource.mqtt.client.Listener;  
  10. import org.fusesource.mqtt.client.MQTT;  
  11. import org.fusesource.mqtt.client.QoS;  
  12. import org.fusesource.mqtt.client.Topic;  
  13. import org.slf4j.Logger;  
  14. import org.slf4j.LoggerFactory;  
  15.   
  16. /** 
  17.  *  
  18.  * MQTT moquette 的Server 段用于并发布主题信息 
  19.  *  
  20.  * 采用Callback式 发布主题  
  21.  *  
  22.  * @author longgangbai 
  23.  */  
  24. public class MQTTCallbackServer {  
  25.       private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackServer.class);  
  26.         private final static String CONNECTION_STRING = "tcp://localhost:1883";  
  27.         private final static boolean CLEAN_START = true;  
  28.         private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
  29.         public  static Topic[] topics = {  
  30.                         new Topic("china/beijing", QoS.EXACTLY_ONCE),  
  31.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
  32.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
  33.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
  34.         public final  static long RECONNECTION_DELAY=2000;  
  35.           
  36.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
  37.           
  38.           
  39.       public static void main(String[] args)   {  
  40.         //创建MQTT对象  
  41.         MQTT mqtt = new MQTT();  
  42.         try {  
  43.             //设置mqtt broker的ip和端口  
  44.             mqtt.setHost(CONNECTION_STRING);  
  45.             //连接前清空会话信息  
  46.             mqtt.setCleanSession(CLEAN_START);  
  47.             //设置重新连接的次数  
  48.             mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
  49.             //设置重连的间隔时间  
  50.             mqtt.setReconnectDelay(RECONNECTION_DELAY);  
  51.             //设置心跳时间  
  52.             mqtt.setKeepAlive(KEEP_ALIVE);  
  53.             //设置缓冲的大小  
  54.             mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
  55.               
  56.               
  57.   
  58.               
  59.             //获取mqtt的连接对象BlockingConnection  
  60.              final CallbackConnection connection = mqtt.callbackConnection();  
  61.                
  62.               
  63.             //添加连接的监听事件  
  64.             connection.listener(new Listener() {  
  65.                 
  66.                 public void onDisconnected() {  
  67.                 }  
  68.                 public void onConnected() {  
  69.                 }  
  70.   
  71.                 public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {  
  72.                     // You can now process a received message from a topic.  
  73.                     // Once process execute the ack runnable.  
  74.                     ack.run();  
  75.                     System.out.println("topic"+topic.toString()+"="+new String(payload.getData()));  
  76.                 }  
  77.                 public void onFailure(Throwable value) {  
  78.                 }  
  79.             });  
  80.             //添加连接事件  
  81.             connection.connect(new Callback<Void>() {  
  82.                 /** 
  83.                  * 连接失败的操作 
  84.                  */  
  85.                 public void onFailure(Throwable value) {  
  86.                      // If we could not connect to the server.  
  87.                     System.out.println("MQTTCallbackServer.CallbackConnection.connect.onFailure"+"连接失败......"+value.getMessage());  
  88.                     value.printStackTrace();  
  89.                 }  
  90.             
  91.                 /** 
  92.                  * 连接成功的操作 
  93.                  * @param v 
  94.                  */  
  95.                 public void onSuccess(Void v) {  
  96.                   
  97.                  int count=1;  
  98.                  while(true){  
  99.                     count++;  
  100.                     // 用于发布消息,目前手机段不需要向服务端发送消息  
  101.                     //主题的内容  
  102.                     final String message="hello "+count+"chinese people !";  
  103.                     final String topic = "china/beijing";  
  104.                     System.out.println("MQTTCallbackServer  publish  topic="+topic+" message :"+message);  
  105.                     connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {  
  106.                         public void onSuccess(Void v) {  
  107.                           // the pubish operation completed successfully.  
  108.                         }  
  109.                         public void onFailure(Throwable value) {  
  110.                             value.printStackTrace();  
  111.                         }  
  112.                     });  
  113.                     try {  
  114.                         Thread.sleep(2000);  
  115.                     } catch (InterruptedException e) {  
  116.                         // TODO Auto-generated catch block  
  117.                         e.printStackTrace();  
  118.                     }  
  119.                  }  
  120.                       
  121. //                  //连接断开  
  122. //                  connection.disconnect(new Callback<Void>() {  
  123. //                      public void onSuccess(Void v) {  
  124. //                        // called once the connection is disconnected.  
  125. //                          System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onSuccess", "called once the connection is disconnected.");  
  126. //                      }  
  127. //                      public void onFailure(Throwable value) {  
  128. //                        // Disconnects never fail.  
  129. //                          System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onFailure", "Disconnects never fail."+value.getMessage());  
  130. //                          value.printStackTrace();  
  131. //                      }  
  132. //                  });  
  133.                       
  134.                       
  135.                 }  
  136.             });  
  137.             Thread.sleep(10000000000L);  
  138.         } catch (URISyntaxException e) {  
  139.             // TODO Auto-generated catch block  
  140.             e.printStackTrace();  
  141.         } catch (Exception e) {  
  142.             // TODO Auto-generated catch block  
  143.             e.printStackTrace();  
  144.         }finally{  
  145.               
  146.         }  
  147.     }  
  148. }  

 

采用Callback式 订阅主题

 

Java代码  技术分享
  1. package com.etrip.mqtt.callback;  
  2.   
  3. import java.net.URISyntaxException;  
  4.   
  5. import org.fusesource.hawtbuf.Buffer;  
  6. import org.fusesource.hawtbuf.UTF8Buffer;  
  7. import org.fusesource.mqtt.client.Callback;  
  8. import org.fusesource.mqtt.client.CallbackConnection;  
  9. import org.fusesource.mqtt.client.Listener;  
  10. import org.fusesource.mqtt.client.MQTT;  
  11. import org.fusesource.mqtt.client.QoS;  
  12. import org.fusesource.mqtt.client.Topic;  
  13. import org.slf4j.Logger;  
  14. import org.slf4j.LoggerFactory;  
  15.   
  16. /** 
  17.  *  
  18.  * MQTT moquette 的Client 段用于订阅主题,并接收主题信息 
  19.  *  
  20.  * 采用Callback式 订阅主题  
  21.  *  
  22.  * @author longgangbai 
  23.  */  
  24. public class MQTTCallbackClient {  
  25.       private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackClient.class);  
  26.         private final static String CONNECTION_STRING = "tcp://localhost:1883";  
  27.         private final static boolean CLEAN_START = true;  
  28.         private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s  
  29.         public  static Topic[] topics = {  
  30.                         new Topic("china/beijing", QoS.AT_MOST_ONCE),  
  31.                         new Topic("china/tianjin", QoS.AT_LEAST_ONCE),  
  32.                         new Topic("china/henan", QoS.AT_MOST_ONCE)};  
  33.         public final  static long RECONNECTION_ATTEMPT_MAX=6;  
  34.         public final  static long RECONNECTION_DELAY=2000;  
  35.          final String topic = "china/beijing";  
  36.         public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M  
  37.           
  38.           
  39.           
  40.       public static void main(String[] args)   {  
  41.         //创建MQTT对象  
  42.         MQTT mqtt = new MQTT();  
  43.         //设置mqtt broker的ip和端口  
  44.         try {  
  45.             mqtt.setHost(CONNECTION_STRING);  
  46.         } catch (URISyntaxException e1) {  
  47.             e1.printStackTrace();  
  48.         }  
  49.         //连接前清空会话信息  
  50.         mqtt.setCleanSession(CLEAN_START);  
  51.         //设置重新连接的次数  
  52.         mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);  
  53.         //设置重连的间隔时间  
  54.         mqtt.setReconnectDelay(RECONNECTION_DELAY);  
  55.         //设置心跳时间  
  56.         mqtt.setKeepAlive(KEEP_ALIVE);  
  57.         //设置缓冲的大小  
  58.         mqtt.setSendBufferSize(SEND_BUFFER_SIZE);  
  59.         //获取mqtt的连接对象CallbackConnection  
  60.         final CallbackConnection connection= mqtt.callbackConnection();  
  61.         try {  
  62.       
  63.             //添加连接的监听事件  
  64.             connection.listener(new Listener() {  
  65.                 
  66.                 public void onDisconnected() {  
  67.                 }  
  68.                 public void onConnected() {  
  69.                     System.out.println(" 连接成功!");  
  70.                 }  
  71.   
  72.                 public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {  
  73.                   
  74.                 }  
  75.                 public void onFailure(Throwable value) {  
  76.                   
  77.                 }  
  78.             });  
  79.             //添加连接事件  
  80.             connection.connect(new Callback<Void>() {  
  81.                 /** 
  82.                  * 连接失败的操作 
  83.                  */  
  84.                 public void onFailure(Throwable value) {  
  85.                      // If we could not connect to the server.  
  86.                     System.out.println("MQTTSubscribeClient.CallbackConnection.connect.onFailure  连接失败......"+value.getMessage());  
  87.                     value.printStackTrace();  
  88.                 }  
  89.              
  90.   
  91.                 /** 
  92.                  * 连接成功的操作 
  93.                  * @param v 
  94.                  */  
  95.                 public void onSuccess(Void v) {  
  96.                     System.out.println("MQTTSubscribeClient.CallbackConnection.connect.onSuccess 订阅连接成功......");  
  97.                         
  98.                     //订阅相关的主题  
  99.                     connection.subscribe(topics, new Callback<byte[]>() {  
  100.                         public void onSuccess(byte[] qoses) {  
  101.                             System.out.println("MQTTSubscribeClient.CallbackConnection.connect.subscribe.onSuccess 订阅主题成功......");  
  102.                         }  
  103.                         public void onFailure(Throwable value) {  
  104.                              // subscribe failed.  
  105.                             System.out.println("MQTTSubscribeClient.CallbackConnection.connect.subscribe.onSuccess 订阅主题失败!"+value.getMessage());  
  106.                             value.printStackTrace();  
  107.                         }  
  108.                     });  
  109.   
  110.                       
  111.                 }  
  112.             });  
  113.             Thread.sleep(100000000000L);  
  114.         } catch (Exception e) {  
  115.             // TODO Auto-generated catch block  
  116.             e.printStackTrace();  
  117.         }finally{  
  118. //            //连接断开  
  119.             connection.disconnect(new Callback<Void>() {  
  120.                 public void onSuccess(Void v) {  
  121.                   // called once the connection is disconnected.  
  122.                     System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onSuccess called once the connection is disconnected.");  
  123.                 }  
  124.                 public void onFailure(Throwable value) {  
  125.                   // Disconnects never fail.  
  126.                     System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onFailure  Disconnects never fail."+value.getMessage());  
  127.                     value.printStackTrace();  
  128.                 }  
  129.             });  
  130.         }  
  131.     }  
  132. }  

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

标签:

原文地址:http://www.cnblogs.com/yudar/p/4615699.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!