如何使用spark和java跨行集成?

aiazj4mn  于 2021-06-02  发布在  Hadoop
关注(0)|答案(20)|浏览(356)

我目前正试图用java编写一个spark作业来计算数据集中一列的积分。
数据如下所示:

DateTime                velocity (in km/h)        vehicle 
    2016-03-28 11:00:45     80                        A
    2016-03-28 11:00:45     75                        A
    2016-03-28 11:00:46     70                        A
    2016-03-28 11:00:47     68                        A
    2016-03-28 11:00:48     72                        A
    2016-03-28 11:00:48     75                        A
    ... 
    2016-03-28 11:00:47     68                        B
    2016-03-28 11:00:48     72                        B
    2016-03-28 11:00:48     75                        B

为了计算每条线路的距离(以公里为单位),我必须定义当前线路和下一条线路之间的时间差,并将其与速度相乘。然后,必须将结果添加到前一行的结果中,以检索此时行驶的“总距离”。
我现在想出了这样的办法。但它会计算出每个Map作业一辆车,可能会有数百万条记录。。。。

final JavaRDD<String[]> input = sc.parallelize(Arrays.asList(
                new String[]{"2016-03-28", "11:00", "80", "VIN1"},
                new String[]{"2016-03-28", "11:00", "60", "VIN1"},
                new String[]{"2016-03-28", "11:00", "50", "VIN1"},
                new String[]{"2016-03-28", "11:01", "80", "VIN1"},
                new String[]{"2016-03-28", "11:05", "80", "VIN1"},
                new String[]{"2016-03-28", "11:09", "80", "VIN1"},
                new String[]{"2016-03-28", "11:00", "80", "VIN2"},
                new String[]{"2016-03-28", "11:01", "80", "VIN2"}
        ));

        // grouping by vehicle and date:
        final JavaPairRDD<String, Iterable<String[]>> byVinAndDate = input.groupBy(new Function<String[], String>() {
            @Override
            public String call(String[] record) throws Exception {
                return record[0] + record[3]; // date, vin
            }
        });

        // mapping each "value" (all record matching key) to result
        final JavaRDD<String[]> result = byVinAndDate.mapValues(new Function<Iterable<String[]>, String[]>() {
            @Override
            public String[] call(Iterable<String[]> records) throws Exception {
                final Iterator<String[]> iterator = records.iterator();

                String[] previousRecord = iterator.next();

                for (String[] record : records) {

                     // Calculate difference current <-> previous record
                     // Add result to new list

                    previousRecord = record;
                }

                return new String[]{
                        previousRecord[0],
                        previousRecord[1],
                        previousRecord[2],
                        previousRecord[3],
                        NewList.get(previousRecord[0]+previousRecord[1]+previousRecord[2]+previousRecord[2])

                };
            }
        }).values();

我完全不知道如何将这个问题转化为map/reduce转换,同时又不失去分布式计算的好处。
我知道这违背了mr和spark的本质,但是任何关于如何链接数据行或以优雅的方式解决这个问题的建议都会非常有用:)
谢谢!

kcrjzv8t

kcrjzv8t2#

+----+ |2016-03-28|VIN1|11:00|0.017592592592592594| 1| |2016-03-28|VIN1|11:01|0.022222222222222223| 2| |2016-03-28|VIN1|11:05|0.022222222222222223| 3| |2016-03-28|VIN1|11:09|0.022222222222222223| 4| |2016-03-28|VIN2|11:00|0.022222222222222223| 1| |2016-03-28|VIN2|11:01|0.022222222222222223| 2| +

dgenwo3n

dgenwo3n3#

+-------+ |2016-03-28|VIN1|11:00|0.017592592592592594| 60| |2016-03-28|VIN1|11:01|0.022222222222222223| 240| |2016-03-28|VIN1|11:05|0.022222222222222223| 240| |2016-03-28|VIN1|11:09|0.022222222222222223| 0| |2016-03-28|VIN2|11:00|0.022222222222222223| 60| |2016-03-28|VIN2|11:01|0.022222222222222223| 0| +

jdzmm42g

jdzmm42g5#

+-------+` 计算整个距离的累积和

val distance = seconds.withColumn("distance", seconds("avg_velocity") * seconds("seconds"))
  distance.show()
  val cumulativeDistance = sum(distance("distance")).over(overDataAndId)

  val all = distance.withColumn("cum_distance", cumulativeDistance)
  all.show()

它将输出累计距离(秒==0的行是每次每个vechile id的总距离)。删除某些列后,将显示: `+

