java-空orc文件

ibrsph3r  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(413)

我正在尝试使用orc core编写orc文件,以便稍后由hive读取。
正在写入的文件具有正确的行数,但列中没有内容。我可以看到,两者都在尝试使用配置单元中的select查询读取文件,并且都使用 hive --orcfiledump -d .
我尝试了指南中提供的示例,该示例编写了两个长类型列,生成的文件被hive正确读取。我怀疑这与我正在编写字符串列有关,但我仍然无法使它工作。
我现在就是这样写这个文件的:

//  File schema
    String outputFormat = "struct<";
    for(int i=0;i<outputSchema.length;i++){
        outputFormat+=outputSchema[i]+":string,";
    }
    outputFormat+="lastRecordHash:string,currentHash:string>";
    TypeDescription orcSchema = TypeDescription.fromString(outputFormat);

    //  Initializes buffers
    VectorizedRowBatch batch = orcSchema.createRowBatch();
    ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
    for(int i=0;i<numFields+2;i++){
        BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
        orcBuffers.add(i, bcv);
    }

    ...

    //  Initializes writer
    Writer writer=null;
    try{
        writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
        partitionCounter++;
    }
    catch(IOException e){
        log.error("Cannot open hdfs file. Reason: "+e.getMessage());
        session.transfer(flowfile, hdfsFailure);
        return;
    }

    //  Writes content
    String[] records = ...

    for(int i=0;i<records.length;i++){
        fields = records[i].split(fieldSeparator);

        int row=batch.size++;

        //  Filling the orc buffers
        for(int j=0;j<numFields;j++){
            orcBuffers.get(j).vector[row] = fields[j].getBytes();
            hashDigest.append(fields[j]);
        }
        if (batch.size == batch.getMaxSize()) {
            try{
                writer.addRowBatch(batch);
                batch.reset();
            }
            catch(IOException e){
                log.error("Cannot write to hdfs. Reason: "+e.getMessage());
                return;
            }
        }         
    }
    if (batch.size != 0) {
        try{
            writer.addRowBatch(batch);
            batch.reset();
        }
        catch(IOException e){
            log.error("Cannot write to hdfs. Reason: "+e.getMessage());
            return;
        }
    }
    writer.close();

任何建议或有用的参考是非常感谢。
谢谢大家。

qmelpv7a

qmelpv7a1#

看来我需要的是对api文档的深入审查。我缺少的是:
呼叫 initBuffer() 在每个 BytesColumnVector 在初始化阶段
指定调用列的值 setVal() . 也可以使用 setRef() . 据记载,这是最快的两个,但我不知道是否适合我的具体情况,我会尝试它。
这是更新的代码:

//  File schema
String outputFormat = "struct<";
for(int i=0;i<outputSchema.length;i++){
    outputFormat+=outputSchema[i]+":string,";
}
outputFormat+="lastRecordHash:string,currentHash:string>";
TypeDescription orcSchema = TypeDescription.fromString(outputFormat);

//  Initializes buffers
VectorizedRowBatch batch = orcSchema.createRowBatch();
ArrayList<BytesColumnVector> orcBuffers = new ArrayList<>(numFields+2);
for(int i=0;i<numFields+2;i++){
    BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
    bcv.initBuffer();
    orcBuffers.add(i, bcv);
}

...

//  Initializes writer
Writer writer=null;
try{
    writer = OrcFile.createWriter(new Path(hdfsUri+outputPath), OrcFile.writerOptions(conf).setSchema(orcSchema));
    partitionCounter++;
}
catch(IOException e){
    log.error("Cannot open hdfs file. Reason: "+e.getMessage());
    session.transfer(flowfile, hdfsFailure);
    return;
}

//  Writes content
String[] records = ...

for(int i=0;i<records.length;i++){
    fields = records[i].split(fieldSeparator);

    int row=batch.size++;

    //  Filling the orc buffers
    for(int j=0;j<numFields;j++){
        orcBuffers.get(j).setVal(row, fields[j].getBytes());
        hashDigest.append(fields[j]);
    }
    if (batch.size == batch.getMaxSize()) {
        try{
            writer.addRowBatch(batch);
            batch.reset();
        }
        catch(IOException e){
            log.error("Cannot write to hdfs. Reason: "+e.getMessage());
            return;
        }
    }         
}
if (batch.size != 0) {
    try{
        writer.addRowBatch(batch);
        batch.reset();
    }
    catch(IOException e){
        log.error("Cannot write to hdfs. Reason: "+e.getMessage());
        return;
    }
}
writer.close();

相关问题