java—定义文件输入的手动拆分算法

oyt4ldly  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(391)

我是spark和hadoop生态系统的新手,已经爱上了它。现在,我正在尝试将现有的java应用程序移植到spark。
此java应用程序的结构如下:
使用 BufferedReader 使用对输入数据进行大量计算的自定义解析器类。每个输入文件的大小为1到最大2.5 gb。
将数据存储在内存中(以 HashMap<String, TreeMap<DateTime, List<DataObjectInterface>>> )
将内存中的数据存储写成json。这些json文件的大小较小。
我编写了一个scala应用程序,它确实由一个工人来处理我的文件,但这显然不是spark能给我带来的最大性能好处。
现在我的问题是把这个移植到spark上:输入文件是基于行的。我通常每行有一条信息。但是,有些消息依赖前面的行在解析器中形成实际的有效消息。例如,我可能会在输入文件中按以下顺序获取数据:
{时间戳}#0x033#{数据_字节}\n
{时间戳}#0x034#{数据_字节}\n
{时间戳}#0x035#{数据_字节}\n
{时间戳}#0x0fe#{数据_字节}\n
{时间戳}#0x036#{数据_字节}\n
为了形成从“合成消息”0x036中提取的实际消息,解析器还需要来自消息0x033、0x034和0x035的行。其他消息也可以在这组需要的消息之间传递。不过,大多数消息都可以通过读取一行来解析。
现在我的问题终于来了:如何让spark为我的目的正确分割我的文件?文件不能“随机”分割;它们的拆分方式必须确保我的所有消息都可以被解析,并且解析器不会等待他永远不会得到的输入。这意味着每个合成消息(依赖于前面行的消息)需要在一个分割中。
我想有几种方法可以实现正确的输出,但我也会在这篇文章中提出一些想法:
为文件输入定义手动分割算法?这将检查拆分的最后几行是否不包含“大”消息的开头[0x033、0x034、0x035]。
按spark的意愿分割文件,但也要从上一次分割到下一次分割添加固定数量的行(比如说50行,这肯定能完成任务)。解析器类将正确处理多个数据,并且不会带来任何问题。
第二种方法可能更简单,但是我不知道如何在spark中实现这一点。有人能给我指出正确的方向吗?
提前谢谢!

xe55xuns

xe55xuns1#

我在我的博客上看到了你的评论http://blog.ae.be/ingesting-data-spark-using-custom-hadoop-fileinputformat/ 决定在这里发表我的意见。
首先,我不太清楚你想做什么。请帮我一把:您的文件包含包含0x033、0x034、0x035和0x036的行,所以spark将分别处理它们?而实际上这些行需要一起处理?
如果是这样的话,你不应该把它解释为“腐败的分裂”。正如您在blogpost中看到的,spark将文件拆分为可以单独处理的记录。默认情况下,它通过在换行符上拆分记录来实现这一点。然而,在您的情况下,您的“记录”实际上分布在多行上。所以,可以使用自定义的fileinputformat。不过,我不确定这是否是最简单的解决办法。
您可以尝试使用一个定制的fileinputformat来解决这个问题,它执行以下操作:不是像默认的fileinputformat那样逐行给出,而是解析文件并跟踪遇到的记录(0x033、0x034等)。同时,您可以过滤掉0x0fe之类的记录(不确定是否要在其他地方使用它们)。这样做的结果是spark将所有这些物理记录作为一个逻辑记录来获取。
另一方面,逐行读取文件并使用功能键(例如,[对象33,0x033],[对象33,0x034],…)Map记录可能更容易。这样,可以使用所选的键组合这些行。
当然还有其他选择。你选择哪一个取决于你的用例。

相关问题