我有一个应用程序,它使用apachespark将键、值数据写入redis。这个应用程序运行起来没有任何问题。但是,应用程序要慢得多。我在这里寻找一些建议来提高写吞吐量,并在将数据写入redis时提高并行性。
这是密码
Dataset<Row> rowkeyMapping = services.select(regexp_replace(col("rowkey"), "_", "").as("rowkey"),struct(regexp_replace(col("name"), "\\[", ","), regexp_replace(col("oname"), "\\[", ","), col("cid")).as("detailsinfo"));
rowkeyMapping.foreach(obj -> {
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(5000);
JedisPool pool = new JedisPool(poolConfig, "redis-host", Integer.parseInt("6379"));
Jedis jedis = pool.getResource();
ObjectMapper om = new ObjectMapper();
String[] rowArray = obj.mkString()
.replaceAll("[\\[]", ",")
.split(",");
String key = rowArray[0];
DetailInfo detail = new DetailInfo();
detail.setName(rowArray[1]);
detail.setOName(rowArray[2]);
detail.setCid(rowArray[3]);
String value = om.writeValueAsString(detail);
logger.info("writing key value pairs to Redis cache (Key) :: " + key);
jedis.set(key, value);
jedis.quit();
});
我对redis的流水线技术知之甚少。但是,我认为流水线更多的是成批处理命令。在这里,就我而言,我处理的是数百万的数据。我不确定流水线是否最合适。
如有任何帮助,我们将不胜感激。
2条答案
按热度按时间1u4esq0p1#
我对spark和redis都不是Maven,但我认为以下几行应该在foreach循环之外:
dz6r00yl2#
正如@amir kost在他的回答中提到的,您的问题是,当您设置单个键值对时,您将创建一个新的连接。为了提高性能,您应该为一批键值对重用连接。
正如您在评论中提到的,您必须在executor中创建连接。所以为了重用连接,您需要使用
foreachPartition
方法Dataset<Row>
,而不是foreach
.foreachPartition
运行给定的ForeachPartitionFunction<T>
整个分区的函数。因此您可以创建一个连接,并对分区中的所有项重复使用它。查看文档了解详细信息。还有,用
foreachPartition
,您可以在分区中获得一批项,然后可以使用redis pipline来获得更好的性能。查看管道文件以了解详细信息