在springxd批处理作业中使用hbase表作为源和汇的mapreduce作业

bt1cpqcv  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(284)

如何在spring中配置使用hbase表作为源和汇的mapreduce作业,我计划使用使用mapreduce作业的spring xd创建批处理作业,但是我想使用hbase表作为这个hadoop作业的源和汇。类似于tablemapreduceutil.inittablemapperjob(),tablemapreduceutil.inittablereducerjob() <hdp:job> 命名空间当前不支持提供输入/输出表

af7jpaap

af7jpaap1#

我可以通过使用另一个bean来解决这个问题,该bean将hadoop作业作为输入,并在设置scan()和source和sink hbase表之后返回作业。使用上的scope=“job”和上的scope=“prototype”我能够在springxd中多次运行同一mr作业,如果没有这个,您将得到job处于运行状态,而不是在第一次成功运行后定义状态问题。

public class InitJobTasklet {
   private Job job;

    public void setJob(Object job){
     this.job = (Job)job;
    }

     public Job getJob() throws IOException {

         Scan scan = new Scan();
         System.out.println("Initializing the hadoop job with hbase tables and scan object... ");
         TableMapReduceUtil.initTableMapperJob("SourceTable", 
                                        scan, 
                                        Mapper.class,
                                        Text.class, Result.class, job);

         TableMapReduceUtil.initTableReducerJob(
                "TargetTable",      // output table
                Reducer.class,             // reducer class
                job);
                job.setNumReduceTasks(1);

          return job;
 }

}
spring批处理作业配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:hdp="http://www.springframework.org/schema/hadoop"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd
    http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

<hdp:job id="mr-hbase-job"
    output-path="/output"
  mapper="mapperclass" reducer="reduceclass"
  map-key="org.apache.hadoop.hbase.io.ImmutableBytesWritable" map-value="org.apache.hadoop.hbase.client.Result" input-format="org.apache.hadoop.hbase.mapreduce.TableInputFormat" output-format="org.apache.hadoop.hbase.mapreduce.TableOutputFormat" jar-by-class="processor class" scope="prototype">

</hdp:job>

<batch:job id="job"  >
              <batch:step id="step1">
        <hdp:job-tasklet id="hadoop-tasklet" job="#{initTask.job}" wait-for-completion="true"  scope="job"/>
    </batch:step>
</batch:job>

<hdp:configuration id="hadoopConfiguration">
     fs.defaultFS=hdfs://localhost:9000
 hadoop.tmp.dir=/home/smunigati/hadoop/temp
 hbase.zookeeper.quorum=localhost
 hbase.zookeeper.property.clientPort=2181
    </hdp:configuration>

<hdp:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfiguration">
</hdp:hbase-configuration>

   <bean id="initTask" class="com.somthing.InitJobTasklet" scope="prototype" >
    <property name="job" ref="mr-hbase-job" />  
    </bean>

相关问题