我们有一个场景,每当一个作业执行STARTED Status时,服务器可能会崩溃。在这种情况下,为了重新启动我从这里理解的batch Spring Batch resume after server's failure,需要将batch_job_execution和batch_step_execution中的status和end_time列值从STARTED更新为FAQs,以便JobOperator.restart(jobExecutionId)API工作。
但是调用它会出现“NoSuchJobException:No job configuration with the name [] was registered”,因为JobOperator中的jobRegistryMap是空的。
现在,如何在服务器重启时重新填充JobRegistry中的这些作业信息,以便我们可以调用JobOperator.restart(jobExecutionId)。
下面是我的代码
/** The Test properties. */
@Autowired
private TestProperties TestProperties;
@Primary
@Bean(name = "TestDataSource")
public DataSource batchDataSource() {
DataSource TestDBSrc = DataSourceBuilder.create().username(getUsername()).password(getPassword()).url(getUrl()).build();
if (TestDBSrc != null && TestDBSrc instanceof HikariDataSource) {
@SuppressWarnings("resource")
HikariDataSource hikariDatsource = (HikariDataSource) TestDBSrc;
hikariDatsource.setSchema(getSchema());
}
return TestDBSrc;
}
private String getSchema() {
return TestProperties.getValue("spring.datasource.hikari.schema", "");
}
private String getUsername() {
return TestProperties.getValue("spring.datasource.username", "");
}
private String getPassword() {
return TestProperties.getValue("spring.datasource.password", "");
}
private String getUrl() {
return TestProperties.getValue("spring.datasource.jdbc-url", "");
}
@Bean(name = "transactionManager")
public JdbcTransactionManager batchTransactionManager(@Qualifier("TestDataSource") DataSource dataSource) {
return new JdbcTransactionManager(dataSource);
}
@Bean(name = "TestBatchJobRepository")
public JobRepository jobRepository(@Qualifier("TestDataSource") DataSource batchDataSource,
@Qualifier("transactionManager") JdbcTransactionManager batchTransactionManager) throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(batchDataSource);
factory.setTransactionManager(batchTransactionManager);
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean(name = "TestBatchJobLauncher")
public JobLauncher jobLauncher(@Qualifier("TestBatchJobRepository") JobRepository jobRepository) throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean (name = "TestBatchJobExplorer")
public JobExplorer jobExplorer(@Qualifier("TestDataSource") DataSource dataSource,@Qualifier("transactionManager")JdbcTransactionManager batchTransactionManager) throws Exception {
final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
bean.setDataSource(dataSource);
bean.setTransactionManager(batchTransactionManager);
bean.setTablePrefix("BATCH_");
bean.setJdbcOperations(new JdbcTemplate(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
@Bean (name ="TestBatchJobRegistry")
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(@Qualifier("TestBatchJobRegistry") JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Bean (name = "TestBatchJobOperator")
public JobOperator jobOperator(@Qualifier("TestBatchJobLauncher") JobLauncher jobLauncher, @Qualifier("TestBatchJobRepository") JobRepository jobRepository,
@Qualifier("TestBatchJobRegistry") JobRegistry jobRegistry, @Qualifier("TestBatchJobExplorer") JobExplorer jobExplorer) {
final SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobLauncher(jobLauncher);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobExplorer(jobExplorer);
return jobOperator;
}
}
字符串
因为我使用默认的内存Map
@Bean (name ="TestBatchJobRegistry")
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
型
它在服务器重启时被清除。
更新24/10/2023:x1c 0d1x
在服务器重新启动时,
作业注册表为空。已尝试,
public void restart(@Qualifier("febpBatchJobExplorer") JobExplorer jobExplorer,@Qualifier("febpBatchJobRepository") JobRepository jobRepository, @Qualifier("febpBatchJobOperator") JobOperator jobOperator){
try {
List<String> jobNames=jobExplorer.getJobNames();
for(String jobName:jobNames)
{
List<JobInstance> jobInstances = jobExplorer.getJobInstances(jobName,0,1);// this will get one latest job from the database
if(CollectionUtils.isNotEmpty(jobInstances)){
JobInstance jobInstance = jobInstances.get(0);
List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
if(CollectionUtils.isNotEmpty(jobExecutions)){
for(JobExecution execution: jobExecutions){
// If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
if(execution.getStatus().equals(BatchStatus.STARTED) || execution.getStatus().equals(BatchStatus.FAILED)){
execution.setEndTime(LocalDateTime.now());
execution.setStatus(BatchStatus.FAILED);
execution.setExitStatus(ExitStatus.FAILED);
jobRepository.update(execution);
jobOperator.restart(execution.getId());
}
}
}
}
}
}catch (Exception e1) {
e1.printStackTrace();
}
}
型
但是jobOperator.restart(execution.getId())再次失败,因为jobRegistry为空。
在低于Spring批次代码
时失败
**更新2023年10月27日:**提供示例应用程序以重现问题https://github.com/PSHREYASHOLLA/SamplebatchApplication。
它是一个maven项目,所以你可以调用mvn install,它将创建\target\SamplebatchApplication-0.0.1-SNAPSHOT.jar。现在你可以像启动任何springboot应用程序(启用Liquibase)一样启动它,java -jar SamplebatchApplication-0.0.1-SNAPSHOT.jar。
如果你看到application.properties文件,我们将其指向一个postgres数据库。我们所有的批处理配置https:github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/FEBPDBConfig.java。
请通过调用rest post API启动批处理,http://localhost:8080/batch/batcher-or-resume-application-batch-job JSON body {“appRestartJobExecutionId”:““}如果我们使用空appRestartJobExecutionId调用此,则流程如下,com.example.postgresql.batchController.batcherOrResumeApplicationBatchJobByB2E()->com.example.postgresql.model.FebpApplicationJobServiceImpl. batcherApplicationBatchJob()-->我们做JobLauncher.run().现在,此作业将作为reader的一部分从febp_emp_detail_test读取50条记录,并作为writer的一部分将更新的记录写入febp_emp_tax_detail_test。这是一个愉快的流程。
现在,如果你调用上面的API,并说5秒后你杀死了服务器,只有部分提交将发生在febp_emp_tax_detail_test中,批处理状态将处于STARTED状态。现在,假设我重新启动服务器并使用失败的作业执行ID调用相同的post API,它现在将调用om.example.postgresql.Buller.BatchController.BullerOrResumeApplicationBatchJobByB2E()-> com.example.postgresql.model.FebpApplicationJobServiceImpl.resumeApplicationBatchJob()-> jobOperator.restart(failedBatchExecutionId);此处由于jobRegistry为空,重启API失败。
更新02/11/2023:
根据Mahmoud Ben Hassine的建议,将我的作业更改为Bean后,我可以重新启动作业。但现在重新启动后,新的作业执行开始并显示完成,但无法处理数据。如果作业在单线程上运行,则处理所有数据,但如果是多线程,则不做任何事情。请检查https://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/EmployeeTaxCalculationBatchConfig.java
第60行,Step step = new StepBuilder(“FEBP_EMP_TAX_CALCULATION_STEP”,jobRepository).<pageDetail,pageTaxDetail>chunk(5,transactionManager).reader(reader.getPagingItemReader()).processor(itemProcessor).writer(itemWriter).taskExecutor(actStmntTaskExecutor()).throttleLimit(50).build();
这里,如果throttleLimit为1,则在重新启动后处理记录,但如果是多线程的,则不处理示例表中的剩余记录。
1条答案
按热度按时间5jvtdoz21#
这不应该是这样的,因为你已经注册了一个
JobRegistryBeanPostProcessor
。这是bean后处理器,每次Spring应用程序上下文启动(重新启动)时,它都会填充作业注册表。这里的问题是作业没有被声明为bean,而是配置类(显然不是
Job
类型)被声明为该名称的bean。因此,JobRegistryBeanPostProcessor
找不到作业,也没有在注册表中注册它。Job应该在应用程序上下文中注册为bean,以便
JobRegistryBeanPostProcessor
对bean进行后处理并在注册表中注册。