程序启动后立即有个后台线程开始发送消息(类似于持续监听接受消息改成持续发送消息)
当RabbitMQ全新账号初始默认配置下,程序启动后,控制台输出以下内容,问题现象就是在发送消息在创建交换机、队列、绑定之前,导致发送消息找不到交换机。但是在SpringTest测试中是没有问题的。我的考虑是不是应该借鉴connectionFactory.createConnection();
手动触发一下?
2021-04-28 10:25:35.026 ERROR 41654 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange.callRcsService' in vhost '/', class-id=60, method-id=40)
2021-04-28 10:25:35.028 WARN 41654 --- [nectionFactory2] c.a.j.demo.Config.RabbitMQConfig : Message Confirm: correlationData:CorrelationData [id=111] ack:false cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange.callRcsService' in vhost '/', class-id=60, method-id=40)
这是application.yml
server:
port: 4110
spring:
datasource:
url: "jdbc:mysql://127.0.0.1:3306/xx?characterEncoding=utf8&useSSL=false"
username: "root"
password: "root"
driver-class-name: com.mysql.cj.jdbc.Driver
rabbitmq:
host: "localhost"
port: 5672
username: "guest"
password: "guest"
publisher-confirm-type: correlated
publisher-returns: true
RabbitMQ配置类:
@Slf4j
@Configuration
public class RabbitMQConfig {
@Autowired
GenPreTaskService genPreTaskService;
@Autowired
public void GenPreTaskService(){
try {
genPreTaskService.GenPreTask();
}catch (Exception e){
log.error(e.toString(), e);
}
}
@Bean
public Exchange exchange(){
return new DirectExchange("exchange.callRcsService");
}
@Bean
public Exchange dlxExchange(){
return new TopicExchange("exchange.dlx");
}
@Bean
public Queue queue(){
return new Queue("queue.callRcsService");
}
@Bean
public Queue dlxQueue(){
return new Queue("queue.dlx");
}
@Bean
public Binding binding() {
return new Binding("queue.callRcsService",
Binding.DestinationType.QUEUE,
"exchange.callRcsService",
"key.callRcsService",
null);
}
@Bean
public Binding dlxBinding() {
return new Binding(
"queue.dlx",
Binding.DestinationType.QUEUE,
"exchange.dlx",
"#",
null);
}
@Bean
public RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returned -> {
log.warn("Message Return:\t" +
"replyCode:{}\treplyText:{}\texchange:{}\troutingKey:{}\r\n",
returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
}
);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.warn("Message Confirm:\t" +
"correlationData:{}\tack:{}\tcause:{}\r\n",
correlationData, ack, cause);
});
return rabbitTemplate;
}
}
消息发送
@Slf4j
@Service
public class GenPreTaskService {
ObjectMapper objectMapper = new ObjectMapper();
@Autowired
private RabbitTemplate rabbitTemplate;
// 问题在这里
// 进入此方法后,先发消息,再完成生产交换机、队列、绑定
public void GenPreTask(){
try {
String messageToSend = objectMapper.writeValueAsString("Hello World");
MessageProperties messageProperties = new MessageProperties();
// 设置单条消息过期时间
messageProperties.setExpiration("15000");
Message message = new Message(messageToSend.getBytes(), messageProperties);
CorrelationData correlationData = new CorrelationData();
correlationData.setId("111");
// 这里
rabbitTemplate.send(
"exchange.callRcsService",
"key.callRcsService",
message,
correlationData);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}