map-only-spark中的作业(与hadoop流媒体相比)

vsaztqbk  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(352)

我有一个函数 process_line 从输入格式Map到输出格式
有些行已损坏,需要忽略。
我成功地将此代码作为python流式处理作业运行:

for input_line in sys.stdin:
    try:
        output_line=process_line(input_line.strip())
        print (output_line)
    except:
        sys.stderr.write('Error with line: {l}\n'.format(l=input_line))
        continue

如何在pyspark中运行等效代码?这就是我所尝试的:

input = sc.textFile(input_dir, 1)
output=lines.map(process_line)
output.saveAsTextFile(output_dir)

如何跟踪损坏的行并对其计数进行统计?

oyjwcjzk

oyjwcjzk1#

您试图只将文本文件读取到一个分区,这可能会导致作业运行缓慢,因为您基本上放弃了并行性。
尝试这样做:

input = sc.textFile(input_dir)
output = lines.map(process_line)
output.saveAsTextFile(output_dir)

对于损坏的行,可以使用try-except机制 process_line 函数,并可能将有问题的行写入某个日志文件,或者尝试执行其他逻辑。

相关问题