码迷,mamicode.com
首页 > 其他好文 > 详细

rabbitmq用x-delayed-message的exchange特性支持消息延迟消费

时间:2020-12-19 13:38:01      阅读:4      评论:0      收藏:0      [点我收藏+]

标签:延迟   lse   tick   rpe   key   rop   erro   rgba   with   

rabbitmq版本:3.6.14

spring-core 版本: 4.2.5.RELEASE     

备注:如果springboot升级到比较高的版本,能用到更高版本的amqp依赖包和spring-core依赖包,有比以下实现delay延迟消费的更好的代码。

 

声明exchange:

        Map<String, Object> argMaps = new HashMap<>();
        argMaps.put("x-delayed-type", "direct");
        CustomExchange exchange = new CustomExchange("ticket-exchange-joyce-test", "x-delayed-message", true, false, argMaps);
        admin.declareExchange(exchange);
        admin.declareBinding(BindingBuilder.bind(queueForPendingPayment).to(exchange).with(ticketRouteKeyPendingPayment).noargs());
当消费者catch到异常时,触发delay延迟消费机制:
private static final String MAX_RETRY_TIME = "max_retry_time";
private static final String CURRENT_RETRY_ROUND = "current_retry_round";
public static final int[] RETRY_INTERVAL = {0, 2, 4, 6, 8, 10, 12, 14};
private static final String DELAY = "x-delay";
Map<String, Object> header = message.getMessageProperties().getHeaders();
            int maxRetryTimes = header.containsKey(MAX_RETRY_TIME) ? (int) header.get(MAX_RETRY_TIME) : RETRY_INTERVAL.length;
            int currentRound = header.containsKey(CURRENT_RETRY_ROUND) ? Integer.valueOf(header.get(CURRENT_RETRY_ROUND).toString()) : 1;
            if (currentRound > maxRetryTimes) {
                LOGGER.info("Message has retryed exceed max retry times, current round = {}, max retry times = {}, header current round = {}, header delay = {}" +
                                ", quit! body: {}"
                        , currentRound, maxRetryTimes, header.get(DELAY), header.get(CURRENT_RETRY_ROUND), origin);
            } else {
                int delay = RETRY_INTERVAL[Math.min(currentRound, RETRY_INTERVAL.length) - 1];
                header.put(CURRENT_RETRY_ROUND, currentRound + 1);
                header.put(DELAY, delay * 1000);
                LOGGER.info("Retrying send message to rabbitmq, current round: {}, delay(ms): {}, header current round = {}, header delay = {}, body: {}"
                        , currentRound, delay * 1000, header.get(CURRENT_RETRY_ROUND), header.get(DELAY), origin);
                AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                        .deliveryMode(2) // message is persistent
                        .contentEncoding(StandardCharsets.UTF_8.name())
                        .headers(header)
                        .build();
                try {
                    String exchange = message.getMessageProperties().getReceivedExchange();
                    String routekey = message.getMessageProperties().getReceivedRoutingKey();
                    LOGGER.info("retry info, exchange = {}, routekey = {}", exchange, routekey);
                    channel.basicPublish(
                            message.getMessageProperties().getReceivedExchange()
                            , message.getMessageProperties().getReceivedRoutingKey()
                            , properties
                            , message.getBody());


                } catch (IOException ex) {
                    LOGGER.error("Rabbitmq channel error", ex);
                }

 




 

end.

rabbitmq用x-delayed-message的exchange特性支持消息延迟消费

标签:延迟   lse   tick   rpe   key   rop   erro   rgba   with   

原文地址:https://www.cnblogs.com/zhuwenjoyce/p/14140003.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!