标签:系统 设置 sch 一个 mes crm系统 ret container prope
公司最近在开发CRM系统的时候,需要将ERP的订单数据实时的传输到CRM系统中,但是由于每天的订单量特别大,采用实时获取后并存储到数据库中,接口的相应速度较慢,性能较差。经过经过多方位评估采用在数据库与接口层添加RabbitMQ作为缓冲层来实现。
具体为:
1、ESB将订单数据实时推送至CRM Controller接口
2、CRM Controller调用RabbitMQ的生产者将数据推送至RabbitMQ服务器
3、消费者实时消费RabbitMQ的订单并进行处理并存储至数据库中
这里只是实现了RabbitMQ的生产者和消费者,采用的RabbitMQ Direct模式,代码如下:
@SpringBootApplication
@EnableScheduling
public class RabbitMQApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitMQApplication.class, args);
}
}
@Configuration
public class ProducerConfig {
private static final Logger logger = LoggerFactory.getLogger(ProducerConfig.class);
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
connectionFactory.setPublisherConfirms(true); //Use full publisher confirms, with correlation data and a callback for each message.
connectionFactory.setPublisherReturns(true);
connectionFactory.setConnectionTimeout(6000); //超时时间
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //采用默认模式创建模板
rabbitTemplate.setMandatory(true); //强制
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//TODO 数据没有投递成功,可以在这里进行一些操作,比如讲消息保存到数据库中,等待下次进行投递
logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return rabbitTemplate;
}
}
@Component
public class ProducerProcessing {
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void directSend() {
rabbitTemplate.convertAndSend(ReciverProcessing.DIRECT_EXCHANGE, ReciverProcessing.DIRECT_ROUTINGKEY, "订单信息");
System.out.println("消息投递完成");
}
// @Scheduled(fixedDelay = 1000, initialDelay = 500)
public void fanoutSend() {
rabbitTemplate.convertAndSend(ReciverProcessing.FANOUT_EXCHANGE, "", "订单信息");
System.out.println("消息投递完成");
}
}
@Configuration
public class ReciverProcessing {
//Direct模式
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String DIRECT_ROUTINGKEY = "direct_rountingkey";
public static final String DIRECT_QUEUE = "direct_queue";
@Autowired
private ConnectionFactory connectionFactory;
//发布订阅模式
public static final String FANOUT_EXCHANGE = "fanout_exchange";
public static final String FANOUT_QUEUE1 = "fanout_queue1";
public static final String FANOUT_QUEUE2 = "fanout_queue2";
/**
* 实例化 DirectExchange
*
* @return
*/
@Bean
public DirectExchange directExchange() {
/**
* Exchange名称
* Exchange持久化
* Exchange 是否自动删除
*/
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
/**
* 创建广播模式交换机
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}
@Bean
public Queue directQueue() {
/**
* 队列名称
* 队列是否持久化
*/
return new Queue(DIRECT_QUEUE, true); //队列持久
}
@Bean
public Binding directBinding() {
/**
* 交换器与队列绑定
*/
return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
}
@Bean
public SimpleMessageListenerContainer DirectMessageContainer() {
//创建队列监听容器
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//监听的队列,可多个
simpleMessageListenerContainer.setQueues(directQueue());
simpleMessageListenerContainer.setExposeListenerChannel(true);
//最大消费者数量
simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
//设置并发数
simpleMessageListenerContainer.setConcurrentConsumers(3);
//一次拉取消息的数量
simpleMessageListenerContainer.setPrefetchCount(1);
//确认模式
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String body = new String(message.getBody());
//TODO 这里开始处理消息,处理消息成功以后可以发送确认消息成功
System.out.println("当前线程 " + Thread.currentThread().getName() + " 收到的消息为: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
});
return simpleMessageListenerContainer;
}
/**
* 队列1
*
* @return
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE1, true);
}
/**
* 队列2
*
* @return
*/
@Bean
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE2, true);
}
/**
* 队列1绑定
*
* @return
*/
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
/**
* 队列2绑定
*
* @return
*/
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
/**
* 监听队列进行消费,这里相当于一个消费者监听了两个队列,在实际开发应用中,发布订阅模式监听两个队列,收到的消息是一样的,所以这样做是没有什么意义的
* 可以设置两个消费者,每个消费者只监听其中的一个队列,收到消息后根据需要来进行不同的处理
*
* @return
*/
@Bean
public SimpleMessageListenerContainer fanoutMessageContainer() {
//创建队列监听容器
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//监听的队列,可多个
simpleMessageListenerContainer.setQueues(fanoutQueue1(), fanoutQueue2());
simpleMessageListenerContainer.setExposeListenerChannel(true);
//最大消费者数量
simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
//设置并发数
simpleMessageListenerContainer.setConcurrentConsumers(3);
//一次拉取消息的数量
simpleMessageListenerContainer.setPrefetchCount(1);
//确认模式
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String body = new String(message.getBody());
//TODO 这里开始处理消息,处理消息成功以后可以发送确认消息成功
System.out.println("当前线程 " + Thread.currentThread().getName() + " 收到的消息为: " + body);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
}
});
return simpleMessageListenerContainer;
}
}
标签:系统 设置 sch 一个 mes crm系统 ret container prope
原文地址:https://www.cnblogs.com/haizhilangzi/p/12301742.html