spark rdd按键查找

kxkpmulp  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(434)

我有一个从hbase转换而来的rdd:
val hbaserdd:rdd[(string,array[string]),其中tuple.\u 1是行键。数组是hbase中的值。

4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"]
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"]
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"]

我还将schemardd(id,date1,col1,col2,col3)转换为
val refdatardd:rdd[(string,array[string])],我将对其进行迭代并检查它是否存在于hbaserdd中:

4929103, ["2015-05-21 10:03:44","EV01","col2","col3"]
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"]

问题是,
如何检查hbaserdd中是否存在键(tuple.\u 1)/(“4929103”)并获取相应的值(tuple.\u 2)?-我不能在rdd.filter中使用pairdd的lookup函数,它抛出“scala.matcherror:null”,但它在外部工作

val filteredRDD = rdd.filter(sqlRow => {
  val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE")
  // if found, check if date1 of hbaseRDD < sqlRow(1)
  // else if not found, retain row
  true
})

不过,我不确定这是否是问题所在,因为当我将查找行切换到以下位置时,我也遇到了npe:

val sqlRowHbase = hbaseRDD.filter(row => {

注意:我在这些行之前进行hbaserdd.count。而hbaserdd.lookup在rdd.filter之外工作正常
所以基本上,我试图通过hbaserdd中的键来“查找”并获取行/值。连接它们有点复杂,因为两个RDD中的某些值可能都是空的。这取决于很多场景,用什么数据保留哪一行。

yhqotfr8

yhqotfr81#

假设您需要查找的一组\u id包含在rdd中,我认为您可以使用leftouterjoin,而不是迭代和查找每个值。
我在上面看到了你关于date1可能改变的位置的评论。我不是在下面讨论它,我认为应该在查找之前通过每一行的某种特定Map来处理它。
如果我得到的伪代码是正确的,你有一个rdd (id, date) 并希望通过在hbase中查找数据来更新它,如果在hbase中找到此id的行并且其日期早于refdata中的日期,则更新日期。对吗?
如果是的话,假设你有这样的参考数据:

val refData = sc.parallelize(Array(
 ("4929103","2015-05-21 10:03:44"),
 ("4929104","2015-05-21 10:03:44")
))

以及来自hbase的一些行数据:

val hbaseRDD = sc.parallelize(Array(
    ("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")),
    ("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")),
    ("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44"))
))

然后,您可以使用一个简单的leftouterjoin将refdata中的每个id查找到hbase中,对于找到的每一行:如果需要,更新日期:

refData
  // looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds
  .leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)})

  // update the date in refData if date from hBase is earlier
  .map { case (rowKey, (refDate, maybeRowDate)) => ( rowKey, chooseDate (refDate, maybeRowDate)) }
  .collect

def chooseDate(refDate: String, rowDate: Option[String]) =  rowDate match {

  // if row not found in Hbase: keep ref date
  case None => refDate

  case Some(rDate) => 
    if (true) /* replace this by first parsing the date, then check if rowDate < refDate */ 
        rowDate
    else
        refDate
}

相关问题