如何在Hadoop中减少步骤差异?

sxpgvts3  于 2022-11-21  发布在  Hadoop
关注(0)|答案(1)|浏览(77)

如何 在 Hadoop 中 减少 步骤 差异 ?
我 有 一 个 理解 Hadoop 的 问题 。 我 有 两 个 文件 , 首先 我 做 了 这些 文件 之间 的 连接 。 一 个 文件 是 关于 国家 , 另 一 个 是 关于 每个 国家 的 客户 端 。
例如 , clients . csv :

Bertram Pearcy  ,bueno,SO
Steven Ulman  ,regular,ZA

中 的 每 一 个
Countries.csv

Name,Code   
Afghanistan,AF
Ã…land Islands,AX
Albania,AL  
…

格式
我 做 了 一 个 Map 减少 , 给 我 多少 " 好 " ( bueno ) 客户 有 一 个 国家 ( ZA , SO ) 和 国家 。 csv 我 知道 与 国家 , 我们 正在 谈论 。
我 编程 :

def steps(self): 
        # ordenamos las operaciones para su ejecución.
        return [ 
            MRStep(mapper=self.mapper 
                   ,reducer=self.reducer),            
            MRStep(mapper=self.mapper1
                   ,combiner=self.combiner_cuenta_palabras
                   ,reducer=self.reducer2
                    ),
        ]

格式
Map/减少 的 结果 是 :

["South Georgia and the South Sandwich Islands"]    1
["South Sudan"] 1
["Spain"]   3

格式
现在 , 我 想 知道 哪 一 个 最 好 。
我 又 加 了 一 减 。

def reducer3(self, _, values):            
        yield  _, max (values)
        
    def steps(self): 
        # ordenamos las operaciones para su ejecución.
        return [ 
            MRStep(mapper=self.mapper 
                   ,reducer=self.reducer),  
            MRStep(mapper=self.mapper1
                   ,combiner=self.combiner_cuenta_palabras
                   ,reducer=self.reducer2),
            MRStep(#mapper=self.mapper3,
                   reducer=self.reducer3
                   #,reducer=self.reducer3
            ),            
        ]

格式
但 我 得到 的 答案 和 没有 减速 器 时 一样
我 试 着 使用 一 个 Map/减少 程序 添加 另 一 个 减少 。 它 不 工作 。
我 第 一 次 减少 我 得到 :

A, 10
C, 2
D, 5

格式
现在 , 我 想 使用 我 得到 的 结果 :一 、 十
附加 备注 :
输入 [ 文件 1 ] + [ 文件 2 ] = 〉 enter image description here
Map/减少 = 〉 输出
enter image description here 格式
现在 , 我 需要 与 额外 的 Map/减少 ( 我 想 使用 我 所 做 的 ) 得到 另 一 个 答案 。
例如 , 一 个 且 只有 一 个 答案 。 例 :x1月 1 日
第 二 ) 所有 具有 最 好 或 更 大 的 数字 , 3 Spain3 Guan
我 试 着 用 :

def reducer3(self, _, values):            
        yield  _, max (values)

格式
我 还要 补充 一 点 ,

def steps(self): 
        # ordenamos las operaciones para su ejecución.
        return [ 
            MRStep(mapper=self.mapper 
                   ,reducer=self.reducer),  
            MRStep(mapper=self.mapper1
                   ,combiner=self.combiner_cuenta_palabras
                   ,reducer=self.reducer2),
            MRStep(reducer=self.reducer3
            ),            
        ]

格式
但是 我 还是 得到 了 相同 的 结果 , 我 知道 REDUCER3 在 使用 , 因为 如果 我 写 max(values)+1000 , 会 得到 相同 的 结果 , 但是 数字 是 10011003

u0njafvf

u0njafvf1#

你的reducer得到了3个不同的键,因此你找到了每个键的最大值,而values只有一个元素(试着打印它的长度...),因此你得到了3个结果。
例如,您需要第三个Map器返回(None, f'{key}|{value}),然后所有记录将被发送到一个reducer,然后您可以在其中迭代、解析和聚合结果

def reducer3(self, _, values):
    _max = float('-inf')
    k_out = None
    for x in values:
        k, v = x.split('|')
        if int(v) > _max:
            _max = v
            k_out = k
    yield  k_out, _max

如果你想捕获相等的max值,我认为你需要多次迭代列表,然后在找到max元素的循环中产生

相关问题