我正在尝试从pandas移植一些代码到(py)spark。不幸的是,我的输入部分已经失败了,我想读入二进制数据并将其放入sparkDataframe中。
到目前为止我用的是 fromfile
来自numpy:
dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:] #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)
但对于spark我找不到怎么做。到目前为止,我的解决方法是使用csv文件而不是二进制文件,但这不是一个理想的解决方案。我知道我不应该用numpy的 fromfile
带着Spark。如何读入已经加载到hdfs中的二进制文件?
我试过类似的方法
fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)
但它给了我一个机会 No such file or directory
错误。
我看到过这个问题:python中的spark:通过用numpy.fromfile加载二进制数据来创建rdd,但是只有当我将文件存储在驱动程序节点的主节点中时,这个问题才起作用。
3条答案
按热度按时间b1payxdu1#
我最近做了这样的事情:
其中解包格式和sparkschema必须“同步”。
unpack\ u格式是python的unpack()和unpack\ u from()函数使用的格式,如中所述https://docs.python.org/2/library/struct.html#format-人物
sparkschema是定义Dataframe模式的变量。示例见https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.sqlcontext.createdataframe
我有一个脚本,可以动态生成解包格式和sparkschema变量;两者同时存在(它是一个更大的代码库的一部分,因此不在此处发布以提高可读性)
解包格式和sparkschema可以定义如下,例如,
fiei3ece2#
编辑:请检查sc.binaryfiles的使用,如下所述:https://stackoverflow.com/a/28753276/5088142
尝试使用:
您可以在core-site.xml的fs.defaultfs中输入主机名
oxalkeyp3#
所以,对于任何一个以spark为起点,以我为起点,偶然发现二进制文件的人来说。我是这样解决的:
现在你可以用你的Dataframe在spark里做任何你想做的事情。