如何使JobRegistry包含Spring批处理中服务器重启时的作业信息

vmjh9lq9  于 7个月前  发布在  Spring
关注(0)|答案(1)|浏览(60)

我们有一个场景,每当一个作业执行STARTED Status时,服务器可能会崩溃。在这种情况下,为了重新启动我从这里理解的batch Spring Batch resume after server's failure,需要将batch_job_execution和batch_step_execution中的status和end_time列值从STARTED更新为FAQs,以便JobOperator.restart(jobExecutionId)API工作。
但是调用它会出现“NoSuchJobException:No job configuration with the name [] was registered”,因为JobOperator中的jobRegistryMap是空的。
现在,如何在服务器重启时重新填充JobRegistry中的这些作业信息,以便我们可以调用JobOperator.restart(jobExecutionId)。
下面是我的代码

/** The Test properties. */
  @Autowired
  private TestProperties TestProperties;

  @Primary
  @Bean(name = "TestDataSource")

  public DataSource batchDataSource() {
    DataSource TestDBSrc = DataSourceBuilder.create().username(getUsername()).password(getPassword()).url(getUrl()).build();
    if (TestDBSrc != null && TestDBSrc instanceof HikariDataSource) {
      @SuppressWarnings("resource")
      HikariDataSource hikariDatsource = (HikariDataSource) TestDBSrc;
      hikariDatsource.setSchema(getSchema());
    }
    return TestDBSrc;
  }

  private String getSchema() {
    return TestProperties.getValue("spring.datasource.hikari.schema", "");
  }

  private String getUsername() {
    return TestProperties.getValue("spring.datasource.username", "");
  }

  private String getPassword() {
    return TestProperties.getValue("spring.datasource.password", "");
  }

  private String getUrl() {
    return TestProperties.getValue("spring.datasource.jdbc-url", "");
  }

  @Bean(name = "transactionManager")
  public JdbcTransactionManager batchTransactionManager(@Qualifier("TestDataSource") DataSource dataSource) {
    return new JdbcTransactionManager(dataSource);
  }

  @Bean(name = "TestBatchJobRepository")
  public JobRepository jobRepository(@Qualifier("TestDataSource") DataSource batchDataSource,
      @Qualifier("transactionManager") JdbcTransactionManager batchTransactionManager) throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(batchDataSource);
    factory.setTransactionManager(batchTransactionManager);
    factory.afterPropertiesSet();
    return factory.getObject();
  }

  @Bean(name = "TestBatchJobLauncher")
  public JobLauncher jobLauncher(@Qualifier("TestBatchJobRepository") JobRepository jobRepository) throws Exception {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
  }

  @Bean (name = "TestBatchJobExplorer")
  public JobExplorer jobExplorer(@Qualifier("TestDataSource") DataSource dataSource,@Qualifier("transactionManager")JdbcTransactionManager batchTransactionManager) throws Exception {
      final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
      bean.setDataSource(dataSource);
      bean.setTransactionManager(batchTransactionManager);
      bean.setTablePrefix("BATCH_");
      bean.setJdbcOperations(new JdbcTemplate(dataSource));
      bean.afterPropertiesSet();
      return bean.getObject();
  }
  
  @Bean (name ="TestBatchJobRegistry")
  public JobRegistry jobRegistry() throws Exception {
    return new MapJobRegistry();
  }
  
  @Bean
  public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(@Qualifier("TestBatchJobRegistry") JobRegistry jobRegistry) {
      JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
      postProcessor.setJobRegistry(jobRegistry);
      return postProcessor;
  }
  
  @Bean (name = "TestBatchJobOperator")
  public JobOperator jobOperator(@Qualifier("TestBatchJobLauncher") JobLauncher jobLauncher, @Qualifier("TestBatchJobRepository") JobRepository jobRepository,
          @Qualifier("TestBatchJobRegistry") JobRegistry jobRegistry, @Qualifier("TestBatchJobExplorer") JobExplorer jobExplorer) {
      final SimpleJobOperator jobOperator = new SimpleJobOperator();
      jobOperator.setJobLauncher(jobLauncher);
      jobOperator.setJobRepository(jobRepository);
      jobOperator.setJobRegistry(jobRegistry);
      jobOperator.setJobExplorer(jobExplorer);
      return jobOperator;
  }
}

字符串
因为我使用默认的内存Map

@Bean (name ="TestBatchJobRegistry")
  public JobRegistry jobRegistry() throws Exception {
    return new MapJobRegistry();
  }


它在服务器重启时被清除。
更新24/10/2023:x1c 0d1x
在服务器重新启动时,

作业注册表为空。已尝试,

