码迷,mamicode.com
首页 > 其他好文 > 详细

ZeroMQ_04 发布订阅模式

时间:2020-04-22 13:35:29      阅读:102      评论:0      收藏:0      [点我收藏+]

标签:通过   time   des   报错   错误   rsize   nbsp   text   long   

简单来说,就是服务端不断发布消息,客户端订阅了就会收到消息。

下面我们看个简单的实力:

Server:

#include <stdlib.h> 
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h> 

#define buffersize 4096
#define randof(num)  (int) ((float) (num) * random () / (RAND_MAX + 1.0))

int main(int argc, char* argv[])
{
    // [0]创建对象
    void* ctx = zmq_ctx_new();
    void* publisher = zmq_socket(ctx, ZMQ_PUB);
    // [1]绑定到5566端口
    zmq_bind(publisher, "tcp://*:5566");

     //  初始化随机数生成器
    srandom ((unsigned) time (NULL));
    while (1) {
       int zipcode, temperature, relhumidity;
        zipcode     = randof (100000);
        temperature = randof (215) - 80;
        relhumidity = randof (50) + 10;

        //  Send message to all subscribers
        char update [20];
        sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);
        printf("server send: %s\n", update);
        //s_send (publisher, update);
        zmq_send (publisher, update, strlen (update), 0);
        sleep(1);
    }
    zmq_close(publisher);
    zmq_ctx_destroy(ctx);
    return 0;
}

Client:

#include <stdlib.h> 
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <time.h> 
#include <assert.h>

static char *s_recv (void *socket) {
    char buffer [256];
    int size = zmq_recv (socket, buffer, 255, 0);
    if (size == -1)
        return NULL;
    buffer[size] = \0;

    return strndup (buffer, sizeof(buffer) - 1);
}

int main (int argc, char *argv [])
{
    //  [0]创建对象,连接到5566端口
    printf ("Collecting updates from weather server...\n");
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5566");
    assert (rc == 0);

    //  [1]设置过滤条件,设置为空,表示全订阅,这里“1”表示匹配开头为“1”的数据
    const char *filter =  "1";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,
                         filter, strlen (filter));
    assert (rc == 0);
    //  [2]接受数据
    int update_nbr;
    long total_temp = 0;
    for (update_nbr = 0; update_nbr < 100; update_nbr++) {
        
        char *string = s_recv (subscriber);
        printf ("client: %s\n", string);
        int zipcode, temperature, relhumidity;
        sscanf (string, "%d %d %d",
            &zipcode, &temperature, &relhumidity);
        total_temp += temperature;
        free (string);
    }
    printf ("Average temperature for zipcode ‘%s‘ was %dF\n",
        filter, (int) (total_temp / update_nbr));

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}

 

out:

// server
server send: 43345 -41 19
server send: 44203 110 59
server send: 78038 2 25
server send: 55377 59 18
server send: 40135 -65 36
server send: 37950 43 10

// client
zf@eappsvr-0:~/ds/zmq/test/pub_sub> ./client
Collecting updates from weather server...
client....
client: 10057 67 11
client: 16839 94 25

技术图片

 

注意: 

需要注意的是,在使用SUB套接字时,必须使用zmq_setsockopt()方法来设置订阅的内容。如果你不设置订阅内容,那将什么消息都收不到,新手很容易犯这个错误。订阅信息可以是任何字符串,可以设置多次。只要消息满足其中一条订阅信息,SUB套接字就会收到。订阅者可以选择不接收某类消息,也是通过zmq_setsockopt()方法实现的。

PUB-SUB套接字组合是异步的。客户端在一个循环体中使用zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地,服务端可以不断地使用zmq_send()发送消息,但不能在PUB套接字上使用zmq_recv()。

关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。

ZeroMQ_04 发布订阅模式

标签:通过   time   des   报错   错误   rsize   nbsp   text   long   

原文地址:https://www.cnblogs.com/vczf/p/12751249.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!