配置单元:如何按键对键/值字符串中的值求和

7gyucuyw  于 2021-06-28  发布在  Hive
关注(0)|答案(3)|浏览(242)

我有两个 string 列包含 key/value 配对变量。像这样:

column1         column2
a:1,b:2,c:3         a:5,c:3
   a:12,b:4     a:9,b:3,d:5

如何将这些值(在现实生活中,我不知道我有多少个键,有些键只能在一列中找到)按特定键求和以得到:

column12
  a:6,b:2,c:6,d:0
 a:21,b:7,c:0,d:5

或者这个:

a  b  c  d
 6  2  6  0
21  7  0  5

谢谢你的帮助!

oyjwcjzk

oyjwcjzk1#

这里有一个解决方案。这是一个有点黑客,但将工作,无论你有多少钥匙。
udf0.py版本


# /usr/bin/python

import sys
from collections import Counter

for line in sys.stdin:
    words = line.strip().split('\t')
    c = Counter()

    for word in words:
        d = {}
        s = word.split(',')
        for ss in s:
            k,v = ss.split(':')
            d[k] = int(v)

        c.update(d)

    print ','.join([str(k)+':'+str(v) for k,v in dict(c).iteritems()])

udf1.py版本


# !/usr/bin/python

import sys

for line in sys.stdin:
    w0, w1 = line.strip().split('\t')

    out = {}
    d = {}
    l = []

    s0 = w0.strip().split(',')
    s1 = w1.strip().split(',')

    for ss in s0:
        k,v = ss.split(':')
        d[k] = int(v)

    for ss in s1:
        l.append(ss)

    for keys in l:
        if d.get(keys, None) is not None:
            out[keys] = d[keys]
        else:
            out[keys] = 0

     print ','.join([str(k)+':'+str(v) for k,v in out.iteritems()])

配置单元查询:

add file /home/username/udf0.py;
add file /home/username/udf1.py;

SELECT TRANSFORM(dict, unique_keys)
       USING 'python udf1.py'
       AS (final_map STRING)
FROM (
  SELECT DISTINCT dict
    , CONCAT_WS(',', unique_keys) as unique_keys
  FROM (
    SELECT dict
      , COLLECT_SET(keys) OVER () AS unique_keys
    FROM (
      SELECT dict
        , keys
      FROM (
        SELECT dict
          , map_keys(str_to_map(dict)) AS key_arr
        FROM (
          SELECT TRANSFORM (col1, col2)
                 USING 'python udf0.py'
                 AS (dict STRING)
          FROM db.tbl ) x ) z
      LATERAL VIEW EXPLODE(key_arr) exptbl AS keys ) a ) b ) c

输出:

a:6,b:2,c:6,d:0
a:21,b:7,c:0,d:5

说明:
第一个udf将获取您的字符串,将其转换为python字典并更新键(即将具有匹配键的值添加到一起)。因为您不知道实际的键,所以需要从每个字典中提取键( map_keys() 在配置单元查询中),分解表,然后将它们收集回唯一的集合。现在,你可以在任何字典里找到所有可能的键。然后,您可以使用第二个udf导入在第一个udf中创建的字典,检查每个键是否存在,如果不存在,则为其值设置一个零。

iyfjxgzm

iyfjxgzm2#

假设每一行都有一个唯一的标识符“id”,下面的查询
应该有用。

select id, collect_list(CONCAT(key,':',val)) as column12  
from  
(  
  select id, key, SUM(val) as val  
  from  
  (  
    select id, k1 as key, k2 as key, v1 as val, v2 as val  
    from  
    (  
      select id, str_to_map(col1,',',':') as c1, str_to_map(col2,',',':') as c2  
      from table  
    )x  
    LATERAL VIEW explode(c1) e1 as k1,v1  
    LATERAL VIEW explode(c2) e2 as k2,v2  
  )y  
)z
k97glaaz

k97glaaz3#

其中一种方法是使用hcatalog编写自定义mapreduce代码。
下面的mr获取表1的输入列,并将结果列写入表2中(不需要reducer,因为逻辑在Map器中处理)

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.*;
import org.apache.hcatalog.common.*;
import org.apache.hcatalog.mapreduce.*;
import org.apache.hcatalog.data.*;
import org.apache.hcatalog.data.schema.*;
import org.apache.commons.lang3.ArrayUtils;

public class HCatSum extends Configured implements Tool {

    public static class Map
            extends
            Mapper<WritableComparable, HCatRecord, WritableComparable, HCatRecord> {
        String column1;
        String column2;
        String val;

        @Override
        protected void map(
                WritableComparable key,
                HCatRecord value,
                org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord, WritableComparable, HCatRecord>.Context context)
                throws IOException, InterruptedException {

            column1 = (String) value.get(0); // a:1,b:2,c:3
            column2 = (String) value.get(1); // a:5,c:3

            String colArray[] = (String[]) ArrayUtils.addAll(
                    column1.split(","), column2.split(","));
            HashMap<String, Integer> hs = new HashMap<String, Integer>();
            for (String token : colArray) {
                String tokensplit[] = token.split(":");
                String k = tokensplit[0];
                int v = Integer.parseInt(tokensplit[1]);
                if (hs.containsKey(k)) {
                    int prev = hs.get(k);
                    hs.put(k, prev + v);
                } else {
                    hs.put(k, v);                       
                }
            }

            val = Arrays.toString(hs.entrySet().toArray()); // [a=6,b=2,c=6]
            HCatRecord record = new DefaultHCatRecord(1);
            record.set(0, val);
            context.write(null, record);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();

        // Get the input and output table names as arguments
        String inputTableName = args[0];
        String outputTableName = args[1];
        // Assume the default database
        String dbName = null;

        Job job = new Job(conf, "HCatsum");
        HCatInputFormat.setInput(job,
                InputJobInfo.create(dbName, inputTableName, null));
        job.setJarByClass(HCatSum.class);
        job.setMapperClass(Map.class);

        // An HCatalog record as input
        job.setInputFormatClass(HCatInputFormat.class);

        // Mapper emits a string as key and an integer as value
        job.setMapOutputKeyClass(WritableComparable.class);
        job.setMapOutputValueClass(DefaultHCatRecord.class);

        // Ignore the key for the reducer output; emitting an HCatalog record as
        // value

        job.setOutputKeyClass(WritableComparable.class);
        job.setOutputValueClass(DefaultHCatRecord.class);
        job.setOutputFormatClass(HCatOutputFormat.class);

        HCatOutputFormat.setOutput(job,
                OutputJobInfo.create(dbName, outputTableName, null));
        HCatSchema s = HCatOutputFormat.getTableSchema(job);
        System.err.println("INFO: output schema explicitly set for writing:"
                + s);
        HCatOutputFormat.setSchema(job, s);
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new HCatSum(), args);
        System.exit(exitCode);
    }
}

如何使用hcatalog运行mr(参考链接):配置单元手册
希望这有帮助。

相关问题