您可以在 https://github.com/HabeebCycle/spring-cloud-stream-implemention 查看我的存储库以获取演示项目
它展示了如何使用RabbitMQ和Kafka为供应商和消费者实现云流,以及两种服务的端到端测试。
对于您的情况:在您的供应商豆中,做这样的事情:
@Bean
public Supplier<DataEvent<String, User>> savedMessage() {
return () -> {
return null;
};
}
Spring在功能包中提供了可用于发送事件的StreamBridge。假设您有一个保存到数据库中的服务层。首先要做的是创建一个由构造函数绑定注入的自动连接的 StreamBridge,并使用它来发送消息,如下所示。请注意,供应商的名称应为输出的绑定名称,如文档中所述。
private final StreamBridge stream;
private final UserRepository repo;
// Store your topic/binding name as the supplier name as follows
private static final String SUPPLIER_BINDING_NAME = "savedMessage-out-0"
public UserService(UserRepository repo, StreamBridge stream) {
this.repo = repo;
this.stream = stream;
}
// Your save method
public void saveUser(User user) {
// Do some checking...
//save your record
User user = repo.save(user);
//check if user is saved or not null
//create your message event (Assuming you have a DataEvent class)
DataEvent<String, User> event = new DataEvent<>("User Saved", user);
boolean sent = stream.send(SUPPLIER_BINDING_NAME, event));
// Check the repo above for proper implementation.
}
对于使用者实现,请查看上面的存储库。
这里还有一个实现,尽管是用 Kotlin 编写的 https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/
你也可以在GitHub上查看Spring最近的项目,https://github.com/spring-cloud/spring-cloud-stream-samples/