spark只写一个hbase区域服务器

ckx4rj1h  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(319)
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.PairRDDFunctions

def bulkWriteToHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sinkTableName: String, outRDD: RDD[(ImmutableBytesWritable, Put)]): Unit = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sinkTableName)

val hJob = Job.getInstance(hConf)
hJob.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, sinkTableName)
hJob.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) 

outRDD.saveAsNewAPIHadoopDataset(hJob.getConfiguration())
}

通过使用这个hbase批量插入,我发现每次spark只会从hbase写入一个区域服务器,这就成为了瓶颈。
然而,当我使用几乎相同的方法但从hbase读取时,它使用多个执行器来执行并行读取。

def bulkReadFromHBase(sparkSession: SparkSession, sparkContext: SparkContext, jobContext: Map[String, String], sourceTableName: String) = {
val hConf = HBaseConfiguration.create()
hConf.set("hbase.zookeeper.quorum", jobContext("hbase.zookeeper.quorum"))
hConf.set("zookeeper.znode.parent", jobContext("zookeeper.znode.parent"))
hConf.set(TableInputFormat.INPUT_TABLE, sourceTableName)

val inputRDD = sparkContext.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
inputRDD
}

有人能解释一下为什么会这样吗?或者我对spark hbase批量i/o使用了错误的方法?

cidc1ykv

cidc1ykv1#

问题:我对spark hbase批量i/o使用了错误的方法?

不,你的方法是对的,不过,你需要在手工创建带有预裂区域的表之前预裂区域。

例如 create 'test_table', 'f1', SPLITS=> ['1', '2', '3', '4', '5', '6', '7', '8', '9'] 上表占9个地区。。
以1-9开始设计好的行键
你可以使用Guava杂音哈希如下。

import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

/**
     * getMurmurHash.
     * 
     * @param content
     * @return HashCode
     */
    public static HashCode getMurmurHash(String content) {
        final HashFunction hf = Hashing.murmur3_128();
        final HashCode hc = hf.newHasher().putString(content, Charsets.UTF_8).hash();
        return hc;
    }

final long hash = getMurmur128Hash(Bytes.toString(yourrowkey as string)).asLong();
            final int prefix = Math.abs((int) hash % 9);

现在将此前缀附加到行键
例如
1rowkey1//将进入第一个区域
2rowkey2//将进入第二个区域
3rowkey3//将进入第三个区域。。。9rowkey9//将进入第九区域
如果正在执行预拆分,并且希望手动管理区域拆分,还可以通过将hbase.hregion.max.filesize设置为一个较大的数字并将拆分策略设置为constantsizeregionsplitpolicy来禁用区域拆分。但是,应该使用100gb这样的保护值,这样区域的增长就不会超出区域服务器的能力。您可以考虑禁用自动拆分并依赖于预拆分的初始区域集,例如,如果您对密钥前缀使用统一哈希,并且可以确保每个区域的读/写负载及其大小在表中的各个区域中是统一的
1) 请确保在将数据加载到hbase表2)设计好的rowkey之前可以预先拆分该表,如下所述使用杂音哈希或其他哈希技术。确保各地区的统一分布。
也看看http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/
问:有人能解释一下为什么会发生这种情况吗?
原因是非常明显和简单的热点数据在一个特定的原因,因为该表的行键差。。。
考虑java中的hashmap,它的元素的hashcode为1234。然后它会把所有的元素都填满一个桶,是吗?如果hashmap元素分布在不同的 hashcode 然后它会把元素放在不同的桶里。hbase也是如此。这里你的hashcode就像你的rowkey。。。

更进一步说,

如果我已经有了一个表,我想把区域分割开来。。。
这个 RegionSplitter 类为选择手动拆分区域而不是让hbase自动处理区域的开发人员提供了几个实用程序,以帮助他们在管理生命周期中进行操作。

最有用的实用程序包括:

创建具有指定数量的预拆分区域的表
对现有表上的所有区域执行滚动拆分
例子:

$ hbase org.apache.hadoop.hbase.util.RegionSplitter test_table HexStringSplit -c 10 -f f1

其中-c 10将请求的区域数指定为10,-f指定表中所需的列族,用“:”分隔。该工具将创建一个名为“test\u table”的表,其中包含10个区域:

13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,,1358563771069.acc1ad1b7962564fc3a43e5907e8db33.', STARTKEY => '', ENDKEY => '19999999', ENCODED => acc1ad1b7962564fc3a43e5907e8db33,}
13/01/18 18:49:32 DEBUG hbase.HRegionInfo: Current INFO from scan results = {NAME => 'test_table,19999999,1358563771096.37ec12df6bd0078f5573565af415c91b.', STARTKEY => '19999999', ENDKEY => '33333332', ENCODED => 37ec12df6bd0078f5573565af415c91b,}
...

正如在评论中所讨论的,您发现我在写入hbase之前的最后一个rdd只有一个分区!这表明只有一个执行者持有整个数据。。。我还在努力找出原因。
还有,检查一下 spark.default.parallelism 默认为所有机器上的所有核心数。parallelizeapi没有父rdd来确定分区的数量,因此它使用 spark.default.parallelism .
因此可以通过重新分区来增加分区。

注意:我注意到,在mapreduce中,regions/input split的分区数=启动的Map程序数。。类似地,在您的案例中,数据加载到一个特定区域的情况也可能相同,这就是为什么一个执行器启动的原因。请核实一下

ohfgkhjo

ohfgkhjo2#

虽然您没有提供示例数据或足够的解释,但这主要不是由于您的代码或配置造成的。由于非最佳行键设计,这种情况正在发生。您正在写入的数据的键(hbase rowkey)结构不正确(可能是单调递增或其他方式)。因此,正在写入其中一个区域。您可以通过各种方法(rowkey设计的各种推荐做法,如盐析、反转和其他技术)来防止这种情况发生。作为参考,你可以看到http://hbase.apache.org/book.html#rowkey.design
在这种情况下,如果您想知道是并行地对所有区域进行写操作,还是逐个进行写操作(问题不清楚),请看以下内容:http://hbase.apache.org/book.html#_bulk_load.

相关问题