sql—配置单元联接的替代方法

mec1mxoz  于 2021-06-02  发布在  Hadoop
关注(0)|答案(5)|浏览(243)

我的 hive 里有两种景色

+------------+
| Table_1    |
+------------+
| hash       |
| campaignId |
+------------+

+-----------------+
| Table_2         |
+-----------------+
| campaignId      |
| accountId       |
| parentAccountID |
+-----------------+

现在,我必须获取由accountid和parentaccountid过滤的“表1”数据,为此我编写了以下查询:

SELECT /*+ MAPJOIN(T2) */ T1.hash, COUNT(T1.campaignId) num_campaigns
FROM Table_1 T1
JOIN Table_2 T2 ON T1.campaignId = T2.campaignId
WHERE (T2.accountId IN ('aid1', 'aid2') OR T2.parentAccountID IN ('aid1', 'aid2')
GROUP BY T1.hash

此查询正在运行,但速度很慢。除了这个(加入)还有更好的选择吗?
我正在通过spark阅读Kafka的表1。
幻灯片持续时间为5秒
窗口持续时间为2分钟
表2在rdbms中,spark通过jdbc读取,它有4500条记录。
每隔5秒,Kafka就会以csv格式输出大约2k条记录。
我需要在5秒内处理数据,但目前需要8到16秒。
根据建议:
我已经分别按campaignid和hash列重新划分了表1。
我已经分别按accountid和parentaccountid列重新划分了表2。
我已经实现了mapjoin。
但仍然没有改善。
注意:如果我删除了窗口持续时间,那么进程将在指定的时间内执行。可能是因为要处理的数据较少。但这不是要求。

wr98u20j

wr98u20j1#

好 啊。。
我终于做到了。
我创建了表2的散列。
然后通过使用广播变量,我将数据传递给每个节点。
这使我省去了加入的麻烦。
谢谢大家的时间和帮助。快乐编码:)

9o685dep

9o685dep2#

使用正确的索引,以下操作可以更快:

SELECT T1.*
FROM Table_1 T1 JOIN
     Table_2 T2
     ON T1.campaignId = T2.campaignId
WHERE T2.accountId IN ('aid1', 'aid2') 
UNION ALL
SELECT T1.*
FROM Table_1 T1 JOIN
     Table_2 T2
     ON T1.campaignId = T2.campaignId
WHERE T2.parentAccountID IN ('aid1', 'aid2') AND
      T2.accountId NOT IN ('aid1', 'aid2') ;

第一个可以考虑上的索引 Table_2(accountId, campaignId) 第二个在 Table_2(parentAccountID, accountId, campaignId) .

piok6c0g

piok6c0g3#

因为这是我们讨论的Hive,所以您需要了解的不仅仅是传统的dbms。
减少io。对数据使用压缩的列格式。兽人或Parquet地板。不是rc。先把你的table转换成兽人。除非数据被压缩成柱状,否则其他任何东西都不会有多大影响。
为配置单元选择适当的连接策略。这篇2011年的旧论文仍然相关。
把你的table装饰一下
使用现代执行引擎:tez或spark。

wkyowqbh

wkyowqbh4#

如果t2filtered足够小,可以放入内存中,请尝试重写查询并将筛选器移到子查询中,然后查看是否会在Map器上执行join。此外,您不需要t2中的列,可以使用左半联接代替内联接:

set hive.cbo.enable=true; 
set hive.auto.convert.join=true;

SELECT T1.* 
FROM Table_1 T1 
LEFT SEMI JOIN 
     (select campaignId  from Table_2 T2 
        where T2.accountId IN ('aid1', 'aid2') 
           OR T2.parentAccountID IN ('aid1', 'aid2')
     ) T2 ON T1.campaignId = T2.campaignId 
;
ffvjumwh

ffvjumwh5#

我建议您使用本机spark转换而不是hivesql:
1.将表2(rdbms)中的数据读入rdd并放入缓存ex:

rddTbl1.map(campaignIdKey, (accountId, parentAccountId)) //filter out before getting into RDD if needed
rddTbl2.cache()

2.现在读取表1流(Kafka)

//get campaigns of relevant account & parentaccountid
val rddTbl2_1 = rddTbl2.filter(x => x._2._1.equals("aid1") || x._2._1.equals("aid2") || x._2._2.equals("aid1") || x._2._2.equals("aid2"))

dstream.foreachRDD{ rddTbl1 =>
  rddTbl1.map(x => x._2.split(",")).
          map(x => (x(1), x(2)). //campaignId, hash
          join(rddTbl2_1).
          map(x => (x._2._1, 1)). //get (hash,1)
          reduceByKey(_+_).
          foreach(println) //save it if needed
}

相关问题