提高apachespark到redis的写性能

kpbpu008  于 2021-06-30  发布在  Java
关注(0)|答案(2)|浏览(326)

我有一个应用程序,它使用apachespark将键、值数据写入redis。这个应用程序运行起来没有任何问题。但是,应用程序要慢得多。我在这里寻找一些建议来提高写吞吐量,并在将数据写入redis时提高并行性。
这是密码

Dataset<Row> rowkeyMapping = services.select(regexp_replace(col("rowkey"), "_", "").as("rowkey"),struct(regexp_replace(col("name"), "\\[", ","), regexp_replace(col("oname"), "\\[", ","), col("cid")).as("detailsinfo"));

rowkeyMapping.foreach(obj -> {
    JedisPoolConfig poolConfig = new JedisPoolConfig();
    poolConfig.setMaxTotal(5000);
    JedisPool pool = new JedisPool(poolConfig, "redis-host", Integer.parseInt("6379"));
    Jedis jedis = pool.getResource();
    ObjectMapper om = new ObjectMapper();
    String[] rowArray = obj.mkString()
        .replaceAll("[\\[]", ",")
        .split(",");
    String key = rowArray[0];
    DetailInfo detail = new DetailInfo();
    detail.setName(rowArray[1]);
    detail.setOName(rowArray[2]);
    detail.setCid(rowArray[3]);

    String value = om.writeValueAsString(detail);
    logger.info("writing key value pairs to Redis cache (Key) :: " + key);
    jedis.set(key, value);
    jedis.quit();
});

我对redis的流水线技术知之甚少。但是,我认为流水线更多的是成批处理命令。在这里,就我而言,我处理的是数百万的数据。我不确定流水线是否最合适。
如有任何帮助,我们将不胜感激。

1u4esq0p

1u4esq0p1#

我对spark和redis都不是Maven,但我认为以下几行应该在foreach循环之外:

JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(5000);
JedisPool pool = new JedisPool(poolConfig, "redis-host", Integer.parseInt("6379"));
dz6r00yl

dz6r00yl2#

正如@amir kost在他的回答中提到的,您的问题是,当您设置单个键值对时,您将创建一个新的连接。为了提高性能,您应该为一批键值对重用连接。
正如您在评论中提到的,您必须在executor中创建连接。所以为了重用连接,您需要使用 foreachPartition 方法 Dataset<Row> ,而不是 foreach . foreachPartition 运行给定的 ForeachPartitionFunction<T> 整个分区的函数。因此您可以创建一个连接,并对分区中的所有项重复使用它。查看文档了解详细信息。
还有,用 foreachPartition ,您可以在分区中获得一批项,然后可以使用redis pipline来获得更好的性能。查看管道文件以了解详细信息

相关问题