为什么在spark流中读取广播变量在运行数天后出现异常?

zrfyljdw  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(291)

在我的项目中,我使用spark streaming(spark v1.6.0)和hbase,hbase(hbase v1.1.2)配置通过广播变量在执行器之间传输。spark流媒体应用程序首先工作,而大约2天后,会出现异常。

val hBaseContext: HBaseContext = new HBaseContext(sc, HBaseCock.hBaseConfiguration())
  private def _materialDStream(dStream: DStream[(String, Int)], columnName: String, batchSize: Int) = {
    hBaseContext.streamBulkIncrement[(String, Int)](
      dStream,
      hTableName,
      (t) => {
        val rowKey = t._1
        val incVal = t._2
        val increment = new Increment(Bytes.toBytes(rowKey))
        increment.addColumn(Bytes.toBytes(hFamily), Bytes.toBytes(columnName), incVal)
        increment
      }, batchSize)
  }

hbasecontext的整个源文件可以在hbasecontext.scala中找到,下面可以找到一些片段。
运行数天后,将出现异常,堆栈跟踪为:

Unable to getConfig from broadcast
16/02/01 10:08:10 ERROR Executor: Exception in task 3.0 in stage 187175.0 (TID 561527)

逻辑如下:
使用config(hbasecontext)创建hbasecontext并广播配置(如果指定了文件路径,则将配置保存到文件)
在连接hbase之前,它首先检查字段config是否为空,如果为空,则从指定的文件还原,如果未指定文件路径,则从广播变量还原。
问题发生在从广播变量恢复配置时,在“configbroadcast.value.value”中从广播读取值时发生异常。
如果master失败,那么spark streaming不会恢复广播变量,而getorcreate()用于获取sparkstreaming示例。我更好奇的是在hbasecontext.scala源代码中,那个文件是首选广播变量来还原值的。那么在spark流媒体中使用广播的最佳实践是什么呢?我是否需要将它们存储在文件中,比如hdfs中的文件?

class HBaseContext(@transient sc: SparkContext, @transient config: Configuration, val tmpHdfsConfgFile: String = null) extends Serializable{
    @transient var tmpHdfsConfiguration: Configuration = config

    val broadcastedConf = sc.broadcast(new SerializableWritable(config))

    if(tmpHdfsConfgFile != null && config != null){
      // save config to file
    }

    private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]): Configuration = {

      if (tmpHdfsConfiguration != null) {
        tmpHdfsConfiguration
      } else if (tmpHdfsConfgFile != null) {
        // read config from file

        tmpHdfsConfiguration
      }
      if (tmpHdfsConfiguration == null) {
        try {
          // Exception happens here!!!
          tmpHdfsConfiguration = configBroadcast.value.value
          tmpHdfsConfiguration
        } catch {
          case ex: Exception => {
            println("Unable to getConfig from broadcast")
          }
        }
      }
    tmpHdfsConfiguration
  }
}
sirbozc5

sirbozc51#

由于某种原因重新启动spark作业后,广播变量被重置。或驱动程序在作业失败后与尝试重新关联。
在流作业的情况下,要使用广播变量,应该在创建streamingcontext之前从sprarkcontext初始化广播。这将确保流媒体开始时广播变量可用。

JavaSparkContext javaSparkContext = createSparkContext();

Broadcast<BroadcastContext> broadcastContext = getBroadcastContext(javaSparkContext);

JavaStreamingContext javaStreamingContext = JavaStreamingContext.getOrCreate(sparkCheckPointDir,
                () -> processor.create(sparkCheckPointDir, javaSparkContext));

相关问题