我的数据来源是Kafka,我用以下方式从Kafka读取数据:
var df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094")
.option("subscribe", "raw_weather")
.load()
df = df.selectExpr("CAST(value as STRING)")
.as[String]
.select("value")
这个 value
收到的结果如下: (725030:14732,2008,12,31,11,0.6,-6.7,1001.7,80,6.2,8,0.0,0.0)
. 共有8784行(24*366)传递给Kafka。
我尝试使用扩展的类在db2数据库中流式传输这些数据 org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
. 以下是我如何尝试写入数据:
def writeToDb2(spark: SparkSession, df: DataFrame): Unit = {
val writer = new JDBCSink(url , user , password)
val query= df.writeStream
.foreach(writer)
.outputMode("append")
.trigger(Trigger.ProcessingTime(2000))
.start()
query.awaitTermination()
}
这就是我的 JDBCSink
看起来像:
class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{
val driver = "com.ibm.db2.jcc.DB2Driver"
var connection:java.sql.Connection = _
var statement:java.sql.Statement = _
val schema = "SPARK"
val rawTableName = "RAW_WEATHER_DATA"
val dailyPrecipitationTable = "DAILY_PRECIPITATION_TABLE"
def open(partitionId: Long, version: Long):Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(valz: org.apache.spark.sql.Row): Unit = {
val value = valz(0).toString.split(",")
val stmt = s"INSERT INTO $schema.$rawTableName(wsid, year, month, day, hour, temperature, dewpoint, pressure, wind_direction, wind_speed, sky_condition, one_hour_precip, six_hour_precip) " +
"VALUES (" +
"'" + value(0) + "'," +
value(1) + "," +
value(2) + "," +
value(3) + "," +
value(4) + "," +
value(5) + "," +
value(6) + "," +
value(7) + "," +
value(8) + "," +
value(9) + "," +
value(10) + "," +
value(11) + "," +
value(12) + ")"
println(value(1) + "," + value(2) + "," + value(3) + "," + value(4) + "," + value(11))
statement.executeUpdate(stmt)
}
def close(errorOrNull:Throwable):Unit = {
connection.close()
}
}
问题是,当我将数据发送到流时,spark不会读取所有行。当我看到程序试图编写的代码时,这一点就变得很清楚了。当我表演 COUNT (*)
在我的表中,它不会写入表中的所有8784行。在程序的某些迭代中,写入的行数徘徊在7000行左右,有时是7900行等,也就是说,它不会写入所有的行。
背后的原因可能是什么?我遵循了结构化流媒体指南。此外,我还尝试使用其他各种触发器运行,但都没有起到任何作用。
暂无答案!
目前还没有任何答案,快来回答吧!