如何异步启动Spring Batch Job

hfwmuf9z  于 5个月前  发布在  Spring
关注(0)|答案(8)|浏览(69)

我已经遵循了 Spring 批处理文档,但无法让我的作业异步运行。
因此,我从Web容器运行作业,作业将通过REST端点触发。

我想获取JobInstance ID,以便在完成整个作业之前将其作为响应传递。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我无法让它工作。下面是我尝试的示例代码。请让我知道我错过了什么或错了什么。
使用BatchConfig创建Async JobLauncher

@Configuration
public class BatchConfig {

    @Autowired
    JobRepository jobRepository;

    @Bean
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

字符串

控制器

@Autowired
JobLauncher jobLauncher;

@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
    JobParameters jobParameters = new JobParametersBuilder().
            addLong("time", System.currentTimeMillis())
            .toJobParameters();
    JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
    System.out.println(jobExecution.getJobInstance().getInstanceId());
    System.out.println("OK RESPONSE");
    return jobExecution.getJobInstance().getInstanceId();
}


JobBuilder作为组件

@Component
public class BatchComponent {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    public Job customJob(String someParam) throws Exception {
        return jobBuilderFactory.get("personProcessor")
                .incrementer(new RunIdIncrementer()).listener(listener())
                .flow(personPorcessStep(someParam)).end().build();
    }

    private Step personPorcessStep(String someParam) throws Exception {
        return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
                .reader(new PersonReader(someParam)).faultTolerant().
                        skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
                .writer(new PersonWriter()).build();
    }

    private JobExecutionListener listener() {
        return new PersonJobCompletionListener();
    }

    private class PersonInput {
        String firstName;

        public PersonInput(String firstName) {
            this.firstName = firstName;
        }

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    private class PersonOutput {
        String firstName;

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    public class PersonReader implements ItemReader<PersonInput> {
        private List<PersonInput> items;
        private int count = 0;

        public PersonReader(String someParam) throws InterruptedException {
            Thread.sleep(10000L); //to simulate processing
            //manipulate and provide data in the read method
            //just for testing i have given some dummy example
            items = new ArrayList<PersonInput>();
            PersonInput pi = new PersonInput("john");
            items.add(pi);
        }

        @Override
        public PersonInput read() {
            if (count < items.size()) {
                return items.get(count++);
            }
            return null;
        }
    }

    public class DataDuplicateSkipper implements SkipPolicy {

        @Override
        public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
            if (exception instanceof DataIntegrityViolationException) {
                return true;
            }
            return true;
        }
    }

    private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {

        @Override
        public PersonOutput process(PersonInput item) throws Exception {
            return null;
        }
    }

    private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
        @Override
        public void write(List<? extends PersonOutput> results) throws Exception {
            return;
        }
    }

    private class PersonJobCompletionListener implements JobExecutionListener {
        public PersonJobCompletionListener() {
        }

        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("JOB COMPLETED");
        }
    }
}


主要功能

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}


我正在使用基于注解的配置,并将gradle与下面的批处理包一起使用。

compile('org.springframework.boot:spring-boot-starter-batch')


如果需要更多的信息,请告诉我。我找不到任何例子来运行这个常见的用例。
谢谢你的时间。

3zwtqj6y

3zwtqj6y1#

试试这个,在您的配置中您需要使用 @Bean(name =“myJobLauncher”) 创建带有 SimpleAsyncTaskExecutor 的customJobLauncher *,并且在您的控制器中使用 *@ Debugger *。

@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

字符串
在控制器中

@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;

zbwhf8kr

zbwhf8kr2#

如果我看一下你的代码,我会发现几个错误。首先,你的自定义配置没有加载,因为,如果加载了,注入会因为相同接口的重复bean示例而失败。
在spring Boot 中有很多魔法,但是如果你不告诉他做一些组件扫描,什么都不会被加载。
我看到的第二个问题是你的BatchConfig类:它没有扩展DefaultBatchConfigure,也没有覆盖getJobLauncher(),所以即使 Boot 魔法将加载所有内容,你也会得到默认的。

