Spring RabbitMQ - 在具有@RabbitListener配置的服务上使用手动通道确认

2022-09-01 12:49:02

如何在不使用自动确认的情况下手动确认消息。有没有办法将其与配置样式一起使用。大多数文档都告诉我们要与 一起使用。但是,使用它,我们失去了注释提供的灵活性。我已按如下方式配置了我的服务:@RabbitListener@EnableRabbitSimpleMessageListenerContainerChannelAwareMessageListener

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}

我的兔子配置如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}

有关如何调整手动通道确认以及上述配置样式的任何帮助,我们将不胜感激。如果我们实现 ChannelAwareMessageListener,则 onMessage 签名将会更改。我们可以在服务上实现 ChannelAwareMessageListener 吗?


答案 1

将 添加到方法...Channel@RabbitListener

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}

并使用 、 中的标记。basicAckbasicReject

编辑

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}

应用程序.属性:

spring.rabbitmq.listener.acknowledge-mode=manual

答案 2

以防万一你需要使用 ChannelAwareMessageListener 类中的 #onMessage()。然后你可以这样做。

@Component
public class MyMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) {
        log.info("Message received.");
        // do something with the message
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

而对于兔子配置

@Configuration
public class RabbitConfig {

    public static final String topicExchangeName = "exchange1";

    public static final String queueName = "queue1";

    public static final String routingKey = "queue1.route.#";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("xxxx");
        connectionFactory.setPassword("xxxxxxxxxx");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vHost1");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    Queue queue() {
        return new Queue(queueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }


    @Bean
    public SimpleMessageListenerContainer listenerContainer(MyMessageListener myRabbitMessageListener) {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory());
        listenerContainer.setQueueNames(queueName);
        listenerContainer.setMessageListener(myRabbitMessageListener);
        listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        listenerContainer.setConcurrency("4");
        listenerContainer.setPrefetchCount(20);
        return listenerContainer;
    }
}

推荐