使用Spring Kafka在单个事务中写入两个Kafka主题
2022-09-04 22:29:30
我试图弄清楚是否有一种方法可以使用Kafka的事务功能来写入事务中的两个主题。
我知道使用Kafka交易的典型场景是消费者 - 生产者模式,这似乎有据可查。
我尝试过:
- 创建了每个主题
KafkaTransactionManager
- 将每个配置为使用各自的事务管理器
ProducerFactory
- 已创建一个具有两个实例的
ChainedTransactionManger
KafkaTransactionManager
-
创建了每个主题
KafkaTemplate
然后,我在一个方法上使用了注释,该方法可以执行以下操作:
@Transactional(transactionManager = "chainedTx")
template1.send("topic1", "example payload"); template2.send("topic2", "example payload");
这不起作用。是事务性的,但是当调用该方法时,没有正在进行的事务,我得到一个.KafkaTemplate
send()
IllegalStateException
我打算尝试这种方法,但Javadoc声明这仅适用于本地事务,因此它似乎不符合我的需求。KafkaTemplate.executeInTransaction()
我的下一步是尝试直接使用Kafka的生产者API来查看此模式是否有效,但如果有人能告诉我知道我在浪费时间并且Kafka不支持事务性地写入多个主题,我将不胜感激。
我确实在Confluent关于Kafka交易支持的博客中找到了这个声明:
事务支持对多个 Kafka 主题和分区进行原子写入...
但我没有找到任何例子来证明这一点。
第一个生产者的配置
@Configuration 公共类 ControlProducerConfig {
@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
第二个生产者的配置
@Configuration
public class PayloadProducerConfig {
@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
主类
@EnableTransactionManagement
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean("chainedTx")
public ChainedTransactionManager chained(
@Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
@Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {
return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}
@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
return new OnStart(postTwoMessages);
}
@Bean
public PostTwoMessages postTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {
return new PostTwoMessages(controlTemplate, payloadTemplate);
}
}
应用程序启动时
public class OnStart implements ApplicationListener<ApplicationReadyEvent> {
private PostTwoMessages postTwoMessages;
public OnStart(PostTwoMessages postTwoMessages) {
this.postTwoMessages = postTwoMessages;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
postTwoMessages.run();
}
}
发布两条消息
public class PostTwoMessages {
private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;
public PostTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {
this.controlTemplate = controlTemplate;
this.payloadTemplate = payloadTemplate;
}
@Transactional(transactionManager = "chainedTx")
public void run() {
UUID uuid = UUID.randomUUID();
controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}
}