spark+cassandra性能相关问题(java代码)

jecbmhm3  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(307)

我使用cassandra作为我的垃圾场,在这里我有多个作业运行来处理数据和更新不同的系统。以下是与作业相关的筛选器
工作1。基于活动标志的数据过滤,更新日期时间和到期时间,并对过滤后的数据进行处理。
工作2。基于更新\日期\时间的数据过滤器处理数据
工作3。基于创建日期时间和活动标志的数据过滤器
运行where条件的db列是(一个查询中的一个或多个列)
活动->是/否
创建日期->时间戳
到期时间->时间戳
更新日期->时间戳
我对这些条件的问题是:-
我应该如何形成我的Cassandra主键?因为我看不到任何方法来实现这个唯一性(id是存在的,但我不需要它来处理数据)。
如果我使用表扫描对Spark代码进行过滤,我甚至需要主键吗?
考虑到数百万条记录的处理。

oaxa6hgo

oaxa6hgo1#

回答您的问题-您需要一个主键,即使它只包含分区键:-)
更详细的答案实际上取决于这些作业运行的频率、总体数据量、集群中的节点数、使用的硬件等。通常,我们会尽可能多地向cassandra推送过滤,因此它只返回相关数据,而不是所有数据。最有效的过滤发生在第一个集群列上,例如,如果我只想处理新创建的条目,那么我可以使用具有以下结构的表:

create table test.test (
  pk int,
  tm timestamp,
  c2 int,
  v1 int,
  v2 int,
  primary key(pk, tm, c2));

然后我可以使用以下方法仅获取新创建的条目:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("test", "test").load()
val filtered = data.filter("tm >= cast('2019-03-10T14:41:34.373+0000' as timestamp)")

或者我可以在给定的时间段内获取条目:

val filtered = data.filter("""ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp)
  AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)""")

可以通过执行 explain 并检查 PushedFilters 第节-标记为的条件 * 将在Cassandra那边被处决。。。
但并不总是可以设计表来匹配所有查询,因此需要为执行频率最高的作业设计主键。就你而言, update_date_time 可能是一个很好的候选,但是如果您将其作为集群列,那么在更新它时您需要小心—您需要以批处理的方式执行更改,如下所示:

begin batch
delete from table where pk = ... and update_date_time = old_timestamp;
insert into table (pk, update_date_time, ...) values (..., new_timestamp, ...);
apply batch;

或者类似的。

相关问题