vps上的spark streaming connectFlumeFlume连接被拒绝

ckx4rj1h  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(276)

我正在用spark streaming(2.2.0)和flume(1.6.0)的pull方法做一个wordcount实验。在我的本地虚拟机上一切正常,所以我把所有的东西都转移到vultr上的vps上。使用第三方面板进行管理,并配置sucerity组。
我是这样做的:
启动flume服务器,它将源绑定到44444上,而spark sink绑定到41414上。测试使用 netstat -anp|grep 44444 414显示了正确的用法。所以Flume工作正常。
telnet localhost 44444,用于模拟输入。
使用idea测试我的程序,下面是出现问题的地方:
以下是错误消息:

20/09/11 13:39:33 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.io.IOException: Error connecting to hadoop/96.30.196.34:41414
    at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
    at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
    at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:138)
    at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:83)
    at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:82)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.streaming.flume.FlumePollingReceiver.onStart(FlumePollingInputDStream.scala:82)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
    at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
    at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: hadoop/96.30.196.34:41414
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
    at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
    at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
    at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
    at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    ... 3 more

使用vps时,什么可能导致连接被拒绝错误?
这里是我的wordcount示例,主机名和端口在程序参数中被设置为hadoop 414,hadoop在主机中被正确设置为vps的公共ip。

import org.apache.spark.SparkConf
    import org.apache.spark.streaming.flume.FlumeUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    object Count {

      def main(args: Array[String]): Unit = {

        if(args.length !=2){
          System.err.println("Usage: FlumePullWordCount <hostname> <port>")
          System.exit(1)
        }

        val Array(hostname, port) = args
        val sparkConf = new SparkConf().setAppName("Count").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))

        // Get stream from 41414 based on the flume conf
        val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)

        flumeStream.map(x=> new String(x.event.getBody.array()).trim)
          .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
      }

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题