mapreduce作业(用python编写)在emr上运行缓慢

x33g5p2x  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(363)

我正在尝试使用python的mrjob包编写mapreduce作业。该作业处理存储在s3中的约36000个文件。每个文件约2mb。当我在本地运行作业(将s3 bucket下载到我的计算机)时,运行大约需要1个小时。但是,当我尝试在emr上运行它时,需要花费更长的时间(我在8小时时停止了它,在mapper中完成了10%)。我在下面附上了我的mapper\u init和mapper的代码。有人知道什么会引起这样的问题吗?有人知道怎么修吗?我还应该注意到,当我将输入限制为100个文件的样本时,它可以正常工作。

def mapper_init(self):
    """
    Set class variables that will be useful to our mapper:
        filename: the path and filename to the current recipe file
        previous_line: The line previously parsed. We need this because the
          ingredient name is in the line after the tag
    """

    #self.filename = os.environ["map_input_file"]  # Not currently used
    self.previous_line = "None yet"
    # Determining if an item is in a list is O(n) while determining if an
    #  item is in a set is O(1)
    self.stopwords = set(stopwords.words('english'))
    self.stopwords = set(self.stopwords_list)

def mapper(self, _, line):
    """
    Takes a line from an html file and yields ingredient words from it

    Given a line of input from an html file, we check to see if it
    contains the identifier that it is an ingredient. Due to the
    formatting of our html files from allrecipes.com, the ingredient name
    is actually found on the following line. Therefore, we save the
    current line so that it can be referenced in the next pass of the
    function to determine if we are on an ingredient line.

    :param line: a line of text from the html file as a str
    :yield: a tuple containing each word in the ingredient as well as a
        counter for each word. The counter is not currently being used,
        but is left in for future development. e.g. "chicken breast" would
        yield "chicken" and "breast"
    """

    # TODO is there a better way to get the tag?
    if re.search(r'span class="ingredient-name" id="lblIngName"',
                 self.previous_line):
        self.previous_line = line
        line = self.process_text(line)
        line_list = set(line.split())
        for word in line_list:
            if word not in self.stopwords:
                yield (word, 1)
    else:
        self.previous_line = line
    yield ('', 0)
lc8prwob

lc8prwob1#

问题是你有更多的小文件。使用s3distcp添加引导步骤以将文件复制到emr。在使用s3distcp时,尝试将小文件聚合为~128mb文件。
hadoop不适合处理大量的小文件。
由于您是手动下载文件到您的计算机和运行,因此它运行得更快。
一旦您使用s3distcp将文件复制到emr,请使用hdfs中的文件。

相关问题