只从cassandra提取spark中所需的列,而不加载所有列

vmpqdwk3  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(355)

使用spark elasticsearch连接器可以直接从es向spark加载所需的列。然而,似乎不存在这样一个直接的选项来做同样的,使用SparkCassandra连接器
将es中的数据读取到spark中--此处仅将es中所需的列带到spark中:

spark.conf.set('es.nodes', ",".join(ES_CLUSTER))
es_epf_df = spark.read.format("org.elasticsearch.spark.sql") \
        .option("es.read.field.include", "id_,employee_name") \
        .load("employee_0001") \

将数据从cassandra读入spark——这里所有列的数据都被带到spark,然后应用select来拉取感兴趣的列:

spark.conf.set('spark.cassandra.connection.host', ','.join(CASSANDRA_CLUSTER))
cass_epf_df = spark.read.format('org.apache.spark.sql.cassandra') \
        .options(keyspace="db_0001", table="employee") \
        .load() \
        .select("id_", "employee_name")

对Cassandra也可以这样吗?如果是,那么怎么做。如果没有,那为什么不呢。

a0x5cqrl

a0x5cqrl1#

你写的代码已经在这么做了。您在加载后编写了select,您可能认为首先会拉取所有列,然后过滤所选列,但事实并非如此。
假设: select * from db_0001.employee; 实际值: select id_, employee_name from db_0001.employee; spark将理解您需要的列,并只查询cassandra数据库中的列。此功能称为 predicate 下推。这不仅仅局限于cassandra,很多源代码都支持这个特性(这是spark的特性,而不是cassandra)。
更多信息:https://docs.datastax.com/en/dse/6.7/dse-dev/datastax_enterprise/spark/sparkpredicatepushdown.html

acruukt9

acruukt92#

实际上,连接器应该自己做,而不需要显式设置任何东西,它被称为“ predicate 下推”,cassandra连接器就是这样做的,根据文档:
连接器将自动将所有有效 predicate 下推到cassandra。数据源还将自动仅从cassandra中选择完成查询所需的列。这可以通过explain命令进行监视。
资料来源:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

相关问题