无法使用pyspark应用模式-字段不一致

hi3rlvi2  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(270)

Pypark newb在这里,我正试图组织一个笨拙的历史数据rdd。我阅读了(来自wasb)中的数据,需要了解它的结构。我对模式有一个大致的想法,但是因为这是一个大的摘录,我可以看到并不是所有的记录都是一致的。
我正在努力解决的是按位置引用rdd元素,这样我就可以尝试从数据中提取一些意义。由于不一致性,我现在无法提交到模式-这意味着dataframes不是一个选项,我失去了df灵活的查询样式。
有关环境和数据的快速摘要:
azure hdinsight群集,wasb中的数据
hdfs公司。2.7
Yarnv。2.7
spark v 1.6(ha配置,8个工作节点(每个16核x 112 gb ram)
jupyter-皮斯巴克
数据:奇怪的分隔“csv”与约765mm的记录
读取数据 ops = sc.textFile("wasb://blob@storageaccount.windows.net/raw_ops.csv") 按时髦的分隔符拆分 ops = ops.map(lambda s: s.split(u"\ufffd")).cache() 显示5条rdd记录
ops.take(5) [ [u'ALER RECOMMENDED BRAKE FLUID EXCHANGE - $139.88 ~', u'~PERFORMED DEALER RECOMMENDED BRAKE FLUID EXCHANGE USING A SPECIAL BRAKE FLUID EXCHANGE MACHINE,A PRESSURIZED SUPPLY OF MOC BRAKE FLUID', u'HIST_LD', u'2016-03-08 16:02:53', u'ARCHIVE'] ,[u'04638', u'0734140', u'2011-10-19', u'345267460', u'2', u'TIR', u'', u'0', u'685745051', u'TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATION', u'HIST_LD', u'2016-03-08 16:01:39', u'ARCHIVE'] ,[u'04638', u'0734140', u'2011-10-19', u'345267460', u'1', u'PRIME ITEM', u'', u'0', u'0', u'TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATION ~', u'~TIRE INFLATION RULE CK ALL TIRES FOR ACCURATE PSI PER MANUFACTURER SPECIFICATIONS AND DOCUMENT PSI ~', u'~ ~', u'~20450 SET AT 36 PSI.', u'HIST_LD', u'2016-03-08 16:01:39', u'ARCHIVE'] ,[u'12093', u'0399468', u'2011-10-19', u'345268559', u'2', u'201', u'', u'1.5', u'0', u'REPLACED GAS CAP AND SANDED FILLER NECK', u'HIST_LD', u'2016-03-08 16:07:15', u'ARCHIVE'] ,[u'12093', u'0399468', u'2011-10-19', u'345268559', u'1', u'PRIME ITEM', u'', u'0', u'0', u'REPLACED GAS CAP AND SANDED FILLER NECK ~', u'~REPLACE GAS CAP AND SAND FILLER NECK', u'HIST_LD', u'2016-03-08 16:07:15', u'ARCHIVE'] ] 我看到第3列可能是一个日期,如何从rdd中的每一行提取这个值?
(在此处查找2013年数据的伪代码示例): ops.filter(lambda x[2]: year(x[2])==2013) 我在网上找到的关于如何做到这一点的文档非常有限,特别是当它涉及到在没有决定性模式的情况下处理结构不一致的数据时。底线是“伪代码”应该是什么?
我的最终目标是解析出2013-2015年的数据,将这些数据划分成各自的Dataframe,并将它们写入hive。谢谢你的帮助!

vi4fp9gy

vi4fp9gy1#

所以这是解决问题的一种方法:

from datetime import datetime  

def only_pass(maybe_date):  
    try:  
        datetime.strptime(maybe_date,"%Y-%m-%d").date()  
        return 1  
    except Exception as err:    
        return 0   

only_rows_with_dates = rdd.filter(lambda row: only_pass(row[2]) == 1)

这里什么也找不到,希望你能明白。

dauxcl2d

dauxcl2d2#

我认为目前最好的选择是
根据分隔符拆分rdd中的行 rddNew = rddOriginal.map(lambda s: s.split(u"\ufffd")).cache() 搜索您怀疑是日期的元素,并使用“re”库中的regex技巧为时间“partition”创建一个新的rdd import re rdd2013 = opcode.filter(lambda x : re.search('^2013', x[2]))

相关问题