请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

spring cloud Stream连续发送消息报异常

师兄你好:

   在使用stream组件第一次给mq发送消息可以接受到,但是第二次给mq发消息会抛出异常,代码和日志如下:

版本://img1.sycdn.imooc.com//szimg/5addfd4c0001074009990532.jpg

接收端://img1.sycdn.imooc.com//szimg/5addfd6700014e9b09270615.jpg

发送端://img1.sycdn.imooc.com//szimg/5addfd7b0001221d10920751.jpg

异常信息如下:

2018-04-23 23:36:44.378  INFO 5968 --- [nio-8001-exec-8] com.imooc.order.message.StreamReceiver   : StreamReceiver:nowMon Apr 23 23:36:44 CST 2018

-------------第一次发送消息可正常接受,但是之后在发送就会抛出如下异常:

2018-04-23 23:36:51.255  WARN 5968 --- [-iESOJ1qN8HUw-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'[B@5b76b8a2(byte[31])' MessageProperties [headers={contentType=text/plain}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myMessage, receivedRoutingKey=myMessage, deliveryTag=12, consumerTag=amq.ctag-PI6t3RYOyapbdv0eqW-u8g, consumerQueue=myMessage.anonymous.nTzF24HHQ-iESOJ1qN8HUw])


org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1474) ~[spring-rabbit-2.0.0.M5.jar:na]

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1385) ~[spring-rabbit-2.0.0.M5.jar:na]

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1305) ~[spring-rabbit-2.0.0.M5.jar:na]

at sun.reflect.GeneratedMethodAccessor136.invoke(Unknown Source) ~[na:na]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_91]

at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]

at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:338) ~[spring-aop-5.0.0.RC3.jar:5.0.0.RC3]

at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:197) ~[spring-aop-5.0.0.RC3.jar:5.0.0.RC3]

at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.0.0.RC3.jar:5.0.0.RC3]

at org.springframework.retry.interceptor.RetryOperationsInterceptor$1.doWithRetry(RetryOperationsInterceptor.java:91) ~[spring-retry-1.2.1.RELEASE.jar:na]

at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) [spring-retry-1.2.1.RELEASE.jar:na]

at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) [spring-retry-1.2.1.RELEASE.jar:na]

at org.springframework.retry.interceptor.RetryOperationsInterceptor.invoke(RetryOperationsInterceptor.java:115) ~[spring-retry-1.2.1.RELEASE.jar:na]

at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.0.RC3.jar:5.0.0.RC3]

at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.0.0.RC3.jar:5.0.0.RC3]

at org.springframework.amqp.rabbit.listener.$Proxy185.invokeListener(Unknown Source) ~[na:na]

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1292) ~[spring-rabbit-2.0.0.M5.jar:na]

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1271) ~[spring-rabbit-2.0.0.M5.jar:na]

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:746) ~[spring-rabbit-2.0.0.M5.jar:na]

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:730) ~[spring-rabbit-2.0.0.M5.jar:na]

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:74) ~[spring-rabbit-2.0.0.M5.jar:na]

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:945) ~[spring-rabbit-2.0.0.M5.jar:na]

at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]

Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=nowMon Apr 23 23:36:48 CST 2018, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=myMessage, amqp_receivedExchange=myMessage, amqp_deliveryTag=12, amqp_consumerQueue=myMessage.anonymous.nTzF24HHQ-iESOJ1qN8HUw, amqp_redelivered=false, id=33bbd6cf-69c8-6ab8-7a0c-b7c6662ea713, amqp_consumerTag=amq.ctag-PI6t3RYOyapbdv0eqW-u8g, contentType=text/plain, timestamp=1524497811252}]

2018-04-23 23:36:51.273  WARN 5968 --- [-iESOJ1qN8HUw-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.


org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted

Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception

... 17 common frames omitted

Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception


正在回答

2回答

廖师兄 2018-04-24 00:17:11

这个问题先留着吧,到最后升级为正式版

5 回复 有任何疑惑可以回复我~
  • 师兄 我也遇到这样的问题了
    回复 有任何疑惑可以回复我~ 2018-06-07 18:24:46
一路向北吧 2019-08-11 19:53:20

遇到同样的问题。

首次进行如下配置,可以多次发送sendMessage,消息正常消费不报错。

StreramClient.java 定义 @Input("myMessage");StreamReceiver.java 定义@StreamListener("message")

后来改为统一变量引用方式,第一次可以消费,第二次以后发送都是抛出和题主一样的异常

String INPUT="myMessage"

@Input(StreamClient.INPUT) 

即使改为第一种方式也仍旧不能解决问题,只能重启idea,恢复正常。

0 回复 有任何疑惑可以回复我~
  • 已解决,该问题为"SpringCloud Stream Input与Output重名导致",参考 https://www.oschina.net/question/3937016_2284328?sort=default
    ------------------------------------
    StreamClient.java
    -----
    public interface StreamClient {
    
    //    String INPUT = "myMessage";
        String INPUT = "input";
        String OUTPUT = "output";
    
    //    @Input("myMessage")
    //    @Input(StreamClient.INPUT)
        @Input(StreamClient.INPUT)
        SubscribableChannel input();
    
    //    @Output("myMessage")
    //    @Output(StreamClient.INPUT)
        @Output(StreamClient.OUTPUT)
        MessageChannel output();
    }
    ------------------------------------
    StreamReceiver.java
    -----
    public class StreamReceiver {
    
    //    @StreamListener("myMessage")
    ////    @StreamListener(StreamClient.INPUT)
    //    public void process(Object message) {
    //      log.info("StreamReciver: {}", message);
    //    }
    
        @StreamListener(StreamClient.INPUT)
        @SendTo(StreamClient.OUTPUT)
        public Object processInput(Object messsage) {
            log.info("message: {}", messsage);
            return messsage;
        }
    
        @StreamListener(StreamClient.OUTPUT)
        public void process(Object message) {
            log.info("StreamReciver: {}", message);
        }
    }
    回复 有任何疑惑可以回复我~ 2019-08-11 20:51:03
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信