通过spark将kafka消息保存到hbase中会话从不关闭

doinxwow  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(488)

我正在尝试使用spark流来接收来自kafka的消息,将它们转换为put并插入到hbase中。我创造了一个 inputDstream 从kafka接收消息,然后创建jobconf,最后使用 saveAsHadoopDataset(JobConf) 将记录保存到hbase。
每次将记录插入hbase时,都会设置一个从hbase到zookeeper的会话,但从不关闭。如果连接数的增加超过zookeeper的最大客户端连接数,spark流将崩溃。
我的代码如下所示:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

object ReceiveKafkaAsDstream {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val topics = "test"
    val brokers = "10.0.2.15:6667"

    val topicSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

    val tableName = "KafkaTable"
    val conf = HBaseConfiguration.create()
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    val jobConfig: JobConf = new JobConf(conf, this.getClass)
    jobConfig.set("mapreduce.output.fileoutputformat", "/user/root/out")
    jobConfig.setOutputFormat(classOf[TableOutputFormat])
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)

      val records = messages
        .map(_._2)
        .map(SampleKafkaRecord.parseToSampleRecord)
      records.print()  
      records.foreachRDD{ stream => stream.map(SampleKafkaRecord.SampleToHbasePut).saveAsHadoopDataset(jobConfig) }

    ssc.start()
    ssc.awaitTermination()
  }

  case class SampleKafkaRecord(id: String, name: String)
  object SampleKafkaRecord extends Serializable {
    def parseToSampleRecord(line: String): SampleKafkaRecord = {
      val values = line.split(";")
      SampleKafkaRecord(values(0), values(1))
    }

    def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
      val rowKey = CSVData.id
      val putOnce = new Put(rowKey.getBytes)

      putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
      return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
    }
  }
}

我在zookeeper conf文件zoo.cfg中将ssc(sparkstreamingcontext)的duration设置为1s,并将maxclientcnxns设置为10,因此从一个客户端到zookeeper最多允许10个连接。
10秒钟后(从hbase到zookeeper设置了10个会话),我得到如下错误:

16/08/24 14:59:30 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase-unsecure/hbaseid
16/08/24 14:59:31 INFO ClientCnxn: Opening socket connection to server localhost.localdomain/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
16/08/24 14:59:31 INFO ClientCnxn: Socket connection established to localhost.localdomain/127.0.0.1:2181, initiating session
16/08/24 14:59:31 WARN ClientCnxn: Session 0x0 for server localhost.localdomain/127.0.0.1:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:384)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)

在我的理解中,这个错误的存在是因为连接数超过了zookeeper的最大连接数。如果我将maxclientcnxn设置为20,流处理可以持续20秒。我知道我可以设置maxclientcnxn为无限,但我真的不认为这是一个解决这个问题的好方法。
另一件事是,如果我使用textfilestream获取文本文件作为数据流,并使用saveashadoopdataset(jobconf)将它们保存到hbase中,它运行得非常好。如果我只是用 val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 只要把信息打印出来,也没有问题。当我收到kafka消息并将它们保存到应用程序的hbase中时,问题就来了。
我的环境是hdp2.4沙盒。版本:1.6,hbase:1.1.2, kafka:2.10.0,Zookeeper:3.4.6。
感谢您的帮助。

gopyfrb3

gopyfrb31#

好吧,我终于成功了。
属性集:
有一个名为“zookeeper.connection.timeout.ms”的属性。此属性应设置为1s。
更改为新api:
变更方法 saveAsHadoopDataset(JobConf)saveAsNewAPIHadoopDataset(JobConf) . 我仍然不知道为什么旧的api不起作用。
改变 import org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.hbase.mapreduce.TableOutputFormat

相关问题