将hashmap的键匹配到scala中spark rdd的条目,并在找到匹配项时添加值,然后将rdd写回hbase

qyuhtwio  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(238)

我正在尝试使用scala读取hbase表,然后根据hbase表中行的内容添加一个新列作为标记。我把这张表读作spark rdd。我还有一个hashmap,其中的键值对如下所示:要将键与spark rdd(从hbase表生成)的条目匹配,如果找到匹配项,则将hashmap中的值添加到一个新列中。在新列名中写入hbase表的函数如下:

def convert (a:Int,s:String) : Tuple2[ImmutableBytesWritable,Put]={
                        val p = new Put(a.toString.getBytes())
                        p.add(Bytes.toBytes("columnfamily"),Bytes.toBytes("col_2"), s.toString.getBytes())//a.toString.getBytes())
                        println("the value of a is: " + a)
                        new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(Bytes.toBytes(a)), p);
                   }
 new PairRDDFunctions(newrddtohbaseLambda.map(x=>convert(x, ranjan))).saveAsHadoopDataset(jobConfig)

然后从hashmap读取字符串并比较和添加回代码如下:

csvhashmap.keys.foreach{i=> if (arrayRDD.zipWithIndex.foreach{case(a,j) => a.split(" ").exists(i contains _); p = j.toInt}==true){new PairRDDFunctions(convert(p,csvhashmap(i))).saveAsHadoopDataset(jobConfig)}}

这里csvhashmap是上面描述的hashmap,“words”是我们试图匹配字符串的rdd。当运行上述命令时,出现以下错误:

error: type mismatch;
 found   : (org.apache.hadoop.hbase.io.ImmutableBytesWritable, org.apache.hadoop.hbase.client.Put)
 required: org.apache.spark.rdd.RDD[(?, ?)]

如何摆脱它?我尝试了很多方法来更改数据类型,但每次都会出现一些错误。另外,我已经检查了上面代码段中的各个函数,它们都很好。当我把它们整合在一起时,我得到了上面的错误。任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题