j2qf4p5b

j2qf4p5b7#

+-------+ | data| id| time| avg_velocity|seconds| +

qjp7pelc

qjp7pelc9#

--------+` 我发现它是一个更具可读性的解决方案,可以让spark管理Dataframe上的操作。代码是用scala编写的,但是可以很容易地用java翻译。

zsohkypk

zsohkypk10#

我宁愿把这个问题转化为dataframeapi,使用spark,让spark管理map/reduce(避免迭代器和数组)。实际上,我们要计算每辆车/每段时间的距离。以下是我使用的步骤:
将rdd转换为Dataframe

case class Vechicle(data: String, time: String, velocity: Int, id: String)
    val df = sc.parallelize(List(
        Vechicle("2016-03-28", "11:00", 80, "VIN1"),
        Vechicle("2016-03-28", "11:00", 60, "VIN1"),
        Vechicle("2016-03-28", "11:00", 50, "VIN1"),
        Vechicle("2016-03-28", "11:01", 80, "VIN1"),
        Vechicle("2016-03-28", "11:05", 80, "VIN1"),
        Vechicle("2016-03-28", "11:09", 80, "VIN1"),
        Vechicle("2016-03-28", "11:00", 80, "VIN2"),
        Vechicle("2016-03-28", "11:01", 80, "VIN2")
      )).toDF()

因为有些数据是同时推送的(minutes:seconds)计算平均值(以秒为度量单位)
val速度=df.groupby(df(“data”)、df(“id”)、df(“time”))).agg((avg(“velocity”)/3600).as(“avg\u velocity”))
它将提供以下输出: `+

kse8i1jr

kse8i1jr12#

--------+ | data| id| cum_distance| +

mrphzbgm

mrphzbgm15#

我想说你做得对,你不应该害怕数百万张唱片:
apachespark可以很好地平衡它,一个工作人员可能忙于长时间的任务,而另一个工作人员可能处理短时间的任务,
如果你能解析时间和距离,那么你可能会得到一个双精度甚至整数,循环遍历几百万个双精度,这并不需要太多的担心。
在给定的输入中,不应该有数百万条记录,因为一天只有1440分钟。
虽然您的方法不需要任何额外的内存来计算,但我提出了另一种方法—使用aggregatebykey,首先将所有时间和距离组合成每个键(vin、日期)的数组。对于这个例子我很抱歉,它是Java8。

final JavaRDD<String[]> input = jsc.parallelize(Arrays.asList(
            new String[]{"2016-03-28", "11:00", "80", "VIN1"},
            new String[]{"2016-03-28", "11:00", "60", "VIN1"},
            new String[]{"2016-03-28", "11:00", "50", "VIN1"},
            new String[]{"2016-03-28", "11:01", "80", "VIN1"},
            new String[]{"2016-03-28", "11:05", "80", "VIN1"},
            new String[]{"2016-03-28", "11:09", "80", "VIN1"},
            new String[]{"2016-03-28", "11:00", "80", "VIN2"},
            new String[]{"2016-03-28", "11:01", "80", "VIN2"}
    ));

    input
            .mapToPair(v -> new Tuple2<>(v[0] + v[3], new Tuple2<>(v[1], v[2])))
            .aggregateByKey(
                    new Tuple2<>(new ArrayList<>(N), new ArrayList<>(N)),
                    (Tuple2<ArrayList<String>, ArrayList<String>> t, Tuple2<String, String> v) -> { //function to add new values to the collection
                        t._1().add(v._1());
                        t._2().add(v._2());
                        return t;
                    },
                    (Tuple2<ArrayList<String>, ArrayList<String>> t1, Tuple2<ArrayList<String>, ArrayList<String>> t2) -> { //function to combine collections
                        t1._1().addAll(t2._1());
                        t1._2().addAll(t2._2());
                        return t1;
                    })
            .foreach(v -> { //prints
                System.out.println();
                System.out.print(v);
            });

这个代码给了我以下信息

(2016-03-28VIN2,([11:00, 11:01],[80, 80]))
(2016-03-28VIN1,([11:00, 11:00, 11:00, 11:01, 11:05, 11:09],[80, 60, 50, 80, 80, 80]))

而不是打印在foreach你必须使用 mapValues 同时在两个数组上循环以获得差分和相乘,然后使用 reduceByKey((a, b) -> a + b) 得到总数。
为了节省一些内存并创建更少的arrayList,您可以在aggregatebykey的第一行开始时创建足够大的数组,而不是像1000000这样提供smth。

相关问题