hadoop mapper的采样记录

eagi6jfj  于 2021-06-04  发布在  Hadoop
关注(0)|答案(2)|浏览(242)

我有一个数据集,它的密钥由3部分组成:a、b和c。在我的Map器中,我想发出键为“a”且值为“a,b,c”的记录
如何为hadoop中从Map器检测到的每个“a”发出总记录的10%?是否应该考虑将前一个map reduce作业中每个“a”的总记录数保存在临时文件中?

bkkx9g8r

bkkx9g8r1#

如果你想接近10%,你可以使用随机。下面是mapper的一个示例:

public class Test extends Mapper<LongWritable, Text, LongWritable, Text> {

    private Random r = new Random();

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        if (r.nextInt(10) == 0) {
            context.write(key, value);
        }
    }

}
d8tt03nd

d8tt03nd2#

使用以下java代码随机选择10%:

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class RandomSample {

 public static class Map extends Mapper<LongWritable, Text, Text, Text> {
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
        if (Math.random()<0.1)
            context.write(value,null);
        else
            context.write(null,null);
    context.write(value,null);
    } 
 }

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();

    Job job = new Job(conf, "randomsample");
    job.setJarByClass(RandomSample.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setNumReduceTasks(0);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
 }

}

并使用这个bash脚本来运行它

echo "Running Job"
hadoop jar RandomSample.jar RandomSample $1 tmp
echo "copying result to local path (RandomSample)"
hadoop fs -getmerge tmp RandomSample
echo "Clean up"
hadoop fs -rmr tmp

例如,如果我们命名脚本 random_sample.sh ,从文件夹中选择10% /example/ ,只需运行

./random_sample.sh /example/

相关问题