码迷,mamicode.com
首页 > 其他好文 > 详细

监控kafka队列长度

时间:2020-09-23 23:30:54      阅读:30      评论:0      收藏:0      [点我收藏+]

标签:one   data   groups   pen   kafka   das   strftime   inf   column   

#!/usr/local/python37/bin/python #获取kafka命令中lags的值,来判定现在队列中有多少未消费,如果超过5000,则可能有延迟 import os import re import datetime import time import pandas as pd import numpy as np import subprocess #now=datetime.datetime.now().strftime("%Y-%m-%dT%H:%M") now=time.time() lagsInfos=os.popen("sh /opt/elk/kafka_node1/bin/kafka-consumer-groups.sh --describe --bootstrap-server 192.168.10.100:9092 --group logstash | awk ‘{if($5>20){print $1,$5}}‘").read() #定义dataframe的index及value的列表 columnList=[] #lagList=[] #print(lagsInfos.splitlines()) for i in range(1,len(lagsInfos.splitlines())): lagList=[] lagInfo=lagsInfos.splitlines()[i].split() lagList.append(lagInfo[0]) lagList.append(int(lagInfo[1])) columnList.append(lagList) df=pd.DataFrame(columnList,columns=["topics","LAG"]) dfResult=df.groupby("topics",as_index=False).sum() h1=dfResult.loc[dfResult["LAG"]>100,["topics","LAG"]] #print(h1) if len(h1)==0: print("OK") else: #将要发送的短信内容中的空格和换行符替换成url里面的格式,否则在发送短信时会报错 msg=str(h1).replace(" ","%20").replace("\n","%0a") url="‘http://sms.domain.com/Smsweb/sms?pid=smsPid&pwd=Mjdfadklfae&phone=1111111111&msg="+msg+"‘" print(url) result=subprocess.getoutput("curl " + url) print(result)

监控kafka队列长度

标签:one   data   groups   pen   kafka   das   strftime   inf   column   

原文地址:https://blog.51cto.com/happyting/2536191

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有 京ICP备13008772号-2
迷上了代码!