尝试在Spring Batch中实现Process Indicator模式

unftdfkk  于 5个月前  发布在  Spring
关注(0)|答案(1)|浏览(57)

我正在努力寻找一个Process Indicator Pattern示例。到目前为止,我有一个源STUDENTS表,其中有一个STATUS字段来指示记录是否已经处理。我正在使用任务执行器进行多线程处理。
在我的编写器中,我将处理过的记录插入到一个新的PROCESSED_STUDENTS表中,并将源STUDENTS表中处理过的记录的状态更新为Processed,我在一个事务块中执行这两个操作,以防任何失败的情况下恢复更改。
这不适用于JdbcPagingItemReader,因为有些记录在处理结束时未被处理。
有人能告诉我我错过了什么吗?

读者

@Bean
@StepScope
public ItemReader<SourceData> reader(DataSource dataSource) {
        
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("status", "ToBeProcessed");     
    
    JdbcPagingItemReader<SourceData> reader = new JdbcPagingItemReader<>();
    reader.setName("Oracle_RCP");
    reader.setDataSource(dataSource);
    reader.setRowMapper(SourceData.rowMapper());        
    reader.setParameterValues(parameterValues);
    reader.setPageSize(100);        
    reader.setQueryProvider(getQueryProvider(new OraclePagingQueryProvider(), "SELECT ID, NAME, CREATED_TIME", "FROM STUDENTS", "WHERE STATUS = :status", CREATED_TIME, Order.ASCENDING));
    reader.setSaveState(false);     
    
    try {
        reader.afterPropertiesSet();
    } catch (Exception e) {
        log.error(e.getMessage(), e.getStackTrace());
    }
    
    return reader;
}

public PagingQueryProvider getQueryProvider(AbstractSqlPagingQueryProvider queryProvider, String select, String from, String where, String sortKey, Order order) {
    queryProvider.setSelectClause(select);
    queryProvider.setFromClause(from);
    
    if (where != null) {
        queryProvider.setWhereClause(where);
    }
    
    Map<String, Order> sortConfiguration = new HashMap<>();
    sortConfiguration.put(sortKey, order);
    queryProvider.setSortKeys(sortConfiguration);
    
    return queryProvider;
}

字符串

处理器

@Bean
@StepScope
public ItemProcessor<SourceData, OutData> processor(
        @Value("#{jobParameters['processDate']}") String processDate) {
    return new CustomItemProcessor(processDate);        
}

作家

@SuppressWarnings("unchecked")
@Bean
public ItemWriter<OutData> writer(Utils utils) {        
    return OutDataList -> utils.batchOperation((List<OutData>) OutDataList, chunk); 
}

作业、步骤和任务执行器

@Bean("MainJob")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Job mainJob(JobBuilderFactory jobBuilderFactory, Step step) {        
    return jobBuilderFactory.get("mainJob")   
      .incrementer(new RunIdIncrementer())          
      .flow(step)
      .end()
      .build();
}

@Bean
public Step step(StepBuilderFactory stepBuilderFactory) {        
    return stepBuilderFactory.get("step")               
            .<SourceData, OutData> chunk(chunk)
            .reader(reader(null))
            .processor(processor(null))
            .writer(writer(null))   
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
@StepScope
public ThreadPoolTaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setMaxPoolSize(50);
    executor.setCorePoolSize(25);       
    executor.setQueueCapacity(25);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setThreadNamePrefix("MultiThreaded-Executor");
    return executor;
}


batchOperation方法,用于在PROCESSED_STUDENTS表中插入已处理的记录并更新STUDENTS源表中已处理的记录:

public void batchOperation(List<OutData> outDataList, int batchSize) {
    try (
            Connection con = jdbcTemplate.getDataSource().getConnection(); 
            PreparedStatement psInsert = con.prepareStatement("INSERT INTO PROCESSED_STUDENTS (ID, NAME) VALUES (?, ?)");
            PreparedStatement psUpdate = con.prepareStatement("UPDATE STUDENTS SET STATUS = 'Processed' WHERE ID = ?");) {
        
        // Starting transaction block
        con.setAutoCommit(false);
        
        int i = 0;
        for (OutData argument : outDataList) {          
            psInsert.setLong(1, argument.getId());
            psInsert.setString(2, argument.getName());
                        
            psUpdate.setLong(1, argument.getId());
                        
            psInsert.addBatch();
            psUpdate.addBatch();
                        
            i++;
                        
            if (i % batchSize == 0) {
                psInsert.executeBatch();
                psUpdate.executeBatch();
            }               
        }
        
        // Executing remaining batch if total record count is an odd number
        psInsert.executeBatch();
        psUpdate.executeBatch();
        
        // End transaction block, commit changes
        con.commit();

        // Setting it back to default true
        con.setAutoCommit(true);
        
    } catch (Exception e) {
        log.error(e.getMessage());
    }
}

gajydyqb

gajydyqb1#

在这种情况下,您不能使用JdbcPagingItemReader,因为您正在更改用作筛选器(状态)的参数。分页将不起作用。
使用JdbcCursorItemReader代替。
标签:(Spring Batch) Not all records are proccessed

相关问题