值的hadoop二次排序排序,获取松散值

crcmnpdw  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(369)

这是很常见的问题,我不知道该选什么。
我有字段:id,creationdate,state,datediff
id是一把天生的钥匙。
我要进减速机:
键(id),值(creationdate,state,datediff)
值(creationdate,state,datediff)应按以下顺序排序:creationdate,state
我应该选什么钥匙?我确实创建了复合键(id,creationdate,state)
我确实通过id实现了partitioner
石斑鱼
按id、创建日期、状态分类
我的减速机只有唯一的id。。。例如:

1 123 true  6
1 456 false 6
1 789 true  7

我只得到

1 123 true  6

在我的减速机里。好像我不懂分类者,分割者,石斑鱼:(有一种理解的奢华。
这是我的密码:

public class POIMapper extends Mapper<LongWritable, Text, XVLRKey, XVLRValue>{

    private static final Log LOG = LogFactory.getLog(POIMapper.class);

    @Override
    public void map(LongWritable key, Text csvLine, Context context) throws IOException, InterruptedException {
        Pair<XVLRKey, XVLRValue> xvlrPair = POIUtil.parseKeyAndValue(csvLine.toString(), POIUtil.CSV_DELIMITER);
        context.write(xvlrPair.getValue0(), xvlrPair.getValue1());
    }

}

public class POIReducer extends Reducer<XVLRKey, XVLRValue, LongWritable, Text>{

    private static final Log LOG = LogFactory.getLog(POIReducer.class);

    private final Text textForOutput = new Text();

    @Override()
    public void reduce(XVLRKey key, Iterable<XVLRValue> values, Context context)
                                                                            throws IOException, InterruptedException {
        XVLROutput out = null;
//Just check that values are correctly attached to keys. No logic here...
        LOG.info("\nPOIReducer: key:"+key);
        for(XVLRValue value : values){
            LOG.info("\n --- --- --- value:"+value+"\n");
            textForOutput.set(print(key, value));
            context.write(key.getMsisdn(), textForOutput);
        }
    }

    private String print(XVLRKey key, XVLRValue value){
        StringBuilder builder = new StringBuilder();
        builder.append(value.getLac())          .append("\t")
               .append(value.getCellId())       .append("\t")
               .append(key.getDateOccurrence()) .append("\t")
               .append(value.getTimeDelta());
        return builder.toString();
    }
}

职务代码:

JobBuilder<POIJob> jobBuilder = createTestableJobInstance();

        jobBuilder.withOutputKey(XVLRKey.class);
        jobBuilder.withOutputValue(XVLRValue.class);

        jobBuilder.withMapper(POIMapper.class);
        jobBuilder.withReducer(POIReducer.class);

        jobBuilder.withInputFormat(TextInputFormat.class);
        jobBuilder.withOutputFormat(TextOutputFormat.class);

        jobBuilder.withPartitioner(XVLRKeyPartitioner.class);
        jobBuilder.withSortComparator(XVLRCompositeKeyComparator.class);
        jobBuilder.withGroupingComparator(XVLRKeyGroupingComparator.class);

        boolean result = buildSubmitAndWaitForCompletion(jobBuilder);
        MatcherAssert.assertThat(result, Matchers.is(true));

public class XVLRKeyPartitioner extends Partitioner<XVLRKey, XVLRValue> {

    @Override
    public int getPartition(XVLRKey key, XVLRValue value, int numPartitions) {
            return Math.abs(key.getMsisdn().hashCode() * 127) % numPartitions;
    }
}

public class XVLRCompositeKeyComparator extends WritableComparator {

    protected XVLRCompositeKeyComparator() {
        super(XVLRKey.class, true);
    }

    @Override
    public int compare(WritableComparable writable1, WritableComparable writable2) {
        XVLRKey key1 = (XVLRKey) writable1;
        XVLRKey key2 = (XVLRKey) writable2;
       return key1.compareTo(key2);
    }
}

public class XVLRKeyGroupingComparator extends WritableComparator {

    protected XVLRKeyGroupingComparator() {
        super(XVLRKey.class, true);
    }

    @Override
    public int compare(WritableComparable writable1, WritableComparable writable2) {

        XVLRKey key1 = (XVLRKey) writable1;
        XVLRKey key2 = (XVLRKey) writable2;

        return key1.getMsisdn().compareTo(key2.getMsisdn());

    }
}

public class XVLRKey implements WritableComparable<XVLRKey>{

    private  final LongWritable msisdn;
    private  final LongWritable dateOccurrence;
    private  final BooleanWritable state;
//getters-setters
}

public class XVLRValue implements WritableComparable<XVLRValue> {

    private final LongWritable lac;
    private final LongWritable cellId;
    private final LongWritable timeDelta;
    private final LongWritable dateOccurrence;
    private final BooleanWritable state;
//getters-setterrs
}

请注意xvlrkey和xvlrvalue确实有重复的字段。我在xvlrkey中复制了dateoccurrence和state,因为我想在我的reducer中获得排序值。它们应该按发生日期排序。
我找不到一个不重复地解决这个问题的方法。

j8ag8udp

j8ag8udp1#

在二级排序情况下(如您所描述的),当您从迭代器中检索下一个值时,所拥有的键的值会发生更改。
这是因为hadoop框架重用对象的示例,以尽可能避免对象创建和垃圾收集。
因此,当您调用“next()”时,框架还会更改key示例中的数据。
所以如果你移动

LOG.info("\nPOIReducer: key:"+key);

语句,以便它位于for循环中,您应该可以看到所有的键。
因此,我的工作基本上遵循以下“规则”:
该键仅由框架用于将值定向到正确的缩减器。
这意味着
我所需要的一切都必须在价值中体现出来。
在reducer中,我只查看值,并且总是丢弃/忽略键。
用于创建键的属性也可以在值中找到。

相关问题