解决mq发消息比存数据库快的问题的另一种方法-亲测
1、背景描述
在一个事务中,既做了插入数据库的操作,又做了向mq生产者发消息的操作,这个时候,mq生产者接收消息过快,导致mq消费者立刻消费,而在消费时,做了查询刚刚插入数据库的那条记录的操作,就会查不到导致空指针异常。
2、解决方案
使用spring的ApplicationEvent、ApplicationListener事件监听来达到延迟效果,就是在插入数据库的那个事务里,不再直接发消息给mq生产者,而是推送一个事件,在监听事件执行时,延迟几秒再发送消息给mq生产者。
直接上代码
- 第一步,先建立一个Event事件类,继承ApplicationEvent
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationEvent;
/**
* @Author: PengCheng.Wu
* @Date: 2021/5/19 11:13
* @Description: <自定义Mq事件>
*/
public class RabbitMqEvent extends ApplicationEvent {
private static final long serialVersionUID = 5801729608204580636L;
private RabbitTemplate rabbitTemplate;
private String key;
private Message msg;
private CorrelationData correlationData;
RabbitMqEvent(Object source, RabbitTemplate rabbitTemplate, String key, Message msg, CorrelationData correlationData) {
super(source);
this.rabbitTemplate = rabbitTemplate;
this.key = key;
this.msg = msg;
this.correlationData = correlationData;
}
public RabbitTemplate getRabbitTemplate() {
return rabbitTemplate;
}
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public Message getMsg() {
return msg;
}
public void setMsg(Message msg) {
this.msg = msg;
}
public CorrelationData getCorrelationData() {
return correlationData;
}
public void setCorrelationData(CorrelationData correlationData) {
this.correlationData = correlationData;
}
}
- 第二步,在建一个EventPublisher用于推送创建RabbitMqEvent事件
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@Component
public class RabbitMqEventPublisher {
private ApplicationContext applicationContext;
@Autowired
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public void publish(RabbitTemplate rabbitTemplate, String key, Message msg, CorrelationData correlationData) {
applicationContext.publishEvent(new RabbitMqEvent(this, rabbitTemplate, key, msg, correlationData));
}
}
- 第三步,建立事件监听类,继承ApplicationListener
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* @Author: PengCheng.Wu (wupengcheng@industics.com)
* @Date: 2021/5/19 11:13
* @Description: <实现ApplicationListener接口,同时声明监听的事件类型>
*/
@Component
public class RabbitMqEventListener implements ApplicationListener<RabbitMqEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqEventListener.class);
private RabbitMqConfig rabbitMqConfig;
private ReportDownloadMsgDao reportDownloadMsgDao;
@Async
@Override
public void onApplicationEvent(RabbitMqEvent rabbitMqEvent) {
RabbitTemplate rabbitTemplate = rabbitMqEvent.getRabbitTemplate();
String key = rabbitMqEvent.getKey();
Message msg = rabbitMqEvent.getMsg();
CorrelationData correlationData = rabbitMqEvent.getCorrelationData();
long startTime = System.currentTimeMillis();
LOGGER.info("(bean-RabbitMqEventListener)接收到了bean-RabbitMqEventPublisher发布的消息, 将在2秒后执行, 唯一键为 : {}", correlationData.getId());
try {
Thread.sleep(2000);
ReportDownloadMsgEntity msgEntity = reportDownloadMsgDao.getById(correlationData.getId());
try {
rabbitTemplate.convertAndSend(rabbitMqConfig.getExchangeName(), key, msg, correlationData);
msgEntity.setSyncStatus(MdConstants.MqRecordStatus.SENT);
LOGGER.info("send to msg queue success: MsgId({})", msgEntity.getMsgId());
} catch (Exception e) {
msgEntity.setSyncStatus(MdConstants.MqRecordStatus.SENDING_FAILED);
msgEntity.setErrorMsg(e.getMessage());
LOGGER.error("send to msg queue error: MsgId [{}], message: {}", msgEntity.getMsgId(), e.getMessage(), e);
} finally {
if (Objects.nonNull(msgEntity)) {
reportDownloadMsgDao.updateById(msgEntity);
}
}
LOGGER.info("监听唯一键为:{}的事件执行结束,MQ对外发送消息发送成功,耗时:{}", correlationData.getId(), (System.currentTimeMillis() - startTime));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Autowired
public void setRabbitMqConfig(RabbitMqConfig rabbitMqConfig) {
this.rabbitMqConfig = rabbitMqConfig;
}
@Autowired
public void setReportDownloadMsgDao(ReportDownloadMsgDao reportDownloadMsgDao) {
this.reportDownloadMsgDao = reportDownloadMsgDao;
}
}
- 调用方法
rabbitMqEventPublisher.publish(rabbitTemplate, key, msg, correlationData);
@Autowired
private RabbitMqEventPublisher rabbitMqEventPublisher;
/**
* 对外发送消息
*
* @param message 具体的消息内容
* @param prop 附件属性
* @throws Exception
*/
public void sendMsgToMq(Object message, Map<String, Object> prop, String key) {
ReportTaskHead taskHead = JSON.parseObject(message.toString(), ReportTaskHead.class);
LOGGER.info("send sendMsgToMq:{'MSG_ID':{},'REPORT_ID':{}},'REQUEST_ID':{}}} to queue:{}", taskHead.getMsgId(), taskHead.getReportId(), taskHead.getRequestId(), key);
ReportDownloadMsgEntity msgEntity = reportDownloadMsgDao.createSyncMsg(taskHead);
MessageProperties messageProperties = new MessageProperties();
prop.forEach(messageProperties::setHeader);
Message msg = new Message(message.toString().getBytes(), messageProperties);
// 设置回调
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnsCallback);
// 指定此条消息唯一的ID
CorrelationData correlationData = new CorrelationData(msgEntity.getMsgId());
//建立事件,延时给mq发消息
rabbitMqEventPublisher.publish(rabbitTemplate, key, msg, correlationData);
LOGGER.info("发送执行[MQ对外发送消息]事件成功,唯一键为: " + correlationData.getId() + ", routingKey: " + key);
}
- 上面这一段,有个reportDownloadMsgDao.createSyncMsg(taskHead);这个插入数据库的操作,就是这里存在的问题
- 到此,每次插入数据库的时候,都会推送一个事件,然后由监听器去获取事件然后延迟推送给Mq
3、注意点
- 在RabbitMqEventListener这个类中的重写的方法onApplicationEvent上面,一定一定一定要加
@Async
,让这个方法异步执行,否则依然不会达到延迟的效果。 - 如果要开启
@Async
,则需要在WebApplication上加上注解@EnableAsync
开启异步
感谢大家的支持,给个一键三连呗~
End.