我正在用spark streaming(2.2.0)和flume(1.6.0)的pull方法做一个wordcount实验。在我的本地虚拟机上一切正常,所以我把所有的东西都转移到vultr上的vps上。使用第三方面板进行管理,并配置sucerity组。
我是这样做的:
启动flume服务器,它将源绑定到44444上,而spark sink绑定到41414上。测试使用 netstat -anp|grep 44444
414显示了正确的用法。所以Flume工作正常。
telnet localhost 44444,用于模拟输入。
使用idea测试我的程序,下面是出现问题的地方:
以下是错误消息:
20/09/11 13:39:33 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.io.IOException: Error connecting to hadoop/96.30.196.34:41414
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:138)
at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:83)
at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:82)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.streaming.flume.FlumePollingReceiver.onStart(FlumePollingInputDStream.scala:82)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2173)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused: hadoop/96.30.196.34:41414
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
... 3 more
使用vps时,什么可能导致连接被拒绝错误?
这里是我的wordcount示例,主机名和端口在程序参数中被设置为hadoop 414,hadoop在主机中被正确设置为vps的公共ip。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Count {
def main(args: Array[String]): Unit = {
if(args.length !=2){
System.err.println("Usage: FlumePullWordCount <hostname> <port>")
System.exit(1)
}
val Array(hostname, port) = args
val sparkConf = new SparkConf().setAppName("Count").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// Get stream from 41414 based on the flume conf
val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)
flumeStream.map(x=> new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
暂无答案!
目前还没有任何答案,快来回答吧!