spark结构流媒体中spark滤波器、当前时间戳和hbase自定义接收器的scala不一致性和突变行为

lh80um4z  于 2021-06-09  发布在  Hbase
关注(0)|答案(0)|浏览(201)

我有一个hbase表,在静态Dataframe中类似于下面的hbasestaticrecorddf

---------------------------------------------------------------
|rowkey|Name|Number|message|lastTS|
|-------------------------------------------------------------|
|266915488007398|somename|8759620897|Hi|1539931239            |
|266915488007399|somename|8759620898|Welcome|1540314926       |
|266915488007400|somename|8759620899|Hello|1540315092         |
|266915488007401|somename|8759620900|Namaskar|1537148280      |
 --------------------------------------------------------------

现在我有了一个文件流源,我将从中获得流rowkey。现在必须检查流式处理rowkey的时间戳(lastts)是否早于一天。为此,我编写了以下代码,其中joinedf是一个流Dataframe,它是通过如下方式连接另一个流Dataframe和hbase静态Dataframe而形成的。

val HBaseStreamDF = HBaseStaticRecorddf.join(anotherStreamDF,"rowkey")

val newdf = HBaseStreamDF.filter(HBaseStreamDF.col("lastTS").cast("Long") < ((System.currentTimeMillis - 86400*1000)/1000))//records older than one day are eligible to get updated

过滤完成后,我想将此记录保存到hbase,如下所示。

newDF.writeStream
  .foreach(new ForeachWriter[Row] {
    println("inside foreach")

    val tableName: String = "dummy"
    val hbaseConfResources: Seq[String] = Seq("hbase-site.xml")
    private var hTable: Table = _
    private var connection: Connection = _

    override def open(partitionId: Long, version: Long): Boolean = {
      connection = createConnection()
      hTable = getHTable(connection)
      true
    }

    def createConnection(): Connection = {
      val hbaseConfig = HBaseConfiguration.create()
      hbaseConfResources.foreach(hbaseConfig.addResource)
      ConnectionFactory.createConnection(hbaseConfig)

    }

    def getHTable(connection: Connection): Table = {
      connection.getTable(TableName.valueOf(tableName))
    }

    override def process(record: Row): Unit = {
      var put = saveToHBase(record)
      hTable.put(put)
    }

    override def close(errorOrNull: Throwable): Unit = {
      hTable.close()
      connection.close()
    }

    def saveToHBase(record: Row): Put = {
    val p = new Put(Bytes.toBytes(record.getString(0)))
    println("Now updating HBase for " + record.getString(0))

      p.add(Bytes.toBytes("messageInfo"),
        Bytes.toBytes("ts"),
        Bytes.toBytes((System.currentTimeMillis/1000).toString)) //saving as second

      p
    }

  }
  ).outputMode(OutputMode.Update())
  .start().awaitTermination()

现在,当任何记录即将到来时,hbase只会第一次更新。如果同样的记录后来出现,它只是被忽视,没有工作。但是,如果某个独特的记录没有被spark应用程序处理,那么它就工作了。所以任何重复的记录都不会被第二次处理。
现在有件有趣的事。
如果我从(system.currenttimemillis-86400*1000)/1000中删除86400秒减法,那么即使传入记录之间存在冗余,也会处理所有内容。但它不是有意的,也没有用,因为它不过滤1天以前的记录。
如果我在filter条件中以毫秒为单位进行比较而不除以1000(这也需要以毫秒为单位的hbase数据),并将记录保存为put对象中的second,那么所有内容都会再次被处理。但是如果我在put对象中将格式改为seconds,那么它就不起作用了。
我试着分别测试过滤器和hbase-put,它们都工作得很好。但如果system.currenttimemillis在过滤器中有一些算术运算,比如/1000或-864000,它们一起就会出问题。如果我移除hbase接收器部分并使用

newDF.writeStream.format("console").start().awaitTermination()

然后过滤逻辑再次工作。如果我移除过滤器,hbase接收器就可以正常工作。但是,hbase的定制接收器只能第一次用于唯一记录。我尝试了其他几个过滤器逻辑,如下面,但问题仍然是一样的。

val newDF = newDF1.filter(col("lastTS").lt(LocalDateTime.now().minusDays(1).toEpochSecond(ZoneOffset.of("+05:30"))))

val newDF = newDF1.filter(col("lastTS").cast("Long") < LocalDateTime.now().minusDays(1).toEpochSecond(ZoneOffset.of("+05:30")))

如何使过滤器工作并将过滤后的记录保存到具有更新的时间戳的hbase中?我参考了其他几篇文章。但结果是一样的。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题