码迷,mamicode.com
首页 > 其他好文 > 详细

kafka初步使用

时间:2021-01-15 11:46:10      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:size   mem   超过   sum   ext   tac   xtend   save   收集   

生产者
KafkaExtendProducer.send("topic","key","value");
消费者

@Slf4j
@RequiredArgsConstructor
@Component
public class ConsumerThread implements Runnable {

private final KafkaConsumerBuilder kafkaConsumerBuilder;

@PostConstruct
public void init() {
	for (int i = 0; i < MqConstant.MESSAGE_THREAD_COUNT; i++) {
		new Thread(this, "consume-proxy-thread-" + i).start();
	}
}

@Override
public void run() {
	KafkaConsumer<?, ?> consumer = kafkaConsumerBuilder.addTopic("11").build();
	log.info("ConsumerThread {} run...", Thread.currentThread().getName());
	List<UserAction> list = new ArrayList<>();
	long lsatInsertTime = 0;
	while (Boolean.TRUE) {
		ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(1));
		try {
			if (!records.isEmpty()) {
				for (ConsumerRecord record : records) {
					UserAction userAction = JSONUtil.toBean((String) record.value(), UserAction.class);
					list.add(userAction);
					if (list.size() >= MqConstant.DATA_SIZE) {
						log.info("批量插入{}条数据", list.size());
						lsatInsertTime = System.currentTimeMillis();
						consumer.commitSync();
						list.clear();
					}
				}
			}
			else {
				if (lsatInsertTime != 0 && (System.currentTimeMillis() - lsatInsertTime) > MqConstant.INVALID_TIME
						&& list.size() > 0) {
					log.info("批量插入已经保存超过30s的行为上报数据");
					lsatInsertTime = System.currentTimeMillis();
					consumer.commitSync();
					list.clear();
				}
			}
		}
		catch (Exception e) {
			e.printStackTrace();
			log.error("行为上报信息异常{}", e.getMessage());
			// todo 收集err信息并保存
			// String errStr = Convert.toStr(list);
			// ConsumeErrorMsg consumeErrorMsg = new ConsumeErrorMsg();
			// consumeErrorMsg.setErrFrom(MqConstant.ACTION_TOPIC);
			// consumeErrorMsg.setErrInfo(e.getMessage());
			// consumeErrorMsg.setErrData(errStr);
			// consumeErrorMsg.setState(0);
			// consumeErrorMsg.setCreateAt(LocalDateTime.now());
			// consumeErrorMsgService.saveMsg(consumeErrorMsg);
			list.clear();
		}
	}
}

}

kafka初步使用

标签:size   mem   超过   sum   ext   tac   xtend   save   收集   

原文地址:https://www.cnblogs.com/lyj98/p/14276782.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!