弹簧批处理异步处理器配置,实现最佳性能
我在Spring Batch中创建异步处理器时遇到问题。我的处理器正在根据调用的响应获取和创建对象。有时对于1个输入(),必须有例如60-100个呼叫,有时只有1个。我试图进行多线程步骤,它一次处理例如50个输入,但它是无用的,因为49个线程在1秒内完成了它们的工作并被阻止了,等待这个线程执行60-100个调用。现在我使用+,但这个解决方案对我来说很慢。由于我的输入()很大,从DB读取大约25k个项目,我想一次开始~50-100个输入。ID
reader
SOAP
ID
SOAP
SOAP
AsyncItemProcessor
AsyncItemWriter
IDs
这是我的配置:
@Configuration
public class BatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private DatabaseConfig databaseConfig;
@Value(value = "classpath:Categories.txt")
private Resource categories;
@Bean
public Job processJob() throws Exception {
return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
}
@Bean
public Step orderStep1() throws Exception {
return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
}
@Bean
public JobExecutionListener listener() {
return new JobCompletionListener();
}
@Bean
public ItemWriter asyncItemWriter() {
AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter());
return asyncItemWriter;
}
@Bean
public ItemWriter<CategoryDailyResult> itemWriter(){
return new Writer();
}
@Bean
public ItemProcessor asyncItemProcessor() {
AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
return new Processor();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(50);
return taskExecutor;
}
@Bean(destroyMethod = "")
public ItemReader<Category> reader() throws Exception {
String query = "select c from Category c where not exists elements(c.children)";
JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
reader.setSaveState(false);
reader.setQueryString(query);
reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
reader.setPageSize(1);
return reader;
}
}
如何提升我的申请?也许我做错了什么?任何反馈欢迎;)
@Edit:对于ID的输入:1到100,例如,我想要50个正在执行处理器的线程。我希望它们不要相互阻塞:Thread1进程输入“1”2分钟,此时我希望Thread2处理输入“2”,“8”,“64”,这些输入很小,并在几秒钟内执行。
@Edit2:我的目标是:我在数据库中有25k个ID,我用它们读取它们,每个ID都由处理器处理。每个项目都是相互独立的。对于每个ID,我在循环中调用0-100次,然后我创建对象,我传递给并保存在数据库中。如何获得此类任务的最佳性能?JpaPagingItemReader
SOAP
Writer