Administrator
发布于 2021-07-29 / 1314 阅读 / 0 评论 / 0 点赞

解决mq发消息比存数据库快的问题的另一种方法-亲测

解决mq发消息比存数据库快的问题的另一种方法-亲测

1、背景描述

在一个事务中,既做了插入数据库的操作,又做了向mq生产者发消息的操作,这个时候,mq生产者接收消息过快,导致mq消费者立刻消费,而在消费时,做了查询刚刚插入数据库的那条记录的操作,就会查不到导致空指针异常。

2、解决方案

使用spring的ApplicationEvent、ApplicationListener事件监听来达到延迟效果,就是在插入数据库的那个事务里,不再直接发消息给mq生产者,而是推送一个事件,在监听事件执行时,延迟几秒再发送消息给mq生产者。

直接上代码

  • 第一步,先建立一个Event事件类,继承ApplicationEvent
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationEvent;

/**
 * @Author: PengCheng.Wu
 * @Date: 2021/5/19 11:13
 * @Description: <自定义Mq事件>
 */

public class RabbitMqEvent extends ApplicationEvent {

    private static final long serialVersionUID = 5801729608204580636L;
    private RabbitTemplate rabbitTemplate;
    private String key;
    private Message msg;
    private CorrelationData correlationData;

    RabbitMqEvent(Object source, RabbitTemplate rabbitTemplate, String key, Message msg, CorrelationData correlationData) {
        super(source);
        this.rabbitTemplate = rabbitTemplate;
        this.key = key;
        this.msg = msg;
        this.correlationData = correlationData;
    }

    public RabbitTemplate getRabbitTemplate() {
        return rabbitTemplate;
    }

    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public Message getMsg() {
        return msg;
    }

    public void setMsg(Message msg) {
        this.msg = msg;
    }

    public CorrelationData getCorrelationData() {
        return correlationData;
    }

    public void setCorrelationData(CorrelationData correlationData) {
        this.correlationData = correlationData;
    }
}
  • 第二步,在建一个EventPublisher用于推送创建RabbitMqEvent事件
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqEventPublisher {

    private ApplicationContext applicationContext;

    @Autowired
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public void publish(RabbitTemplate rabbitTemplate, String key, Message msg, CorrelationData correlationData) {
        applicationContext.publishEvent(new RabbitMqEvent(this, rabbitTemplate, key, msg, correlationData));
    }
}
  • 第三步,建立事件监听类,继承ApplicationListener
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * @Author: PengCheng.Wu (wupengcheng@industics.com)
 * @Date: 2021/5/19 11:13
 * @Description: <实现ApplicationListener接口,同时声明监听的事件类型>
 */
@Component
public class RabbitMqEventListener implements ApplicationListener<RabbitMqEvent> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqEventListener.class);

    private RabbitMqConfig rabbitMqConfig;
    private ReportDownloadMsgDao reportDownloadMsgDao;

    @Async
    @Override
    public void onApplicationEvent(RabbitMqEvent rabbitMqEvent) {
        RabbitTemplate rabbitTemplate = rabbitMqEvent.getRabbitTemplate();
        String key = rabbitMqEvent.getKey();
        Message msg = rabbitMqEvent.getMsg();
        CorrelationData correlationData = rabbitMqEvent.getCorrelationData();

        long startTime = System.currentTimeMillis();
        LOGGER.info("(bean-RabbitMqEventListener)接收到了bean-RabbitMqEventPublisher发布的消息, 将在2秒后执行, 唯一键为 : {}", correlationData.getId());

        try {
            Thread.sleep(2000);
            ReportDownloadMsgEntity msgEntity = reportDownloadMsgDao.getById(correlationData.getId());
            try {
                rabbitTemplate.convertAndSend(rabbitMqConfig.getExchangeName(), key, msg, correlationData);
                msgEntity.setSyncStatus(MdConstants.MqRecordStatus.SENT);
                LOGGER.info("send to msg queue success: MsgId({})", msgEntity.getMsgId());
            } catch (Exception e) {
                msgEntity.setSyncStatus(MdConstants.MqRecordStatus.SENDING_FAILED);
                msgEntity.setErrorMsg(e.getMessage());
                LOGGER.error("send to msg queue error: MsgId [{}], message: {}", msgEntity.getMsgId(), e.getMessage(), e);
            } finally {
                if (Objects.nonNull(msgEntity)) {
                    reportDownloadMsgDao.updateById(msgEntity);
                }
            }
            LOGGER.info("监听唯一键为:{}的事件执行结束,MQ对外发送消息发送成功,耗时:{}", correlationData.getId(), (System.currentTimeMillis() - startTime));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Autowired
    public void setRabbitMqConfig(RabbitMqConfig rabbitMqConfig) {
        this.rabbitMqConfig = rabbitMqConfig;
    }

    @Autowired
    public void setReportDownloadMsgDao(ReportDownloadMsgDao reportDownloadMsgDao) {
        this.reportDownloadMsgDao = reportDownloadMsgDao;
    }
}
  • 调用方法

rabbitMqEventPublisher.publish(rabbitTemplate, key, msg, correlationData);

    @Autowired
		private RabbitMqEventPublisher rabbitMqEventPublisher;
		/**
     * 对外发送消息
     *
     * @param message 具体的消息内容
     * @param prop    附件属性
     * @throws Exception
     */
    public void sendMsgToMq(Object message, Map<String, Object> prop, String key) {
        ReportTaskHead taskHead = JSON.parseObject(message.toString(), ReportTaskHead.class);
        LOGGER.info("send sendMsgToMq:{'MSG_ID':{},'REPORT_ID':{}},'REQUEST_ID':{}}} to queue:{}", taskHead.getMsgId(), taskHead.getReportId(), taskHead.getRequestId(), key);
        ReportDownloadMsgEntity msgEntity = reportDownloadMsgDao.createSyncMsg(taskHead);

        MessageProperties messageProperties = new MessageProperties();
        prop.forEach(messageProperties::setHeader);
        Message msg = new Message(message.toString().getBytes(), messageProperties);

        // 设置回调
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnsCallback);

        // 指定此条消息唯一的ID
        CorrelationData correlationData = new CorrelationData(msgEntity.getMsgId());
        //建立事件,延时给mq发消息

        rabbitMqEventPublisher.publish(rabbitTemplate, key, msg, correlationData);
        LOGGER.info("发送执行[MQ对外发送消息]事件成功,唯一键为: " + correlationData.getId() + ", routingKey: " + key);
    }

  • 上面这一段,有个reportDownloadMsgDao.createSyncMsg(taskHead);这个插入数据库的操作,就是这里存在的问题
  • 到此,每次插入数据库的时候,都会推送一个事件,然后由监听器去获取事件然后延迟推送给Mq

3、注意点

  • 在RabbitMqEventListener这个类中的重写的方法onApplicationEvent上面,一定一定一定要加@Async,让这个方法异步执行,否则依然不会达到延迟的效果。
  • 如果要开启@Async,则需要在WebApplication上加上注解@EnableAsync开启异步

感谢大家的支持,给个一键三连呗~

End.


评论