我们有相同的用例。如前所述,在运行时重新创建Bean的主要问题之一是如何更新已经注入的引用。这是主要的挑战。
为了解决这个问题,我使用了Java的AtomicReference<>类。我没有直接注入bean,而是将其包装为AtomicReference,然后注入它。由于 AtomicReference 包装的对象可以以线程安全的方式重置,因此在检测到数据库更改时,我可以使用它来更改基础对象。以下是此模式的配置/用法示例:
@Configuration
public class KafkaConfiguration {
private static final String KAFKA_SERVER_LIST = "kafka.server.list";
private static AtomicReference<String> serverList;
@Resource
MyService myService;
@PostConstruct
public void init() {
serverList = new AtomicReference<>(myService.getPropertyValue(KAFKA_SERVER_LIST));
}
// Just a helper method to check if the value for the server list has changed
// Not a big fan of the static usage but needed a way to compare the old / new values
public static boolean isRefreshNeeded() {
MyService service = Registry.getApplicationContext().getBean("myService", MyService.class);
String newServerList = service.getPropertyValue(KAFKA_SERVER_LIST);
// Arguably serverList does not need to be Atomic for this usage as this is executed
// on a single thread
if (!StringUtils.equals(serverList.get(), newServerList)) {
serverList.set(newServerList);
return true;
}
return false;
}
public ProducerFactory<String, String> kafkaProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.CLIENT_ID_CONFIG, "...");
// Here we are pulling the value for the serverList that has been set
// see the init() and isRefreshNeeded() methods above
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList.get());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
@Lazy
public AtomicReference<KafkaTemplate<String, String>> kafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(kafkaProducerFactory());
AtomicReference<KafkaTemplate<String, String>> ref = new AtomicReference<>(template);
return ref;
}
}
然后,我将豆子注射到需要的地方,例如
public MyClass1 {
@Resource
AtomicReference<KafkaTemplate<String, String>> kafkaTemplate;
...
}
public MyClass2 {
@Resource
AtomicReference<KafkaTemplate<String, String>> kafkaTemplate;
...
}
在一个单独的类中,我运行一个调度程序线程,该线程在应用程序上下文启动时启动。该类如下所示:
class Manager implements Runnable {
private ScheduledExecutorService scheduler;
public void start() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this, 0, 120, TimeUnit.SECONDS);
}
public void stop() {
scheduler.shutdownNow();
}
@Override
public void run() {
try {
if (KafkaConfiguration.isRefreshNeeded()) {
AtomicReference<KafkaTemplate<String, String>> kafkaTemplate =
(AtomicReference<KafkaTemplate<String, String>>) Registry.getApplicationContext().getBean("kafkaTemplate");
// Get new instance here. This will have the new value for the server list
// that was "refreshed"
KafkaConfiguration config = new KafkaConfiguration();
// The set here replaces the wrapped objet in a thread safe manner with the new bean
// and thus all injected instances now use the newly created object
kafkaTemplate.set(config.kafkaTemplate().get());
}
} catch (Exception e){
} finally {
}
}
}
如果这是我主张做的事情,我仍然在围栏上,因为它确实有轻微的气味。但是在有限和谨慎的使用中,它确实为所述用例提供了另一种方法。请注意,从 Kafka 的角度来看,此代码示例将使旧生产者保持打开状态。实际上,需要正确地对旧生产者进行 flush() 调用以关闭它。但这不是这个例子想要展示的。