更改hadoop作业的拆分数

qvsjd97n  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(343)

我目前正在编写代码来处理一个单一的图像使用 Hadoop ,所以我的输入只有一个文件( .png ). 我有运行作业的工作代码,但不是按顺序运行 mappers ,它只运行一个 mapper 永远不会产生别人 mappers .
我已经创建了我自己的扩展 FileInputFormat 以及 RecordReader 类来创建(我以为是“n”自定义的) splits ->“n” map 任务。
我一直在网上疯狂地搜索这种性质的例子来学习,但我所能找到的都是处理将整个文件作为一个分裂(确切地说是一个分裂)的例子 mapper )或者使用文本文件中的固定行数(例如,3行) map 任务。
我要做的是发送一对坐标 ((x1, y1), (x2, y2)) 对每个 mapper 其中坐标对应于图像中某个矩形的左上/右下像素。
如有任何建议/指导/示例/示例链接,将不胜感激。

自定义文件输入格式

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class FileInputFormat1 extends FileInputFormat
{
    @Override
    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new RecordReader1();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return true;
    }
}

自定义recordreader

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class RecordReader1 extends RecordReader<KeyChunk1, NullWritable> {

    private KeyChunk1 key;
    private NullWritable value;

    private ImagePreprocessor IMAGE;

    public RecordReader1()
    {

    }

    @Override
    public void close() throws IOException {

    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return IMAGE.getProgress();
    }

    @Override
    public KeyChunk1 getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public NullWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        boolean gotNextValue = IMAGE.hasAnotherChunk();

        if (gotNextValue)
        {
            if (key == null)
            {
                key = new KeyChunk1();
            }
            if (value == null)
            {
                value = NullWritable.get();
            }

            int[] data = IMAGE.getChunkIndicesAndIndex();
            key.setChunkIndex(data[2]);
            key.setStartRow(data[0]);
            key.setStartCol(data[1]);
            key.setChunkWidth(data[3]);
            key.setChunkHeight(data[4]);
        }
        else
        {
            key = null;
            value = null;
        }

        return gotNextValue;
    }

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        Configuration config = taskAttemptContext.getConfiguration();
        IMAGE = new ImagePreprocessor(
                config.get("imageName"),
                config.getInt("v_slices", 1),
                config.getInt("h_slices", 1),
                config.getInt("kernel_rad", 2),
                config.getInt("grad_rad", 1),
                config.get("hdfs_address"),
                config.get("local_directory")
        );
    }
}

imagepreprocessor类(用于自定义recordreader-仅显示必要的信息)

import java.awt.image.BufferedImage;
import java.io.IOException;

public class ImagePreprocessor {

    private String filename;
    private int num_v_slices;
    private int num_h_slices;
    private int minSize;

    private int width, height;

    private int chunkWidth, chunkHeight;
    private int indexI, indexJ;

    String hdfs_address, local_directory;

    public ImagePreprocessor(String filename, int num_v_slices, int num_h_slices, int kernel_radius, int gradient_radius,
                             String hdfs_address, String local_directory) throws IOException{
        this.hdfs_address = hdfs_address;
        this.local_directory = local_directory;

        // all "validate" methods throw errors if input data is invalid

        checkValidFilename(filename);
        checkValidNumber(num_v_slices, "vertical strips");
        this.num_v_slices = num_v_slices;
        checkValidNumber(num_h_slices, "horizontal strips");
        this.num_h_slices = num_h_slices;
        checkValidNumber(kernel_radius, "kernel radius");
        checkValidNumber(gradient_radius, "gradient radius");

        this.minSize = 1 + 2 * (kernel_radius + gradient_radius);

        getImageData(); // loads image and saves width/height to class variables
        validateImageSize();

        chunkWidth = validateWidth((int)Math.ceil(((double)width) / num_v_slices));
        chunkHeight = validateHeight((int)Math.ceil(((double)height) / num_h_slices));

        indexI = 0;
        indexJ = 0;

    }

    public boolean hasAnotherChunk()
    {
        return indexI < num_h_slices;
    }

    public int[] getChunkIndicesAndIndex()
    {
        int[] ret = new int[5];
        ret[0] = indexI;
        ret[1] = indexJ;
        ret[2] = indexI*num_v_slices + indexJ;
        ret[3] = chunkWidth;
        ret[4] = chunkHeight;

        indexJ += 1;
        if (indexJ >= num_v_slices)
        {
            indexJ = 0;
            indexI += 1;
        }

        return ret;
    }
}

谢谢你的时间!

myzjeezk

myzjeezk1#

你应该重写方法 public InputSplit[] getSplits(JobConf job, int numSplits) 在你的 FileInputFormat1 班级。根据创建自己的类 InputSplit 有矩形坐标,所以在里面 FileInputFormat 您可以获取此信息以将正确的键/值对返回给Map器。可能实施 getSplitsFileInputFormat 我可以帮你看看这里。

相关问题