关于returner的基础使用请参考returner文章。
继续上面的话题,这里编写c/s端来采集数据。
继续下面话题之前,你需要了解event、returner、zmq协议框架。
步骤:
1、在syndic上运行客户端程序,用来收集数据,其实就是master-minion架构。
2、收集的数据首先写入本地log中,其次发送到顶级master端。
3、顶级master运行服务端程序,用来接收数据,并写入本地数据库。
4、确保数据不丢失,采用zmq协议框架,使用REQ/REP套接字。
先说说客户端:
1、使用event去过滤事件,包含new_job和ret事件。
2、关于new_job的信息使用单线程发送。
3、关于ret事件使用多线程发送。
client.py 程序如下
#!/usr/bin/env python
# coding: utf-8
import sys
import threading
import salt.utils.event
import msgpack
import zmq
import config
class JobProcess(object):
‘‘‘
return new job‘s jid and id to master
‘‘‘
def __init__(self,jid,minions):
self.jid = jid
self.minions = minions # a list that include lots of minion id
def run(self):
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://%s:%d" % (config.server,config.server_port))
# use msgpack convert data
for id in self.minions:
data = {‘jid‘:self.jid,‘id‘:id}
packed_data = msgpack.packb(data)
socket.send(packed_data)
message = socket.recv()
socket.close()
context.term()
class ResultThread(threading.Thread):
‘‘‘
return the minions‘s result to master
‘‘‘
def __init__(self,fun_args,jid,result,success,fun,id):
self.fun_args = fun_args # a list
self.jid = jid
self.result = result # not return
self.success = success # flag
self.fun = fun
self.id = id
super(ResultThread,self).__init__()
def run(self):
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://%s:%d" % (config.server,config.server_port))
data = {‘fun_args‘:self.fun_args,‘jid‘:self.jid,
‘success‘:self.success,‘fun‘:self.fun,‘id‘:self.id}
self.write_to_log(data)
packed_data = msgpack.packb(data)
socket.send(packed_data)
message = socket.recv()
socket.close()
context.term()
# write data to local log
# if send failed, we can check the log to check the exe result
def write_to_log(self,data):
log_file = config.access_log
with open(log_file,‘a+‘) as fp:
fp.writelines([str(data),‘\n‘])
def filter_events():
‘‘‘
filter the return result
‘‘‘
sock_dir = config.sock_dir
event = salt.utils.event.MasterEvent(sock_dir)
try:
for eachevent in event.iter_events(full=True):
ret = eachevent[‘data‘]
# get the minions‘ return result
if ‘salt/job/‘ in eachevent[‘tag‘]:
if ret.has_key(‘id‘) and ret.has_key(‘return‘):
# Igonre saltutil.find_job event
if ret[‘fun‘] == "saltutil.find_job":
continue
# use thread handle data
result_thd = ResultThread(ret[‘fun_args‘],ret[‘jid‘],
ret[‘return‘],ret[‘success‘],ret[‘fun‘],ret[‘id‘])
result_thd.start()
# get new job‘s info: jid and minions
elif eachevent[‘tag‘] == ‘new_job‘:
# Igonre saltutil.find_job event
if ret[‘fun‘] == "saltutil.find_job":
continue
# handle data
job_pro = JobProcess(ret[‘jid‘],ret[‘minions‘])
job_pro.run()
# other event
else:
pass
except BaseException,e:
if str(e):
err = str(e)
else:
err = ‘Terminated‘
print err
with open(config.error_log,‘a+‘) as fp:
fp.writelines([err,‘\n‘])
sys.exit(2)
def main():
filter_events()
if __name__ == ‘__main__‘:
main()配置文件config.py如下
#!/usr/bin/env python # coding: utf-8 # listen server ip and port server = ‘192.168.110.150‘ server_port = 10000 # the mysql server info host = ‘localhost‘ user = ‘salt‘ passwd = ‘salt‘ db = ‘salt‘ port = 3306 # the master local sock dir sock_dir = ‘/opt/app/salt/run/master‘ # log access_log = ‘/tmp/salt_access.log‘ error_log = ‘/tmp/salt_error.log‘
filter_events函数是使用event过滤new_job和ret事件的。new_job事件采用JobProcess类处理,使用单进程。ret事件采用ResultThread类处理,使用多线程。
这里有三个东西解释下:
第一个就是为什么采用zmq协议框架。首先是不用考虑服务端和客户端的启动顺序,其次就是server挂了,客户端运行,只要启动server,原先的数据并不会丢失。如果使用传统socket,server挂了,就需要重启客户端再启动服务端,数据会丢失。
第二个就是为什么需要采集new_job信息,我们知道new_job包含jid和有关的minions,我们先把这些数据入库,然后根据ret信息将jid+id对应记录更新。但是有些minion没有启动的话,是没有ret信息的,如果此时不先把new_job信息入库,那没有返回ret信息的minion就不能统计到。
第三个就是为什么new_job采用单线程而ret采用多线程发送。你想想,如果new_job也是用多线程,server端挂了,那么此时在client端会堆积很多线程,会导致"Too many files open",从而导致client被迫退出。而如果采用单线程发送的话,server挂了,那么此时客户端会阻塞在recv上,就不会堆积很多线程了。
服务端:
1、接受客户端的数据
2、new_job信息采用单线程写入数据库,ret信息使用多线程写入数据库
3、确保new_job信息先写入,ret信息用于更新记录
4、根据信息是否有success属性判断信息类别
server.py如下
#!/usr/bin/env python
# coding: utf-8
import zmq
import msgpack
import MySQLdb
import threading
from contextlib import contextmanager
from sys import exit
import config
@contextmanager
def _get_serv(commit=False):
‘‘‘
Return a mysql cursor
‘‘‘
conn = MySQLdb.connect(host=config.host, user=config.user,
passwd=config.passwd, db=config.db,port=config.port)
cursor = conn.cursor()
try:
yield cursor
except MySQLdb.DatabaseError as err:
error, = err.args
sys.stderr.write(error.message)
cursor.execute("ROLLBACK")
raise err
else:
if commit:
cursor.execute("COMMIT")
else:
cursor.execute("ROLLBACK")
finally:
conn.close()
class HandleProcess(object):
‘‘‘
insert jid and id to mysql
‘‘‘
def __init__(self,data):
self.data = data
def run(self):
with _get_serv(commit=True) as cur:
sql = "select jid,id from salt_returns where id=%s and jid=%s"
num = cur.execute(sql,(self.data[‘id‘],self.data[‘jid‘]))
if num:
pass
else:
sql = "insert into salt_returns(id,jid) values(%s,%s)"
cur.execute(sql,(self.data[‘id‘],self.data[‘jid‘]))
class HandleThread(threading.Thread):
‘‘‘
update the result to mysql
‘‘‘
def __init__(self,data):
self.data = data
super(HandleThread,self).__init__()
def run(self):
with _get_serv(commit=True) as cur:
sql = "select jid,id from salt_returns where id=%s and jid=%s"
num = cur.execute(sql,(self.data[‘id‘],self.data[‘jid‘]))
# the fun_args is a list ,need convert to str
fun_args = str(self.data[‘fun_args‘])
# if jid and id is exist
# then update the data
if num:
sql = "update salt_returns set fun_args=%s,success=%s,fun=%s where id=%s and jid=%s"
cur.execute(sql,(fun_args,self.data[‘success‘],
self.data[‘fun‘],self.data[‘id‘],self.data[‘jid‘]))
def handle_data(message):
unpack_data = msgpack.unpackb(message)
if unpack_data.has_key(‘success‘):
thd = HandleThread(unpack_data)
thd.start()
else:
pro = HandleProcess(unpack_data)
pro.run()
def main():
context = zmq.Context()
socket = context.socket(zmq.REP)
try:
socket.bind("tcp://%s:%d" % (config.server, config.server_port))
except zmq.error.ZMQError,msg:
print ‘Bind failed:‘ + str(msg)
with open(config.error_log,‘a+‘) as fp:
fp.writelines([‘Bind failed: ‘,str(msg),‘\n‘])
socket.close()
context.term()
exit(1)
while True:
try:
message = socket.recv()
handle_data(message)
socket.send(‘OK‘)
except BaseException,e:
if str(e):
err = str(e)
else:
err = ‘Terminated‘
print err
with open(config.error_log,‘a+‘) as fp:
fp.writelines([err,‘\n‘])
socket.close()
context.term()
exit(2)
if __name__ == "__main__":
main()这个结构可以用于大型的master-syndic-minion框架,只要稍微修改下数据的写入就行。
以后这个会用于web界面的执行结果统计。
本文出自 “fly天地” 博客,请务必保留此出处http://liuping0906.blog.51cto.com/2516248/1533289
原文地址:http://liuping0906.blog.51cto.com/2516248/1533289