自定义分区器: import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; im ...
分类:
其他好文 时间:
2020-06-25 15:30:00
阅读次数:
55
kafka只接收bytes字节数组,所以自定义序列化器内部实现需按照bytes字节数组转换为标准。 重点:本例子只是提供参考怎样写自定义序列化器,因为关系到性能,一般默认使用StringSerializer即可,效率很高。 1) 自定义序列化类,转换成bytes字节数组: import cn.enj ...
分类:
其他好文 时间:
2020-06-25 13:42:15
阅读次数:
81
1)引入maven依赖 我这里使用的是springboot 2.1.3.RELEASE 版本: <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </depe ...
分类:
其他好文 时间:
2020-06-25 12:20:32
阅读次数:
155
springboot集成kafka是比较简单的是事情,但是kafka发送消息的失败回调在日常工作中,如果不容忍消息丢失的话,发送失败需要再次发送或者放到数据库中用任务重推。以下是演示用的发送类代码 @Slf4j @Component public class TestRunner implement ...
分类:
编程语言 时间:
2020-06-25 12:04:39
阅读次数:
143
kafka是什么Kafka最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的,发布/订阅模式的消息队列(Message Queue),Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。经过多年发展,Kafka已经由最初的日志 ...
分类:
其他好文 时间:
2020-06-25 10:16:04
阅读次数:
74
安装 pip3 install setuptools pip3 install pykafka pip3 install kafka-python 单台kafka import sys from kafka import KafkaConsumer from kafka.structs import ...
分类:
编程语言 时间:
2020-06-24 21:46:18
阅读次数:
80
消息队列 为什么用? 解耦,削峰,异步 基本模型 生产者 -> MQ -> 消费者 Kafka 十万级/s 适用于大数据领域实时计算,日志采集, ActiveMQ 万/s , RabbitMQ 万/s , RocketMQ 十万/s 造成问题及解决 1.系统可用性降低 (MQ发生故障,全崩了) 2. ...
分类:
其他好文 时间:
2020-06-24 19:24:18
阅读次数:
68
package test import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession} import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} ...
分类:
数据库 时间:
2020-06-24 16:23:50
阅读次数:
132
Kafka-文件管理 文件管理 保留数据是kafka的一个基本特性,kafaka不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反,kafka为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。 因为在一个大文件里查找和删除消息 ...
分类:
其他好文 时间:
2020-06-24 00:44:34
阅读次数:
76
一、消息队列概述 消息队列(MessageQueue,简称为MQ)其本质是就是个队列,FIFO先进先出,只不过是队列中储放的主要内容是message,因而叫消息队列主要用于:不同的服务server、进程process、线程thread相互间通信二、选用消息队列的场景①异步处理②流量控制③服务解耦④发 ...
分类:
系统相关 时间:
2020-06-24 00:34:13
阅读次数:
122