RocketMq配置死信队列

1周前 (2020-07-27)     作者:石马人山     分类:MQ     阅读次数:     评论(0)    文章页统计代码

    RocketMq死信队列,通过对正常队列中消息设置失效时间,并指定失效后策略(设置死信交换机),达到延时处理任务的目的。通过该方式能极大减少无意义的批处理任务,同时减少传统批处理任务大量sql空查询,减少资源消耗。

注:以下代码仅为部分示例,需要配置mq连接池之后才能真正运行起来。


生产者推送消息配置

package com.longlonggo.test.mq;

import com.mqclient.config.ClientConfig;
import com.mqclient.dto.MsgDto;
import com.mqclient.dto.MsgTypeEnum;
import com.mqclient.dto.SendModelEnum;
import com.mqclient.util.ChannelUtil;
import com.mqclient.util.MqConfig;
import cn.hutool.core.date.DateUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * 生产者测试类
 */
public class ProducerTest {
    public static void main(String[] args) throws Exception{

        //正常队列
        String normalExchange = "longlonggo_normal_queue";
        String normalQueue = "longlonggo_normal_queue";
        String normalRoutingKey = "tolonglonggoDlx";
        String normalConnectionName = "longlonggo_normal_conn";

        //死信队列配置,交换机名称使用默认队列
        String dlxExchange = "longlonggo_dlx_exchange";
        String dlxQueue = "longlonggo_dlx_queue";
        String dlxRoutingKey = "#";
        String dlxConnectionName = "longlonggo_dlx_conn";

        //校验MQ是否初始化
        ClientConfig.checkAppType();

        //信道持久化模式:永久; 
	//设置消息等待N毫秒推送至死信队列(失效时间也可以设置在队列上,如果都设置则以近的优先)
        AMQP.BasicProperties pros = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .expiration("10000")
                .build();

        // 构建消息队列入参
        Map<String, String> map = new HashMap<>(1);
        map.put("content", DateUtil.formatDateTime(new Date()));
        map.put("mqType", "mqType");

        // 设置队列消息过期后,丢到的死信交换机
        Map<String, Object> arguments = new HashMap<String, Object>(16);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        // 设置队列中的消息 10s 钟后过期【失效时间可以设置在队列上,也可以设置在消息上】
        //arguments.put("x-message-ttl", 10000);

        //正常的队列绑定
        Channel channel = ChannelUtil.getChannel(normalConnectionName);
        channel.exchangeDeclare(normalExchange, "topic", true, false, null);
        channel.queueDeclare(normalQueue, true, false, false, arguments);
        channel.queueBind(normalQueue, normalExchange, normalRoutingKey);

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchange, "topic", true, false, null);
        channel.queueDeclare(dlxQueue, true, false, false, null);
        channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey);

        //执行推送
        channel.basicPublish(normalExchange, normalRoutingKey, false, false, pros, map.toString().getBytes());

        System.err.println("消息发送完成......");
    }

}


消费者配置

package com.longlonggo.test.mq;

import com.mqclient.config.ClientConfig;
import com.mqclient.util.ChannelUtil;
import com.mqclient.util.MqConfig;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 消费者测试类
 */
public class ConsumerTest {

    //消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了
    private static final String QUEUE_NAME = "longlonggo_dlx_queue";
    public static void main(String[] args) throws Exception {

        // 创建信道
        final Channel channel = ChannelUtil.getChannel("longlonggo_dlx_conn");

        System.out.println("消费者启动 ..........");

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String resultMsg = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
                resultMsg += "死信队列接收到消息:";
                resultMsg += new String(body);
                System.err.println(resultMsg);
                System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);
    }
}


除非注明,发表在“石马人山的博客”的文章『RocketMq配置死信队列』版权归石马人山所有。 转载请注明出处为“本文转载于『石马人山的博客』原地址http://www.longlonggo.com/html/1///150/180/521.html
文章页分享代码