如何在java中从parquet文件中开始n行而不下载整个文件

kx5bkwkv  于 2021-05-27  发布在  Hadoop
关注(0)|答案(0)|浏览(388)

我的要求是从s3/sftp/ftp读取Parquet文件,然后从文件中读取几行并将其写入csv文件。
由于我没有找到任何直接从s3/sftp/ftp读取Parquet文件的通用解决方案,我正在使用inputstream将Parquet文件下载到本地。

File tmp = null;
File parquetFile = null;
try {
      tmp = File.createTempFile("csvFile", ".csv");
      parquetFile = File.createTempFile("partquetFile",".parquet");

      //downloading file to local
      StreamUtils.dumpToDisk(parquetFile, feed.getInputStream());
      parquetReaderUtils.parquetReader(new 
      org.apache.hadoop.fs.Path(parquetFile.getAbsolutePath()),tmp);

    } catch(IOException e){
        System.out.println("Error reading parquet file.");
    }
    finally {
              FileUtils.deleteQuietly(tmp);
              FileUtils.deleteQuietly(parquetFile);
    }

其中一个文件是下载的,我调用parquetrederutils类的parquetreader()方法从本地路径读取文件。将Parquet文件的前5行写入csv文件。
以下是ParqueTreaterUtils类定义:

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.springframework.stereotype.Component;

import java.io.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.JulianFields;

@Component
public class ParquetReaderUtils {
    private static final String CSV_DELIMITER = ",";

// Reading parquet file from local and writing first 5 rows to csv file.
    public void parquetReader(org.apache.hadoop.fs.Path path, File csvOutputFile, InputStream in) throws IllegalArgumentException {
        Configuration conf = new Configuration();
        conf.addResource(in);
        int headerRow = 0;
        int rowsRead = 0;
        try {
            ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
            MessageType schema = readFooter.getFileMetaData().getSchema();
            ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
            BufferedWriter w = new BufferedWriter(new FileWriter(csvOutputFile));
            PageReadStore pages = null;
            try {
                while (null != (pages = r.readNextRowGroup())) {
                    final long rows = pages.getRowCount();
                    System.out.println("Number of rows: " + rows);

                    final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
                    final RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
                    for (int i = 0; i <= 5; i++) {
                        final Group g = (Group) recordReader.read();
                        //printGroup(g);
                        writeGroup(w, g, schema, headerRow);
                        rowsRead++;
                    }
                    if(rowsRead==5)
                        break;
                }
            } finally {
                r.close();
                w.close();
            }
        } catch (IOException e) {
            System.out.println("Error reading parquet file.");
            e.printStackTrace();
        }
    }

// writing rows to csv file.
    private static void writeGroup(BufferedWriter w, Group g, MessageType schema, int headerRow)
            throws IOException {
        if (headerRow < 1) {
            for (int j = 0; j < schema.getFieldCount(); j++) {
                if (j > 0) {
                    w.write(CSV_DELIMITER);
                }
                Type fieldType = g.getType().getType(j);
                String fieldName = fieldType.getName();
                w.write(fieldName);
            }
            w.write('\n');
            headerRow++;
        }
        for (int j = 0; j < schema.getFieldCount(); j++) {
            try {
                if (j > 0) {
                    w.write(CSV_DELIMITER);
                }
                Type fieldType = g.getType().getType(j);
                PrimitiveType pt = (PrimitiveType) g.getType().getFields().get(j);
                int valueCount = g.getFieldRepetitionCount(j);
                String valueToString = g.getValueToString(j, 0);
                if (pt.getPrimitiveTypeName().name().equals("INT96")) {
                    for (int index = 0; index < valueCount; index++) {
                        if (fieldType.isPrimitive()) {
                            LocalDateTime dateTime = convertToDate(g.getInt96(j, index).getBytes());
                            valueToString = String.valueOf(dateTime);
                        }
                    }
                }
                w.write(valueToString);
            } catch (Exception e) {
                w.write("");
                continue;
            }
        }
        w.write('\n');
    }

// Method to convert INT96 value to LocalDateTime.
    private static LocalDateTime convertToDate(byte[] int96Bytes) {
        // Find Julian day
        int julianDay = 0;
        int index = int96Bytes.length;
        while (index > 8) {
            index--;
            julianDay <<= 8;
            julianDay += int96Bytes[index] & 0xFF;
        }

        // Find nanos since midday (since Julian days start at midday)
        long nanos = 0;
        // Continue from the index we got to
        while (index > 0) {
            index--;
            nanos <<= 8;
            nanos += int96Bytes[index] & 0xFF;
        }

        LocalDateTime timestamp = LocalDate.MIN
                .with(JulianFields.JULIAN_DAY, julianDay)
                .atTime(LocalTime.NOON)
                .plusNanos(nanos);
        System.out.println("Timestamp: " + timestamp);
        return timestamp;
    }

}

这里我下载整个文件到本地系统,如果Parquet文件的大小是大的,这个解决方案是不可扩展的。下载完整的文件对我没用。
有没有办法直接从inputstream读取Parquet文件?而不是下载到本地并读取本地文件。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题