弹簧批处理异步处理器配置,实现最佳性能

2022-09-03 16:10:37

我在Spring Batch中创建异步处理器时遇到问题。我的处理器正在根据调用的响应获取和创建对象。有时对于1个输入(),必须有例如60-100个呼叫,有时只有1个。我试图进行多线程步骤,它一次处理例如50个输入,但它是无用的,因为49个线程在1秒内完成了它们的工作并被阻止了,等待这个线程执行60-100个调用。现在我使用+,但这个解决方案对我来说很慢。由于我的输入()很大,从DB读取大约25k个项目,我想一次开始~50-100个输入。IDreaderSOAPIDSOAPSOAPAsyncItemProcessorAsyncItemWriterIDs

这是我的配置:

@Configuration
public class BatchConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private DatabaseConfig databaseConfig;
    @Value(value = "classpath:Categories.txt")
    private Resource categories;

    @Bean
    public Job processJob() throws Exception {
        return jobBuilderFactory.get("processJob").incrementer(new RunIdIncrementer()).listener(listener()).flow(orderStep1()).end().build();
    }

    @Bean
    public Step orderStep1() throws Exception {
        return stepBuilderFactory.get("orderStep1").<Category, CategoryDailyResult>chunk(1).reader(reader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).taskExecutor(taskExecutor()).build();
    }

    @Bean
    public JobExecutionListener listener() {
        return new JobCompletionListener();
    }


    @Bean
    public ItemWriter asyncItemWriter() {
        AsyncItemWriter<CategoryDailyResult> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(itemWriter());
        return asyncItemWriter;
    }

    @Bean
    public ItemWriter<CategoryDailyResult> itemWriter(){
        return new Writer();
    }

    @Bean
    public ItemProcessor asyncItemProcessor() {
        AsyncItemProcessor<Category, CategoryDailyResult> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(itemProcessor());
        asyncItemProcessor.setTaskExecutor(taskExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemProcessor<Category, CategoryDailyResult> itemProcessor(){
        return new Processor();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(50);
        return taskExecutor;
    }

    @Bean(destroyMethod = "")
    public ItemReader<Category> reader() throws Exception {
        String query = "select c from Category c where not exists elements(c.children)";

        JpaPagingItemReader<Category> reader = new JpaPagingItemReader<>();
        reader.setSaveState(false);
        reader.setQueryString(query);
        reader.setEntityManagerFactory(databaseConfig.entityManagerFactory().getObject());
        reader.setPageSize(1);

        return reader;
    }
}

如何提升我的申请?也许我做错了什么?任何反馈欢迎;)

@Edit:对于ID的输入:1到100,例如,我想要50个正在执行处理器的线程。我希望它们不要相互阻塞:Thread1进程输入“1”2分钟,此时我希望Thread2处理输入“2”,“8”,“64”,这些输入很小,并在几秒钟内执行。

@Edit2:我的目标是:我在数据库中有25k个ID,我用它们读取它们,每个ID都由处理器处理。每个项目都是相互独立的。对于每个ID,我在循环中调用0-100次,然后我创建对象,我传递给并保存在数据库中。如何获得此类任务的最佳性能?JpaPagingItemReaderSOAPWriter


答案 1

您应该对作业进行分区。添加一个分区步骤,如下所示:

@Bean
public Step partitionedOrderStep1(Step orderStep1) {
    return stepBuilder.get("partitionedOrderStep1")
            .partitioner(orderStep1)
            .partitioner("orderStep1", new SimplePartitioner())
            .taskExecutor(taskExecutor())
            .gridSize(10)  //Number of concurrent partitions
            .build();
}

然后在作业定义中使用该步骤。.gridSize() 调用配置要并发执行的分区数。如果任何读取器、处理器或编写器对象是有状态的,则需要使用@StepScope对其进行批注。


答案 2

@KCrookedHand:我处理过类似的情况,我不得不阅读几千个,并且需要调用SOAP服务(我已将其注入 itemReader)来匹配条件。

我的配置如下所示,基本上您有几个选项来实现并行处理,其中两个是“分区”和“客户端服务器”方法。我之所以选择分区,是因为根据我的数据,我可以更好地控制我需要多少个分区。

请像@MichaelMinella提到的那样,使用任务let进行下面的步骤执行,如果它适用。

<batch:step id="notificationMapper">
            <batch:partition partitioner="partitioner"
                step="readXXXStep" />
        </batch:step>
    </batch:job>


    <batch:step id="readXXXStep">
        <batch:job ref="jobRef" job-launcher="jobLauncher"
            job-parameters-extractor="jobParameterExtractor" />
    </batch:step>

    <batch:job id="jobRef">

        <batch:step id="dummyStep" next="skippedItemsDecision">
            <batch:tasklet ref="dummyTasklet"/>
            <batch:listeners>
                <batch:listener ref="stepListener" />
            </batch:listeners>
        </batch:step>

        <batch:step id="xxx.readItems" next="xxx.then.finish">
            <batch:tasklet>
                <batch:chunk reader="xxxChunkReader" processor="chunkProcessor"
                    writer="itemWriter" commit-interval="100">
                </batch:chunk>
            </batch:tasklet>
            <batch:listeners>
                <batch:listener ref="taskletListener" />
            </batch:listeners>
        </batch:step>

        ...