apachespark-hive内部连接、限制和自定义udf

gv8xihay  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(319)

我正在尝试在配置单元中运行查询:
这里是最简单的设置(我知道我可以做一个=但是我使用了一个自定义的udf,它不仅仅是一个等式比较)
数据集a和b各有大约30000行 SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5 其中custom_udf_equals_comparison只是在a.id=b.id之间进行相等检查
当我运行这个查询时,我可以在我的日志输出中看到很多m/r任务正在运行,假设它在两个数据集之间进行比较,直到比较所有可能的排列,并且远远超过5的限制(我预计只有少数m/r任务,因为我知道大多数数据可以在每个表的前几行中联接),为什么会发生这种情况?和/或如何修复?
编辑:
您好,zero323,这是一个类似的问题,但并不确切,它解释了为什么在使用udf进行比较时会执行2个rdd之间的完全比较,但它没有解释为什么在找到限制5时,限制不会停止比较。例如,如果在前10次连接尝试中找到5行,那么为什么在剩余的3000030000次连接尝试中会找到5行。是因为在所有连接发生之后才应用限制吗?e、 它将3000030000行合并,然后将它们减少到5行?
编辑2:

def levenshtein(str1: String, str2: String): Int = {
val lenStr1 = str1.length
val lenStr2 = str2.length

val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1)

for (i <- 0 to lenStr1) d(i)(0) = i
for (j <- 0 to lenStr2) d(0)(j) = j

for (i <- 1 to lenStr1; j <- 1 to lenStr2) {
  val cost = if (str1(i - 1) == str2(j-1)) 0 else 1

  d(i)(j) = min(
    d(i-1)(j  ) + 1,     // deletion
    d(i  )(j-1) + 1,     // insertion
    d(i-1)(j-1) + cost   // substitution
  )
}

d(lenStr1)(lenStr2)

}

def min(nums: Int*): Int = nums.min

def join_views( joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = {
if (joinType == "Equals") {
  if (col1 == null || col2 == null) {
    return false
  }

  return col1 == col2
}
else if (joinType == "Fuzzy_String") {
  if (col1 == null || col2 == null) {
    return false
  }

  val val1 = col1.asInstanceOf[String]
  val val2 = col2.asInstanceOf[String]

  val ratio = Utils.distancePercentage(val1, val2)

  if (ratio == 1.0) {
    return val1 == val2
  }

  return (ratio >= parameters.asInstanceOf[Double])
}

return false;

}
... 在连接视图上(“fuzzy\u string”,“0.1”,a.col1,b.col1)限制5=20秒
... 在连接视图上(“fuzzy\u string”,“0.9”,a.col1,b.col1)限制5=100秒

ahy6op9u

ahy6op9u1#

所以这里有三个不同的问题:
spark通过使用散列和排序来优化连接,因此这些优化只适用于equi连接。其他类型的联接(包括依赖于udf的联接)需要成对比较,因此需要笛卡尔积。您可以检查为什么在sql查询中使用udf会导致笛卡尔积?详情。
数据移动后的限制操作,尤其是洗牌,无法完全优化。你可以在sun rui提供的关于限制大rdd的漂亮答案中找到一个很好的解释。
由于缺少无序处理,您的情况相对简单,但您仍然需要将分区组合在一起。
限制优化基于分区,而不是记录。spark开始检查第一个分区,如果满足条件的元素的数量低于要求,它会迭代增加每个迭代要使用的分区的数量(据我所知,因子是4)。如果你正在寻找一个罕见的事件,这可以增加相当快。

相关问题