java—如何用更高效的方法取代groupby

1tu0hz3e  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(342)

我的任务是使用apachespark分析肯尼迪航天中心的日志。代码正在运行,但我想摆脱 groupBy 因为it成本。
下面的代码收集带有5xx错误代码的请求列表,并计算失败的请求数。
我的代码

SparkSession session = SparkSession.builder().master("local").appName(application_name).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(session.sparkContext());
JavaRDD<LogEntry> input = jsc.textFile(hdfs_connect + args[0])
                .map(App::log_entry_extractor)
                .filter(Objects::nonNull);

Dataset<Row> dataSet = session.createDataFrame(input, LogEntry.class);

// task 1
dataSet.filter(col("returnCode").between(500, 599))
                .groupBy("request")
                .count()
                .select("request", "count")
//                .sort(desc("count"))
                .coalesce(1)
                .toJavaRDD()
                .saveAsTextFile(hdfs_connect + output_folder_task_1);

数据示例

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0
205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985
d104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] "GET / HTTP/1.0" 200 7074
y0u0uwnf

y0u0uwnf1#

这没有错 groupBy 在这种情况下,dataframe/dataset groupby behavior/optimization实际上也没有可行的替代方案。 coalesce(1) 另一方面,大多数情况下是一种反模式,在最坏的情况下,它会将您的流程变成一个连续的流程
但是,如果要进行剧烈合并(例如,使numpartitions=1),则这可能会导致计算在比您希望的节点更少的节点上进行(例如,在numpartitions=1的情况下为一个节点)。为了避免这种情况,可以调用重新分区。这将添加一个shuffle步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。
考虑将其替换为 repartition(1) 或者移除任何东西

相关问题