标签:bean factory let 部署 lin ctr red git 博文
Pinpoint 是一套APM (Application Performance Management)工具,主要用于帮助分析系统的总体结构和组件如何相互调用,也可用于追踪线上性能问题,方便定位出现问题的点。
Pinpoint主要有如下几个组成部分:
Spring Task:Spring内置的定时任务。
Spring Batch: 一个大数据量的并行处理框架。
通过Spring Task的定时任务,每分钟做一次通过SpringBatch的批处理检查。
https://github.com/naver/pinpoint/tree/master/web/src/main/java/com/navercorp/pinpoint/web/alarm
<task:scheduled-tasks scheduler="scheduler"> <task:scheduled ref="batchJobLauncher" method="alarmJob" cron="0 0/1 * * * *" /> <!--省略--> </task:scheduled-tasks> <task:scheduler id="scheduler" pool-size="5"/>
此段代码标签使用的是SpringTask的标签,大概意思为,定义了一个线程池大小为5的调度器scheduler。
scheduler执行的任务有三个,batchJobLauncher的alarmJob方法,每一分钟执行一次,alarmJob,顾名思义,报警的任务。
前置说明-SpringBatch批处理框架中与此相关的解释:
如果是批处理的话,自然有批处理任务(对应下面的Job标签),每个任务自然有一个或者多个步骤(对应下面的Step标签)。
每个步骤有三个操作,读取数据(对应reader),处理数据(对应processor),回写数据(对应writer)。这三者中的参数是按照顺序传递的。
大家可能会想,报警机制和读取数据,处理数据,回写数据有什么关系吗?下面我说明一下pinpoint相关的对应的业务关系:
reader:读取数据 => 通过用户配置的规则提供Checker,即异常校验器。
processor:处理数据 => 用Checker进行校验,标记异常状态。
writer:回写数据 => 判断Checker是否有异常情况,有则报警。
下面的配置的源码:
<!--定义了一个alramJob的批处理任务-->
<batch:job id="alarmJob">
<batch:step id="alarmPartitionStep">
<!--此alarmJob只有一个Step-->
<batch:partition step="alarmStep" partitioner="alarmPartitioner">
<!--设置执行的线程池-->
<batch:handler task-executor="alarmPoolTaskExecutorForPartition" />
</batch:partition>
</batch:step>
<batch:listeners>
<batch:listener ref="jobFailListener"/>
</batch:listeners>
</batch:job>
<batch:step id="alarmStep">
<!--代表step的一种处理策略-->
<batch:tasklet>
<!--批处理流程-->
<!-- 顺序执行
reader:读取数据 => 提供Checker,即异常校验器,见下面的bean
processor:处理数据 => 用Checker进行校验,见下面的bean
writer:回写数据 => 判断Checker是否有异常情况,有则报警,见下面的bean
-->
<batch:chunk reader="reader" processor="processor" writer="writer" commit-interval="1"/>
</batch:tasklet>
</batch:step>
<bean id="alarmPartitioner" class="com.navercorp.pinpoint.web.alarm.AlarmPartitioner"/>
<bean id="reader" class="com.navercorp.pinpoint.web.alarm.AlarmReader" scope="step"/>
<bean id="processor" class="com.navercorp.pinpoint.web.alarm.AlarmProcessor" scope="step"/>
<bean id="writer" class="com.navercorp.pinpoint.web.alarm.AlarmWriter" scope="step"/>
// StepExecutionListener监听器,可以定义Step开始前后的操作
public class AlarmReader implements ItemReader<AlarmChecker>, StepExecutionListener {
...
// 报警所用的Checker在内存里
private final Queue<AlarmChecker> checkers = new ConcurrentLinkedDeque<>();
...
// Checker出队,供processor使用,
@Override
public AlarmChecker read() {
return checkers.poll();
}
// 批处理之前,将应用的报警规则加入Checker之中
@Override
public void beforeStep(StepExecution stepExecution) {
// 查询所有的应用
List<Application> applicationList = applicationIndexDao.selectAllApplicationNames();
// 根据应用用户配置的规则,添加Checker到队列当中
for (Application application : applicationList) {
addChecker(application);
}
}
private void addChecker(Application application) {
// 根据应用名称获取所有的规则,应用名称就是配置agent的时候,指定的applicationName
List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
long timeSlotEndTime = System.currentTimeMillis();
Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();
// 遍历规则
for (Rule rule : rules) {
// CheckerCategory是一个枚举类,预置了所有的报警规则模版,比如失败请求次数、慢请求次数等
CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());
// 数据收集器是为检验规则准备的,例如Rule是失败请求次数,但是次数从哪里来,就是从这个收集器来的
DataCollector collector = collectorMap.get(checkerCategory.getDataCollectorCategory());
// 这里是一个基于Map的缓存
if (collector == null) {
collector = dataCollectorFactory.createDataCollector(checkerCategory, application, timeSlotEndTime);
collectorMap.put(collector.getDataCollectorCategory(), collector);
}
// 创建Checker,有兴趣的读者可以看看CheckerCategroy的源码,设计的还是很不错的。
// AlaramChecker是一个抽象方法,具体的功能由子类实现
AlarmChecker checker = checkerCategory.createChecker(collector, rule);
// 加入队列
checkers.add(checker);
}
}
...
}
public class AlarmProcessor implements ItemProcessor<AlarmChecker, AlarmChecker> {
// 此处的AlarmChecker是上面的read()方法传递过来的
@Override
public AlarmChecker process(AlarmChecker checker) {
// check,顾名思义,检验,标记是否有异常情况,check()方法见下
checker.check();
return checker;
}
}
com.navercorp.pinpoint.web.alarm.checker.AlarmProcessor
protected abstract boolean decideResult(T value); public void check() { // 收集数据 dataCollector.collect(); // 标记是否有异常情况,意为是否满足报警的阀值,decideResult是一个抽象方法 // detected字段在后续的Writter中会被检查 detected = decideResult(getDetectedValue()); }
public class AlarmWriter implements ItemWriter<AlarmChecker> {
// 需要用户自定义配置在Spring中的AlarmMessageSender,如果不配置,则是一个空实现
@Autowired(required = false)
private AlarmMessageSender alarmMessageSender = new EmptyMessageSender();
@Autowired
private AlarmService alarmService;
// 实现的接口的方法,主要内容
@Override
public void write(List<? extends AlarmChecker> checkers) throws Exception {
Map<String, CheckerResult> beforeCheckerResults = alarmService.selectBeforeCheckerResults(checkers.get(0).getRule().getApplicationId());
// 遍历上面传递的Checker
for (AlarmChecker checker : checkers) {
CheckerResult beforeCheckerResult = beforeCheckerResults.get(checker.getRule().getCheckerName());
if (beforeCheckerResult == null) {
beforeCheckerResult = new CheckerResult(checker.getRule().getApplicationId(), checker.getRule().getCheckerName(), false, 0, 1);
}
// 对上面的Processor标记的detected值进行检查
if (checker.isDetected()) {
sendAlarmMessage(beforeCheckerResult, checker);
}
// 记录报警历史
alarmService.updateBeforeCheckerResult(beforeCheckerResult, checker);
}
}
private void sendAlarmMessage(CheckerResult beforeCheckerResult, AlarmChecker checker) {
if (isTurnToSendAlarm(beforeCheckerResult)) {
// 是否配置了发送报警短信
if (checker.isSMSSend()) {
alarmMessageSender.sendSms(checker, beforeCheckerResult.getSequenceCount() + 1);
}
// 是否配置了发送报警邮件
if (checker.isEmailSend()) {
alarmMessageSender.sendEmail(checker, beforeCheckerResult.getSequenceCount() + 1);
}
}
}
// ...
}
标签:bean factory let 部署 lin ctr red git 博文
原文地址:https://www.cnblogs.com/langshiquan/p/9497464.html