我有一个hbase表,在静态Dataframe中类似于下面的hbasestaticrecorddf
---------------------------------------------------------------
|rowkey|Name|Number|message|lastTS|
|-------------------------------------------------------------|
|266915488007398|somename|8759620897|Hi|1539931239 |
|266915488007399|somename|8759620898|Welcome|1540314926 |
|266915488007400|somename|8759620899|Hello|1540315092 |
|266915488007401|somename|8759620900|Namaskar|1537148280 |
--------------------------------------------------------------
现在我有了一个文件流源,我将从中获得流rowkey。现在必须检查流式处理rowkey的时间戳(lastts)是否早于一天。为此,我编写了以下代码,其中joinedf是一个流Dataframe,它是通过如下方式连接另一个流Dataframe和hbase静态Dataframe而形成的。
val HBaseStreamDF = HBaseStaticRecorddf.join(anotherStreamDF,"rowkey")
val newdf = HBaseStreamDF.filter(HBaseStreamDF.col("lastTS").cast("Long") < ((System.currentTimeMillis - 86400*1000)/1000))//records older than one day are eligible to get updated
过滤完成后,我想将此记录保存到hbase,如下所示。
newDF.writeStream
.foreach(new ForeachWriter[Row] {
println("inside foreach")
val tableName: String = "dummy"
val hbaseConfResources: Seq[String] = Seq("hbase-site.xml")
private var hTable: Table = _
private var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
connection = createConnection()
hTable = getHTable(connection)
true
}
def createConnection(): Connection = {
val hbaseConfig = HBaseConfiguration.create()
hbaseConfResources.foreach(hbaseConfig.addResource)
ConnectionFactory.createConnection(hbaseConfig)
}
def getHTable(connection: Connection): Table = {
connection.getTable(TableName.valueOf(tableName))
}
override def process(record: Row): Unit = {
var put = saveToHBase(record)
hTable.put(put)
}
override def close(errorOrNull: Throwable): Unit = {
hTable.close()
connection.close()
}
def saveToHBase(record: Row): Put = {
val p = new Put(Bytes.toBytes(record.getString(0)))
println("Now updating HBase for " + record.getString(0))
p.add(Bytes.toBytes("messageInfo"),
Bytes.toBytes("ts"),
Bytes.toBytes((System.currentTimeMillis/1000).toString)) //saving as second
p
}
}
).outputMode(OutputMode.Update())
.start().awaitTermination()
现在,当任何记录即将到来时,hbase只会第一次更新。如果同样的记录后来出现,它只是被忽视,没有工作。但是,如果某个独特的记录没有被spark应用程序处理,那么它就工作了。所以任何重复的记录都不会被第二次处理。
现在有件有趣的事。
如果我从(system.currenttimemillis-86400*1000)/1000中删除86400秒减法,那么即使传入记录之间存在冗余,也会处理所有内容。但它不是有意的,也没有用,因为它不过滤1天以前的记录。
如果我在filter条件中以毫秒为单位进行比较而不除以1000(这也需要以毫秒为单位的hbase数据),并将记录保存为put对象中的second,那么所有内容都会再次被处理。但是如果我在put对象中将格式改为seconds,那么它就不起作用了。
我试着分别测试过滤器和hbase-put,它们都工作得很好。但如果system.currenttimemillis在过滤器中有一些算术运算,比如/1000或-864000,它们一起就会出问题。如果我移除hbase接收器部分并使用
newDF.writeStream.format("console").start().awaitTermination()
然后过滤逻辑再次工作。如果我移除过滤器,hbase接收器就可以正常工作。但是,hbase的定制接收器只能第一次用于唯一记录。我尝试了其他几个过滤器逻辑,如下面,但问题仍然是一样的。
val newDF = newDF1.filter(col("lastTS").lt(LocalDateTime.now().minusDays(1).toEpochSecond(ZoneOffset.of("+05:30"))))
或
val newDF = newDF1.filter(col("lastTS").cast("Long") < LocalDateTime.now().minusDays(1).toEpochSecond(ZoneOffset.of("+05:30")))
如何使过滤器工作并将过滤后的记录保存到具有更新的时间戳的hbase中?我参考了其他几篇文章。但结果是一样的。
暂无答案!
目前还没有任何答案,快来回答吧!