批量配置

@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {

  @Override
  @Bean
  public JobLauncher getJobLauncher() {
    try {
      SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
      jobLauncher.setJobRepository(getJobRepository());
      jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
      jobLauncher.afterPropertiesSet();
      return jobLauncher;

    } catch (Exception e) {
      log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
      return super.getJobLauncher();
    }
  }
}

字符串

主要功能

@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

8mmmxcuj

8mmmxcuj3#

虽然你有自己的自定义jobLauncher,但你正在使用Spring提供的默认jobLauncher运行作业。你能在你的控制器中自动连接simpleJobLauncher并给予尝试吗?

ac1kyiln

ac1kyiln4#

我知道这是一个老问题,但我还是为未来的用户发布了这个答案。
在检查了你的代码之后,我不知道为什么你会有这个问题,但是我可以建议你使用一个注解加上像这样使用ThreadPoolTaskExecutor,看看它是否能解决你的问题。
你也可以查看这个教程:Asynchronous Spring Batch Job Processing了解更多细节。它将帮助你异步配置一个spring批处理作业。这个教程是我写的。

@Configuration
public class BatchConfig {
 
 @Autowired
 private JobRepository jobRepository;
 
 @Bean
 public TaskExecutor threadPoolTaskExecutor(){
  
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(12);
        executor.setCorePoolSize(8);
        executor.setQueueCapacity(15);
  
   return executor;
 }
 
 @Bean
    public JobLauncher asyncJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
        return jobLauncher;
 }
}

字符串

oyt4ldly

oyt4ldly5#

JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);. Joblauncher将在Job完成后等待返回任何内容,这就是为什么你的服务可能需要很长时间才能响应,如果这是你的问题。如果你想要异步功能,你可能想看看Spring的@EnableAsync@Async
@EnableAsync

daolsyd0

daolsyd06#

根据spring文档,要异步返回http请求的响应,需要使用org.springframework.core.task.SimpleAsyncTaskExecutor。
spring TaskExecutor接口的任何实现都可以用于控制作业如何异步执行。
spring batch documentation

<bean id="jobLauncher"
  class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>

字符串

owfi6suc

owfi6suc7#

如果你正在使用Lombok,这可能会帮助你:

TLDR:Lombok @AllArgsConstructor似乎不能很好地与@Qualifier annotation配合使用EDIT:如果您在lombok.config文件中启用了@Qualifier annotation,则可以像这样将@Qualifier@AllArgsConstructor配合使用:

lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier

字符串
我知道老问题,但我有完全相同的问题,没有答案解决它。
我像这样配置了jobLauncher,并添加了限定符以确保jobLauncher被注入:

@Bean(name = "asyncJobLauncher")
 public JobLauncher simpleJobLauncher(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }


然后像这样注射

@Qualifier("asyncJobLauncher")
private final JobLauncher jobLauncher;


我在将其更改为autowire后使用Lombok @AllArgsConstructor,正确的作业启动器被注入,作业现在异步执行:

@Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher jobLauncher;


此外,我不必从DefaultBatchConfigurer扩展配置

yyhrrdl8

yyhrrdl88#

创建一个名为BatchParallelProcessingConfiguration.java的新java文件并添加以下代码,

import javax.sql.DataSource;

import org.springframework.batch.core.configuration.BatchConfigurationException;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.support.DefaultBatchConfiguration;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

@Configuration
// @EnableBatchProcessing
public class BatchParallelProcessingConfiguration {

    @Bean
    public TaskExecutor taskExecutor(){
        return new SimpleAsyncTaskExecutor();
    }

    @Bean
    public JobLauncher jobLauncher(){
        TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
        jobLauncher.setTaskExecutor(taskExecutor());
        return jobLauncher;
    }
}

字符串
为了通过spring Boot 覆盖默认的jobLauncher bean provider,我们需要在application.properties中添加以下属性。

spring.main.allow-bean-definition-overriding=true


这就是现在你的工作可以异步运行和可移植的。

相关问题