如何将hdfs中的二进制文件读入sparkDataframe?

czfnxgou  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(384)

我正在尝试从pandas移植一些代码到(py)spark。不幸的是,我的输入部分已经失败了,我想读入二进制数据并将其放入sparkDataframe中。
到目前为止我用的是 fromfile 来自numpy:

dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:]                                           #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)

但对于spark我找不到怎么做。到目前为止,我的解决方法是使用csv文件而不是二进制文件,但这不是一个理想的解决方案。我知道我不应该用numpy的 fromfile 带着Spark。如何读入已经加载到hdfs中的二进制文件?
我试过类似的方法

fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)

但它给了我一个机会 No such file or directory 错误。
我看到过这个问题:python中的spark:通过用numpy.fromfile加载二进制数据来创建rdd,但是只有当我将文件存储在驱动程序节点的主节点中时,这个问题才起作用。

b1payxdu

b1payxdu1#

我最近做了这样的事情:

from struct import unpack_from

# creates an RDD of binaryrecords for determinted record length

binary_rdd = sc.binaryRecords("hdfs://" + file_name, record_length)

# map()s each binary record to unpack() it

unpacked_rdd = binary_rdd.map(lambda record: unpack_from(unpack_format, record))

# registers a data frame with this schema; registerTempTable() it as table_name

raw_df = sqlc.createDataFrame(unpacked_rdd, sparkSchema)
raw_df.registerTempTable(table_name)

其中解包格式和sparkschema必须“同步”。
unpack\ u格式是python的unpack()和unpack\ u from()函数使用的格式,如中所述https://docs.python.org/2/library/struct.html#format-人物
sparkschema是定义Dataframe模式的变量。示例见https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.sqlcontext.createdataframe
我有一个脚本,可以动态生成解包格式和sparkschema变量;两者同时存在(它是一个更大的代码库的一部分,因此不在此处发布以提高可读性)
解包格式和sparkschema可以定义如下,例如,

from pyspark.sql.types import *

unpack_format = '<'   # '<' means little-endian: https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
sparkSchema = StructType()
record_length = 0

unpack_format += '35s'    # 35 bytes that represent a character string
sparkSchema.add("FirstName", 'string', True)  # True = nullable
record_length += 35

unpack_format += 'H'    # 'H' = unsigned 2-byte integer
sparkSchema.add("ZipCode", 'integer', True)
record_length += 2

# and so on for each field..
fiei3ece

fiei3ece2#

编辑:请检查sc.binaryfiles的使用,如下所述:https://stackoverflow.com/a/28753276/5088142
尝试使用:

hdfs://machine_host_name:8020/user/bin_file1.bin

您可以在core-site.xml的fs.defaultfs中输入主机名

oxalkeyp

oxalkeyp3#

所以,对于任何一个以spark为起点,以我为起点,偶然发现二进制文件的人来说。我是这样解决的:

dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
             ('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
                   StructField('idx_resource',IntegerType(),False), 
                   StructField('date',IntegerType),False), 
                   StructField('value',DoubleType(),False), 
                   StructField('pollID',IntegerType(),False)])

filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')

def read_array(rdd):
    #output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
    array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
    array=array.newbyteorder().byteswap() # big Endian
    return array.tolist()

unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)

现在你可以用你的Dataframe在spark里做任何你想做的事情。

相关问题