调度spark作业java

yrdbyhpb  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(402)

我有一个spark工作,它读取hbase表、一些聚合并将数据存储到mongodb。当前此作业正在使用spark submit脚本手动运行。我想安排一个固定的时间间隔。
如何使用java实现这一点。
有图书馆吗?或者我可以用java中的线程来实现这一点?
感谢您的建议!

k97glaaz

k97glaaz1#

如果你想继续使用 spark-submit 我更喜欢crontab或者类似的东西,比如运行bash脚本。
但是如果您需要从java运行“spark submit”,您可以查看org.apache.spark.launcher包。使用这种方法,您可以用编程方式启动应用程序 SparkLauncher .

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

...

     public void startApacheSparkApplication(){
        SparkAppHandle handler = new SparkLauncher()
         .setAppResource("pathToYourSparkApp.jar")
         .setMainClass("your.package.main.Class")
         .setMaster("local")
         .setConf(...)
         .startApplication(); // <-- and start spark job app
     }
...

但你的问题是关于某个排程库。你可以用一个简单的 TimerDate 在java util中提供( java.util.TimerTask ),但我更喜欢使用quartz作业调度库-它非常流行(因为我知道spring也使用quartz调度程序)。
spring还提供了集成类,用于支持与timer(自1.3以来jdk的一部分)的调度,以及quartz scheduler(http://quartz-scheduler.org) ....
使用quartz你可以设置cron调度,对我来说,使用quartz更容易。
只需添加maven依赖项

<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.2.3</version>
</dependency>

创建spark-quartz作业

public class SparkLauncherQuartzJob implements Job {
         startApacheSparkApplication();
   ...

现在创建一个触发器并计划它

// trigger runs every hour
 Trigger trigger = new Trigger() 
             .withIdentity("sparkJob1Trigger", "sparkJobsGroup")
             .withSchedule(
                 CronScheduleBuilder.cronSchedule("0 * * * * ?"))
             .build();

  JobDetail sparkQuartzJob = JobBuilder.newJob(SparkLauncherQuartzJob.class).withIdentity("SparkLauncherQuartzJob", "sparkJobsGroup").build();

  Scheduler scheduler = new StdSchedulerFactory().getScheduler();
  scheduler.start();
  scheduler.scheduleJob(sparkQuartzJob , trigger);

不太可能-如果你有spring启动应用程序,你可以使用调度运行一些方法非常容易-只是 @EnableScheduling 在配置和类似的情况下:

@Scheduled(fixedRate = 300000)
public void periodicalRunningSparkJob() {
    log.info("Spark job periodically execution");
    startApacheSparkApplication();
}

相关问题