上文讨论了PAIR/PAIR,REQ/REP两种模式,现在看看PUB/SUB和PUSH/PULL模式。
PUB/SUB:发布订阅模式,跟我们订阅新闻类似的,采用异步IO,多对多模式,如果没有订阅,服务端发送的消息直接丢弃掉。
pub_server.py
import zmq
import random
import sys
import time
port = "5556"
if len(sys.argv) > 1:
port = sys.argv[1]
int(port)
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
while True:
topic = random.randrange(9999,10005)
messagedata = random.randrange(1,215) - 80
print "%d %d" % (topic, messagedata)
socket.send("%d %d" % (topic, messagedata))
time.sleep(1)sub_client.py
import sys
import time
import zmq
port = "5556"
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Collecting updates from weather server..."
socket.connect("tcp://localhost:%s" % port)
#socket.set(zmq.UNSUBSCRIBE, messagedata)
topicfilter = "10001"
socket.set(zmq.SUBSCRIBE, topicfilter)
#Process 5 updates
total_value = 0
#for update_nbr in range (5):
while True:
string = socket.recv()
topic, messagedata = string.split()
# total_value += int(messagedata)
print topic, messagedata
time.sleep(1)zmq.SUBCRIBE是用来指明订阅某种消息,这里订阅的是出现10001的信息
PUSH/PULL:任务分发模式,主要用于分布式计算的,将很多个任务分发到worker,然后worker将计算结果发送到结果收集器。
producer.py
import time import zmq import random context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind(‘tcp://*:5557‘) # sync start of batch # be sure all worker connect success sink = context.socket(zmq.PUSH) sink.connect(‘tcp://0.0.0.0:5558‘) print ‘Press Enter when the workers are ready:‘ _ = raw_input() print ‘Sending tasks to workers...‘ sink.send(b‘0‘) for task_nbr in xrange(1000000): workload = random.randint(1,10) sender.send_string(u‘%i‘ % workload) for i in range(10): sender.send_string(u‘0‘) time.sleep(1)
consumer.py
import sys import time import zmq context = zmq.Context() # Socket to recevie messages on receiver = context.socket(zmq.PULL) receiver.connect(‘tcp://localhost:5557‘) # socket to send messages sender = context.socket(zmq.PUSH) sender.connect(‘tcp://localhost:5558‘) while True: a_str = receiver.recv_string() num = int(a_str) if num % 2 == 0 or a_str == u‘0‘: sender.send_string(a_str)
result.py
import sys
import time
import zmq
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
sum = 0
flag = 0
# Start our clock now
tstart = time.time()
while True:
a_str = receiver.recv_string()
num = int(a_str)
sum += num
if a_str == ‘0‘:
flag += 1
if flag == 10:
break
tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec这个结果并不精确,分别是启动1个、2个、3个、4个consumer.py进程的测试结果,说明计算缩短了时间。
Queue,Forwarder,Streamer分别是REQ/REP、PUB/SUB、PUSH/PULL的代理,用于代理不同网段的机器。
关于代理的用法,这里不讲述。请参考下面地址。
参考地址:
http://learning-0mq-with-pyzmq.readthedocs.org/en/latest/pyzmq/pyzmq.html
https://github.com/anjuke/zguide-cn
本文出自 “fly天地” 博客,请务必保留此出处http://liuping0906.blog.51cto.com/2516248/1431721
了解saltstack的通信协议zeromq(二),布布扣,bubuko.com
原文地址:http://liuping0906.blog.51cto.com/2516248/1431721