public void restart(@Qualifier("febpBatchJobExplorer") JobExplorer jobExplorer,@Qualifier("febpBatchJobRepository") JobRepository jobRepository, @Qualifier("febpBatchJobOperator") JobOperator jobOperator){
      try {
          List<String> jobNames=jobExplorer.getJobNames();
          for(String jobName:jobNames)
          {
              List<JobInstance> jobInstances = jobExplorer.getJobInstances(jobName,0,1);// this will get one latest job from the database
              if(CollectionUtils.isNotEmpty(jobInstances)){
                 JobInstance jobInstance =  jobInstances.get(0);
                 List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
                 if(CollectionUtils.isNotEmpty(jobExecutions)){
                     for(JobExecution execution: jobExecutions){
                         // If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
                         if(execution.getStatus().equals(BatchStatus.STARTED) || execution.getStatus().equals(BatchStatus.FAILED)){ 
                             execution.setEndTime(LocalDateTime.now());
                             execution.setStatus(BatchStatus.FAILED);                               
                             execution.setExitStatus(ExitStatus.FAILED);                               
                             jobRepository.update(execution);
                             jobOperator.restart(execution.getId());
                         }
                     }
                 }
              }
          } 
      }catch (Exception e1) {
          e1.printStackTrace();
      }
  }


但是jobOperator.restart(execution.getId())再次失败,因为jobRegistry为空。
在低于Spring批次代码

时失败

**更新2023年10月27日:**提供示例应用程序以重现问题https://github.com/PSHREYASHOLLA/SamplebatchApplication

它是一个maven项目,所以你可以调用mvn install,它将创建\target\SamplebatchApplication-0.0.1-SNAPSHOT.jar。现在你可以像启动任何springboot应用程序(启用Liquibase)一样启动它,java -jar SamplebatchApplication-0.0.1-SNAPSHOT.jar。
如果你看到application.properties文件,我们将其指向一个postgres数据库。我们所有的批处理配置https:github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/FEBPDBConfig.java。
请通过调用rest post API启动批处理,http://localhost:8080/batch/batcher-or-resume-application-batch-job JSON body {“appRestartJobExecutionId”:““}如果我们使用空appRestartJobExecutionId调用此,则流程如下,com.example.postgresql.batchController.batcherOrResumeApplicationBatchJobByB2E()->com.example.postgresql.model.FebpApplicationJobServiceImpl. batcherApplicationBatchJob()-->我们做JobLauncher.run().现在,此作业将作为reader的一部分从febp_emp_detail_test读取50条记录,并作为writer的一部分将更新的记录写入febp_emp_tax_detail_test。这是一个愉快的流程。
现在,如果你调用上面的API,并说5秒后你杀死了服务器,只有部分提交将发生在febp_emp_tax_detail_test中,批处理状态将处于STARTED状态。现在,假设我重新启动服务器并使用失败的作业执行ID调用相同的post API,它现在将调用om.example.postgresql.Buller.BatchController.BullerOrResumeApplicationBatchJobByB2E()-> com.example.postgresql.model.FebpApplicationJobServiceImpl.resumeApplicationBatchJob()-> jobOperator.restart(failedBatchExecutionId);此处由于jobRegistry为空,重启API失败。

更新02/11/2023:

根据Mahmoud Ben Hassine的建议,将我的作业更改为Bean后,我可以重新启动作业。但现在重新启动后,新的作业执行开始并显示完成,但无法处理数据。如果作业在单线程上运行,则处理所有数据,但如果是多线程,则不做任何事情。请检查https://github.com/PSHREYASHOLLA/SamplebatchApplication/blob/main/src/main/java/com/example/postgresql/model/EmployeeTaxCalculationBatchConfig.java
第60行,Step step = new StepBuilder(“FEBP_EMP_TAX_CALCULATION_STEP”,jobRepository).<pageDetail,pageTaxDetail>chunk(5,transactionManager).reader(reader.getPagingItemReader()).processor(itemProcessor).writer(itemWriter).taskExecutor(actStmntTaskExecutor()).throttleLimit(50).build();
这里,如果throttleLimit为1,则在重新启动后处理记录,但如果是多线程的,则不处理示例表中的剩余记录。

5jvtdoz2

5jvtdoz21#

这不应该是这样的,因为你已经注册了一个JobRegistryBeanPostProcessor。这是bean后处理器,每次Spring应用程序上下文启动(重新启动)时,它都会填充作业注册表。

  • 编辑:提供最小示例后更新答案 *

这里的问题是作业没有被声明为bean,而是配置类(显然不是Job类型)被声明为该名称的bean。因此,JobRegistryBeanPostProcessor找不到作业,也没有在注册表中注册它。
Job应该在应用程序上下文中注册为bean,以便JobRegistryBeanPostProcessor对bean进行后处理并在注册表中注册。

相关问题