如何使用java和spring 3.0同时处理来自JMS主题(而不是队列)的多条消息?

2022-09-03 07:02:15

请注意,我希望多个消息侦听器同时处理来自该主题的连续消息。此外,我希望每个消息侦听器都以事务方式运行,以便给定消息侦听器中的处理失败将导致该侦听器的消息保留在主题上。

Spring DefaultMessageListenerContainer 似乎仅支持 JMS 队列的并发。

我是否需要实例化多个默认消息列表容器?

如果时间沿垂直轴向动:

ListenerA reads msg 1        ListenerB reads msg 2        ListenerC reads msg 3
ListenerA reads msg 4        ListenerB reads msg 5        ListenerC reads msg 6
ListenerA reads msg 7        ListenerB reads msg 8        ListenerC reads msg 9
ListenerA reads msg 10       ListenerB reads msg 11       ListenerC reads msg 12
...

更新:
感谢您@T.Rob和@skaffman的反馈。

我最终所做的是创建多个 with,然后将逻辑放入消息侦听器中,以便只有一个线程处理给定的消息 ID。DefaultMessageListenerContainersconcurrency=1


答案 1

您不需要多个实例,不,但您需要使用 concurrentConsumers 属性将 配置为并发DefaultMessageListenerContainerDefaultMessageListenerContainer

指定要创建的并发使用者数。默认值为 1。

为此设置指定较高的值将增加运行时计划的并发使用者的标准级别:这实际上是在任何给定时间将调度的并发使用者的最小数量。这是一个静态设置;对于动态缩放,请考虑改为指定“maxCurrentConsumers”设置。

建议增加并发使用者的数量,以便扩展来自队列的消息的使用。但是,请注意,一旦注册了多个使用者,任何订购保证都将丢失。通常,对于低容量队列,请坚持使用 1 个使用者。

但是,底部有很大的警告:

不要增加主题的并发使用者数。这将导致并发使用同一消息,这几乎从来都不是可取的。

这很有趣,当你想到它时,这是有道理的。如果您有多个实例,也会发生同样的情况。DefaultMessageListenerContainer

我想也许你需要重新思考你的设计,尽管我不确定我会有什么建议。同时使用 pub/sub 消息似乎是一件完全合理的事情,但是如何避免同时向所有使用者传递相同的消息呢?


答案 2

至少在ActiveMQ中,你想要的是完全支持的,他的名字是VirtualTopic。

概念是:

  1. 您创建一个虚拟主题(只需使用前缀创建一个主题)例如。VirtualTopic.VirtualTopic.Color
  2. 创建一个订阅此虚拟主题的使用者,以匹配此模式,例如。,这样做,Activemq将创建一个具有该名称的队列,该队列将订阅,然后发布到此虚拟主题的每条消息都将传递到client1队列,请注意,它的工作方式类似于rabbitmq交换。Consumer.<clientName>.VirtualTopic.<topicName>Consumer.client1.VirtualTopic.ColorVirtualTopic.Color
  3. 您已经完成了,现在您可以像使用每个队列一样使用 client1 队列,具有许多使用者,DLQ,自定义的重新传递策略等。
  4. 在这一点上我想你已经明白了,你可以创建 client2client3 以及你想要多少订阅者,他们都会收到一个发布到的消息的副本VirtualTopic.Color

这里的代码

@Component
public class ColorReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    // simply generating data to the topic
    long id=0;
    @Scheduled(fixedDelay = 500)
    public void postMail() throws JMSException, IOException {

        final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
        final Color color = new Color(++id, colorName.getName());
        final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        message.setObject(color);
        message.setProperty("color", color.getName());
        LOGGER.info("status=color-post, color={}", color);
        jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
        selector = "color <> 'RED'"
    )
    public void genericReceiveMessage(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver, color={}", color);
    }

    /**
     * Listen only red colors messages
     *
     * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
     * the containers clientId need to be different between each other
     */
    @JmsListener(
//      destination = "Consumer.redColorContainer.VirtualTopic.color",
        destination = "Consumer.client1.VirtualTopic.color",
        containerFactory = "redColorContainer", selector = "color='RED'"
    )
    public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
        LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
    )
    public void genericReceiveMessage2(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver-2, color={}", color);
    }

}

@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {

    /**
     * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
     * clientIds per consumer pool (as two @JmsListener above, or two application instances)
     * 
     */
    @Bean
    public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-5");
        configurer.configure(factory, connectionFactory);
        // container.setClientId("aId..."); lets spring generate a random ID
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        // necessary when post serializable objects (you can set it at application.properties)
        connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-2");
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

public class Color implements Serializable {

    public static final Color WHITE = new Color("WHITE");
    public static final Color BLUE = new Color("BLUE");
    public static final Color RED = new Color("RED");

    private String name;
    private long id;

    // CONSTRUCTORS, GETTERS AND SETTERS
}

推荐