无法通过mqtt代理接收kafka上的数据

nbewdwxp  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(480)

关于Kafka镜头的使用,我有一个特别的问题。传感器端未接收到传感器数据。
也就是说,我尝试使用docker容器创建一个网络连接,其中每个容器表示一个服务。服务如下:
第一个是使用mqtt协议发送数据的传感器(在本例中是连接了传感器的raspberry pi)
mqtt代理—在本例中是mosquitto,用于获取消息并与透镜交互
lenses kafka-用于本例中mosquitto与其他服务(如另一端的influxdb)之间交互的系统
我可以肯定地说,发送的数据是成功接收的,因为我使用apachejmeter检查从mosquitto站点接收到的数据。问题出现在镜头侧,在那里可以识别连接,但没有接收到数据,并显示以下错误:

MqttException (0) - java.net.NoRouteToHostException: Host is unreachable (Host unreachable) 
 at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:38)
 at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:715)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748) 
Caused by: java.net.NoRouteToHostException: Host is unreachable (Host unreachable)
 at java.net.PlainSocketImpl.socketConnect(Native Method)
 at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
 at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 at java.net.Socket.connect(Socket.java:589) 
 at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:84)
 at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:701) 
... 7 more

透镜侧用于读取数据的配置代码是:

connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.clean=true
connect.mqtt.timeout=1000
tasks.max=1
connect.mqtt.keep.alive=1000
name=Mosquitto
connect.mqtt.kcql=INSERT INTO kafka_sensor_data SELECT * FROM /sensor_data WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.service.quality=1
connect.mqtt.client.id=dm_source_id
connect.mqtt.converter.throw.on.error=true
connect.mqtt.hosts=tcp://XXX:1883

(仅用于安全目的,已经有一个经过测试的ip地址)。
这种方法实际上一直有效,直到最近,但突然我变成了这个错误。我检查了连接和端口连接,但一切正常。不太确定去哪里查了。

qrjkbowd

qrjkbowd1#

过了一段时间我终于找到了问题所在。
它是通过使用docker容器实现的。虽然该服务设法保持端口和地址对外部网络开放和组织,但内部存在问题,因为lenses无法识别mqtt代理。
在配置了我自己的网络而不是让它在默认设置下创建之后,我设法解决了这个问题。

相关问题