码迷,mamicode.com
首页 > 编程语言 > 详细

springboot 集成storm的redis

时间:2019-09-02 19:04:21      阅读:197      评论:0      收藏:0      [点我收藏+]

标签:算数   exce   wait   cep   mys   shm   消费   下一步   received   

springboot 集成storm,计算日志中的展示信息,将实时的计算数据存储到redis中,并判断redis中的数量信息进行下一步的操作,存储到mysql中等

1.配置redis参数,redis采用集群模式,需要配置redis集群

spring:
redis:
database: 0
password:
cluster:
nodes:
- 127.0.0.1:6380
maxRedirects: 3
pool:
max-idle: 8
min-idle: 0
max-active: 8
max-wait: -1

2.redis配置类实现
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisConfig {

@Autowired
private JedisConnectionFactory jedisConnectionFactory;

@Bean("redisTemplate")
public RedisTemplate<?, ?> getRedisTemplate(){
RedisTemplate<?,?> template = new StringRedisTemplate(jedisConnectionFactory);
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(genericJackson2JsonRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(genericJackson2JsonRedisSerializer);
return template;
}


}

public class RedisConfUtils {

/**
* {@link org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration#}
* @param redisProperties
* @return
*/
public static RedisClusterConfiguration getClusterConfiguration(RedisProperties redisProperties) {
if (redisProperties.getCluster() == null) {
return null;
}
RedisProperties.Cluster clusterProperties = redisProperties.getCluster();
RedisClusterConfiguration config = new RedisClusterConfiguration(
clusterProperties.getNodes());

if (clusterProperties.getMaxRedirects() != null) {
config.setMaxRedirects(clusterProperties.getMaxRedirects());
}
return config;
}

public static RedisTemplate buildRedisTemplate(byte[] redisProperties){
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(
RedisConfUtils.getClusterConfiguration(
(RedisProperties) Serializer.INSTANCE.deserialize(redisProperties)));
RedisTemplate<String, Long> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
jedisConnectionFactory.afterPropertiesSet();

GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
3.storm 日志控制Builder配置redis信息,将redis信息传递到控制类中
@Getter
@Setter
@Configuration
@DependsOn("redisTemplate")
@ConfigurationProperties(prefix = "storm.bolt.logConsoleBolt")
public class LogConsoleBoltBuilder extends BoltBuilder {
@Autowired
private RedisProperties redisProperties;
private int emitFrequencyInSeconds = 60;//每60s发射一次数据

@Bean("logConsoleBolt")
public LogConsoleBolt buildBolt() {
super.setId("logConsoleBolt");
LogConsoleBolt logConsoleBolt = new LogConsoleBolt();
logConsoleBolt.setRedisProperties(Serializer.INSTANCE.serialize(redisProperties));
logConsoleBolt.setEmitFrequencyInSeconds(emitFrequencyInSeconds);
return logConsoleBolt;
}
}

4.storm 日志控制类获取实例化redis信息,将计算得到的信息存储到redis中
@Slf4j
public class LogConsoleBolt extends BaseRichBolt {
private final static String AD_LIST_SHOW_COUNT = "AD_LIST_SHOW_COUNT";
private OutputCollector collector;
private HashOperations<String, String, Long> hashOperations;
@Setter
private byte[] redisProperties;
@Setter
private int emitFrequencyInSeconds;

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
this.hashOperations = RedisConfUtils.buildRedisTemplate(redisProperties).opsForHash();
}
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
/**
* 这里配置TickTuple的发送频率
*/
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;
}

@Override
public void execute(Tuple input) {
try {
log.info(input.toString());
//判断日志类型,不是需要的日志则不做处理
if (input.size()<5){
collector.ack(input);
}else {
String value = input.getStringByField("value").toString();

AdShowLogEntity adShowLogEntity = AdShowLogEntity.logToEntity(value);
if (adShowLogEntity != null){
AdShowLogEntity.Message msg = adShowLogEntity.getMessage().get(0);
// 输出
// collector.emit(new Values(LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), String.valueOf(1)));
            //存储信息到redis
Long cont = hashOperations.increment(AD_LIST_SHOW_COUNT, LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), 1l);
collector.emit(new Values(Integer.parseInt(msg.getCreativeId()),System.currentTimeMillis(),0.01f));
}else {
// collector.ack(input);
}
collector.ack(input);
// System.out.println("received from kafka : "+ value);
// 必须ack,否则会重复消费kafka中的消息
}


}catch (Exception e){
e.printStackTrace();
collector.fail(input);
}


}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("adId","updateTime","price")); //分词定义的field为word
}
}


springboot 集成storm的redis

标签:算数   exce   wait   cep   mys   shm   消费   下一步   received   

原文地址:https://www.cnblogs.com/flyyu1/p/11448039.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!