如何使用Pyspark通过SSH隧道读取MySQL数据库?

wh6knrhe  于 5个月前  发布在  Mysql
关注(0)|答案(1)|浏览(46)

我在这个奇怪的Pyspark行为破解我的头,在互联网上找不到任何东西。
我们在私有网络中有一个MySQL集群,我们可以通过SSH隧道访问它。我试图使用Pyspark和SSHTunnelForwarder从这个数据库读取数据,我是这样做的:
1.创建隧道:

server = SSHTunnelForwarder(
        (target_tunnel_ip_address, 22),
        ssh_username=tunnel_username",
        ssh_private_key=private_key_filepath",
        remote_bind_address=(mysql_address, 3306)
        )

server.start()

字符串
1.使用数据库信息创建JDBC URL,如下所示:

hostname = "localhost" #Because I forwarded I forwarded the remote port to my localhost
port = server.local_bind_port #To access which port the SSHTunnelForwarder used
username = my_username
password = my_password
database = my_database
jdbcUrl = "jdbc:mysql://{}:{}/{}?user={}&password={}".format(hostname, port, database, username, password)


1.从数据库中阅读数据:

data = spark.read \
                  .format("jdbc") \
                  .option("url", jdbcUrl) \
                  .option("driver", "com.mysql.cj.jdbc.Driver") \
                  .option("query", query) \
                  .load()


到目前为止一切顺利,这似乎工作,我可以看到表列:[变量数据输出][1] [1]:https://i.stack.imgur.com/YJhCC.png

DataFrame[id: int, company_id: int, life_id: int, type_id: int, cep: string, address: string, number: int, complement: string, neighborhood: string, city: string, state: string, origin: string, created_at: timestamp, updated_at: timestamp, active: boolean]


但是一旦我调用任何实际读取数据的方法,比如.head(),.collect()或其他变体,我就会得到这个错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7629.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7629.0 (TID 11996, XX.XXX.XXX.XXX, executor 0): com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure


有人知道为什么会发生这种情况以及如何解决它吗?

dphi5xsq

dphi5xsq1#

该代码在驱动程序中执行,但任务在执行器上运行,当您引用“localhost”时,每个执行器将自行解释并无法连接。
而是获取驱动程序的主机名(例如socket.gethostname())并在JDBC URL中使用它

相关问题