至少在ActiveMQ中,你想要的是完全支持的,他的名字是VirtualTopic。
概念是:
- 您创建一个虚拟主题(只需使用前缀创建一个主题)例如。
VirtualTopic.
VirtualTopic.Color
- 创建一个订阅此虚拟主题的使用者,以匹配此模式,例如。,这样做,Activemq将创建一个具有该名称的队列,该队列将订阅,然后发布到此虚拟主题的每条消息都将传递到client1队列,请注意,它的工作方式类似于rabbitmq交换。
Consumer.<clientName>.VirtualTopic.<topicName>
Consumer.client1.VirtualTopic.Color
VirtualTopic.Color
- 您已经完成了,现在您可以像使用每个队列一样使用 client1 队列,具有许多使用者,DLQ,自定义的重新传递策略等。
- 在这一点上我想你已经明白了,你可以创建 client2,client3 以及你想要多少订阅者,他们都会收到一个发布到的消息的副本
VirtualTopic.Color
这里的代码
@Component
public class ColorReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
@Autowired
private JmsTemplate jmsTemplate;
// simply generating data to the topic
long id=0;
@Scheduled(fixedDelay = 500)
public void postMail() throws JMSException, IOException {
final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
final Color color = new Color(++id, colorName.getName());
final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
message.setObject(color);
message.setProperty("color", color.getName());
LOGGER.info("status=color-post, color={}", color);
jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
selector = "color <> 'RED'"
)
public void genericReceiveMessage(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver, color={}", color);
}
/**
* Listen only red colors messages
*
* the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
* the containers clientId need to be different between each other
*/
@JmsListener(
// destination = "Consumer.redColorContainer.VirtualTopic.color",
destination = "Consumer.client1.VirtualTopic.color",
containerFactory = "redColorContainer", selector = "color='RED'"
)
public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
)
public void genericReceiveMessage2(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver-2, color={}", color);
}
}
@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {
/**
* Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
* clientIds per consumer pool (as two @JmsListener above, or two application instances)
*
*/
@Bean
public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-5");
configurer.configure(factory, connectionFactory);
// container.setClientId("aId..."); lets spring generate a random ID
return factory;
}
@Bean
public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
// necessary when post serializable objects (you can set it at application.properties)
connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-2");
configurer.configure(factory, connectionFactory);
return factory;
}
}
public class Color implements Serializable {
public static final Color WHITE = new Color("WHITE");
public static final Color BLUE = new Color("BLUE");
public static final Color RED = new Color("RED");
private String name;
private long id;
// CONSTRUCTORS, GETTERS AND SETTERS
}